You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2015/07/11 01:06:27 UTC

hbase git commit: HBASE-13997 ScannerCallableWithReplicas cause Infinitely blocking (Zephyr Guo and Enis)

Repository: hbase
Updated Branches:
  refs/heads/master c16bbf47c -> 5e708746b


HBASE-13997 ScannerCallableWithReplicas cause Infinitely blocking (Zephyr Guo and Enis)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5e708746
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5e708746
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5e708746

Branch: refs/heads/master
Commit: 5e708746b8d301c2fb22a85b8756129147012374
Parents: c16bbf4
Author: Enis Soztutar <en...@apache.org>
Authored: Fri Jul 10 16:06:29 2015 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Fri Jul 10 16:06:29 2015 -0700

----------------------------------------------------------------------
 .../client/ScannerCallableWithReplicas.java     | 55 +++++----------
 .../hadoop/hbase/client/TestClientScanner.java  | 73 +++++++++++++++++++-
 2 files changed, 88 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/5e708746/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index 1708efe..586db8c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -22,9 +22,7 @@ import static org.apache.hadoop.hbase.client.ClientScanner.createClosestRowBefor
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
-import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
@@ -158,15 +156,13 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
     // We want to accomodate some RPCs for redundant replica scans (but are still in progress)
     ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
         new ResultBoundedCompletionService<Pair<Result[], ScannerCallable>>(
-            new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf), pool,
+            RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf), pool,
             rl.size() * 5);
 
-    List<ExecutionException> exceptions = null;
-    int submitted = 0, completed = 0;
     AtomicBoolean done = new AtomicBoolean(false);
     replicaSwitched.set(false);
     // submit call for the primary replica.
-    submitted += addCallsForCurrentReplica(cs, rl);
+    addCallsForCurrentReplica(cs, rl);
     try {
       // wait for the timeout to see whether the primary responds back
       Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeBeforeReplicas,
@@ -179,11 +175,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
         return r == null ? null : r.getFirst(); //great we got a response
       }
     } catch (ExecutionException e) {
-      // the primary call failed with RetriesExhaustedException or DoNotRetryIOException
-      // but the secondaries might still succeed. Continue on the replica RPCs.
-      exceptions = new ArrayList<ExecutionException>(rl.size());
-      exceptions.add(e);
-      completed++;
+      RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
     } catch (CancellationException e) {
       throw new InterruptedIOException(e.getMessage());
     } catch (InterruptedException e) {
@@ -191,24 +183,16 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
     }
     // submit call for the all of the secondaries at once
     // TODO: this may be an overkill for large region replication
-    submitted += addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1);
+    addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1);
     try {
-      while (completed < submitted) {
-        try {
-          Future<Pair<Result[], ScannerCallable>> f = cs.take();
-          Pair<Result[], ScannerCallable> r = f.get();
-          if (r != null && r.getSecond() != null) {
-            updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
-          }
-          return r == null ? null : r.getFirst(); // great we got an answer
-        } catch (ExecutionException e) {
-          // if not cancel or interrupt, wait until all RPC's are done
-          // one of the tasks failed. Save the exception for later.
-          if (exceptions == null) exceptions = new ArrayList<ExecutionException>(rl.size());
-          exceptions.add(e);
-          completed++;
-        }
+      Future<Pair<Result[], ScannerCallable>> f = cs.take();
+      Pair<Result[], ScannerCallable> r = f.get();
+      if (r != null && r.getSecond() != null) {
+        updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
       }
+      return r == null ? null : r.getFirst(); // great we got an answer
+    } catch (ExecutionException e) {
+      RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
     } catch (CancellationException e) {
       throw new InterruptedIOException(e.getMessage());
     } catch (InterruptedException e) {
@@ -218,11 +202,6 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
       // calls succeeded or failed. In all case, we stop all our tasks.
       cs.cancelAll();
     }
-
-    if (exceptions != null && !exceptions.isEmpty()) {
-      RpcRetryingCallerWithReadReplicas.throwEnrichedException(exceptions.get(0),
-          retries); // just rethrow the first exception for now.
-    }
     return null; // unreachable
   }
 
@@ -283,19 +262,18 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
     return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage();
   }
 
-  private int addCallsForCurrentReplica(
+  private void addCallsForCurrentReplica(
       ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) {
     RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
     outstandingCallables.add(currentScannerCallable);
     cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id);
-    return 1;
   }
 
-  private int addCallsForOtherReplicas(
+  private void addCallsForOtherReplicas(
       ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl,
       int min, int max) {
     if (scan.getConsistency() == Consistency.STRONG) {
-      return 0; // not scheduling on other replicas for strong consistency
+      return; // not scheduling on other replicas for strong consistency
     }
     for (int id = min; id <= max; id++) {
       if (currentScannerCallable.id == id) {
@@ -307,7 +285,6 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
       RetryingRPC retryingOnReplica = new RetryingRPC(s);
       cs.submit(retryingOnReplica, scannerTimeout, id);
     }
-    return max - min + 1;
   }
 
   /**
@@ -354,8 +331,8 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
       // and we can't invoke it multiple times at the same time)
       this.caller = ScannerCallableWithReplicas.this.caller;
       if (scan.getConsistency() == Consistency.TIMELINE) {
-        this.caller = new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf).
-            <Result[]>newCaller();
+        this.caller = RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf)
+            .<Result[]>newCaller();
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/5e708746/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
index 3f406df..6d7cc7f 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
@@ -21,9 +21,14 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
-import java.util.LinkedList;
+import java.util.Iterator;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -32,6 +37,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -496,4 +502,69 @@ public class TestClientScanner {
       assertFalse(cs.advance());
     }
   }
+
+  /**
+   * Tests the case where all replicas of a region throw an exception. It should not cause a hang
+   * but the exception should propagate to the client
+   */
+  @Test (timeout = 30000)
+  public void testExceptionsFromReplicasArePropagated() throws IOException {
+    scan.setConsistency(Consistency.TIMELINE);
+
+    // Mock a caller which calls the callable for ScannerCallableWithReplicas,
+    // but throws an exception for the actual scanner calls via callWithRetries.
+    rpcFactory = new MockRpcRetryingCallerFactory(conf);
+    conf.set(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY,
+      MockRpcRetryingCallerFactory.class.getName());
+
+    // mock 3 replica locations
+    when(clusterConn.locateRegion((TableName)any(), (byte[])any(), anyBoolean(),
+      anyBoolean(), anyInt())).thenReturn(new RegionLocations(null, null, null));
+
+    try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"),
+      clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
+      Iterator<Result> iter = scanner.iterator();
+      while (iter.hasNext()) {
+        iter.next();
+      }
+      fail("Should have failed with RetriesExhaustedException");
+    } catch (RetriesExhaustedException expected) {
+
+    }
+  }
+
+  public static class MockRpcRetryingCallerFactory extends RpcRetryingCallerFactory {
+
+    public MockRpcRetryingCallerFactory(Configuration conf) {
+      super(conf);
+    }
+
+    @Override
+    public <T> RpcRetryingCaller<T> newCaller() {
+      return new RpcRetryingCaller<T>() {
+        @Override
+        public void cancel() {
+        }
+        @Override
+        public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
+            throws IOException, RuntimeException {
+          throw new IOException("Scanner exception");
+        }
+
+        @Override
+        public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
+            throws IOException, RuntimeException {
+          try {
+            return callable.call(callTimeout);
+          } catch (IOException e) {
+            throw e;
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        }
+      };
+    }
+
+  }
+
 }