You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nd...@apache.org on 2016/02/12 07:49:06 UTC

hbase git commit: HBASE-14812 Fix ResultBoundedCompletionService deadlock

Repository: hbase
Updated Branches:
  refs/heads/branch-1.1 360a5782c -> e2f5ec2b0


HBASE-14812 Fix ResultBoundedCompletionService deadlock


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e2f5ec2b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e2f5ec2b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e2f5ec2b

Branch: refs/heads/branch-1.1
Commit: e2f5ec2b0daf0092f8854619170cf6ba3e4338d7
Parents: 360a578
Author: Elliott Clark <ec...@apache.org>
Authored: Fri Nov 13 18:28:12 2015 -0800
Committer: Nick Dimiduk <nd...@apache.org>
Committed: Thu Feb 11 22:45:50 2016 -0800

----------------------------------------------------------------------
 .../client/ResultBoundedCompletionService.java  | 26 +++++++++++++-------
 .../client/ScannerCallableWithReplicas.java     | 22 ++++++++++++-----
 2 files changed, 33 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e2f5ec2b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
index eacbe2d..9b32e93 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
@@ -39,12 +39,13 @@ public class ResultBoundedCompletionService<V> {
   private final Executor executor;
   private final QueueingFuture<V>[] tasks; // all the tasks
   private volatile QueueingFuture<V> completed = null;
+  private volatile boolean cancelled = false;
   
   class QueueingFuture<T> implements RunnableFuture<T> {
     private final RetryingCallable<T> future;
     private T result = null;
     private ExecutionException exeEx = null;
-    private volatile boolean cancelled;
+    private volatile boolean cancelled = false;
     private final int callTimeout;
     private final RpcRetryingCaller<T> retryingCaller;
     private boolean resultObtained = false;
@@ -61,18 +62,21 @@ public class ResultBoundedCompletionService<V> {
     public void run() {
       try {
         if (!cancelled) {
-          result =
-              this.retryingCaller.callWithRetries(future, callTimeout);
+          result = this.retryingCaller.callWithRetries(future, callTimeout);
           resultObtained = true;
         }
       } catch (Throwable t) {
         exeEx = new ExecutionException(t);
       } finally {
-        if (!cancelled && completed == null) {
-          completed = (QueueingFuture<V>) QueueingFuture.this;
-          synchronized (tasks) {
-            tasks.notify();
+        synchronized (tasks) {
+          // If this wasn't canceled then store the result.
+          if (!cancelled && completed == null) {
+            completed = (QueueingFuture<V>) QueueingFuture.this;
           }
+
+          // Notify just in case there was someone waiting and this was canceled.
+          // That shouldn't happen but better safe than sorry.
+          tasks.notify();
         }
       }
     }
@@ -145,19 +149,23 @@ public class ResultBoundedCompletionService<V> {
 
   public QueueingFuture<V> take() throws InterruptedException {
     synchronized (tasks) {
-      while (completed == null) tasks.wait();
+      while (completed == null && !cancelled) tasks.wait();
     }
     return completed;
   }
 
   public QueueingFuture<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
     synchronized (tasks) {
-      if (completed == null) unit.timedWait(tasks, timeout);
+      if (completed == null && !cancelled) unit.timedWait(tasks, timeout);
     }
     return completed;
   }
 
   public void cancelAll() {
+    // Grab the lock on tasks so that cancelled is visible everywhere
+    synchronized (tasks) {
+      cancelled = true;
+    }
     for (QueueingFuture<V> future : tasks) {
       if (future != null) future.cancel(true);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e2f5ec2b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index 9054b00..5675aa0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
@@ -167,12 +168,13 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
     replicaSwitched.set(false);
     // submit call for the primary replica.
     addCallsForCurrentReplica(cs, rl);
+
     try {
       // wait for the timeout to see whether the primary responds back
       Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeBeforeReplicas,
           TimeUnit.MICROSECONDS); // Yes, microseconds
       if (f != null) {
-        Pair<Result[], ScannerCallable> r = f.get();
+        Pair<Result[], ScannerCallable> r = f.get(timeout, TimeUnit.MILLISECONDS);
         if (r != null && r.getSecond() != null) {
           updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
         }
@@ -184,23 +186,31 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
       throw new InterruptedIOException(e.getMessage());
     } catch (InterruptedException e) {
       throw new InterruptedIOException(e.getMessage());
+    } catch (TimeoutException e) {
+      throw new InterruptedIOException(e.getMessage());
     }
+
     // submit call for the all of the secondaries at once
     // TODO: this may be an overkill for large region replication
     addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1);
+
     try {
-      Future<Pair<Result[], ScannerCallable>> f = cs.take();
-      Pair<Result[], ScannerCallable> r = f.get();
-      if (r != null && r.getSecond() != null) {
-        updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
+      Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeout, TimeUnit.MILLISECONDS);
+      if (f != null) {
+        Pair<Result[], ScannerCallable> r = f.get(timeout, TimeUnit.MILLISECONDS);
+        if (r != null && r.getSecond() != null) {
+          updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
+        }
+        return r == null ? null : r.getFirst(); // great we got an answer
       }
-      return r == null ? null : r.getFirst(); // great we got an answer
     } catch (ExecutionException e) {
       RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
     } catch (CancellationException e) {
       throw new InterruptedIOException(e.getMessage());
     } catch (InterruptedException e) {
       throw new InterruptedIOException(e.getMessage());
+    } catch (TimeoutException e) {
+      throw new InterruptedIOException(e.getMessage());
     } finally {
       // We get there because we were interrupted or because one or more of the
       // calls succeeded or failed. In all case, we stop all our tasks.