You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nk...@apache.org on 2014/06/14 08:46:26 UTC
git commit: HBASE-11347 For some errors,
the client can retry infinitely
Repository: hbase
Updated Branches:
refs/heads/master 84ed7cf64 -> 3fa92647d
HBASE-11347 For some errors, the client can retry infinitely
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3fa92647
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3fa92647
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3fa92647
Branch: refs/heads/master
Commit: 3fa92647d24dc87cc958b33c5becd8dda16d1326
Parents: 84ed7cf
Author: Nicolas Liochon <li...@gmail.com>
Authored: Sat Jun 14 08:45:07 2014 +0200
Committer: Nicolas Liochon <li...@gmail.com>
Committed: Sat Jun 14 08:45:07 2014 +0200
----------------------------------------------------------------------
.../hadoop/hbase/client/AsyncProcess.java | 12 +-
.../hadoop/hbase/client/TestAsyncProcess.java | 138 +++++++++++++------
2 files changed, 98 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/3fa92647/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 433322f..fb3612f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -522,9 +522,6 @@ class AsyncProcess {
private final Object[] results;
private final long nonceGroup;
- @VisibleForTesting
- protected AtomicInteger hardRetryLimit = null; // used for tests to stop retries.
-
public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
ExecutorService pool, boolean needResults, Object[] results,
Batch.Callback<CResult> callback) {
@@ -558,7 +555,7 @@ class AsyncProcess {
final Map<ServerName, MultiAction<Row>> actionsByServer =
new HashMap<ServerName, MultiAction<Row>>();
- HRegionLocation loc = null;
+ HRegionLocation loc;
for (Action<Row> action : currentActions) {
try {
loc = findDestLocation(tableName, action.getAction());
@@ -661,10 +658,6 @@ class AsyncProcess {
canRetry = false;
}
- if (canRetry && hardRetryLimit != null) {
- canRetry = hardRetryLimit.decrementAndGet() >= 0;
- }
-
if (!canRetry) {
// Batch.Callback<Res> was not called on failure in 0.94. We keep this.
errors.add(throwable, row, server);
@@ -692,11 +685,12 @@ class AsyncProcess {
byte[] row = rsActions.actions.values().iterator().next().get(0).getAction().getRow();
hConnection.updateCachedLocations(tableName, row, null, server);
errorsByServer.reportServerError(server);
+ boolean canRetry = errorsByServer.canRetryMore(numAttempt);
List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
for (Action<Row> action : e.getValue()) {
- if (manageError(action.getOriginalIndex(), action.getAction(), true, t, server)) {
+ if (manageError(action.getOriginalIndex(), action.getAction(), canRetry, t, server)) {
toReplay.add(action);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3fa92647/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index c31e451..edffd18 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
@@ -45,6 +46,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@@ -77,10 +79,32 @@ public class TestAsyncProcess {
private static final String success = "success";
private static Exception failure = new Exception("failure");
+ private static int NB_RETRIES = 3;
+
+ @BeforeClass
+ public static void beforeClass(){
+ conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, NB_RETRIES);
+ }
+
+ static class CountingThreadFactory implements ThreadFactory {
+ final AtomicInteger nbThreads;
+ ThreadFactory realFactory = Threads.newDaemonThreadFactory("test-TestAsyncProcess");
+ @Override
+ public Thread newThread(Runnable r) {
+ nbThreads.incrementAndGet();
+ return realFactory.newThread(r);
+ }
+
+ CountingThreadFactory(AtomicInteger nbThreads){
+ this.nbThreads = nbThreads;
+ }
+ }
+
static class MyAsyncProcess extends AsyncProcess {
final AtomicInteger nbMultiResponse = new AtomicInteger();
final AtomicInteger nbActions = new AtomicInteger();
public List<AsyncRequestFuture> allReqs = new ArrayList<AsyncRequestFuture>();
+ public AtomicInteger callsCt = new AtomicInteger();
@Override
protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName,
@@ -89,36 +113,11 @@ public class TestAsyncProcess {
// Test HTable has tableName of null, so pass DUMMY_TABLE
AsyncRequestFutureImpl<Res> r = super.createAsyncRequestFuture(
DUMMY_TABLE, actions, nonceGroup, pool, callback, results, needResults);
- r.hardRetryLimit = new AtomicInteger(1);
allReqs.add(r);
+ callsCt.incrementAndGet();
return r;
}
- @SuppressWarnings("unchecked")
- public long getRetriesRequested() {
- long result = 0;
- for (AsyncRequestFuture ars : allReqs) {
- if (ars instanceof AsyncProcess.AsyncRequestFutureImpl) {
- result += (1 - ((AsyncRequestFutureImpl<?>)ars).hardRetryLimit.get());
- }
- }
- return result;
- }
-
- static class CountingThreadFactory implements ThreadFactory {
- final AtomicInteger nbThreads;
- ThreadFactory realFactory = Threads.newDaemonThreadFactory("test-TestAsyncProcess");
- @Override
- public Thread newThread(Runnable r) {
- nbThreads.incrementAndGet();
- return realFactory.newThread(r);
- }
-
- CountingThreadFactory(AtomicInteger nbThreads){
- this.nbThreads = nbThreads;
- }
- }
-
public MyAsyncProcess(ClusterConnection hc, Configuration conf) {
this(hc, conf, new AtomicInteger());
}
@@ -136,6 +135,17 @@ public class TestAsyncProcess {
new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
}
+ public MyAsyncProcess(
+ ClusterConnection hc, Configuration conf, boolean useGlobalErrors, boolean dummy) {
+ super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())) {
+ public void execute(Runnable command) {
+ throw new RejectedExecutionException("test under failure");
+ }
+ },
+ new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
+ }
+
@Override
public <Res> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows,
boolean atLeastOne, Callback<Res> callback, boolean needResults)
@@ -146,6 +156,7 @@ public class TestAsyncProcess {
@Override
protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
+ callsCt.incrementAndGet();
final MultiResponse mr = createMultiResponse(
callable.getMulti(), nbMultiResponse, nbActions);
return new RpcRetryingCaller<MultiResponse>(100, 10) {
@@ -166,6 +177,33 @@ public class TestAsyncProcess {
}
}
+ static class CallerWithFailure extends RpcRetryingCaller<MultiResponse>{
+
+ public CallerWithFailure() {
+ super(100, 100);
+ }
+
+ @Override
+ public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout)
+ throws IOException, RuntimeException {
+ throw new IOException("test");
+ }
+ }
+
+ static class AsyncProcessWithFailure extends MyAsyncProcess {
+
+ public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf) {
+ super(hc, conf, true);
+ serverTrackerTimeout = 1;
+ }
+
+ @Override
+ protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
+ callsCt.incrementAndGet();
+ return new CallerWithFailure();
+ }
+ }
+
static MultiResponse createMultiResponse(
final MultiAction<Row> multi, AtomicInteger nbMultiResponse, AtomicInteger nbActions) {
final MultiResponse mr = new MultiResponse();
@@ -188,15 +226,7 @@ public class TestAsyncProcess {
*/
static class MyConnectionImpl extends ConnectionManager.HConnectionImplementation {
final AtomicInteger nbThreads = new AtomicInteger(0);
- final static Configuration c = new Configuration();
-
- static {
- c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
- }
- protected MyConnectionImpl() {
- super(c);
- }
protected MyConnectionImpl(Configuration conf) {
super(conf);
@@ -217,7 +247,7 @@ public class TestAsyncProcess {
final boolean usedRegions[];
protected MyConnectionImpl2(List<HRegionLocation> hrl) {
- super(c);
+ super(conf);
this.hrl = hrl;
this.usedRegions = new boolean[hrl.size()];
}
@@ -320,7 +350,7 @@ public class TestAsyncProcess {
Assert.assertEquals(0, puts.size());
ars.waitUntilDone();
verifyResult(ars, false);
- Assert.assertEquals(2L, ap.getRetriesRequested());
+ Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
Assert.assertEquals(1, ars.getErrors().exceptions.size());
Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0),
@@ -386,7 +416,8 @@ public class TestAsyncProcess {
Assert.assertTrue(puts.isEmpty());
ars.waitUntilDone();
verifyResult(ars, false, true, true);
- Assert.assertEquals(2, ap.getRetriesRequested());
+ Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
+ ap.callsCt.set(0);
Assert.assertEquals(1, ars.getErrors().actions.size());
puts.add(createPut(1, true));
@@ -395,7 +426,7 @@ public class TestAsyncProcess {
ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
Assert.assertEquals(0, puts.size());
ars.waitUntilDone();
- Assert.assertEquals(2, ap.getRetriesRequested());
+ Assert.assertEquals(2, ap.callsCt.get());
verifyResult(ars, true);
}
@@ -411,7 +442,7 @@ public class TestAsyncProcess {
AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
ars.waitUntilDone();
verifyResult(ars, false, true, true);
- Assert.assertEquals(2, ap.getRetriesRequested());
+ Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
Assert.assertEquals(1, ars.getFailedOperations().size());
}
@@ -608,7 +639,7 @@ public class TestAsyncProcess {
@Test
public void testBatch() throws IOException, InterruptedException {
HTable ht = new HTable();
- ht.connection = new MyConnectionImpl();
+ ht.connection = new MyConnectionImpl(conf);
ht.multiAp = new MyAsyncProcess(ht.connection, conf, false);
List<Put> puts = new ArrayList<Put>();
@@ -641,12 +672,11 @@ public class TestAsyncProcess {
HTable ht = new HTable();
Configuration configuration = new Configuration(conf);
configuration.setBoolean(ConnectionManager.RETRIES_BY_SERVER_KEY, true);
- configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
// set default writeBufferSize
ht.setWriteBufferSize(configuration.getLong("hbase.client.write.buffer", 2097152));
ht.connection = new MyConnectionImpl(configuration);
- MyAsyncProcess ap = new MyAsyncProcess(ht.connection, conf, true);
+ MyAsyncProcess ap = new MyAsyncProcess(ht.connection, configuration, true);
ht.ap = ap;
Assert.assertNotNull(ht.ap.createServerErrorTracker());
@@ -663,7 +693,29 @@ public class TestAsyncProcess {
} catch (RetriesExhaustedWithDetailsException expected) {
}
// Checking that the ErrorsServers came into play and didn't make us stop immediately
- Assert.assertEquals(2, ap.getRetriesRequested());
+ Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
+ }
+
+ @Test
+ public void testGlobalErrors() throws IOException {
+ HTable ht = new HTable();
+ ht.connection = new MyConnectionImpl(conf);
+ AsyncProcessWithFailure ap = new AsyncProcessWithFailure(ht.connection, conf);
+ ht.ap = ap;
+
+ Assert.assertNotNull(ht.ap.createServerErrorTracker());
+
+ Put p = createPut(1, true);
+ ht.setAutoFlush(false, false);
+ ht.put(p);
+
+ try {
+ ht.flushCommits();
+ Assert.fail();
+ } catch (RetriesExhaustedWithDetailsException expected) {
+ }
+ // Checking that the ErrorsServers came into play and didn't make us stop immediately
+ Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
}
/**