You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nd...@apache.org on 2016/02/12 07:49:06 UTC
hbase git commit: HBASE-14812 Fix ResultBoundedCompletionService
deadlock
Repository: hbase
Updated Branches:
refs/heads/branch-1.1 360a5782c -> e2f5ec2b0
HBASE-14812 Fix ResultBoundedCompletionService deadlock
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e2f5ec2b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e2f5ec2b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e2f5ec2b
Branch: refs/heads/branch-1.1
Commit: e2f5ec2b0daf0092f8854619170cf6ba3e4338d7
Parents: 360a578
Author: Elliott Clark <ec...@apache.org>
Authored: Fri Nov 13 18:28:12 2015 -0800
Committer: Nick Dimiduk <nd...@apache.org>
Committed: Thu Feb 11 22:45:50 2016 -0800
----------------------------------------------------------------------
.../client/ResultBoundedCompletionService.java | 26 +++++++++++++-------
.../client/ScannerCallableWithReplicas.java | 22 ++++++++++++-----
2 files changed, 33 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/e2f5ec2b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
index eacbe2d..9b32e93 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
@@ -39,12 +39,13 @@ public class ResultBoundedCompletionService<V> {
private final Executor executor;
private final QueueingFuture<V>[] tasks; // all the tasks
private volatile QueueingFuture<V> completed = null;
+ private volatile boolean cancelled = false;
class QueueingFuture<T> implements RunnableFuture<T> {
private final RetryingCallable<T> future;
private T result = null;
private ExecutionException exeEx = null;
- private volatile boolean cancelled;
+ private volatile boolean cancelled = false;
private final int callTimeout;
private final RpcRetryingCaller<T> retryingCaller;
private boolean resultObtained = false;
@@ -61,18 +62,21 @@ public class ResultBoundedCompletionService<V> {
public void run() {
try {
if (!cancelled) {
- result =
- this.retryingCaller.callWithRetries(future, callTimeout);
+ result = this.retryingCaller.callWithRetries(future, callTimeout);
resultObtained = true;
}
} catch (Throwable t) {
exeEx = new ExecutionException(t);
} finally {
- if (!cancelled && completed == null) {
- completed = (QueueingFuture<V>) QueueingFuture.this;
- synchronized (tasks) {
- tasks.notify();
+ synchronized (tasks) {
+ // If this wasn't canceled then store the result.
+ if (!cancelled && completed == null) {
+ completed = (QueueingFuture<V>) QueueingFuture.this;
}
+
+ // Notify just in case there was someone waiting and this was canceled.
+ // That shouldn't happen but better safe than sorry.
+ tasks.notify();
}
}
}
@@ -145,19 +149,23 @@ public class ResultBoundedCompletionService<V> {
public QueueingFuture<V> take() throws InterruptedException {
synchronized (tasks) {
- while (completed == null) tasks.wait();
+ while (completed == null && !cancelled) tasks.wait();
}
return completed;
}
public QueueingFuture<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
synchronized (tasks) {
- if (completed == null) unit.timedWait(tasks, timeout);
+ if (completed == null && !cancelled) unit.timedWait(tasks, timeout);
}
return completed;
}
public void cancelAll() {
+ // Grab the lock on tasks so that cancelled is visible everywhere
+ synchronized (tasks) {
+ cancelled = true;
+ }
for (QueueingFuture<V> future : tasks) {
if (future != null) future.cancel(true);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e2f5ec2b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index 9054b00..5675aa0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
@@ -167,12 +168,13 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
replicaSwitched.set(false);
// submit call for the primary replica.
addCallsForCurrentReplica(cs, rl);
+
try {
// wait for the timeout to see whether the primary responds back
Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeBeforeReplicas,
TimeUnit.MICROSECONDS); // Yes, microseconds
if (f != null) {
- Pair<Result[], ScannerCallable> r = f.get();
+ Pair<Result[], ScannerCallable> r = f.get(timeout, TimeUnit.MILLISECONDS);
if (r != null && r.getSecond() != null) {
updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
}
@@ -184,23 +186,31 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
throw new InterruptedIOException(e.getMessage());
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
+ } catch (TimeoutException e) {
+ throw new InterruptedIOException(e.getMessage());
}
+
// submit call for the all of the secondaries at once
// TODO: this may be an overkill for large region replication
addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1);
+
try {
- Future<Pair<Result[], ScannerCallable>> f = cs.take();
- Pair<Result[], ScannerCallable> r = f.get();
- if (r != null && r.getSecond() != null) {
- updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
+ Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeout, TimeUnit.MILLISECONDS);
+ if (f != null) {
+ Pair<Result[], ScannerCallable> r = f.get(timeout, TimeUnit.MILLISECONDS);
+ if (r != null && r.getSecond() != null) {
+ updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
+ }
+ return r == null ? null : r.getFirst(); // great we got an answer
}
- return r == null ? null : r.getFirst(); // great we got an answer
} catch (ExecutionException e) {
RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
} catch (CancellationException e) {
throw new InterruptedIOException(e.getMessage());
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
+ } catch (TimeoutException e) {
+ throw new InterruptedIOException(e.getMessage());
} finally {
// We get there because we were interrupted or because one or more of the
// calls succeeded or failed. In all case, we stop all our tasks.