You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2014/07/19 00:45:34 UTC
git commit: HBASE-11537 Avoid synchronization on instances of
ConcurrentMap (Mike Drob)
Repository: hbase
Updated Branches:
refs/heads/master 209dd6dcf -> 5f4e85d3f
HBASE-11537 Avoid synchronization on instances of ConcurrentMap (Mike Drob)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5f4e85d3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5f4e85d3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5f4e85d3
Branch: refs/heads/master
Commit: 5f4e85d3f98b0456383fa691574198dedcd89bd0
Parents: 209dd6d
Author: Jonathan M Hsieh <jm...@apache.org>
Authored: Fri Jul 18 15:40:10 2014 -0700
Committer: Jonathan M Hsieh <jm...@apache.org>
Committed: Fri Jul 18 15:40:10 2014 -0700
----------------------------------------------------------------------
.../hbase/procedure/ProcedureCoordinator.java | 67 ++++++++++----------
.../hadoop/hbase/procedure/ProcedureMember.java | 34 ++++------
2 files changed, 46 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/5f4e85d3/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java
index fe7318b..fd9606a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java
@@ -24,7 +24,6 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -145,48 +144,48 @@ public class ProcedureCoordinator {
String procName = proc.getName();
// make sure we aren't already running a procedure of that name
- synchronized (procedures) {
- Procedure oldProc = procedures.get(procName);
- if (oldProc != null) {
- // procedures are always eventually completed on both successful and failed execution
- try {
- if (!oldProc.isCompleted()) {
- LOG.warn("Procedure " + procName + " currently running. Rejecting new request");
- return false;
- }
- else {
- LOG.debug("Procedure " + procName
+ Procedure oldProc = procedures.get(procName);
+ if (oldProc != null) {
+ // procedures are always eventually completed on both successful and failed execution
+ try {
+ if (!oldProc.isCompleted()) {
+ LOG.warn("Procedure " + procName + " currently running. Rejecting new request");
+ return false;
+ } else {
+ LOG.debug("Procedure " + procName
+ " was in running list but was completed. Accepting new attempt.");
- procedures.remove(procName);
+ if (!procedures.remove(procName, oldProc)) {
+ LOG.warn("Procedure " + procName
+ + " has been resubmitted by another thread. Rejecting this request.");
+ return false;
}
- } catch (ForeignException e) {
- LOG.debug("Procedure " + procName
+ }
+ } catch (ForeignException e) {
+ LOG.debug("Procedure " + procName
+ " was in running list but has exception. Accepting new attempt.");
- procedures.remove(procName);
+ if (!procedures.remove(procName, oldProc)) {
+ LOG.warn("Procedure " + procName
+ + " has been resubmitted by another thread. Rejecting this request.");
+ return false;
}
}
}
// kick off the procedure's execution in a separate thread
- Future<Void> f = null;
try {
- synchronized (procedures) {
- this.procedures.put(procName, proc);
- f = this.pool.submit(proc);
+ if (this.procedures.putIfAbsent(procName, proc) == null) {
+ this.pool.submit(proc);
+ return true;
+ } else {
+ LOG.error("Another thread has submitted procedure '" + procName + "'. Ignoring this attempt.");
+ return false;
}
- return true;
} catch (RejectedExecutionException e) {
- LOG.warn("Procedure " + procName + " rejected by execution pool. Propagating error and " +
- "cancelling operation.", e);
+ LOG.warn("Procedure " + procName + " rejected by execution pool. Propagating error.", e);
// Remove the procedure from the list since is not started
- this.procedures.remove(procName);
+ this.procedures.remove(procName, proc);
// the thread pool is full and we can't run the procedure
proc.receive(new ForeignException(procName, e));
-
- // cancel procedure proactively
- if (f != null) {
- f.cancel(true);
- }
}
return false;
}
@@ -217,13 +216,11 @@ public class ProcedureCoordinator {
*/
public void abortProcedure(String procName, ForeignException reason) {
// if we know about the Procedure, notify it
- synchronized(procedures) {
- Procedure proc = procedures.get(procName);
- if (proc == null) {
- return;
- }
- proc.receive(reason);
+ Procedure proc = procedures.get(procName);
+ if (proc == null) {
+ return;
}
+ proc.receive(reason);
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/5f4e85d3/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java
index 0559e12..89760f9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -139,41 +138,36 @@ public class ProcedureMember implements Closeable {
}
// make sure we aren't already running an subprocedure of that name
- Subprocedure rsub;
- synchronized (subprocs) {
- rsub = subprocs.get(procName);
- }
+ Subprocedure rsub = subprocs.get(procName);
if (rsub != null) {
if (!rsub.isComplete()) {
LOG.error("Subproc '" + procName + "' is already running. Bailing out");
return false;
}
LOG.warn("A completed old subproc " + procName + " is still present, removing");
- subprocs.remove(procName);
+ if (!subprocs.remove(procName, rsub)) {
+ LOG.error("Another thread has replaced existing subproc '" + procName + "'. Bailing out");
+ return false;
+ }
}
LOG.debug("Submitting new Subprocedure:" + procName);
// kick off the subprocedure
- Future<Void> future = null;
try {
- synchronized (subprocs) {
- subprocs.put(procName, subproc);
+ if (subprocs.putIfAbsent(procName, subproc) == null) {
+ this.pool.submit(subproc);
+ return true;
+ } else {
+ LOG.error("Another thread has submitted subproc '" + procName + "'. Bailing out");
+ return false;
}
- future = this.pool.submit(subproc);
- return true;
} catch (RejectedExecutionException e) {
- synchronized (subprocs) {
- subprocs.remove(procName);
- }
+ subprocs.remove(procName, subproc);
+
// the thread pool is full and we can't run the subprocedure
String msg = "Subprocedure pool is full!";
subproc.cancel(msg, e.getCause());
-
- // cancel all subprocedures proactively
- if (future != null) {
- future.cancel(true);
- }
}
LOG.error("Failed to start subprocedure '" + procName + "'");
@@ -182,7 +176,7 @@ public class ProcedureMember implements Closeable {
/**
* Notification that procedure coordinator has reached the global barrier
- * @param procName name of the subprocedure that should start running the the in-barrier phase
+ * @param procName name of the subprocedure that should start running the in-barrier phase
*/
public void receivedReachedGlobalBarrier(String procName) {
Subprocedure subproc = subprocs.get(procName);