You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nk...@apache.org on 2014/06/14 08:46:26 UTC

git commit: HBASE-11347 For some errors, the client can retry infinitely

Repository: hbase
Updated Branches:
  refs/heads/master 84ed7cf64 -> 3fa92647d


HBASE-11347 For some errors, the client can retry infinitely


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3fa92647
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3fa92647
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3fa92647

Branch: refs/heads/master
Commit: 3fa92647d24dc87cc958b33c5becd8dda16d1326
Parents: 84ed7cf
Author: Nicolas Liochon <li...@gmail.com>
Authored: Sat Jun 14 08:45:07 2014 +0200
Committer: Nicolas Liochon <li...@gmail.com>
Committed: Sat Jun 14 08:45:07 2014 +0200

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncProcess.java       |  12 +-
 .../hadoop/hbase/client/TestAsyncProcess.java   | 138 +++++++++++++------
 2 files changed, 98 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/3fa92647/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 433322f..fb3612f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -522,9 +522,6 @@ class AsyncProcess {
     private final Object[] results;
     private final long nonceGroup;
 
-    @VisibleForTesting
-    protected AtomicInteger hardRetryLimit = null; // used for tests to stop retries.
-
     public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
         ExecutorService pool, boolean needResults, Object[] results,
         Batch.Callback<CResult> callback) {
@@ -558,7 +555,7 @@ class AsyncProcess {
       final Map<ServerName, MultiAction<Row>> actionsByServer =
           new HashMap<ServerName, MultiAction<Row>>();
 
-      HRegionLocation loc = null;
+      HRegionLocation loc;
       for (Action<Row> action : currentActions) {
         try {
           loc = findDestLocation(tableName, action.getAction());
@@ -661,10 +658,6 @@ class AsyncProcess {
         canRetry = false;
       }
 
-      if (canRetry && hardRetryLimit != null) {
-        canRetry = hardRetryLimit.decrementAndGet() >= 0;
-      }
-
       if (!canRetry) {
         // Batch.Callback<Res> was not called on failure in 0.94. We keep this.
         errors.add(throwable, row, server);
@@ -692,11 +685,12 @@ class AsyncProcess {
       byte[] row = rsActions.actions.values().iterator().next().get(0).getAction().getRow();
       hConnection.updateCachedLocations(tableName, row, null, server);
       errorsByServer.reportServerError(server);
+      boolean canRetry = errorsByServer.canRetryMore(numAttempt);
 
       List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
       for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
         for (Action<Row> action : e.getValue()) {
-          if (manageError(action.getOriginalIndex(), action.getAction(), true, t, server)) {
+          if (manageError(action.getOriginalIndex(), action.getAction(), canRetry, t, server)) {
             toReplay.add(action);
           }
         }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3fa92647/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index c31e451..edffd18 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.mockito.Mockito;
@@ -45,6 +46,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -77,10 +79,32 @@ public class TestAsyncProcess {
   private static final String success = "success";
   private static Exception failure = new Exception("failure");
 
+  private static int NB_RETRIES = 3;
+
+  @BeforeClass
+  public static void beforeClass(){
+    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, NB_RETRIES);
+  }
+
+  static class CountingThreadFactory implements ThreadFactory {
+    final AtomicInteger nbThreads;
+    ThreadFactory realFactory =  Threads.newDaemonThreadFactory("test-TestAsyncProcess");
+    @Override
+    public Thread newThread(Runnable r) {
+      nbThreads.incrementAndGet();
+      return realFactory.newThread(r);
+    }
+
+    CountingThreadFactory(AtomicInteger nbThreads){
+      this.nbThreads = nbThreads;
+    }
+  }
+
   static class MyAsyncProcess extends AsyncProcess {
     final AtomicInteger nbMultiResponse = new AtomicInteger();
     final AtomicInteger nbActions = new AtomicInteger();
     public List<AsyncRequestFuture> allReqs = new ArrayList<AsyncRequestFuture>();
+    public AtomicInteger callsCt = new AtomicInteger();
 
     @Override
     protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName,
@@ -89,36 +113,11 @@ public class TestAsyncProcess {
       // Test HTable has tableName of null, so pass DUMMY_TABLE
       AsyncRequestFutureImpl<Res> r = super.createAsyncRequestFuture(
           DUMMY_TABLE, actions, nonceGroup, pool, callback, results, needResults);
-      r.hardRetryLimit = new AtomicInteger(1);
       allReqs.add(r);
+      callsCt.incrementAndGet();
       return r;
     }
 
-    @SuppressWarnings("unchecked")
-    public long getRetriesRequested() {
-      long result = 0;
-      for (AsyncRequestFuture ars : allReqs) {
-        if (ars instanceof AsyncProcess.AsyncRequestFutureImpl) {
-          result += (1 - ((AsyncRequestFutureImpl<?>)ars).hardRetryLimit.get());
-        }
-      }
-      return result;
-    }
-
-    static class CountingThreadFactory implements ThreadFactory {
-      final AtomicInteger nbThreads;
-      ThreadFactory realFactory =  Threads.newDaemonThreadFactory("test-TestAsyncProcess");
-      @Override
-      public Thread newThread(Runnable r) {
-        nbThreads.incrementAndGet();
-        return realFactory.newThread(r);
-      }
-
-      CountingThreadFactory(AtomicInteger nbThreads){
-        this.nbThreads = nbThreads;
-      }
-    }
-
     public MyAsyncProcess(ClusterConnection hc, Configuration conf) {
       this(hc, conf, new AtomicInteger());
     }
@@ -136,6 +135,17 @@ public class TestAsyncProcess {
           new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
     }
 
+    public MyAsyncProcess(
+        ClusterConnection hc, Configuration conf, boolean useGlobalErrors, boolean dummy) {
+      super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
+              new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())) {
+        public void execute(Runnable command) {
+          throw new RejectedExecutionException("test under failure");
+        }
+      },
+          new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
+    }
+
     @Override
     public <Res> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows,
         boolean atLeastOne, Callback<Res> callback, boolean needResults)
@@ -146,6 +156,7 @@ public class TestAsyncProcess {
 
     @Override
     protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
+      callsCt.incrementAndGet();
       final MultiResponse mr = createMultiResponse(
           callable.getMulti(), nbMultiResponse, nbActions);
       return new RpcRetryingCaller<MultiResponse>(100, 10) {
@@ -166,6 +177,33 @@ public class TestAsyncProcess {
     }
   }
 
+  static class CallerWithFailure extends RpcRetryingCaller<MultiResponse>{
+
+    public CallerWithFailure() {
+      super(100, 100);
+    }
+
+    @Override
+    public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout)
+        throws IOException, RuntimeException {
+      throw new IOException("test");
+    }
+  }
+
+  static class AsyncProcessWithFailure extends MyAsyncProcess {
+
+    public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf) {
+      super(hc, conf, true);
+      serverTrackerTimeout = 1;
+    }
+
+    @Override
+    protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
+      callsCt.incrementAndGet();
+      return new CallerWithFailure();
+    }
+  }
+
   static MultiResponse createMultiResponse(
       final MultiAction<Row> multi, AtomicInteger nbMultiResponse, AtomicInteger nbActions) {
     final MultiResponse mr = new MultiResponse();
@@ -188,15 +226,7 @@ public class TestAsyncProcess {
    */
   static class MyConnectionImpl extends ConnectionManager.HConnectionImplementation {
     final AtomicInteger nbThreads = new AtomicInteger(0);
-    final static Configuration c = new Configuration();
-
-    static {
-      c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
-    }
 
-    protected MyConnectionImpl() {
-      super(c);
-    }
 
     protected MyConnectionImpl(Configuration conf) {
       super(conf);
@@ -217,7 +247,7 @@ public class TestAsyncProcess {
     final boolean usedRegions[];
 
     protected MyConnectionImpl2(List<HRegionLocation> hrl) {
-      super(c);
+      super(conf);
       this.hrl = hrl;
       this.usedRegions = new boolean[hrl.size()];
     }
@@ -320,7 +350,7 @@ public class TestAsyncProcess {
     Assert.assertEquals(0, puts.size());
     ars.waitUntilDone();
     verifyResult(ars, false);
-    Assert.assertEquals(2L, ap.getRetriesRequested());
+    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
 
     Assert.assertEquals(1, ars.getErrors().exceptions.size());
     Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0),
@@ -386,7 +416,8 @@ public class TestAsyncProcess {
     Assert.assertTrue(puts.isEmpty());
     ars.waitUntilDone();
     verifyResult(ars, false, true, true);
-    Assert.assertEquals(2, ap.getRetriesRequested());
+    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
+    ap.callsCt.set(0);
     Assert.assertEquals(1, ars.getErrors().actions.size());
 
     puts.add(createPut(1, true));
@@ -395,7 +426,7 @@ public class TestAsyncProcess {
     ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
     Assert.assertEquals(0, puts.size());
     ars.waitUntilDone();
-    Assert.assertEquals(2, ap.getRetriesRequested());
+    Assert.assertEquals(2, ap.callsCt.get());
     verifyResult(ars, true);
   }
 
@@ -411,7 +442,7 @@ public class TestAsyncProcess {
     AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
     ars.waitUntilDone();
     verifyResult(ars, false, true, true);
-    Assert.assertEquals(2, ap.getRetriesRequested());
+    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
 
     Assert.assertEquals(1, ars.getFailedOperations().size());
   }
@@ -608,7 +639,7 @@ public class TestAsyncProcess {
   @Test
   public void testBatch() throws IOException, InterruptedException {
     HTable ht = new HTable();
-    ht.connection = new MyConnectionImpl();
+    ht.connection = new MyConnectionImpl(conf);
     ht.multiAp = new MyAsyncProcess(ht.connection, conf, false);
 
     List<Put> puts = new ArrayList<Put>();
@@ -641,12 +672,11 @@ public class TestAsyncProcess {
     HTable ht = new HTable();
     Configuration configuration = new Configuration(conf);
     configuration.setBoolean(ConnectionManager.RETRIES_BY_SERVER_KEY, true);
-    configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
     // set default writeBufferSize
     ht.setWriteBufferSize(configuration.getLong("hbase.client.write.buffer", 2097152));
 
     ht.connection = new MyConnectionImpl(configuration);
-    MyAsyncProcess ap = new MyAsyncProcess(ht.connection, conf, true);
+    MyAsyncProcess ap = new MyAsyncProcess(ht.connection, configuration, true);
     ht.ap = ap;
 
     Assert.assertNotNull(ht.ap.createServerErrorTracker());
@@ -663,7 +693,29 @@ public class TestAsyncProcess {
     } catch (RetriesExhaustedWithDetailsException expected) {
     }
     // Checking that the ErrorsServers came into play and didn't make us stop immediately
-    Assert.assertEquals(2, ap.getRetriesRequested());
+    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
+  }
+
+  @Test
+  public void testGlobalErrors() throws IOException {
+    HTable ht = new HTable();
+    ht.connection = new MyConnectionImpl(conf);
+    AsyncProcessWithFailure ap = new AsyncProcessWithFailure(ht.connection, conf);
+    ht.ap = ap;
+
+    Assert.assertNotNull(ht.ap.createServerErrorTracker());
+
+    Put p = createPut(1, true);
+    ht.setAutoFlush(false, false);
+    ht.put(p);
+
+    try {
+      ht.flushCommits();
+      Assert.fail();
+    } catch (RetriesExhaustedWithDetailsException expected) {
+    }
+    // Checking that the ErrorsServers came into play and didn't make us stop immediately
+    Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
   }
 
   /**