You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2016/09/26 15:43:31 UTC
[1/2] hbase git commit: HBASE-16587 Procedure v2 - Cleanup suspended
proc execution
Repository: hbase
Updated Branches:
refs/heads/master 5f7e642fe -> 8da0500e7
HBASE-16587 Procedure v2 - Cleanup suspended proc execution
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e01e05cc
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e01e05cc
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e01e05cc
Branch: refs/heads/master
Commit: e01e05cc0ef5255c549d3c7bb87be38d34f13d94
Parents: 5f7e642
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Mon Sep 26 08:08:44 2016 -0700
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Mon Sep 26 08:08:44 2016 -0700
----------------------------------------------------------------------
.../hadoop/hbase/procedure2/Procedure.java | 59 +++-
.../hbase/procedure2/ProcedureExecutor.java | 61 ++++-
.../procedure2/RemoteProcedureException.java | 21 +-
.../hbase/procedure2/StateMachineProcedure.java | 4 +-
.../procedure2/TestProcedureSuspended.java | 270 +++++++++++++++++++
.../procedure/MasterProcedureScheduler.java | 4 +-
.../master/procedure/ProcedurePrepareLatch.java | 2 +-
.../master/procedure/ProcedureSyncWait.java | 3 +-
.../procedure/TestMasterProcedureEvents.java | 90 +++++++
9 files changed, 489 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/e01e05cc/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
index b9145e7..69c36e6 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
@@ -164,6 +164,23 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
}
/**
+ * Used to keep the procedure lock even when the procedure is yielding or suspended.
+ * @return true if the procedure should hold on the lock until completionCleanup()
+ */
+ protected boolean holdLock(final TEnvironment env) {
+ return false;
+ }
+
+ /**
+ * This is used in conjuction with holdLock(). If holdLock() is true
+ * the procedure executor will not call acquireLock() if hasLock() is true.
+ * @return true if the procedure has the lock, false otherwise.
+ */
+ protected boolean hasLock(final TEnvironment env) {
+ return false;
+ }
+
+ /**
* Called when the procedure is loaded for replay.
* The procedure implementor may use this method to perform some quick
* operation before replay.
@@ -174,6 +191,14 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
}
/**
+ * Called when the procedure is ready to be added to the queue after
+ * the loading/replay operation.
+ */
+ protected void afterReplay(final TEnvironment env) {
+ // no-op
+ }
+
+ /**
* Called when the procedure is marked as completed (success or rollback).
* The procedure implementor may use this method to cleanup in-memory states.
* This operation will not be retried on failure.
@@ -339,6 +364,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
return state == ProcedureState.RUNNABLE;
}
+ public synchronized boolean isInitializing() {
+ return state == ProcedureState.INITIALIZING;
+ }
+
/**
* @return true if the procedure has failed.
* true may mean failed but not yet rolledback or failed and rolledback.
@@ -479,8 +508,12 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
setFailure(source, new ProcedureAbortedException(msg));
}
- @InterfaceAudience.Private
- protected synchronized boolean setTimeoutFailure() {
+ /**
+ * Called by the ProcedureExecutor when the timeout set by setTimeout() is expired.
+ * @return true to let the framework handle the timeout as abort,
+ * false in case the procedure handled the timeout itself.
+ */
+ protected synchronized boolean setTimeoutFailure(final TEnvironment env) {
if (state == ProcedureState.WAITING_TIMEOUT) {
long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate;
setFailure("ProcedureExecutor", new TimeoutIOException(
@@ -549,6 +582,24 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
}
/**
+ * Internal method called by the ProcedureExecutor that starts the
+ * user-level code acquireLock().
+ */
+ @InterfaceAudience.Private
+ protected boolean doAcquireLock(final TEnvironment env) {
+ return acquireLock(env);
+ }
+
+ /**
+ * Internal method called by the ProcedureExecutor that starts the
+ * user-level code releaseLock().
+ */
+ @InterfaceAudience.Private
+ protected void doReleaseLock(final TEnvironment env) {
+ releaseLock(env);
+ }
+
+ /**
* Called on store load to initialize the Procedure internals after
* the creation/deserialization.
*/
@@ -601,6 +652,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
return childrenLatch > 0;
}
+ protected synchronized int getChildrenLatch() {
+ return childrenLatch;
+ }
+
/**
* Called by the RootProcedureState on procedure execution.
* Each procedure store its stack-index positions.
http://git-wip-us.apache.org/repos/asf/hbase/blob/e01e05cc/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 5066fb4..c2838ba 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -438,6 +438,7 @@ public class ProcedureExecutor<TEnvironment> {
// some procedure may be started way before this stuff.
for (int i = runnableList.size() - 1; i >= 0; --i) {
Procedure proc = runnableList.get(i);
+ proc.afterReplay(getEnvironment());
if (!proc.hasParent()) {
sendProcedureLoadedNotification(proc.getProcId());
}
@@ -857,9 +858,9 @@ public class ProcedureExecutor<TEnvironment> {
// Execute the procedure
assert proc.getState() == ProcedureState.RUNNABLE : proc;
- if (proc.acquireLock(getEnvironment())) {
+ if (acquireLock(proc)) {
execProcedure(procStack, proc);
- proc.releaseLock(getEnvironment());
+ releaseLock(proc, false);
} else {
runnables.yield(proc);
}
@@ -879,12 +880,34 @@ public class ProcedureExecutor<TEnvironment> {
// Finalize the procedure state
if (proc.getProcId() == rootProcId) {
procedureFinished(proc);
+ } else {
+ execCompletionCleanup(proc);
}
break;
}
} while (procStack.isFailed());
}
+ private boolean acquireLock(final Procedure proc) {
+ final TEnvironment env = getEnvironment();
+ // hasLock() is used in conjunction with holdLock().
+ // This allows us to not rewrite or carry around the hasLock() flag
+ // for every procedure. the hasLock() have meaning only if holdLock() is true.
+ if (proc.holdLock(env) && proc.hasLock(env)) {
+ return true;
+ }
+ return proc.doAcquireLock(env);
+ }
+
+ private void releaseLock(final Procedure proc, final boolean force) {
+ final TEnvironment env = getEnvironment();
+ // for how the framework works, we know that we will always have the lock
+ // when we call releaseLock(), so we can avoid calling proc.hasLock()
+ if (force || !proc.holdLock(env)) {
+ proc.doReleaseLock(env);
+ }
+ }
+
private void timeoutLoop() {
while (isRunning()) {
Procedure proc = waitingTimeout.poll();
@@ -921,15 +944,17 @@ public class ProcedureExecutor<TEnvironment> {
continue;
}
- // The procedure received an "abort-timeout", call abort() and
- // add the procedure back in the queue for rollback.
- if (proc.setTimeoutFailure()) {
+ // The procedure received a timeout. if the procedure itself does not handle it,
+ // call abort() and add the procedure back in the queue for rollback.
+ if (proc.setTimeoutFailure(getEnvironment())) {
long rootProcId = Procedure.getRootProcedureId(procedures, proc);
RootProcedureState procStack = rollbackStack.get(rootProcId);
procStack.abort();
store.update(proc);
runnables.addFront(proc);
continue;
+ } else if (proc.getState() == ProcedureState.WAITING_TIMEOUT) {
+ waitingTimeout.add(proc);
}
}
}
@@ -940,7 +965,7 @@ public class ProcedureExecutor<TEnvironment> {
* finished to user, and the result will be the fatal exception.
*/
private boolean executeRollback(final long rootProcId, final RootProcedureState procStack) {
- Procedure rootProc = procedures.get(rootProcId);
+ final Procedure rootProc = procedures.get(rootProcId);
RemoteProcedureException exception = rootProc.getException();
if (exception == null) {
exception = procStack.getException();
@@ -948,7 +973,7 @@ public class ProcedureExecutor<TEnvironment> {
store.update(rootProc);
}
- List<Procedure> subprocStack = procStack.getSubproceduresStack();
+ final List<Procedure> subprocStack = procStack.getSubproceduresStack();
assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc;
int stackTail = subprocStack.size();
@@ -956,7 +981,7 @@ public class ProcedureExecutor<TEnvironment> {
while (stackTail --> 0) {
final Procedure proc = subprocStack.get(stackTail);
- if (!reuseLock && !proc.acquireLock(getEnvironment())) {
+ if (!reuseLock && !acquireLock(proc)) {
// can't take a lock on the procedure, add the root-proc back on the
// queue waiting for the lock availability
return false;
@@ -970,7 +995,7 @@ public class ProcedureExecutor<TEnvironment> {
// we can avoid to lock/unlock each step
reuseLock = stackTail > 0 && (subprocStack.get(stackTail - 1) == proc) && !abortRollback;
if (!reuseLock) {
- proc.releaseLock(getEnvironment());
+ releaseLock(proc, false);
}
// allows to kill the executor before something is stored to the wal.
@@ -985,6 +1010,10 @@ public class ProcedureExecutor<TEnvironment> {
if (proc.isYieldAfterExecutionStep(getEnvironment())) {
return false;
}
+
+ if (proc != rootProc) {
+ execCompletionCleanup(proc);
+ }
}
// Finalize the procedure state
@@ -1302,14 +1331,22 @@ public class ProcedureExecutor<TEnvironment> {
return Procedure.getRootProcedureId(procedures, proc);
}
- private void procedureFinished(final Procedure proc) {
- // call the procedure completion cleanup handler
+ private void execCompletionCleanup(final Procedure proc) {
+ final TEnvironment env = getEnvironment();
+ if (proc.holdLock(env) && proc.hasLock(env)) {
+ releaseLock(proc, true);
+ }
try {
- proc.completionCleanup(getEnvironment());
+ proc.completionCleanup(env);
} catch (Throwable e) {
// Catch NullPointerExceptions or similar errors...
LOG.error("CODE-BUG: uncatched runtime exception for procedure: " + proc, e);
}
+ }
+
+ private void procedureFinished(final Procedure proc) {
+ // call the procedure completion cleanup handler
+ execCompletionCleanup(proc);
// update the executor internal state maps
ProcedureInfo procInfo = Procedure.createProcedureInfo(proc, proc.getNonceKey());
http://git-wip-us.apache.org/repos/asf/hbase/blob/e01e05cc/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java
index 456f83d..402ddfc 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java
@@ -65,14 +65,23 @@ public class RemoteProcedureException extends ProcedureException {
return source;
}
- public IOException unwrapRemoteException() {
- if (getCause() instanceof RemoteException) {
- return ((RemoteException)getCause()).unwrapRemoteException();
+ public Exception unwrapRemoteException() {
+ final Throwable cause = getCause();
+ if (cause instanceof RemoteException) {
+ return ((RemoteException)cause).unwrapRemoteException();
}
- if (getCause() instanceof IOException) {
- return (IOException)getCause();
+ if (cause instanceof Exception) {
+ return (Exception)cause;
}
- return new IOException(getCause());
+ return new Exception(cause);
+ }
+
+ public IOException unwrapRemoteIOException() {
+ final Exception cause = unwrapRemoteException();
+ if (cause instanceof IOException) {
+ return (IOException)cause;
+ }
+ return new IOException(cause);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/e01e05cc/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
index 10467fe..a363c2e 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
@@ -131,7 +131,9 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
subProcList = new ArrayList<Procedure>(subProcedure.length);
}
for (int i = 0; i < subProcedure.length; ++i) {
- subProcList.add(subProcedure[i]);
+ Procedure proc = subProcedure[i];
+ if (!proc.hasOwner()) proc.setOwner(getOwner());
+ subProcList.add(proc);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e01e05cc/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java
new file mode 100644
index 0000000..eb72939
--- /dev/null
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.util.Threads;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category({MasterTests.class, SmallTests.class})
+public class TestProcedureSuspended {
+ private static final Log LOG = LogFactory.getLog(TestProcedureSuspended.class);
+
+ private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
+ private static final Procedure NULL_PROC = null;
+
+ private ProcedureExecutor<TestProcEnv> procExecutor;
+ private ProcedureStore procStore;
+
+ private HBaseCommonTestingUtility htu;
+
+ @Before
+ public void setUp() throws IOException {
+ htu = new HBaseCommonTestingUtility();
+
+ procStore = new NoopProcedureStore();
+ procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(), procStore);
+ procStore.start(PROCEDURE_EXECUTOR_SLOTS);
+ procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ procExecutor.stop();
+ procStore.stop(false);
+ }
+
+ @Test(timeout=10000)
+ public void testSuspendWhileHoldingLocks() {
+ final AtomicBoolean lockA = new AtomicBoolean(false);
+ final AtomicBoolean lockB = new AtomicBoolean(false);
+
+ final TestLockProcedure p1keyA = new TestLockProcedure(lockA, "keyA", false, true);
+ final TestLockProcedure p2keyA = new TestLockProcedure(lockA, "keyA", false, true);
+ final TestLockProcedure p3keyB = new TestLockProcedure(lockB, "keyB", false, true);
+
+ procExecutor.submitProcedure(p1keyA);
+ procExecutor.submitProcedure(p2keyA);
+ procExecutor.submitProcedure(p3keyB);
+
+ // first run p1, p3 are able to run p2 is blocked by p1
+ waitAndAssertTimestamp(p1keyA, 1, 1);
+ waitAndAssertTimestamp(p2keyA, 0, -1);
+ waitAndAssertTimestamp(p3keyB, 1, 2);
+ assertEquals(true, lockA.get());
+ assertEquals(true, lockB.get());
+
+ // release p3
+ p3keyB.setThrowSuspend(false);
+ procExecutor.getRunnableSet().addFront(p3keyB);
+ waitAndAssertTimestamp(p1keyA, 1, 1);
+ waitAndAssertTimestamp(p2keyA, 0, -1);
+ waitAndAssertTimestamp(p3keyB, 2, 3);
+ assertEquals(true, lockA.get());
+
+ // wait until p3 is fully completed
+ ProcedureTestingUtility.waitProcedure(procExecutor, p3keyB);
+ assertEquals(false, lockB.get());
+
+ // rollback p2 and wait until is fully completed
+ p1keyA.setTriggerRollback(true);
+ procExecutor.getRunnableSet().addFront(p1keyA);
+ ProcedureTestingUtility.waitProcedure(procExecutor, p1keyA);
+
+ // p2 should start and suspend
+ waitAndAssertTimestamp(p1keyA, 4, 60000);
+ waitAndAssertTimestamp(p2keyA, 1, 7);
+ waitAndAssertTimestamp(p3keyB, 2, 3);
+ assertEquals(true, lockA.get());
+
+ // wait until p2 is fully completed
+ p2keyA.setThrowSuspend(false);
+ procExecutor.getRunnableSet().addFront(p2keyA);
+ ProcedureTestingUtility.waitProcedure(procExecutor, p2keyA);
+ waitAndAssertTimestamp(p1keyA, 4, 60000);
+ waitAndAssertTimestamp(p2keyA, 2, 8);
+ waitAndAssertTimestamp(p3keyB, 2, 3);
+ assertEquals(false, lockA.get());
+ assertEquals(false, lockB.get());
+ }
+
+ @Test(timeout=10000)
+ public void testYieldWhileHoldingLocks() {
+ final AtomicBoolean lock = new AtomicBoolean(false);
+
+ final TestLockProcedure p1 = new TestLockProcedure(lock, "key", true, false);
+ final TestLockProcedure p2 = new TestLockProcedure(lock, "key", true, false);
+
+ procExecutor.submitProcedure(p1);
+ procExecutor.submitProcedure(p2);
+
+ // try to execute a bunch of yield on p1, p2 should be blocked
+ while (p1.getTimestamps().size() < 100) Threads.sleep(10);
+ assertEquals(0, p2.getTimestamps().size());
+
+ // wait until p1 is completed
+ p1.setThrowYield(false);
+ ProcedureTestingUtility.waitProcedure(procExecutor, p1);
+
+ // try to execute a bunch of yield on p2
+ while (p2.getTimestamps().size() < 100) Threads.sleep(10);
+ assertEquals(p1.getTimestamps().get(p1.getTimestamps().size() - 1).longValue() + 1,
+ p2.getTimestamps().get(0).longValue());
+
+ // wait until p2 is completed
+ p1.setThrowYield(false);
+ ProcedureTestingUtility.waitProcedure(procExecutor, p1);
+ }
+
+ private void waitAndAssertTimestamp(TestLockProcedure proc, int size, int lastTs) {
+ final ArrayList<Long> timestamps = proc.getTimestamps();
+ while (timestamps.size() < size) Threads.sleep(10);
+ LOG.info(proc + " -> " + timestamps);
+ assertEquals(size, timestamps.size());
+ if (size > 0) {
+ assertEquals(lastTs, timestamps.get(timestamps.size() - 1).longValue());
+ }
+ }
+
+ public static class TestLockProcedure extends Procedure<TestProcEnv> {
+ private final ArrayList<Long> timestamps = new ArrayList<Long>();
+ private final String key;
+
+ private boolean triggerRollback = false;
+ private boolean throwSuspend = false;
+ private boolean throwYield = false;
+ private AtomicBoolean lock = null;
+ private boolean hasLock = false;
+
+ public TestLockProcedure(final AtomicBoolean lock, final String key,
+ final boolean throwYield, final boolean throwSuspend) {
+ this.lock = lock;
+ this.key = key;
+ this.throwYield = throwYield;
+ this.throwSuspend = throwSuspend;
+ }
+
+ public void setThrowYield(final boolean throwYield) {
+ this.throwYield = throwYield;
+ }
+
+ public void setThrowSuspend(final boolean throwSuspend) {
+ this.throwSuspend = throwSuspend;
+ }
+
+ public void setTriggerRollback(final boolean triggerRollback) {
+ this.triggerRollback = triggerRollback;
+ }
+
+ @Override
+ protected Procedure[] execute(final TestProcEnv env)
+ throws ProcedureYieldException, ProcedureSuspendedException {
+ LOG.info("EXECUTE " + this + " suspend " + (lock != null));
+ timestamps.add(env.nextTimestamp());
+ if (triggerRollback) {
+ setFailure(getClass().getSimpleName(), new Exception("injected failure"));
+ } else if (throwYield) {
+ throw new ProcedureYieldException();
+ } else if (throwSuspend) {
+ throw new ProcedureSuspendedException();
+ }
+ return null;
+ }
+
+ @Override
+ protected void rollback(final TestProcEnv env) {
+ LOG.info("ROLLBACK " + this);
+ timestamps.add(env.nextTimestamp() * 10000);
+ }
+
+ @Override
+ protected boolean acquireLock(final TestProcEnv env) {
+ if ((hasLock = lock.compareAndSet(false, true))) {
+ LOG.info("ACQUIRE LOCK " + this + " " + (hasLock));
+ }
+ return hasLock;
+ }
+
+ @Override
+ protected void releaseLock(final TestProcEnv env) {
+ LOG.info("RELEASE LOCK " + this + " " + hasLock);
+ lock.set(false);
+ hasLock = false;
+ }
+
+ @Override
+ protected boolean holdLock(final TestProcEnv env) {
+ return true;
+ }
+
+ @Override
+ protected boolean hasLock(final TestProcEnv env) {
+ return hasLock;
+ }
+
+ public ArrayList<Long> getTimestamps() {
+ return timestamps;
+ }
+
+ @Override
+ protected void toStringClassDetails(StringBuilder builder) {
+ builder.append(getClass().getName());
+ builder.append("(" + key + ")");
+ }
+
+ @Override
+ protected boolean abort(TestProcEnv env) { return false; }
+
+ @Override
+ protected void serializeStateData(final OutputStream stream) throws IOException {
+ }
+
+ @Override
+ protected void deserializeStateData(final InputStream stream) throws IOException {
+ }
+ }
+
+ private static class TestProcEnv {
+ public final AtomicLong timestamp = new AtomicLong(0);
+
+ public long nextTimestamp() {
+ return timestamp.incrementAndGet();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/e01e05cc/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
index 3a215d5..548fb00 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
@@ -297,7 +297,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
TableProcedureInterface iProcTable = (TableProcedureInterface)proc;
boolean tableDeleted;
if (proc.hasException()) {
- IOException procEx = proc.getException().unwrapRemoteException();
+ Exception procEx = proc.getException().unwrapRemoteException();
if (iProcTable.getTableOperationType() == TableOperationType.CREATE) {
// create failed because the table already exist
tableDeleted = !(procEx instanceof TableExistsException);
@@ -1628,4 +1628,4 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
return Math.max(1, queue.getPriority() * quantum); // TODO
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e01e05cc/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java
index eaeb9ac..eb23960 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java
@@ -82,7 +82,7 @@ public abstract class ProcedurePrepareLatch {
protected void countDown(final Procedure proc) {
if (proc.hasException()) {
- exception = proc.getException().unwrapRemoteException();
+ exception = proc.getException().unwrapRemoteIOException();
}
latch.countDown();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e01e05cc/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
index cf8fdd4..0e10293 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
@@ -83,7 +83,8 @@ public final class ProcedureSyncWait {
if (result.isFailed()) {
// If the procedure fails, we should always have an exception captured. Throw it.
throw RemoteProcedureException.fromProto(
- result.getForeignExceptionMessage().getForeignExchangeMessage()).unwrapRemoteException();
+ result.getForeignExceptionMessage().getForeignExchangeMessage())
+ .unwrapRemoteIOException();
}
return result.getResult();
} else {
http://git-wip-us.apache.org/repos/asf/hbase/blob/e01e05cc/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java
index d6b15d9..bf58738 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java
@@ -18,6 +18,11 @@
package org.apache.hadoop.hbase.master.procedure;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -29,9 +34,13 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
+import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -173,4 +182,85 @@ public class TestMasterProcedureEvents {
}
return null;
}
+
+ @Test(timeout=30000)
+ public void testTimeoutEventProcedure() throws Exception {
+ HMaster master = UTIL.getMiniHBaseCluster().getMaster();
+ ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
+ MasterProcedureScheduler procSched = procExec.getEnvironment().getProcedureQueue();
+
+ TestTimeoutEventProcedure proc = new TestTimeoutEventProcedure(1000, 5);
+ procExec.submitProcedure(proc);
+
+ ProcedureTestingUtility.waitProcedure(procExec, proc.getProcId());
+ ProcedureTestingUtility.assertIsAbortException(procExec.getResult(proc.getProcId()));
+ }
+
+ public static class TestTimeoutEventProcedure
+ extends Procedure<MasterProcedureEnv> implements TableProcedureInterface {
+ private final ProcedureEvent event = new ProcedureEvent("timeout-event");
+
+ private final AtomicInteger ntimeouts = new AtomicInteger(0);
+ private int maxTimeouts = 1;
+
+ public TestTimeoutEventProcedure() {}
+
+ public TestTimeoutEventProcedure(final int timeoutMsec, final int maxTimeouts) {
+ this.maxTimeouts = maxTimeouts;
+ setTimeout(timeoutMsec);
+ setOwner("test");
+ }
+
+ @Override
+ protected Procedure[] execute(final MasterProcedureEnv env)
+ throws ProcedureSuspendedException {
+ LOG.info("EXECUTE " + this + " ntimeouts=" + ntimeouts);
+ if (ntimeouts.get() > maxTimeouts) {
+ setAbortFailure("test", "give up after " + ntimeouts.get());
+ return null;
+ }
+
+ env.getProcedureQueue().suspendEvent(event);
+ if (env.getProcedureQueue().waitEvent(event, this)) {
+ setState(ProcedureState.WAITING_TIMEOUT);
+ throw new ProcedureSuspendedException();
+ }
+
+ return null;
+ }
+
+ @Override
+ protected void rollback(final MasterProcedureEnv env) {
+ }
+
+ @Override
+ protected boolean setTimeoutFailure(final MasterProcedureEnv env) {
+ int n = ntimeouts.incrementAndGet();
+ LOG.info("HANDLE TIMEOUT " + this + " ntimeouts=" + n);
+ setState(ProcedureState.RUNNABLE);
+ env.getProcedureQueue().wakeEvent(event);
+ return false;
+ }
+
+ @Override
+ public TableName getTableName() {
+ return TableName.valueOf("testtb");
+ }
+
+ @Override
+ public TableOperationType getTableOperationType() {
+ return TableOperationType.READ;
+ }
+
+ @Override
+ protected boolean abort(MasterProcedureEnv env) { return false; }
+
+ @Override
+ protected void serializeStateData(final OutputStream stream) throws IOException {
+ }
+
+ @Override
+ protected void deserializeStateData(final InputStream stream) throws IOException {
+ }
+ }
}
[2/2] hbase git commit: HBASE-16695 Procedure v2 - Support for parent
holding locks
Posted by mb...@apache.org.
HBASE-16695 Procedure v2 - Support for parent holding locks
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8da0500e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8da0500e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8da0500e
Branch: refs/heads/master
Commit: 8da0500e7d494f45cded7c3cb3423401a73e21fb
Parents: e01e05c
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Mon Sep 26 08:42:48 2016 -0700
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Mon Sep 26 08:42:48 2016 -0700
----------------------------------------------------------------------
.../hadoop/hbase/procedure2/Procedure.java | 12 +
.../hbase/procedure2/ProcedureRunnableSet.java | 3 +
.../procedure/MasterProcedureScheduler.java | 156 +++++---
.../procedure/TestMasterProcedureScheduler.java | 330 +++++++----------
...TestMasterProcedureSchedulerConcurrency.java | 363 +++++++++++++++++++
5 files changed, 597 insertions(+), 267 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/8da0500e/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
index 69c36e6..6403cfd 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
@@ -908,4 +908,16 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
return proc;
}
+
+ /**
+ * @param a the first procedure to be compared.
+ * @param b the second procedure to be compared.
+ * @return true if the two procedures have the same parent
+ */
+ public static boolean haveSameParent(final Procedure a, final Procedure b) {
+ if (a.hasParent() && b.hasParent()) {
+ return a.getParentProcId() == b.getParentProcId();
+ }
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8da0500e/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java
index 65df692..64c41ee 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.procedure2;
+import com.google.common.annotations.VisibleForTesting;
+
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
@@ -69,6 +71,7 @@ public interface ProcedureRunnableSet {
* Returns the number of elements in this collection.
* @return the number of elements in this collection.
*/
+ @VisibleForTesting
int size();
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/8da0500e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
index 548fb00..26ecd94 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.master.procedure;
+import com.google.common.annotations.VisibleForTesting;
+
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
@@ -44,6 +46,7 @@ import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator;
import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList;
import org.apache.hadoop.hbase.util.AvlUtil.AvlLinkedNode;
import org.apache.hadoop.hbase.util.AvlUtil.AvlTree;
+import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator;
/**
* ProcedureRunnableSet for the Master Procedures.
@@ -78,7 +81,6 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
private final FairQueue<ServerName> serverRunQueue = new FairQueue<ServerName>();
private final FairQueue<TableName> tableRunQueue = new FairQueue<TableName>();
- private int queueSize = 0;
private final ServerQueue[] serverBuckets = new ServerQueue[128];
private NamespaceQueue namespaceMap = null;
@@ -148,14 +150,14 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
if (proc.isSuspended()) return;
queue.add(proc, addFront);
- if (!(queue.isSuspended() || queue.hasExclusiveLock())) {
+ if (!(queue.isSuspended() ||
+ (queue.hasExclusiveLock() && !queue.isLockOwner(proc.getProcId())))) {
// the queue is not suspended or removed from the fairq (run-queue)
// because someone has an xlock on it.
// so, if the queue is not-linked we should add it
if (queue.size() == 1 && !AvlIterableList.isLinked(queue)) {
fairq.add(queue);
}
- queueSize++;
} else if (queue.hasParentLock(proc)) {
assert addFront : "expected to add a child in the front";
assert !queue.isSuspended() : "unexpected suspended state for the queue";
@@ -165,7 +167,6 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
if (!AvlIterableList.isLinked(queue)) {
fairq.add(queue);
}
- queueSize++;
}
}
@@ -179,13 +180,13 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
Procedure pollResult = null;
schedLock.lock();
try {
- if (queueSize == 0) {
+ if (!hasRunnables()) {
if (waitNsec < 0) {
schedWaitCond.await();
} else {
schedWaitCond.awaitNanos(waitNsec);
}
- if (queueSize == 0) {
+ if (!hasRunnables()) {
return null;
}
}
@@ -209,6 +210,10 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
return pollResult;
}
+ private boolean hasRunnables() {
+ return tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables();
+ }
+
private <T extends Comparable<T>> Procedure doPoll(final FairQueue<T> fairq) {
final Queue<T> rq = fairq.poll();
if (rq == null || !rq.isAvailable()) {
@@ -218,13 +223,12 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
assert !rq.isSuspended() : "rq=" + rq + " is suspended";
final Procedure pollResult = rq.peek();
final boolean xlockReq = rq.requireExclusiveLock(pollResult);
- if (xlockReq && rq.isLocked() && !rq.hasParentLock(pollResult)) {
+ if (xlockReq && rq.isLocked() && !rq.hasLockAccess(pollResult)) {
// someone is already holding the lock (e.g. shared lock). avoid a yield
return null;
}
rq.poll();
- this.queueSize--;
if (rq.isEmpty() || xlockReq) {
removeFromRunQueue(fairq, rq);
} else if (rq.hasParentLock(pollResult)) {
@@ -232,7 +236,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
// check if the next procedure is still a child.
// if not, remove the rq from the fairq and go back to the xlock state
Procedure nextProc = rq.peek();
- if (nextProc != null && nextProc.getParentProcId() != pollResult.getParentProcId()) {
+ if (nextProc != null && !Procedure.haveSameParent(nextProc, pollResult)) {
removeFromRunQueue(fairq, rq);
}
}
@@ -255,7 +259,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR);
tableMap = null;
- assert queueSize == 0 : "expected queue size to be 0, got " + queueSize;
+ assert size() == 0 : "expected queue size to be 0, got " + size();
} finally {
schedLock.unlock();
}
@@ -271,6 +275,14 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
}
+ private void wakePollIfNeeded(final int waitingCount) {
+ if (waitingCount > 1) {
+ schedWaitCond.signalAll();
+ } else if (waitingCount > 0) {
+ schedWaitCond.signal();
+ }
+ }
+
@Override
public void signalAll() {
schedLock.lock();
@@ -285,14 +297,30 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
public int size() {
schedLock.lock();
try {
- return queueSize;
+ int count = 0;
+
+ // Server queues
+ final AvlTreeIterator<ServerQueue> serverIter = new AvlTreeIterator<ServerQueue>();
+ for (int i = 0; i < serverBuckets.length; ++i) {
+ serverIter.seekFirst(serverBuckets[i]);
+ while (serverIter.hasNext()) {
+ count += serverIter.next().size();
+ }
+ }
+
+ // Table queues
+ final AvlTreeIterator<TableQueue> tableIter = new AvlTreeIterator<TableQueue>(tableMap);
+ while (tableIter.hasNext()) {
+ count += tableIter.next().size();
+ }
+ return count;
} finally {
schedLock.unlock();
}
}
@Override
- public void completionCleanup(Procedure proc) {
+ public void completionCleanup(final Procedure proc) {
if (proc instanceof TableProcedureInterface) {
TableProcedureInterface iProcTable = (TableProcedureInterface)proc;
boolean tableDeleted;
@@ -310,7 +338,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE);
}
if (tableDeleted) {
- markTableAsDeleted(iProcTable.getTableName());
+ markTableAsDeleted(iProcTable.getTableName(), proc);
return;
}
} else {
@@ -323,14 +351,12 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
if (AvlIterableList.isLinked(queue)) return;
if (!queue.isEmpty()) {
fairq.add(queue);
- queueSize += queue.size();
}
}
private <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq, Queue<T> queue) {
if (!AvlIterableList.isLinked(queue)) return;
fairq.remove(queue);
- queueSize -= queue.size();
}
// ============================================================================
@@ -470,13 +496,8 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
schedLock.lock();
try {
- popEventWaitingObjects(event);
-
- if (queueSize > 1) {
- schedWaitCond.signalAll();
- } else if (queueSize > 0) {
- schedWaitCond.signal();
- }
+ final int waitingCount = popEventWaitingObjects(event);
+ wakePollIfNeeded(waitingCount);
} finally {
schedLock.unlock();
}
@@ -493,6 +514,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
final boolean isTraceEnabled = LOG.isTraceEnabled();
schedLock.lock();
try {
+ int waitingCount = 0;
for (int i = 0; i < count; ++i) {
final ProcedureEvent event = events[i];
synchronized (event) {
@@ -500,36 +522,36 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
if (isTraceEnabled) {
LOG.trace("Wake event " + event);
}
- popEventWaitingObjects(event);
+ waitingCount += popEventWaitingObjects(event);
}
}
-
- if (queueSize > 1) {
- schedWaitCond.signalAll();
- } else if (queueSize > 0) {
- schedWaitCond.signal();
- }
+ wakePollIfNeeded(waitingCount);
} finally {
schedLock.unlock();
}
}
- private void popEventWaitingObjects(final ProcedureEvent event) {
+ private int popEventWaitingObjects(final ProcedureEvent event) {
+ int count = 0;
while (event.hasWaitingTables()) {
final Queue<TableName> queue = event.popWaitingTable();
queue.setSuspended(false);
addToRunQueue(tableRunQueue, queue);
+ count += queue.size();
}
// TODO: This will change once we have the new AM
while (event.hasWaitingServers()) {
final Queue<ServerName> queue = event.popWaitingServer();
queue.setSuspended(false);
addToRunQueue(serverRunQueue, queue);
+ count += queue.size();
}
while (event.hasWaitingProcedures()) {
wakeProcedure(event.popWaitingProcedure(false));
+ count++;
}
+ return count;
}
private void suspendProcedure(final BaseProcedureEvent event, final Procedure procedure) {
@@ -823,8 +845,8 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
if (hasExclusiveLock()) {
// if we have an exclusive lock already taken
// only child of the lock owner can be executed
- Procedure availProc = peek();
- return availProc != null && hasParentLock(availProc);
+ final Procedure nextProc = peek();
+ return nextProc != null && hasLockAccess(nextProc);
}
// no xlock
@@ -1011,26 +1033,31 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
schedLock.lock();
TableQueue queue = getTableQueue(table);
if (!queue.getNamespaceQueue().trySharedLock()) {
+ schedLock.unlock();
return false;
}
- if (!queue.tryExclusiveLock(procedure.getProcId())) {
+ if (!queue.tryExclusiveLock(procedure)) {
queue.getNamespaceQueue().releaseSharedLock();
schedLock.unlock();
return false;
}
removeFromRunQueue(tableRunQueue, queue);
+ boolean hasParentLock = queue.hasParentLock(procedure);
schedLock.unlock();
- // Zk lock is expensive...
- boolean hasXLock = queue.tryZkExclusiveLock(lockManager, procedure.toString());
- if (!hasXLock) {
- schedLock.lock();
- queue.releaseExclusiveLock();
- queue.getNamespaceQueue().releaseSharedLock();
- addToRunQueue(tableRunQueue, queue);
- schedLock.unlock();
+ boolean hasXLock = true;
+ if (!hasParentLock) {
+ // Zk lock is expensive...
+ hasXLock = queue.tryZkExclusiveLock(lockManager, procedure.toString());
+ if (!hasXLock) {
+ schedLock.lock();
+ if (!hasParentLock) queue.releaseExclusiveLock();
+ queue.getNamespaceQueue().releaseSharedLock();
+ addToRunQueue(tableRunQueue, queue);
+ schedLock.unlock();
+ }
}
return hasXLock;
}
@@ -1041,15 +1068,16 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
* @param table the name of the table that has the exclusive lock
*/
public void releaseTableExclusiveLock(final Procedure procedure, final TableName table) {
- schedLock.lock();
- TableQueue queue = getTableQueue(table);
- schedLock.unlock();
+ final TableQueue queue = getTableQueueWithLock(table);
+ final boolean hasParentLock = queue.hasParentLock(procedure);
- // Zk lock is expensive...
- queue.releaseZkExclusiveLock(lockManager);
+ if (!hasParentLock) {
+ // Zk lock is expensive...
+ queue.releaseZkExclusiveLock(lockManager);
+ }
schedLock.lock();
- queue.releaseExclusiveLock();
+ if (!hasParentLock) queue.releaseExclusiveLock();
queue.getNamespaceQueue().releaseSharedLock();
addToRunQueue(tableRunQueue, queue);
schedLock.unlock();
@@ -1116,17 +1144,19 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
* If there are new operations pending (e.g. a new create),
* the remove will not be performed.
* @param table the name of the table that should be marked as deleted
+ * @param procedure the procedure that is removing the table
* @return true if deletion succeeded, false otherwise meaning that there are
* other new operations pending for that table (e.g. a new create).
*/
- protected boolean markTableAsDeleted(final TableName table) {
+ @VisibleForTesting
+ protected boolean markTableAsDeleted(final TableName table, final Procedure procedure) {
final ReentrantLock l = schedLock;
l.lock();
try {
TableQueue queue = getTableQueue(table);
if (queue == null) return true;
- if (queue.isEmpty() && queue.tryExclusiveLock(0)) {
+ if (queue.isEmpty() && queue.tryExclusiveLock(procedure)) {
// remove the table from the run-queue and the map
if (AvlIterableList.isLinked(queue)) {
tableRunQueue.remove(queue);
@@ -1256,11 +1286,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
wakeProcedure(nextProcs[i]);
}
- if (numProcs > 1) {
- schedWaitCond.signalAll();
- } else if (numProcs > 0) {
- schedWaitCond.signal();
- }
+ wakePollIfNeeded(numProcs);
if (!procedure.hasParent()) {
// release the table shared-lock.
@@ -1289,7 +1315,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
if (!tableQueue.trySharedLock()) return false;
NamespaceQueue nsQueue = getNamespaceQueue(nsName);
- boolean hasLock = nsQueue.tryExclusiveLock(procedure.getProcId());
+ boolean hasLock = nsQueue.tryExclusiveLock(procedure);
if (!hasLock) {
tableQueue.releaseSharedLock();
}
@@ -1333,7 +1359,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
schedLock.lock();
try {
ServerQueue queue = getServerQueue(serverName);
- if (queue.tryExclusiveLock(procedure.getProcId())) {
+ if (queue.tryExclusiveLock(procedure)) {
removeFromRunQueue(serverRunQueue, queue);
return true;
}
@@ -1473,10 +1499,13 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
return proc.hasParent() && isLockOwner(proc.getParentProcId());
}
- public synchronized boolean tryExclusiveLock(final long procIdOwner) {
- assert procIdOwner != Long.MIN_VALUE;
- if (isLocked() && !isLockOwner(procIdOwner)) return false;
- exclusiveLockProcIdOwner = procIdOwner;
+ public synchronized boolean hasLockAccess(final Procedure proc) {
+ return isLockOwner(proc.getProcId()) || hasParentLock(proc);
+ }
+
+ public synchronized boolean tryExclusiveLock(final Procedure proc) {
+ if (isLocked()) return hasLockAccess(proc);
+ exclusiveLockProcIdOwner = proc.getProcId();
return true;
}
@@ -1564,6 +1593,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
private Queue<T> currentQueue = null;
private Queue<T> queueHead = null;
private int currentQuantum = 0;
+ private int size = 0;
public FairQueue() {
this(1);
@@ -1573,9 +1603,14 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
this.quantum = quantum;
}
+ public boolean hasRunnables() {
+ return size > 0;
+ }
+
public void add(Queue<T> queue) {
queueHead = AvlIterableList.append(queueHead, queue);
if (currentQueue == null) setNextQueue(queueHead);
+ size++;
}
public void remove(Queue<T> queue) {
@@ -1584,6 +1619,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
if (currentQueue == queue) {
setNextQueue(queueHead != null ? nextQueue : null);
}
+ size--;
}
public Queue<T> poll() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8da0500e/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
index 7feeec5..58eaf3c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
@@ -20,10 +20,7 @@ package org.apache.hadoop.hbase.master.procedure;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -37,8 +34,8 @@ import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -51,7 +48,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-@Category({MasterTests.class, MediumTests.class})
+@Category({MasterTests.class, SmallTests.class})
public class TestMasterProcedureScheduler {
private static final Log LOG = LogFactory.getLog(TestMasterProcedureScheduler.class);
@@ -70,60 +67,6 @@ public class TestMasterProcedureScheduler {
queue.clear();
}
- @Test
- public void testConcurrentCreateDelete() throws Exception {
- final MasterProcedureScheduler procQueue = queue;
- final TableName table = TableName.valueOf("testtb");
- final AtomicBoolean running = new AtomicBoolean(true);
- final AtomicBoolean failure = new AtomicBoolean(false);
- Thread createThread = new Thread() {
- @Override
- public void run() {
- try {
- TestTableProcedure proc = new TestTableProcedure(1, table,
- TableProcedureInterface.TableOperationType.CREATE);
- while (running.get() && !failure.get()) {
- if (procQueue.tryAcquireTableExclusiveLock(proc, table)) {
- procQueue.releaseTableExclusiveLock(proc, table);
- }
- }
- } catch (Throwable e) {
- LOG.error("create failed", e);
- failure.set(true);
- }
- }
- };
-
- Thread deleteThread = new Thread() {
- @Override
- public void run() {
- try {
- TestTableProcedure proc = new TestTableProcedure(2, table,
- TableProcedureInterface.TableOperationType.DELETE);
- while (running.get() && !failure.get()) {
- if (procQueue.tryAcquireTableExclusiveLock(proc, table)) {
- procQueue.releaseTableExclusiveLock(proc, table);
- }
- procQueue.markTableAsDeleted(table);
- }
- } catch (Throwable e) {
- LOG.error("delete failed", e);
- failure.set(true);
- }
- }
- };
-
- createThread.start();
- deleteThread.start();
- for (int i = 0; i < 100 && running.get() && !failure.get(); ++i) {
- Thread.sleep(100);
- }
- running.set(false);
- createThread.join();
- deleteThread.join();
- assertEquals(false, failure.get());
- }
-
/**
* Verify simple create/insert/fetch/delete of the table queue.
*/
@@ -159,9 +102,11 @@ public class TestMasterProcedureScheduler {
assertEquals(0, queue.size());
for (int i = 1; i <= NUM_TABLES; ++i) {
- TableName tableName = TableName.valueOf(String.format("test-%04d", i));
+ final TableName tableName = TableName.valueOf(String.format("test-%04d", i));
+ final TestTableProcedure dummyProc = new TestTableProcedure(100, tableName,
+ TableProcedureInterface.TableOperationType.DELETE);
// complete the table deletion
- assertTrue(queue.markTableAsDeleted(tableName));
+ assertTrue(queue.markTableAsDeleted(tableName, dummyProc));
}
}
@@ -173,11 +118,14 @@ public class TestMasterProcedureScheduler {
public void testCreateDeleteTableOperationsWithWriteLock() throws Exception {
TableName tableName = TableName.valueOf("testtb");
+ final TestTableProcedure dummyProc = new TestTableProcedure(100, tableName,
+ TableProcedureInterface.TableOperationType.DELETE);
+
queue.addBack(new TestTableProcedure(1, tableName,
TableProcedureInterface.TableOperationType.EDIT));
// table can't be deleted because one item is in the queue
- assertFalse(queue.markTableAsDeleted(tableName));
+ assertFalse(queue.markTableAsDeleted(tableName, dummyProc));
// fetch item and take a lock
Procedure proc = queue.poll();
@@ -186,11 +134,11 @@ public class TestMasterProcedureScheduler {
assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName));
// table can't be deleted because we have the lock
assertEquals(0, queue.size());
- assertFalse(queue.markTableAsDeleted(tableName));
+ assertFalse(queue.markTableAsDeleted(tableName, dummyProc));
// release the xlock
queue.releaseTableExclusiveLock(proc, tableName);
// complete the table deletion
- assertTrue(queue.markTableAsDeleted(tableName));
+ assertTrue(queue.markTableAsDeleted(tableName, proc));
}
/**
@@ -202,13 +150,16 @@ public class TestMasterProcedureScheduler {
final TableName tableName = TableName.valueOf("testtb");
final int nitems = 2;
+ final TestTableProcedure dummyProc = new TestTableProcedure(100, tableName,
+ TableProcedureInterface.TableOperationType.DELETE);
+
for (int i = 1; i <= nitems; ++i) {
queue.addBack(new TestTableProcedure(i, tableName,
TableProcedureInterface.TableOperationType.READ));
}
// table can't be deleted because one item is in the queue
- assertFalse(queue.markTableAsDeleted(tableName));
+ assertFalse(queue.markTableAsDeleted(tableName, dummyProc));
Procedure[] procs = new Procedure[nitems];
for (int i = 0; i < nitems; ++i) {
@@ -218,12 +169,12 @@ public class TestMasterProcedureScheduler {
// take the rlock
assertTrue(queue.tryAcquireTableSharedLock(proc, tableName));
// table can't be deleted because we have locks and/or items in the queue
- assertFalse(queue.markTableAsDeleted(tableName));
+ assertFalse(queue.markTableAsDeleted(tableName, dummyProc));
}
for (int i = 0; i < nitems; ++i) {
// table can't be deleted because we have locks
- assertFalse(queue.markTableAsDeleted(tableName));
+ assertFalse(queue.markTableAsDeleted(tableName, dummyProc));
// release the rlock
queue.releaseTableSharedLock(procs[i], tableName);
}
@@ -231,7 +182,7 @@ public class TestMasterProcedureScheduler {
// there are no items and no lock in the queeu
assertEquals(0, queue.size());
// complete the table deletion
- assertTrue(queue.markTableAsDeleted(tableName));
+ assertTrue(queue.markTableAsDeleted(tableName, dummyProc));
}
/**
@@ -299,7 +250,7 @@ public class TestMasterProcedureScheduler {
// remove table queue
assertEquals(0, queue.size());
- assertTrue("queue should be deleted", queue.markTableAsDeleted(tableName));
+ assertTrue("queue should be deleted", queue.markTableAsDeleted(tableName, wrProc));
}
@Test
@@ -355,6 +306,32 @@ public class TestMasterProcedureScheduler {
}
@Test
+ public void testVerifyNamespaceXLock() throws Exception {
+ String nsName = "ns1";
+ TableName tableName = TableName.valueOf(nsName, "testtb");
+ queue.addBack(new TestNamespaceProcedure(1, nsName,
+ TableProcedureInterface.TableOperationType.CREATE));
+ queue.addBack(new TestTableProcedure(2, tableName,
+ TableProcedureInterface.TableOperationType.READ));
+
+ // Fetch the ns item and take the xlock
+ Procedure proc = queue.poll();
+ assertEquals(1, proc.getProcId());
+ assertEquals(true, queue.tryAcquireNamespaceExclusiveLock(proc, nsName));
+
+ // the table operation can't be executed because the ns is locked
+ assertEquals(null, queue.poll(0));
+
+ // release the ns lock
+ queue.releaseNamespaceExclusiveLock(proc, nsName);
+
+ proc = queue.poll();
+ assertEquals(2, proc.getProcId());
+ assertEquals(true, queue.tryAcquireTableExclusiveLock(proc, tableName));
+ queue.releaseTableExclusiveLock(proc, tableName);
+ }
+
+ @Test
public void testSharedZkLock() throws Exception {
final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
final String dir = TEST_UTIL.getDataTestDir("TestSharedZkLock").toString();
@@ -625,154 +602,80 @@ public class TestMasterProcedureScheduler {
assertEquals(null, queue.poll(0));
}
- /**
- * Verify that "write" operations for a single table are serialized,
- * but different tables can be executed in parallel.
- */
- @Test(timeout=90000)
- public void testConcurrentWriteOps() throws Exception {
- final TestTableProcSet procSet = new TestTableProcSet(queue);
+ @Test
+ public void testParentXLockAndChildrenSharedLock() throws Exception {
+ final TableName tableName = TableName.valueOf("testParentXLockAndChildrenSharedLock");
+ final HRegionInfo[] regions = new HRegionInfo[] {
+ new HRegionInfo(tableName, Bytes.toBytes("a"), Bytes.toBytes("b")),
+ new HRegionInfo(tableName, Bytes.toBytes("b"), Bytes.toBytes("c")),
+ new HRegionInfo(tableName, Bytes.toBytes("c"), Bytes.toBytes("d")),
+ };
- final int NUM_ITEMS = 10;
- final int NUM_TABLES = 4;
- final AtomicInteger opsCount = new AtomicInteger(0);
- for (int i = 0; i < NUM_TABLES; ++i) {
- TableName tableName = TableName.valueOf(String.format("testtb-%04d", i));
- for (int j = 1; j < NUM_ITEMS; ++j) {
- procSet.addBack(new TestTableProcedure(i * 100 + j, tableName,
- TableProcedureInterface.TableOperationType.EDIT));
- opsCount.incrementAndGet();
- }
- }
- assertEquals(opsCount.get(), queue.size());
-
- final Thread[] threads = new Thread[NUM_TABLES * 2];
- final HashSet<TableName> concurrentTables = new HashSet<TableName>();
- final ArrayList<String> failures = new ArrayList<String>();
- final AtomicInteger concurrentCount = new AtomicInteger(0);
- for (int i = 0; i < threads.length; ++i) {
- threads[i] = new Thread() {
- @Override
- public void run() {
- while (opsCount.get() > 0) {
- try {
- Procedure proc = procSet.acquire();
- if (proc == null) {
- queue.signalAll();
- if (opsCount.get() > 0) {
- continue;
- }
- break;
- }
-
- TableName tableId = procSet.getTableName(proc);
- synchronized (concurrentTables) {
- assertTrue("unexpected concurrency on " + tableId, concurrentTables.add(tableId));
- }
- assertTrue(opsCount.decrementAndGet() >= 0);
- try {
- long procId = proc.getProcId();
- int concurrent = concurrentCount.incrementAndGet();
- assertTrue("inc-concurrent="+ concurrent +" 1 <= concurrent <= "+ NUM_TABLES,
- concurrent >= 1 && concurrent <= NUM_TABLES);
- LOG.debug("[S] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent);
- Thread.sleep(2000);
- concurrent = concurrentCount.decrementAndGet();
- LOG.debug("[E] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent);
- assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_TABLES);
- } finally {
- synchronized (concurrentTables) {
- assertTrue(concurrentTables.remove(tableId));
- }
- procSet.release(proc);
- }
- } catch (Throwable e) {
- LOG.error("Failed " + e.getMessage(), e);
- synchronized (failures) {
- failures.add(e.getMessage());
- }
- } finally {
- queue.signalAll();
- }
- }
- }
- };
- threads[i].start();
- }
- for (int i = 0; i < threads.length; ++i) {
- threads[i].join();
- }
- assertTrue(failures.toString(), failures.isEmpty());
- assertEquals(0, opsCount.get());
- assertEquals(0, queue.size());
+ queue.addBack(new TestTableProcedure(1, tableName,
+ TableProcedureInterface.TableOperationType.CREATE));
- for (int i = 1; i <= NUM_TABLES; ++i) {
- TableName table = TableName.valueOf(String.format("testtb-%04d", i));
- assertTrue("queue should be deleted, table=" + table, queue.markTableAsDeleted(table));
+ // fetch and acquire first xlock proc
+ Procedure parentProc = queue.poll();
+ assertEquals(1, parentProc.getProcId());
+ assertTrue(queue.tryAcquireTableExclusiveLock(parentProc, tableName));
+
+ // add child procedure
+ for (int i = 0; i < regions.length; ++i) {
+ queue.addFront(new TestRegionProcedure(1, 1 + i, tableName,
+ TableProcedureInterface.TableOperationType.ASSIGN, regions[i]));
}
- }
- public static class TestTableProcSet {
- private final MasterProcedureScheduler queue;
+ // add another xlock procedure (no parent)
+ queue.addBack(new TestTableProcedure(100, tableName,
+ TableProcedureInterface.TableOperationType.EDIT));
- public TestTableProcSet(final MasterProcedureScheduler queue) {
- this.queue = queue;
+ // fetch and execute child
+ for (int i = 0; i < regions.length; ++i) {
+ final int regionIdx = regions.length - i - 1;
+ Procedure childProc = queue.poll();
+ LOG.debug("fetch children " + childProc);
+ assertEquals(1 + regionIdx, childProc.getProcId());
+ assertEquals(false, queue.waitRegion(childProc, regions[regionIdx]));
+ queue.wakeRegion(childProc, regions[regionIdx]);
}
- public void addBack(Procedure proc) {
- queue.addBack(proc);
- }
+ // nothing available, until xlock release
+ assertEquals(null, queue.poll(0));
- public void addFront(Procedure proc) {
- queue.addFront(proc);
- }
+ // release xlock
+ queue.releaseTableExclusiveLock(parentProc, tableName);
- public Procedure acquire() {
- Procedure proc = null;
- boolean avail = false;
- while (!avail) {
- proc = queue.poll();
- if (proc == null) break;
- switch (getTableOperationType(proc)) {
- case CREATE:
- case DELETE:
- case EDIT:
- avail = queue.tryAcquireTableExclusiveLock(proc, getTableName(proc));
- break;
- case READ:
- avail = queue.tryAcquireTableSharedLock(proc, getTableName(proc));
- break;
- default:
- throw new UnsupportedOperationException();
- }
- if (!avail) {
- addFront(proc);
- LOG.debug("yield procId=" + proc);
- }
- }
- return proc;
- }
+ // fetch the other xlock proc
+ Procedure proc = queue.poll();
+ assertEquals(100, proc.getProcId());
+ assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName));
+ queue.releaseTableExclusiveLock(proc, tableName);
+ }
- public void release(Procedure proc) {
- switch (getTableOperationType(proc)) {
- case CREATE:
- case DELETE:
- case EDIT:
- queue.releaseTableExclusiveLock(proc, getTableName(proc));
- break;
- case READ:
- queue.releaseTableSharedLock(proc, getTableName(proc));
- break;
- }
- }
+ @Test
+ public void testParentXLockAndChildrenXLock() throws Exception {
+ final TableName tableName = TableName.valueOf("testParentXLockAndChildrenXLock");
- public TableName getTableName(Procedure proc) {
- return ((TableProcedureInterface)proc).getTableName();
- }
+ queue.addBack(new TestTableProcedure(1, tableName,
+ TableProcedureInterface.TableOperationType.EDIT));
- public TableProcedureInterface.TableOperationType getTableOperationType(Procedure proc) {
- return ((TableProcedureInterface)proc).getTableOperationType();
- }
+ // fetch and acquire first xlock proc
+ Procedure parentProc = queue.poll();
+ assertEquals(1, parentProc.getProcId());
+ assertTrue(queue.tryAcquireTableExclusiveLock(parentProc, tableName));
+
+ // add child procedure
+ queue.addFront(new TestTableProcedure(1, 2, tableName,
+ TableProcedureInterface.TableOperationType.EDIT));
+
+ // fetch the other xlock proc
+ Procedure proc = queue.poll();
+ assertEquals(2, proc.getProcId());
+ assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName));
+ queue.releaseTableExclusiveLock(proc, tableName);
+
+ // release xlock
+ queue.releaseTableExclusiveLock(parentProc, tableName);
}
public static class TestTableProcedure extends TestProcedure
@@ -813,6 +716,19 @@ public class TestMasterProcedureScheduler {
}
}
+ public static class TestTableProcedureWithEvent extends TestTableProcedure {
+ private final ProcedureEvent event;
+
+ public TestTableProcedureWithEvent(long procId, TableName tableName, TableOperationType opType) {
+ super(procId, tableName, opType);
+ event = new ProcedureEvent(tableName + " procId=" + procId);
+ }
+
+ public ProcedureEvent getEvent() {
+ return event;
+ }
+ }
+
public static class TestRegionProcedure extends TestTableProcedure {
private final HRegionInfo[] regionInfo;
@@ -839,7 +755,7 @@ public class TestMasterProcedureScheduler {
public void toStringClassDetails(final StringBuilder sb) {
sb.append(getClass().getSimpleName());
sb.append(" (region=");
- sb.append(getRegionInfo());
+ sb.append(Arrays.toString(getRegionInfo()));
sb.append(")");
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8da0500e/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java
new file mode 100644
index 0000000..380067d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.TableLockManager;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
+import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestTableProcedure;
+import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestTableProcedureWithEvent;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@Category({MasterTests.class, MediumTests.class})
+public class TestMasterProcedureSchedulerConcurrency {
+ private static final Log LOG = LogFactory.getLog(TestMasterProcedureSchedulerConcurrency.class);
+
+ private MasterProcedureScheduler queue;
+ private Configuration conf;
+
+ @Before
+ public void setUp() throws IOException {
+ conf = HBaseConfiguration.create();
+ queue = new MasterProcedureScheduler(conf, new TableLockManager.NullTableLockManager());
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ assertEquals("proc-queue expected to be empty", 0, queue.size());
+ queue.clear();
+ }
+
+ @Test(timeout=60000)
+ public void testConcurrentCreateDelete() throws Exception {
+ final MasterProcedureScheduler procQueue = queue;
+ final TableName table = TableName.valueOf("testtb");
+ final AtomicBoolean running = new AtomicBoolean(true);
+ final AtomicBoolean failure = new AtomicBoolean(false);
+ Thread createThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ TestTableProcedure proc = new TestTableProcedure(1, table,
+ TableProcedureInterface.TableOperationType.CREATE);
+ while (running.get() && !failure.get()) {
+ if (procQueue.tryAcquireTableExclusiveLock(proc, table)) {
+ procQueue.releaseTableExclusiveLock(proc, table);
+ }
+ }
+ } catch (Throwable e) {
+ LOG.error("create failed", e);
+ failure.set(true);
+ }
+ }
+ };
+
+ Thread deleteThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ TestTableProcedure proc = new TestTableProcedure(2, table,
+ TableProcedureInterface.TableOperationType.DELETE);
+ while (running.get() && !failure.get()) {
+ if (procQueue.tryAcquireTableExclusiveLock(proc, table)) {
+ procQueue.releaseTableExclusiveLock(proc, table);
+ }
+ procQueue.markTableAsDeleted(table, proc);
+ }
+ } catch (Throwable e) {
+ LOG.error("delete failed", e);
+ failure.set(true);
+ }
+ }
+ };
+
+ createThread.start();
+ deleteThread.start();
+ for (int i = 0; i < 100 && running.get() && !failure.get(); ++i) {
+ Thread.sleep(100);
+ }
+ running.set(false);
+ createThread.join();
+ deleteThread.join();
+ assertEquals(false, failure.get());
+ }
+
+ /**
+ * Verify that "write" operations for a single table are serialized,
+ * but different tables can be executed in parallel.
+ */
+ @Test(timeout=60000)
+ public void testConcurrentWriteOps() throws Exception {
+ final TestTableProcSet procSet = new TestTableProcSet(queue);
+
+ final int NUM_ITEMS = 10;
+ final int NUM_TABLES = 4;
+ final AtomicInteger opsCount = new AtomicInteger(0);
+ for (int i = 0; i < NUM_TABLES; ++i) {
+ TableName tableName = TableName.valueOf(String.format("testtb-%04d", i));
+ for (int j = 1; j < NUM_ITEMS; ++j) {
+ procSet.addBack(new TestTableProcedure(i * 100 + j, tableName,
+ TableProcedureInterface.TableOperationType.EDIT));
+ opsCount.incrementAndGet();
+ }
+ }
+ assertEquals(opsCount.get(), queue.size());
+
+ final Thread[] threads = new Thread[NUM_TABLES * 2];
+ final HashSet<TableName> concurrentTables = new HashSet<TableName>();
+ final ArrayList<String> failures = new ArrayList<String>();
+ final AtomicInteger concurrentCount = new AtomicInteger(0);
+ for (int i = 0; i < threads.length; ++i) {
+ threads[i] = new Thread() {
+ @Override
+ public void run() {
+ while (opsCount.get() > 0) {
+ try {
+ Procedure proc = procSet.acquire();
+ if (proc == null) {
+ queue.signalAll();
+ if (opsCount.get() > 0) {
+ continue;
+ }
+ break;
+ }
+
+ TableName tableId = procSet.getTableName(proc);
+ synchronized (concurrentTables) {
+ assertTrue("unexpected concurrency on " + tableId, concurrentTables.add(tableId));
+ }
+ assertTrue(opsCount.decrementAndGet() >= 0);
+ try {
+ long procId = proc.getProcId();
+ int concurrent = concurrentCount.incrementAndGet();
+ assertTrue("inc-concurrent="+ concurrent +" 1 <= concurrent <= "+ NUM_TABLES,
+ concurrent >= 1 && concurrent <= NUM_TABLES);
+ LOG.debug("[S] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent);
+ Thread.sleep(2000);
+ concurrent = concurrentCount.decrementAndGet();
+ LOG.debug("[E] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent);
+ assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_TABLES);
+ } finally {
+ synchronized (concurrentTables) {
+ assertTrue(concurrentTables.remove(tableId));
+ }
+ procSet.release(proc);
+ }
+ } catch (Throwable e) {
+ LOG.error("Failed " + e.getMessage(), e);
+ synchronized (failures) {
+ failures.add(e.getMessage());
+ }
+ } finally {
+ queue.signalAll();
+ }
+ }
+ }
+ };
+ threads[i].start();
+ }
+ for (int i = 0; i < threads.length; ++i) {
+ threads[i].join();
+ }
+ assertTrue(failures.toString(), failures.isEmpty());
+ assertEquals(0, opsCount.get());
+ assertEquals(0, queue.size());
+
+ for (int i = 1; i <= NUM_TABLES; ++i) {
+ final TableName table = TableName.valueOf(String.format("testtb-%04d", i));
+ final TestTableProcedure dummyProc = new TestTableProcedure(100, table,
+ TableProcedureInterface.TableOperationType.DELETE);
+ assertTrue("queue should be deleted, table=" + table,
+ queue.markTableAsDeleted(table, dummyProc));
+ }
+ }
+
+ @Test(timeout=60000)
+ public void testConcurrentWaitWake() throws Exception {
+ testConcurrentWaitWake(false);
+ }
+
+ @Test(timeout=60000)
+ public void testConcurrentWaitWakeBatch() throws Exception {
+ testConcurrentWaitWake(true);
+ }
+
+ private void testConcurrentWaitWake(final boolean useWakeBatch) throws Exception {
+ final TableName tableName = TableName.valueOf("testtb");
+
+ final int NPROCS = 20;
+ final int NRUNS = 100;
+
+ for (long i = 0; i < NPROCS; ++i) {
+ queue.addBack(new TestTableProcedureWithEvent(i, tableName,
+ TableProcedureInterface.TableOperationType.READ));
+ }
+
+ final Thread[] threads = new Thread[4];
+ final AtomicInteger waitCount = new AtomicInteger(0);
+ final AtomicInteger wakeCount = new AtomicInteger(0);
+
+ final ConcurrentSkipListSet<TestTableProcedureWithEvent> waitQueue =
+ new ConcurrentSkipListSet<TestTableProcedureWithEvent>();
+ threads[0] = new Thread() {
+ @Override
+ public void run() {
+ while (true) {
+ if (useWakeBatch) {
+ ProcedureEvent[] ev = new ProcedureEvent[waitQueue.size()];
+ for (int i = 0; i < ev.length; ++i) {
+ ev[i] = waitQueue.pollFirst().getEvent();
+ LOG.debug("WAKE " + ev[i] + " total=" + wakeCount.get());
+ }
+ queue.wakeEvents(ev, ev.length);
+ wakeCount.addAndGet(ev.length);
+ } else {
+ int size = waitQueue.size();
+ while (size-- > 0) {
+ ProcedureEvent ev = waitQueue.pollFirst().getEvent();
+ queue.wakeEvent(ev);
+ LOG.debug("WAKE " + ev + " total=" + wakeCount.get());
+ wakeCount.incrementAndGet();
+ }
+ }
+ if (wakeCount.get() >= NRUNS) {
+ break;
+ }
+ Threads.sleepWithoutInterrupt(25);
+ }
+ }
+ };
+
+ for (int i = 1; i < threads.length; ++i) {
+ threads[i] = new Thread() {
+ @Override
+ public void run() {
+ while (true) {
+ TestTableProcedureWithEvent proc = (TestTableProcedureWithEvent)queue.poll();
+ if (proc == null) continue;
+
+ waitQueue.add(proc);
+ queue.suspendEvent(proc.getEvent());
+ queue.waitEvent(proc.getEvent(), proc);
+ LOG.debug("WAIT " + proc.getEvent());
+ if (waitCount.incrementAndGet() >= NRUNS) {
+ break;
+ }
+ }
+ }
+ };
+ }
+
+ for (int i = 0; i < threads.length; ++i) {
+ threads[i].start();
+ }
+ for (int i = 0; i < threads.length; ++i) {
+ threads[i].join();
+ }
+
+ queue.clear();
+ }
+
+ public static class TestTableProcSet {
+ private final MasterProcedureScheduler queue;
+
+ public TestTableProcSet(final MasterProcedureScheduler queue) {
+ this.queue = queue;
+ }
+
+ public void addBack(Procedure proc) {
+ queue.addBack(proc);
+ }
+
+ public void addFront(Procedure proc) {
+ queue.addFront(proc);
+ }
+
+ public Procedure acquire() {
+ Procedure proc = null;
+ boolean avail = false;
+ while (!avail) {
+ proc = queue.poll();
+ if (proc == null) break;
+ switch (getTableOperationType(proc)) {
+ case CREATE:
+ case DELETE:
+ case EDIT:
+ avail = queue.tryAcquireTableExclusiveLock(proc, getTableName(proc));
+ break;
+ case READ:
+ avail = queue.tryAcquireTableSharedLock(proc, getTableName(proc));
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+ if (!avail) {
+ addFront(proc);
+ LOG.debug("yield procId=" + proc);
+ }
+ }
+ return proc;
+ }
+
+ public void release(Procedure proc) {
+ switch (getTableOperationType(proc)) {
+ case CREATE:
+ case DELETE:
+ case EDIT:
+ queue.releaseTableExclusiveLock(proc, getTableName(proc));
+ break;
+ case READ:
+ queue.releaseTableSharedLock(proc, getTableName(proc));
+ break;
+ }
+ }
+
+ public TableName getTableName(Procedure proc) {
+ return ((TableProcedureInterface)proc).getTableName();
+ }
+
+ public TableProcedureInterface.TableOperationType getTableOperationType(Procedure proc) {
+ return ((TableProcedureInterface)proc).getTableOperationType();
+ }
+ }
+}