You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2015/07/17 19:41:43 UTC
[2/3] hbase git commit: HBASE-14106 TestProcedureRecovery is flaky
HBASE-14106 TestProcedureRecovery is flaky
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9135ad1b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9135ad1b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9135ad1b
Branch: refs/heads/branch-1
Commit: 9135ad1bedd776e8b2b50c79e18ceadc30c08fcd
Parents: 440e0bc
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Fri Jul 17 10:20:33 2015 -0700
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Fri Jul 17 10:37:06 2015 -0700
----------------------------------------------------------------------
.../hbase/procedure2/TestProcedureRecovery.java | 71 ++++++++++++++------
1 file changed, 49 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/9135ad1b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
index 145f7f3..74e4675 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -52,7 +53,8 @@ public class TestProcedureRecovery {
private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
private static final Procedure NULL_PROC = null;
- private static ProcedureExecutor<Void> procExecutor;
+ private static TestProcEnv procEnv;
+ private static ProcedureExecutor<TestProcEnv> procExecutor;
private static ProcedureStore procStore;
private static int procSleepInterval;
@@ -69,15 +71,13 @@ public class TestProcedureRecovery {
assertTrue(testDir.depth() > 1);
logDir = new Path(testDir, "proc-logs");
+ procEnv = new TestProcEnv();
procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir);
- procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore);
+ procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
procExecutor.testing = new ProcedureExecutor.Testing();
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
procSleepInterval = 0;
-
- ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, false);
- ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, false);
}
@After
@@ -93,13 +93,14 @@ public class TestProcedureRecovery {
dumpLogDirState();
}
- public static class TestSingleStepProcedure extends SequentialProcedure<Void> {
+ public static class TestSingleStepProcedure extends SequentialProcedure<TestProcEnv> {
private int step = 0;
public TestSingleStepProcedure() { }
@Override
- protected Procedure[] execute(Void env) {
+ protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
+ env.waitOnLatch();
LOG.debug("execute procedure " + this + " step=" + step);
step++;
setResult(Bytes.toBytes(step));
@@ -107,18 +108,19 @@ public class TestProcedureRecovery {
}
@Override
- protected void rollback(Void env) { }
+ protected void rollback(TestProcEnv env) { }
@Override
- protected boolean abort(Void env) { return true; }
+ protected boolean abort(TestProcEnv env) { return true; }
}
- public static class BaseTestStepProcedure extends SequentialProcedure<Void> {
+ public static class BaseTestStepProcedure extends SequentialProcedure<TestProcEnv> {
private AtomicBoolean abort = new AtomicBoolean(false);
private int step = 0;
@Override
- protected Procedure[] execute(Void env) {
+ protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
+ env.waitOnLatch();
LOG.debug("execute procedure " + this + " step=" + step);
ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
step++;
@@ -133,14 +135,14 @@ public class TestProcedureRecovery {
}
@Override
- protected void rollback(Void env) {
+ protected void rollback(TestProcEnv env) {
LOG.debug("rollback procedure " + this + " step=" + step);
ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
step++;
}
@Override
- protected boolean abort(Void env) {
+ protected boolean abort(TestProcEnv env) {
abort.set(true);
return true;
}
@@ -160,7 +162,7 @@ public class TestProcedureRecovery {
public TestMultiStepProcedure() { }
@Override
- public Procedure[] execute(Void env) {
+ public Procedure[] execute(TestProcEnv env) throws InterruptedException {
super.execute(env);
return isFailed() ? null : new Procedure[] { new Step1Procedure() };
}
@@ -169,7 +171,7 @@ public class TestProcedureRecovery {
public Step1Procedure() { }
@Override
- protected Procedure[] execute(Void env) {
+ protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
super.execute(env);
return isFailed() ? null : new Procedure[] { new Step2Procedure() };
}
@@ -297,6 +299,8 @@ public class TestProcedureRecovery {
// Restart
restart();
+ waitProcedure(procId);
+
Procedure proc2 = new TestSingleStepProcedure();
// Submit a procedure with the same nonce and expect the same procedure would return.
long procId2 = ProcedureTestingUtility.submitAndWait(procExecutor, proc2, nonceGroup, nonce);
@@ -313,17 +317,23 @@ public class TestProcedureRecovery {
Procedure proc = new TestMultiStepProcedure();
long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc, nonceGroup, nonce);
- // Restart
+ // Restart (use a latch to prevent the step execution until we submitted proc2)
+ CountDownLatch latch = new CountDownLatch(1);
+ procEnv.setWaitLatch(latch);
restart();
- Procedure proc2 = new TestMultiStepProcedure();
// Submit a procedure with the same nonce and expect the same procedure would return.
- long procId2 = ProcedureTestingUtility.submitAndWait(procExecutor, proc2, nonceGroup, nonce);
+ Procedure proc2 = new TestMultiStepProcedure();
+ long procId2 = procExecutor.submitProcedure(proc2, nonceGroup, nonce);
+ latch.countDown();
+ procEnv.setWaitLatch(null);
+
// The original proc is not completed and the new submission should have the same proc Id.
assertTrue(procId == procId2);
}
+
public static class TestStateMachineProcedure
- extends StateMachineProcedure<Void, TestStateMachineProcedure.State> {
+ extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> {
enum State { STATE_1, STATE_2, STATE_3, DONE }
public TestStateMachineProcedure() {}
@@ -332,7 +342,7 @@ public class TestProcedureRecovery {
private int iResult = 0;
@Override
- protected StateMachineProcedure.Flow executeFromState(Void env, State state) {
+ protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state) {
switch (state) {
case STATE_1:
LOG.info("execute step 1 " + this);
@@ -363,7 +373,7 @@ public class TestProcedureRecovery {
}
@Override
- protected void rollbackState(Void env, final State state) {
+ protected void rollbackState(TestProcEnv env, final State state) {
switch (state) {
case STATE_1:
LOG.info("rollback step 1 " + this);
@@ -395,7 +405,7 @@ public class TestProcedureRecovery {
}
@Override
- protected boolean abort(Void env) {
+ protected boolean abort(TestProcEnv env) {
aborted.set(true);
return true;
}
@@ -521,4 +531,21 @@ public class TestProcedureRecovery {
LOG.warn("Unable to dump " + logDir, e);
}
}
+
+ private static class TestProcEnv {
+ private CountDownLatch latch = null;
+
+ /**
+ * set/unset a latch. every procedure execute() step will wait on the latch if any.
+ */
+ public void setWaitLatch(CountDownLatch latch) {
+ this.latch = latch;
+ }
+
+ public void waitOnLatch() throws InterruptedException {
+ if (latch != null) {
+ latch.await();
+ }
+ }
+ }
}