You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/07/25 20:32:55 UTC
svn commit: r1365690 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/client/
main/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/client/ test/java/org/apache/hadoop/hb...
Author: mbautin
Date: Wed Jul 25 18:32:54 2012
New Revision: 1365690
URL: http://svn.apache.org/viewvc?rev=1365690&view=rev
Log:
[HBASE-6451] Try to make the put path handle errors similar to getRegionServerWithRetries
Author: aaiyer
Summary:
Prakash has changed getRegionServerWithRetries to handle retries and
failures better. This change tries to bring similar improvements to the put
path.
Test Plan: Run on MR. Test out on dev cluster.
Reviewers: pkhemani, liyintang
Reviewed By: pkhemani
CC: hbase-eng@
Differential Revision: https://phabricator.fb.com/D508747
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1365690&r1=1365689&r2=1365690&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Wed Jul 25 18:32:54 2012
@@ -35,7 +35,6 @@ public final class HConstants {
public enum OperationStatusCode {
NOT_RUN,
SUCCESS,
- SANITY_CHECK_FAILURE,
FAILURE;
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java?rev=1365690&r1=1365689&r2=1365690&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java Wed Jul 25 18:32:54 2012
@@ -322,8 +322,10 @@ public interface HConnection extends Clo
* Delete the cached location
* @param tableName
* @param row
+ * @param oldLoc
*/
- public void deleteCachedLocation(final byte [] tableName, final byte [] row);
+ public void deleteCachedLocation(final byte [] tableName, final byte [] row,
+ HServerAddress oldLoc);
/**
* Enable or disable region cache prefetch for the table. It will be
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1365690&r1=1365689&r2=1365690&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Wed Jul 25 18:32:54 2012
@@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.HServerAd
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
@@ -882,7 +883,7 @@ public class HConnectionManager {
return location;
}
} else {
- deleteCachedLocation(tableName, row);
+ deleteCachedLocation(tableName, row, null);
}
// Query the root or meta region for the location of the meta region
@@ -1033,10 +1034,11 @@ public class HConnectionManager {
}
/*
- * Delete a cached location, if it satisfies the table name and row
- * requirements.
+ * Delete a cached location for the specified table name and row
+ * if it is located on the (optionally) specified old location.
*/
- public void deleteCachedLocation(final byte [] tableName, final byte [] row) {
+ public void deleteCachedLocation(final byte [] tableName,
+ final byte [] row, HServerAddress oldServer) {
synchronized (this.cachedRegionLocations) {
Map<byte [], HRegionLocation> tableLocations =
getTableLocations(tableName);
@@ -1046,6 +1048,11 @@ public class HConnectionManager {
if (!tableLocations.isEmpty()) {
HRegionLocation rl = getCachedLocation(tableName, row);
if (rl != null) {
+ // If oldLocation is specified. deleteLocation only if it is the same.
+ if (oldServer != null
+ && !oldServer.equals(rl.getServerAddress()))
+ return; // perhaps, some body else cleared and repopulated.
+
tableLocations.remove(rl.getRegionInfo().getStartKey());
if (LOG.isDebugEnabled()) {
LOG.debug("Removed " +
@@ -1289,29 +1296,21 @@ public class HConnectionManager {
callable.instantiateRegionLocation(false /* reload cache? */);
for(int tries = 0; ; tries++) {
try {
- callable.instantiateServer();
- return callable.call();
- } catch (Throwable t) {
- boolean isLocalException = !(t instanceof RemoteException);
- // translateException throws DoNotRetryException or any
- // non-IOException.
- t = translateException(t);
- if (isLocalException && (t instanceof SocketTimeoutException ||
- t instanceof ConnectException ||
- t instanceof ClosedChannelException ||
- t instanceof SyncFailedException ||
- t instanceof EOFException)) {
- // XXX this list covers most connectivity exceptions but not all.
- // For example, in SocketOutputStream a plain IOException is thrown
- // at times when the channel is closed.
-
- // if thrown these exceptions, we clear all the cache entries that
- // map to that slow/dead server; otherwise, let cache miss and ask
- // .META. again to find the new location
- HRegionLocation hrl = callable.location;
- clearCachedLocationForServer(hrl.getServerAddress().toString());
+ return getRegionServerWithoutRetries(callable, false);
+ } catch (DoNotRetryIOException ioe) {
+ // clear cache if needed
+ if (ioe.getCause() instanceof NotServingRegionException) {
+ HRegionLocation prevLoc = callable.location;
+ if (prevLoc.getRegionInfo() != null) {
+ deleteCachedLocation(callable.tableName,
+ prevLoc.getRegionInfo().getStartKey(),
+ prevLoc.getServerAddress());
+ }
}
+ // If we are not supposed to retry; Let it pass through.
+ throw ioe;
+ } catch (Throwable t) {
exceptions.add(t);
if (tries == numRetries - 1) {
throw new RetriesExhaustedException(callable.getServerName(),
@@ -1319,8 +1318,14 @@ public class HConnectionManager {
}
HRegionLocation prevLoc = callable.location;
+ if (prevLoc.getRegionInfo() != null) {
+ deleteCachedLocation(callable.tableName,
+ prevLoc.getRegionInfo().getStartKey(),
+ prevLoc.getServerAddress());
+ }
// do not retry if getting the location throws exception
- callable.instantiateRegionLocation(true /* reload cache ? */);
+ callable.instantiateRegionLocation(false /* reload cache ? */);
+
if (prevLoc.getServerAddress().
equals(callable.location.getServerAddress())) {
long pauseTime = getPauseTime(tries);
@@ -1339,11 +1344,11 @@ public class HConnectionManager {
throw new InterruptedIOException();
}
// do not reload cache. While we were sleeping hopefully the cache
- // has been re-populated. We had anyway invalidated it earlier
- // before going to sleep.
+ // has been re-populated.
callable.instantiateRegionLocation(false);
} else {
LOG.debug("getRegionServerWithRetries failed, " +
+ "region moved from " + prevLoc + " to " + callable.location +
"retrying immediately tries=" + tries, t);
}
}
@@ -1353,12 +1358,45 @@ public class HConnectionManager {
@Override
public <T> T getRegionServerWithoutRetries(ServerCallable<T> callable)
throws IOException, RuntimeException {
+ return getRegionServerWithoutRetries(callable, true);
+ }
+
+ /**
+ * Pass in a ServerCallable with your particular bit of logic defined and
+ * this method will pass it to the defined region server.
+ * @param <T> the type of the return value
+ * @param callable callable to run
+ * @return an object of type T
+ * @throws IOException if a remote or network exception occurs
+ * @throws RuntimeException other unspecified error
+ */
+ private <T> T getRegionServerWithoutRetries(ServerCallable<T> callable,
+ boolean instantiateRegionLocation)
+ throws IOException, RuntimeException {
try {
- callable.instantiateRegionLocation(false);
+ if (instantiateRegionLocation) callable.instantiateRegionLocation(false);
callable.instantiateServer();
return callable.call();
- } catch (Throwable t) {
- Throwable t2 = translateException(t);
+ } catch (Throwable t1) {
+ Throwable t2 = translateException(t1);
+ boolean isLocalException = !(t2 instanceof RemoteException);
+ // translateException throws DoNotRetryException or any
+ // non-IOException.
+ if (isLocalException && (t2 instanceof SocketTimeoutException ||
+ t2 instanceof ConnectException ||
+ t2 instanceof ClosedChannelException ||
+ t2 instanceof SyncFailedException ||
+ t2 instanceof EOFException)) {
+ // XXX this list covers most connectivity exceptions but not all.
+ // For example, in SocketOutputStream a plain IOException is thrown
+ // at times when the channel is closed.
+
+ // if thrown these exceptions, we clear all the cache entries that
+ // map to that slow/dead server; otherwise, let cache miss and ask
+ // .META. again to find the new location
+ clearCachedLocationForServer(callable.location.getServerAddress().toString());
+ }
+
if (t2 instanceof IOException) {
throw (IOException)t2;
} else {
@@ -1381,7 +1419,10 @@ public class HConnectionManager {
@Override
public void instantiateRegionLocation(boolean reload) throws IOException {
- // do nothing.
+ // we don't need to locate the region, since we have been given the address of the
+ // server. But, let us store the information in this.location, so that
+ // we can handle failures (i.e. clear cache) if we fail to connect to the RS.
+ this.location = new HRegionLocation(null, address);
}
@Override
@@ -1503,19 +1544,20 @@ public class HConnectionManager {
} catch (InterruptedException ie) {
throw ie;
} catch (ExecutionException e) {
- translateException(e.getCause());
+ if(e.getCause() instanceof DoNotRetryIOException)
+ throw (DoNotRetryIOException)e.getCause();
}
// If we got a response. Let us go through the responses from each region and
// process the Puts and Deletes.
// If the response is null, we will add it to newWorkingList here.
if (request.deletes != null) {
- newWorkingList = processMutationResponseFromOneRegionServer(tableName, resp,
- request.deletes, newWorkingList, true);
+ newWorkingList = processMutationResponseFromOneRegionServer(tableName, address,
+ resp, request.deletes, newWorkingList, true);
}
if (request.puts != null) {
- newWorkingList = processMutationResponseFromOneRegionServer(tableName, resp,
- request.puts, newWorkingList, false);
+ newWorkingList = processMutationResponseFromOneRegionServer(tableName, address,
+ resp, request.puts, newWorkingList, false);
}
}
return newWorkingList;
@@ -1546,7 +1588,8 @@ public class HConnectionManager {
} catch (InterruptedException ie) {
throw ie;
} catch (ExecutionException e) {
- translateException(e.getCause());
+ if(e.getCause() instanceof DoNotRetryIOException)
+ throw (DoNotRetryIOException)e.getCause();
}
if (resp == null) {
@@ -1554,13 +1597,15 @@ public class HConnectionManager {
LOG.debug("Failed all for server: " + address + ", removing from cache");
}
- newWorkingList = processGetResponseFromOneRegionServer(tableName, resp, request, orig_list,
- newWorkingList, results);
+ newWorkingList = processGetResponseFromOneRegionServer(tableName, address,
+ request, resp, orig_list, newWorkingList, results);
}
return newWorkingList;
}
- private <R extends Mutation> List<Mutation> processMutationResponseFromOneRegionServer(byte[] tableName,
+ private <R extends Mutation> List<Mutation> processMutationResponseFromOneRegionServer(
+ byte[] tableName,
+ HServerAddress address,
MultiResponse resp,
Map<byte[], List<R>> map,
List<Mutation> newWorkingList,
@@ -1593,7 +1638,7 @@ public class HConnectionManager {
newWorkingList.addAll(regionOps);
// enough to remove from cache one of the rows from the region
- deleteCachedLocation(tableName, regionOps.get(0).getRow());
+ deleteCachedLocation(tableName, regionOps.get(0).getRow(), address);
}
}
@@ -1601,7 +1646,9 @@ public class HConnectionManager {
}
private List<Get> processGetResponseFromOneRegionServer(byte[] tableName,
- MultiResponse resp, MultiAction request,
+ HServerAddress address,
+ MultiAction request,
+ MultiResponse resp,
List<Get> orig_list, List<Get> newWorkingList,
Result[] results) throws IOException {
@@ -1631,7 +1678,7 @@ public class HConnectionManager {
}
// enough to clear this once for a region
- deleteCachedLocation(tableName, regionGets.get(0).getRow());
+ deleteCachedLocation(tableName, regionGets.get(0).getRow(), address);
}
}
@@ -1963,57 +2010,7 @@ public class HConnectionManager {
}
/**
- * Process a single put request
- * @param put The put request
- * @param failed The failed request list
- * @param tableName The table name for the put request
- * @return The cause of exceptions when processing the put request.
- * @throws IOException
- */
- private Throwable processSinglePut(Put put,
- List<Put> failed, final byte[] tableName, HBaseRPCOptions options) throws IOException {
- // XXX error handling should mirror getRegionServerWithRetries()
- // Get server address
- byte [] row = put.getRow();
- HRegionLocation loc = locateRegion(tableName, row, true);
- HServerAddress address = loc.getServerAddress();
- byte [] regionName = loc.getRegionInfo().getRegionName();
-
- // Convert the single put to the multiput
- MultiPut multiPut = new MultiPut(address);
- multiPut.add(regionName, put);
-
- // Create the multiPutCallable
- Callable<MultiPutResponse> multiPutCallable =
- createPutCallable(multiPut.address, multiPut, tableName, options);
-
- try {
- // Get the MultiPutResponse
- MultiPutResponse response = multiPutCallable.call();
-
- // verify the result
- Integer result = response.getAnswer(regionName);
- if (result == null || result >= 0) {
- LOG.debug("Failed all for region: " +
- Bytes.toStringBinary(regionName));
- failed.add(put);
- }
- } catch (Exception e) {
- // process exceptions
- LOG.debug("Failed all from " + address, e);
- failed.add(put);
-
- if (e.getCause() instanceof DoNotRetryIOException) {
- // Just give up, leaving the batch put list in an
- // untouched/semi-committed state
- throw (DoNotRetryIOException) e.getCause();
- }
- return e.getCause();
- }
- return null;
- }
-
- /**
+ * Try to make the put path handle errors similar to getRegionServerWithRetries
* Process a list of puts from a shared HTable thread pool.
* @param list The input put request list
* @param failed The failed put request list
@@ -2050,9 +2047,19 @@ public class HConnectionManager {
List<Future<MultiPutResponse>> futures =
new ArrayList<Future<MultiPutResponse>>(regionPuts.size());
+ boolean singleServer = (multiPuts.size() == 1);
for ( MultiPut put : multiPuts ) {
- futures.add(HTable.multiActionThreadPool.submit(
- createPutCallable(put.address, put, tableName, options)));
+ Callable<MultiPutResponse> callable = createPutCallable(put.address,
+ put, tableName, options);
+ Future<MultiPutResponse> task;
+ if (singleServer) {
+ task = new FutureTask<MultiPutResponse>(callable);
+ ((FutureTask<MultiPutResponse>)task).run();
+ }
+ else {
+ task = HTable.multiActionThreadPool.submit(callable);
+ }
+ futures.add(task);
}
// step 3:
@@ -2060,37 +2067,38 @@ public class HConnectionManager {
for (int i = 0; i < futures.size(); i++ ) {
Future<MultiPutResponse> future = futures.get(i);
MultiPut request = multiPuts.get(i);
+ MultiPutResponse resp = null;
try {
- MultiPutResponse resp = future.get();
-
- // For each region
- for (Map.Entry<byte[], List<Put>> e : request.puts.entrySet()) {
- Integer result = resp.getAnswer(e.getKey());
- if (result == null) {
- // failed
- LOG.debug("Failed all for region: " +
- Bytes.toStringBinary(e.getKey()) + ", removing from cache");
- failed.addAll(e.getValue());
- } else if (result >= 0) {
- // some failures
- List<Put> lst = e.getValue();
- failed.addAll(lst.subList(result, lst.size()));
- LOG.debug("Failed past " + result + " for region: " +
- Bytes.toStringBinary(e.getKey()) + ", removing from cache");
- }
- }
+ resp = future.get();
} catch (InterruptedException e) {
- // go into the failed list.
- LOG.debug("Failed all from " + request.address, e);
- failed.addAll(request.allPuts());
- } catch (ExecutionException e) {
- // all go into the failed list.
- LOG.debug("Failed all from " + request.address, e);
- failed.addAll(request.allPuts());
-
- // Just give up, leaving the batch put list in an untouched/semi-committed state
- if (e.getCause() instanceof DoNotRetryIOException) {
- throw (DoNotRetryIOException) e.getCause();
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException(e.getMessage());
+ } catch (ExecutionException ex) {
+ // retry, unless it is not to be retried.
+ if (ex.getCause() instanceof DoNotRetryIOException)
+ throw (DoNotRetryIOException)ex.getCause();
+ }
+
+ // For each region
+ for (Map.Entry<byte[], List<Put>> e : request.puts.entrySet()) {
+ byte[] region = e.getKey();
+ List<Put> lst = e.getValue();
+ Integer result = null;
+ if (resp != null)
+ result = resp.getAnswer(region);
+
+ if (result == null) {
+ // failed
+ LOG.debug("Failed all for region: " +
+ Bytes.toStringBinary(region) + ", removing from cache");
+ deleteCachedLocation(tableName, lst.get(0).getRow(), request.address);
+
+ failed.addAll(e.getValue());
+ } else if (result != HConstants.MULTIPUT_SUCCESS) {
+ // some failures
+ failed.addAll(lst.subList(result, lst.size()));
+ LOG.debug("Failed past " + result + " for region: " +
+ Bytes.toStringBinary(region) + ", removing from cache");
}
}
}
@@ -2117,12 +2125,14 @@ public class HConnectionManager {
}
failedPuts.addAll(e.getValue());
- } else if (result >= 0) {
+ } else if (result != HConstants.MULTIPUT_SUCCESS) {
// Prepared the failed puts for retry
if (failedPuts == null) {
failedPuts = new ArrayList<Put>();
}
List<Put> lst = e.getValue();
+ // In case of a failure the return result is the index (non-inclusive) up
+ // to which the operations succeded.
failedPuts.addAll(lst.subList(result, lst.size()));
}
}
@@ -2154,47 +2164,44 @@ public class HConnectionManager {
*/
public void processBatchOfPuts(List<Put> list, final byte[] tableName, HBaseRPCOptions options)
throws IOException {
- boolean singletonList = list.size() == 1;
- Throwable singleRowCause = null;
- for ( int tries = 0 ; tries < numRetries && !list.isEmpty(); ++tries) {
+ long callStartTime;
+ callStartTime = System.currentTimeMillis();
+
+ int tries;
+ for ( tries = 0 ; tries < numRetries && !list.isEmpty(); ++tries) {
List<Put> failed = new ArrayList<Put>();
- if (singletonList) {
- singleRowCause = this.processSinglePut(list.get(0), failed, tableName, options);
- } else {
- this.processBatchOfMultiPut(list, failed, tableName, options);
- }
+ this.processBatchOfMultiPut(list, failed, tableName, options);
list.clear();
if (!failed.isEmpty()) {
- // clear the cache for failed puts
- for (Put failedPut: failed) {
- deleteCachedLocation(tableName, failedPut.getRow());
- }
-
// retry the failed ones after sleep
list.addAll(failed);
- long sleepTime = getPauseTime(tries);
- LOG.debug("processBatchOfPuts had some failures, sleeping for " + sleepTime +
+
+ // Do not sleep the first time. The region might have moved.
+ if (tries <= 1) continue;
+
+ long pauseTime = getPauseTime(tries);
+ if ((System.currentTimeMillis() - callStartTime + pauseTime) >
+ rpcRetryTimeout) {
+ break; // we will throw a RetriesExhaustedException
+ }
+ LOG.debug("processBatchOfPuts had some failures, sleeping for " + pauseTime +
" ms!");
try {
- Thread.sleep(sleepTime);
+ Thread.sleep(pauseTime);
} catch (InterruptedException ignored) {
- // ignore here
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException();
}
}
}
// Get exhausted after the retries
if (!list.isEmpty()) {
- if (singletonList && singleRowCause != null) {
- throw new IOException("Exhaused retrying " + numRetries +
- " and the last exception caught is " + singleRowCause);
- }
-
// ran out of retries and didnt succeed everything!
throw new RetriesExhaustedException("Still had " + list.size() +
" puts left after retrying " +
- numRetries + " times.");
+ tries + " times, in " + (System.currentTimeMillis() - callStartTime) + " ms.");
}
}
@@ -2214,7 +2221,12 @@ public class HConnectionManager {
@Override
public void instantiateRegionLocation(boolean reload)
throws IOException {
- // location is already cached
+ // we don't need to locate the region, since we have been given the address of the
+ // server. But, let us store the information in this.location, so that
+ // we can handle failures (i.e. clear cache) if we fail to connect to the RS.
+ // (see HConnectionManager.getRegionServerWithRetries for how it is used to
+ // call deleteCachedLocation.)
+ this.location = new HRegionLocation(null, address);
}
@Override
public void instantiateServer() throws IOException {
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java?rev=1365690&r1=1365689&r2=1365690&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java Wed Jul 25 18:32:54 2012
@@ -341,12 +341,12 @@ public class HTableMultiplexer {
return queue.size() + currentProcessingPutCount.get();
}
- private boolean resubmitFailedPut(PutStatus failedPutStatus) throws IOException{
+ private boolean resubmitFailedPut(PutStatus failedPutStatus, HServerAddress oldLoc) throws IOException{
Put failedPut = failedPutStatus.getPut();
// The currentPut is failed. So get the table name for the currentPut.
byte[] tableName = failedPutStatus.getRegionInfo().getTableDesc().getName();
// Clear the cached location for the failed puts
- this.connection.deleteCachedLocation(tableName, failedPut.getRow());
+ this.connection.deleteCachedLocation(tableName, failedPut.getRow(), oldLoc);
// Decrease the retry count
int retryCount = failedPutStatus.getRetryCount() - 1;
@@ -409,14 +409,15 @@ public class HTableMultiplexer {
if (failed.size() == processingList.size()) {
// All the puts for this region server are failed. Going to retry it later
for (PutStatus putStatus: processingList) {
- if (!resubmitFailedPut(putStatus)) {
+ if (!resubmitFailedPut(putStatus, this.addr)) {
failedCount++;
}
}
} else {
Set<Put> failedPutSet = new HashSet<Put>(failed);
for (PutStatus putStatus: processingList) {
- if (failedPutSet.contains(putStatus.getPut()) && !resubmitFailedPut(putStatus)) {
+ if (failedPutSet.contains(putStatus.getPut())
+ && !resubmitFailedPut(putStatus, this.addr)) {
failedCount++;
}
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java?rev=1365690&r1=1365689&r2=1365690&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java Wed Jul 25 18:32:54 2012
@@ -37,7 +37,6 @@ import java.io.IOException;
*/
public class ScannerCallable extends ServerCallable<Result[]> {
private long scannerId = -1L;
- private boolean instantiated = false;
private boolean closed = false;
private Scan scan;
private int caching = 1;
@@ -53,25 +52,6 @@ public class ScannerCallable extends Ser
this.scan = scan;
}
- @Override
- public void instantiateRegionLocation(boolean reload) throws IOException {
- if (!instantiated || reload) {
- super.instantiateRegionLocation(reload);
- instantiated = false;
- }
- }
- /**
- * @param reload force reload of server location
- * @throws IOException
- */
- @Override
- public void instantiateServer() throws IOException {
- if (!instantiated) {
- super.instantiateServer();
- instantiated = true;
- }
- }
-
/**
* @see java.util.concurrent.Callable#call()
*/
@@ -137,7 +117,7 @@ public class ScannerCallable extends Ser
* @return the HRegionInfo for the current region
*/
public HRegionInfo getHRegionInfo() {
- if (!instantiated) {
+ if (location == null) {
return null;
}
return location.getRegionInfo();
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1365690&r1=1365689&r2=1365690&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Jul 25 18:32:54 2012
@@ -2016,15 +2016,8 @@ public class HRegion implements HeapSize
// Check the families in the put. If bad, skip this one.
if (op instanceof Put) {
- try {
- checkFamilies(op.getFamilyMap().keySet());
- checkTimestamps(op, now);
- } catch (DoNotRetryIOException dnrioe) {
- LOG.warn("Sanity check error in batch processing", dnrioe);
- batchOp.retCodes[lastIndexExclusive] = OperationStatusCode.SANITY_CHECK_FAILURE;
- lastIndexExclusive++;
- continue;
- }
+ checkFamilies(op.getFamilyMap().keySet());
+ checkTimestamps(op, now);
}
// If we haven't got any rows in our batch, we should block to
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java?rev=1365690&r1=1365689&r2=1365690&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java Wed Jul 25 18:32:54 2012
@@ -58,7 +58,7 @@ public class TestHCM {
HConnectionManager.TableServers conn =
(HConnectionManager.TableServers) table.getConnection();
assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
- conn.deleteCachedLocation(TABLE_NAME, ROW);
+ conn.deleteCachedLocation(TABLE_NAME, ROW, null);
HRegionLocation rl = conn.getCachedLocation(TABLE_NAME, ROW);
assertNull("What is this location?? " + rl, rl);
}
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1365690&r1=1365689&r2=1365690&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Wed Jul 25 18:32:54 2012
@@ -376,14 +376,18 @@ public class TestHRegion extends HBaseTe
LOG.info("Next a batch put with one invalid family");
puts[5].add(Bytes.toBytes("BAD_CF"), qual, val);
- codes = this.region.put(puts);
- assertEquals(10, codes.length);
- for (int i = 0; i < 10; i++) {
- assertEquals((i == 5) ? OperationStatusCode.SANITY_CHECK_FAILURE :
- OperationStatusCode.SUCCESS, codes[i]);
+ try {
+ codes = this.region.put(puts);
+
+ // put should throw an exception
+ assertTrue(false);
+ } catch(IOException e) {
+ // continue
}
- assertEquals(1, HLog.getSyncTime().count);
+ // let make the puts good again.
+ puts[5] = new Put(Bytes.toBytes("row_" + 5));
+ puts[5].add(cf, qual, val);
LOG.info("Next a batch put that has to break into two batches to avoid a lock");
Integer lockedRow = region.obtainRowLock(Bytes.toBytes("row_2"));
@@ -417,8 +421,7 @@ public class TestHRegion extends HBaseTe
assertEquals(1, HLog.getSyncTime().count);
codes = retFromThread.get();
for (int i = 0; i < 10; i++) {
- assertEquals((i == 5) ? OperationStatusCode.SANITY_CHECK_FAILURE :
- OperationStatusCode.SUCCESS, codes[i]);
+ assertEquals(OperationStatusCode.SUCCESS, codes[i]);
}
LOG.info("Nexta, a batch put which uses an already-held lock");
@@ -434,8 +437,7 @@ public class TestHRegion extends HBaseTe
codes = region.batchMutateWithLocks(putsAndLocks.toArray(new Pair[0]), "multiput_");
LOG.info("...performed put");
for (int i = 0; i < 10; i++) {
- assertEquals((i == 5) ? OperationStatusCode.SANITY_CHECK_FAILURE :
- OperationStatusCode.SUCCESS, codes[i]);
+ assertEquals(OperationStatusCode.SUCCESS, codes[i]);
}
// Make sure we didn't do an extra batch
assertEquals(1, HLog.getSyncTime().count);
@@ -2775,24 +2777,24 @@ public class TestHRegion extends HBaseTe
int version = 0;
for (int f =0 ; f < num_storefiles; f++) {
- for (int i = 0; i < duplicate_multiplier; i ++) {
- for (int j = 0; j < num_unique_rows; j++) {
- Put put = new Put(Bytes.toBytes("row" + j));
- put.add(fam1, qf1, version++, val1);
+ for (int i = 0; i < duplicate_multiplier; i ++) {
+ for (int j = 0; j < num_unique_rows; j++) {
+ Put put = new Put(Bytes.toBytes("row" + j));
+ put.add(fam1, qf1, version++, val1);
region.put(put);
- }
- }
- region.flushcache();
+ }
+ }
+ region.flushcache();
}
//before compaction
Store store = region.getStore(fam1);
List<StoreFile> storeFiles = store.getStorefiles();
for (StoreFile storefile : storeFiles) {
- StoreFile.Reader reader = storefile.getReader();
- reader.loadFileInfo();
+ StoreFile.Reader reader = storefile.getReader();
+ reader.loadFileInfo();
reader.loadBloomfilter();
assertEquals(num_unique_rows * duplicate_multiplier, reader.getEntries());
- assertEquals(num_unique_rows, reader.getFilterEntries());
+ assertEquals(num_unique_rows, reader.getFilterEntries());
}
region.compactStores(true);
@@ -2800,10 +2802,10 @@ public class TestHRegion extends HBaseTe
//after compaction
storeFiles = store.getStorefiles();
for (StoreFile storefile : storeFiles) {
- StoreFile.Reader reader = storefile.getReader();
- reader.loadFileInfo();
+ StoreFile.Reader reader = storefile.getReader();
+ reader.loadFileInfo();
reader.loadBloomfilter();
- assertEquals(num_unique_rows * duplicate_multiplier * num_storefiles,
+ assertEquals(num_unique_rows * duplicate_multiplier * num_storefiles,
reader.getEntries());
assertEquals(num_unique_rows, reader.getFilterEntries());
}