You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2015/07/11 01:06:27 UTC
hbase git commit: HBASE-13997 ScannerCallableWithReplicas cause
Infinitely blocking (Zephyr Guo and Enis)
Repository: hbase
Updated Branches:
refs/heads/master c16bbf47c -> 5e708746b
HBASE-13997 ScannerCallableWithReplicas cause Infinitely blocking (Zephyr Guo and Enis)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5e708746
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5e708746
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5e708746
Branch: refs/heads/master
Commit: 5e708746b8d301c2fb22a85b8756129147012374
Parents: c16bbf4
Author: Enis Soztutar <en...@apache.org>
Authored: Fri Jul 10 16:06:29 2015 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Fri Jul 10 16:06:29 2015 -0700
----------------------------------------------------------------------
.../client/ScannerCallableWithReplicas.java | 55 +++++----------
.../hadoop/hbase/client/TestClientScanner.java | 73 +++++++++++++++++++-
2 files changed, 88 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/5e708746/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index 1708efe..586db8c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -22,9 +22,7 @@ import static org.apache.hadoop.hbase.client.ClientScanner.createClosestRowBefor
import java.io.IOException;
import java.io.InterruptedIOException;
-import java.util.ArrayList;
import java.util.HashSet;
-import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
@@ -158,15 +156,13 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
// We want to accomodate some RPCs for redundant replica scans (but are still in progress)
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
new ResultBoundedCompletionService<Pair<Result[], ScannerCallable>>(
- new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf), pool,
+ RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf), pool,
rl.size() * 5);
- List<ExecutionException> exceptions = null;
- int submitted = 0, completed = 0;
AtomicBoolean done = new AtomicBoolean(false);
replicaSwitched.set(false);
// submit call for the primary replica.
- submitted += addCallsForCurrentReplica(cs, rl);
+ addCallsForCurrentReplica(cs, rl);
try {
// wait for the timeout to see whether the primary responds back
Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeBeforeReplicas,
@@ -179,11 +175,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
return r == null ? null : r.getFirst(); //great we got a response
}
} catch (ExecutionException e) {
- // the primary call failed with RetriesExhaustedException or DoNotRetryIOException
- // but the secondaries might still succeed. Continue on the replica RPCs.
- exceptions = new ArrayList<ExecutionException>(rl.size());
- exceptions.add(e);
- completed++;
+ RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
} catch (CancellationException e) {
throw new InterruptedIOException(e.getMessage());
} catch (InterruptedException e) {
@@ -191,24 +183,16 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
}
// submit call for the all of the secondaries at once
// TODO: this may be an overkill for large region replication
- submitted += addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1);
+ addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1);
try {
- while (completed < submitted) {
- try {
- Future<Pair<Result[], ScannerCallable>> f = cs.take();
- Pair<Result[], ScannerCallable> r = f.get();
- if (r != null && r.getSecond() != null) {
- updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
- }
- return r == null ? null : r.getFirst(); // great we got an answer
- } catch (ExecutionException e) {
- // if not cancel or interrupt, wait until all RPC's are done
- // one of the tasks failed. Save the exception for later.
- if (exceptions == null) exceptions = new ArrayList<ExecutionException>(rl.size());
- exceptions.add(e);
- completed++;
- }
+ Future<Pair<Result[], ScannerCallable>> f = cs.take();
+ Pair<Result[], ScannerCallable> r = f.get();
+ if (r != null && r.getSecond() != null) {
+ updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
}
+ return r == null ? null : r.getFirst(); // great we got an answer
+ } catch (ExecutionException e) {
+ RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
} catch (CancellationException e) {
throw new InterruptedIOException(e.getMessage());
} catch (InterruptedException e) {
@@ -218,11 +202,6 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
// calls succeeded or failed. In all case, we stop all our tasks.
cs.cancelAll();
}
-
- if (exceptions != null && !exceptions.isEmpty()) {
- RpcRetryingCallerWithReadReplicas.throwEnrichedException(exceptions.get(0),
- retries); // just rethrow the first exception for now.
- }
return null; // unreachable
}
@@ -283,19 +262,18 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage();
}
- private int addCallsForCurrentReplica(
+ private void addCallsForCurrentReplica(
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) {
RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
outstandingCallables.add(currentScannerCallable);
cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id);
- return 1;
}
- private int addCallsForOtherReplicas(
+ private void addCallsForOtherReplicas(
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl,
int min, int max) {
if (scan.getConsistency() == Consistency.STRONG) {
- return 0; // not scheduling on other replicas for strong consistency
+ return; // not scheduling on other replicas for strong consistency
}
for (int id = min; id <= max; id++) {
if (currentScannerCallable.id == id) {
@@ -307,7 +285,6 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
RetryingRPC retryingOnReplica = new RetryingRPC(s);
cs.submit(retryingOnReplica, scannerTimeout, id);
}
- return max - min + 1;
}
/**
@@ -354,8 +331,8 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
// and we can't invoke it multiple times at the same time)
this.caller = ScannerCallableWithReplicas.this.caller;
if (scan.getConsistency() == Consistency.TIMELINE) {
- this.caller = new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf).
- <Result[]>newCaller();
+ this.caller = RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf)
+ .<Result[]>newCaller();
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5e708746/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
index 3f406df..6d7cc7f 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
@@ -21,9 +21,14 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.when;
import java.io.IOException;
-import java.util.LinkedList;
+import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -32,6 +37,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -496,4 +502,69 @@ public class TestClientScanner {
assertFalse(cs.advance());
}
}
+
+ /**
+ * Tests the case where all replicas of a region throw an exception. It should not cause a hang
+ * but the exception should propagate to the client
+ */
+ @Test (timeout = 30000)
+ public void testExceptionsFromReplicasArePropagated() throws IOException {
+ scan.setConsistency(Consistency.TIMELINE);
+
+ // Mock a caller which calls the callable for ScannerCallableWithReplicas,
+ // but throws an exception for the actual scanner calls via callWithRetries.
+ rpcFactory = new MockRpcRetryingCallerFactory(conf);
+ conf.set(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY,
+ MockRpcRetryingCallerFactory.class.getName());
+
+ // mock 3 replica locations
+ when(clusterConn.locateRegion((TableName)any(), (byte[])any(), anyBoolean(),
+ anyBoolean(), anyInt())).thenReturn(new RegionLocations(null, null, null));
+
+ try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"),
+ clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
+ Iterator<Result> iter = scanner.iterator();
+ while (iter.hasNext()) {
+ iter.next();
+ }
+ fail("Should have failed with RetriesExhaustedException");
+ } catch (RetriesExhaustedException expected) {
+
+ }
+ }
+
+ public static class MockRpcRetryingCallerFactory extends RpcRetryingCallerFactory {
+
+ public MockRpcRetryingCallerFactory(Configuration conf) {
+ super(conf);
+ }
+
+ @Override
+ public <T> RpcRetryingCaller<T> newCaller() {
+ return new RpcRetryingCaller<T>() {
+ @Override
+ public void cancel() {
+ }
+ @Override
+ public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
+ throws IOException, RuntimeException {
+ throw new IOException("Scanner exception");
+ }
+
+ @Override
+ public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
+ throws IOException, RuntimeException {
+ try {
+ return callable.call(callTimeout);
+ } catch (IOException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ }
+
}