You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2014/07/19 00:45:34 UTC

git commit: HBASE-11537 Avoid synchronization on instances of ConcurrentMap (Mike Drob)

Repository: hbase
Updated Branches:
  refs/heads/master 209dd6dcf -> 5f4e85d3f


HBASE-11537 Avoid synchronization on instances of ConcurrentMap (Mike Drob)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5f4e85d3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5f4e85d3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5f4e85d3

Branch: refs/heads/master
Commit: 5f4e85d3f98b0456383fa691574198dedcd89bd0
Parents: 209dd6d
Author: Jonathan M Hsieh <jm...@apache.org>
Authored: Fri Jul 18 15:40:10 2014 -0700
Committer: Jonathan M Hsieh <jm...@apache.org>
Committed: Fri Jul 18 15:40:10 2014 -0700

----------------------------------------------------------------------
 .../hbase/procedure/ProcedureCoordinator.java   | 67 ++++++++++----------
 .../hadoop/hbase/procedure/ProcedureMember.java | 34 ++++------
 2 files changed, 46 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/5f4e85d3/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java
index fe7318b..fd9606a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java
@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -145,48 +144,48 @@ public class ProcedureCoordinator {
     String procName = proc.getName();
 
     // make sure we aren't already running a procedure of that name
-    synchronized (procedures) {
-      Procedure oldProc = procedures.get(procName);
-      if (oldProc != null) {
-        // procedures are always eventually completed on both successful and failed execution
-        try {
-          if (!oldProc.isCompleted()) {
-            LOG.warn("Procedure " + procName + " currently running.  Rejecting new request");
-            return false;
-          }
-          else {
-            LOG.debug("Procedure " + procName
+    Procedure oldProc = procedures.get(procName);
+    if (oldProc != null) {
+      // procedures are always eventually completed on both successful and failed execution
+      try {
+        if (!oldProc.isCompleted()) {
+          LOG.warn("Procedure " + procName + " currently running.  Rejecting new request");
+          return false;
+        } else {
+          LOG.debug("Procedure " + procName
               + " was in running list but was completed.  Accepting new attempt.");
-            procedures.remove(procName);
+          if (!procedures.remove(procName, oldProc)) {
+            LOG.warn("Procedure " + procName
+                + " has been resubmitted by another thread. Rejecting this request.");
+            return false;
           }
-        } catch (ForeignException e) {
-          LOG.debug("Procedure " + procName
+        }
+      } catch (ForeignException e) {
+        LOG.debug("Procedure " + procName
             + " was in running list but has exception.  Accepting new attempt.");
-          procedures.remove(procName);
+        if (!procedures.remove(procName, oldProc)) {
+          LOG.warn("Procedure " + procName
+              + " has been resubmitted by another thread. Rejecting this request.");
+          return false;
         }
       }
     }
 
     // kick off the procedure's execution in a separate thread
-    Future<Void> f = null;
     try {
-      synchronized (procedures) {
-        this.procedures.put(procName, proc);
-        f = this.pool.submit(proc);
+      if (this.procedures.putIfAbsent(procName, proc) == null) {
+        this.pool.submit(proc);
+        return true;
+      } else {
+        LOG.error("Another thread has submitted procedure '" + procName + "'. Ignoring this attempt.");
+        return false;
       }
-      return true;
     } catch (RejectedExecutionException e) {
-      LOG.warn("Procedure " + procName + " rejected by execution pool.  Propagating error and " +
-          "cancelling operation.", e);
+      LOG.warn("Procedure " + procName + " rejected by execution pool.  Propagating error.", e);
       // Remove the procedure from the list since is not started
-      this.procedures.remove(procName);
+      this.procedures.remove(procName, proc);
       // the thread pool is full and we can't run the procedure
       proc.receive(new ForeignException(procName, e));
-
-      // cancel procedure proactively
-      if (f != null) {
-        f.cancel(true);
-      }
     }
     return false;
   }
@@ -217,13 +216,11 @@ public class ProcedureCoordinator {
    */
   public void abortProcedure(String procName, ForeignException reason) {
     // if we know about the Procedure, notify it
-    synchronized(procedures) {
-      Procedure proc = procedures.get(procName);
-      if (proc == null) {
-        return;
-      }
-      proc.receive(reason);
+    Procedure proc = procedures.get(procName);
+    if (proc == null) {
+      return;
     }
+    proc.receive(reason);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/5f4e85d3/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java
index 0559e12..89760f9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -139,41 +138,36 @@ public class ProcedureMember implements Closeable {
     }
 
     // make sure we aren't already running an subprocedure of that name
-    Subprocedure rsub;
-    synchronized (subprocs) {
-      rsub = subprocs.get(procName);
-    }
+    Subprocedure rsub = subprocs.get(procName);
     if (rsub != null) {
       if (!rsub.isComplete()) {
         LOG.error("Subproc '" + procName + "' is already running. Bailing out");
         return false;
       }
       LOG.warn("A completed old subproc "  +  procName + " is still present, removing");
-      subprocs.remove(procName);
+      if (!subprocs.remove(procName, rsub)) {
+        LOG.error("Another thread has replaced existing subproc '" + procName + "'. Bailing out");
+        return false;
+      }
     }
 
     LOG.debug("Submitting new Subprocedure:" + procName);
 
     // kick off the subprocedure
-    Future<Void> future = null;
     try {
-      synchronized (subprocs) {
-        subprocs.put(procName, subproc);
+      if (subprocs.putIfAbsent(procName, subproc) == null) {
+        this.pool.submit(subproc);
+        return true;
+      } else {
+        LOG.error("Another thread has submitted subproc '" + procName + "'. Bailing out");
+        return false;
       }
-      future = this.pool.submit(subproc);
-      return true;
     } catch (RejectedExecutionException e) {
-      synchronized (subprocs) {
-        subprocs.remove(procName);
-      }
+      subprocs.remove(procName, subproc);
+
       // the thread pool is full and we can't run the subprocedure
       String msg = "Subprocedure pool is full!";
       subproc.cancel(msg, e.getCause());
-
-      // cancel all subprocedures proactively
-      if (future != null) {
-        future.cancel(true);
-      }
     }
 
     LOG.error("Failed to start subprocedure '" + procName + "'");
@@ -182,7 +176,7 @@ public class ProcedureMember implements Closeable {
 
    /**
     * Notification that procedure coordinator has reached the global barrier
-    * @param procName name of the subprocedure that should start running the the in-barrier phase
+    * @param procName name of the subprocedure that should start running the in-barrier phase
     */
    public void receivedReachedGlobalBarrier(String procName) {
      Subprocedure subproc = subprocs.get(procName);