You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2015/07/17 19:41:42 UTC

[1/3] hbase git commit: HBASE-14106 TestProcedureRecovery is flaky

Repository: hbase
Updated Branches:
  refs/heads/branch-1 440e0bc6e -> 9135ad1be
  refs/heads/branch-1.2 e65fda7b1 -> 229965646
  refs/heads/master b98598f36 -> 7382f8e04


HBASE-14106 TestProcedureRecovery is flaky


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7382f8e0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7382f8e0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7382f8e0

Branch: refs/heads/master
Commit: 7382f8e0459ca5f25965e46ab1ea902ad7463713
Parents: b98598f
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Fri Jul 17 10:20:33 2015 -0700
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Fri Jul 17 10:20:33 2015 -0700

----------------------------------------------------------------------
 .../hbase/procedure2/TestProcedureRecovery.java | 71 ++++++++++++++------
 1 file changed, 49 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7382f8e0/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
index 24a448e..1a4845c 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.CountDownLatch;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -53,7 +54,8 @@ public class TestProcedureRecovery {
   private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
   private static final Procedure NULL_PROC = null;
 
-  private static ProcedureExecutor<Void> procExecutor;
+  private static TestProcEnv procEnv;
+  private static ProcedureExecutor<TestProcEnv> procExecutor;
   private static ProcedureStore procStore;
   private static int procSleepInterval;
 
@@ -70,15 +72,13 @@ public class TestProcedureRecovery {
     assertTrue(testDir.depth() > 1);
 
     logDir = new Path(testDir, "proc-logs");
+    procEnv = new TestProcEnv();
     procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir);
-    procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore);
+    procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
     procExecutor.testing = new ProcedureExecutor.Testing();
     procStore.start(PROCEDURE_EXECUTOR_SLOTS);
     procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
     procSleepInterval = 0;
-
-    ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, false);
-    ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, false);
   }
 
   @After
@@ -94,13 +94,14 @@ public class TestProcedureRecovery {
     dumpLogDirState();
   }
 
-  public static class TestSingleStepProcedure extends SequentialProcedure<Void> {
+  public static class TestSingleStepProcedure extends SequentialProcedure<TestProcEnv> {
     private int step = 0;
 
     public TestSingleStepProcedure() { }
 
     @Override
-    protected Procedure[] execute(Void env) {
+    protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
+      env.waitOnLatch();
       LOG.debug("execute procedure " + this + " step=" + step);
       step++;
       setResult(Bytes.toBytes(step));
@@ -108,18 +109,19 @@ public class TestProcedureRecovery {
     }
 
     @Override
-    protected void rollback(Void env) { }
+    protected void rollback(TestProcEnv env) { }
 
     @Override
-    protected boolean abort(Void env) { return true; }
+    protected boolean abort(TestProcEnv env) { return true; }
   }
 
-  public static class BaseTestStepProcedure extends SequentialProcedure<Void> {
+  public static class BaseTestStepProcedure extends SequentialProcedure<TestProcEnv> {
     private AtomicBoolean abort = new AtomicBoolean(false);
     private int step = 0;
 
     @Override
-    protected Procedure[] execute(Void env) {
+    protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
+      env.waitOnLatch();
       LOG.debug("execute procedure " + this + " step=" + step);
       ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
       step++;
@@ -134,14 +136,14 @@ public class TestProcedureRecovery {
     }
 
     @Override
-    protected void rollback(Void env) {
+    protected void rollback(TestProcEnv env) {
       LOG.debug("rollback procedure " + this + " step=" + step);
       ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
       step++;
     }
 
     @Override
-    protected boolean abort(Void env) {
+    protected boolean abort(TestProcEnv env) {
       abort.set(true);
       return true;
     }
@@ -161,7 +163,7 @@ public class TestProcedureRecovery {
     public TestMultiStepProcedure() { }
 
     @Override
-    public Procedure[] execute(Void env) {
+    public Procedure[] execute(TestProcEnv env) throws InterruptedException {
       super.execute(env);
       return isFailed() ? null : new Procedure[] { new Step1Procedure() };
     }
@@ -170,7 +172,7 @@ public class TestProcedureRecovery {
       public Step1Procedure() { }
 
       @Override
-      protected Procedure[] execute(Void env) {
+      protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
         super.execute(env);
         return isFailed() ? null : new Procedure[] { new Step2Procedure() };
       }
@@ -298,6 +300,8 @@ public class TestProcedureRecovery {
 
     // Restart
     restart();
+    waitProcedure(procId);
+
     Procedure proc2 = new TestSingleStepProcedure();
     // Submit a procedure with the same nonce and expect the same procedure would return.
     long procId2 = ProcedureTestingUtility.submitAndWait(procExecutor, proc2, nonceGroup, nonce);
@@ -314,17 +318,23 @@ public class TestProcedureRecovery {
     Procedure proc = new TestMultiStepProcedure();
     long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc, nonceGroup, nonce);
 
-    // Restart
+    // Restart (use a latch to prevent the step execution until we submitted proc2)
+    CountDownLatch latch = new CountDownLatch(1);
+    procEnv.setWaitLatch(latch);
     restart();
-    Procedure proc2 = new TestMultiStepProcedure();
     // Submit a procedure with the same nonce and expect the same procedure would return.
-    long procId2 = ProcedureTestingUtility.submitAndWait(procExecutor, proc2, nonceGroup, nonce);
+    Procedure proc2 = new TestMultiStepProcedure();
+    long procId2 = procExecutor.submitProcedure(proc2, nonceGroup, nonce);
+    latch.countDown();
+    procEnv.setWaitLatch(null);
+
     // The original proc is not completed and the new submission should have the same proc Id.
     assertTrue(procId == procId2);
   }
 
+
   public static class TestStateMachineProcedure
-      extends StateMachineProcedure<Void, TestStateMachineProcedure.State> {
+      extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> {
     enum State { STATE_1, STATE_2, STATE_3, DONE }
 
     public TestStateMachineProcedure() {}
@@ -333,7 +343,7 @@ public class TestProcedureRecovery {
     private int iResult = 0;
 
     @Override
-    protected StateMachineProcedure.Flow executeFromState(Void env, State state) {
+    protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state) {
       switch (state) {
         case STATE_1:
           LOG.info("execute step 1 " + this);
@@ -364,7 +374,7 @@ public class TestProcedureRecovery {
     }
 
     @Override
-    protected void rollbackState(Void env, final State state) {
+    protected void rollbackState(TestProcEnv env, final State state) {
       switch (state) {
         case STATE_1:
           LOG.info("rollback step 1 " + this);
@@ -396,7 +406,7 @@ public class TestProcedureRecovery {
     }
 
     @Override
-    protected boolean abort(Void env) {
+    protected boolean abort(TestProcEnv env) {
       aborted.set(true);
       return true;
     }
@@ -522,4 +532,21 @@ public class TestProcedureRecovery {
       LOG.warn("Unable to dump " + logDir, e);
     }
   }
+
+  private static class TestProcEnv {
+    private CountDownLatch latch = null;
+
+    /**
+     * set/unset a latch. every procedure execute() step will wait on the latch if any.
+     */
+    public void setWaitLatch(CountDownLatch latch) {
+      this.latch = latch;
+    }
+
+    public void waitOnLatch() throws InterruptedException {
+      if (latch != null) {
+        latch.await();
+      }
+    }
+  }
 }
\ No newline at end of file


[2/3] hbase git commit: HBASE-14106 TestProcedureRecovery is flaky

Posted by mb...@apache.org.
HBASE-14106 TestProcedureRecovery is flaky


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9135ad1b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9135ad1b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9135ad1b

Branch: refs/heads/branch-1
Commit: 9135ad1bedd776e8b2b50c79e18ceadc30c08fcd
Parents: 440e0bc
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Fri Jul 17 10:20:33 2015 -0700
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Fri Jul 17 10:37:06 2015 -0700

----------------------------------------------------------------------
 .../hbase/procedure2/TestProcedureRecovery.java | 71 ++++++++++++++------
 1 file changed, 49 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/9135ad1b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
index 145f7f3..74e4675 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.CountDownLatch;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -52,7 +53,8 @@ public class TestProcedureRecovery {
   private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
   private static final Procedure NULL_PROC = null;
 
-  private static ProcedureExecutor<Void> procExecutor;
+  private static TestProcEnv procEnv;
+  private static ProcedureExecutor<TestProcEnv> procExecutor;
   private static ProcedureStore procStore;
   private static int procSleepInterval;
 
@@ -69,15 +71,13 @@ public class TestProcedureRecovery {
     assertTrue(testDir.depth() > 1);
 
     logDir = new Path(testDir, "proc-logs");
+    procEnv = new TestProcEnv();
     procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir);
-    procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore);
+    procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
     procExecutor.testing = new ProcedureExecutor.Testing();
     procStore.start(PROCEDURE_EXECUTOR_SLOTS);
     procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
     procSleepInterval = 0;
-
-    ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, false);
-    ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, false);
   }
 
   @After
@@ -93,13 +93,14 @@ public class TestProcedureRecovery {
     dumpLogDirState();
   }
 
-  public static class TestSingleStepProcedure extends SequentialProcedure<Void> {
+  public static class TestSingleStepProcedure extends SequentialProcedure<TestProcEnv> {
     private int step = 0;
 
     public TestSingleStepProcedure() { }
 
     @Override
-    protected Procedure[] execute(Void env) {
+    protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
+      env.waitOnLatch();
       LOG.debug("execute procedure " + this + " step=" + step);
       step++;
       setResult(Bytes.toBytes(step));
@@ -107,18 +108,19 @@ public class TestProcedureRecovery {
     }
 
     @Override
-    protected void rollback(Void env) { }
+    protected void rollback(TestProcEnv env) { }
 
     @Override
-    protected boolean abort(Void env) { return true; }
+    protected boolean abort(TestProcEnv env) { return true; }
   }
 
-  public static class BaseTestStepProcedure extends SequentialProcedure<Void> {
+  public static class BaseTestStepProcedure extends SequentialProcedure<TestProcEnv> {
     private AtomicBoolean abort = new AtomicBoolean(false);
     private int step = 0;
 
     @Override
-    protected Procedure[] execute(Void env) {
+    protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
+      env.waitOnLatch();
       LOG.debug("execute procedure " + this + " step=" + step);
       ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
       step++;
@@ -133,14 +135,14 @@ public class TestProcedureRecovery {
     }
 
     @Override
-    protected void rollback(Void env) {
+    protected void rollback(TestProcEnv env) {
       LOG.debug("rollback procedure " + this + " step=" + step);
       ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
       step++;
     }
 
     @Override
-    protected boolean abort(Void env) {
+    protected boolean abort(TestProcEnv env) {
       abort.set(true);
       return true;
     }
@@ -160,7 +162,7 @@ public class TestProcedureRecovery {
     public TestMultiStepProcedure() { }
 
     @Override
-    public Procedure[] execute(Void env) {
+    public Procedure[] execute(TestProcEnv env) throws InterruptedException {
       super.execute(env);
       return isFailed() ? null : new Procedure[] { new Step1Procedure() };
     }
@@ -169,7 +171,7 @@ public class TestProcedureRecovery {
       public Step1Procedure() { }
 
       @Override
-      protected Procedure[] execute(Void env) {
+      protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
         super.execute(env);
         return isFailed() ? null : new Procedure[] { new Step2Procedure() };
       }
@@ -297,6 +299,8 @@ public class TestProcedureRecovery {
 
     // Restart
     restart();
+    waitProcedure(procId);
+
     Procedure proc2 = new TestSingleStepProcedure();
     // Submit a procedure with the same nonce and expect the same procedure would return.
     long procId2 = ProcedureTestingUtility.submitAndWait(procExecutor, proc2, nonceGroup, nonce);
@@ -313,17 +317,23 @@ public class TestProcedureRecovery {
     Procedure proc = new TestMultiStepProcedure();
     long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc, nonceGroup, nonce);
 
-    // Restart
+    // Restart (use a latch to prevent the step execution until we submitted proc2)
+    CountDownLatch latch = new CountDownLatch(1);
+    procEnv.setWaitLatch(latch);
     restart();
-    Procedure proc2 = new TestMultiStepProcedure();
     // Submit a procedure with the same nonce and expect the same procedure would return.
-    long procId2 = ProcedureTestingUtility.submitAndWait(procExecutor, proc2, nonceGroup, nonce);
+    Procedure proc2 = new TestMultiStepProcedure();
+    long procId2 = procExecutor.submitProcedure(proc2, nonceGroup, nonce);
+    latch.countDown();
+    procEnv.setWaitLatch(null);
+
     // The original proc is not completed and the new submission should have the same proc Id.
     assertTrue(procId == procId2);
   }
 
+
   public static class TestStateMachineProcedure
-      extends StateMachineProcedure<Void, TestStateMachineProcedure.State> {
+      extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> {
     enum State { STATE_1, STATE_2, STATE_3, DONE }
 
     public TestStateMachineProcedure() {}
@@ -332,7 +342,7 @@ public class TestProcedureRecovery {
     private int iResult = 0;
 
     @Override
-    protected StateMachineProcedure.Flow executeFromState(Void env, State state) {
+    protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state) {
       switch (state) {
         case STATE_1:
           LOG.info("execute step 1 " + this);
@@ -363,7 +373,7 @@ public class TestProcedureRecovery {
     }
 
     @Override
-    protected void rollbackState(Void env, final State state) {
+    protected void rollbackState(TestProcEnv env, final State state) {
       switch (state) {
         case STATE_1:
           LOG.info("rollback step 1 " + this);
@@ -395,7 +405,7 @@ public class TestProcedureRecovery {
     }
 
     @Override
-    protected boolean abort(Void env) {
+    protected boolean abort(TestProcEnv env) {
       aborted.set(true);
       return true;
     }
@@ -521,4 +531,21 @@ public class TestProcedureRecovery {
       LOG.warn("Unable to dump " + logDir, e);
     }
   }
+
+  private static class TestProcEnv {
+    private CountDownLatch latch = null;
+
+    /**
+     * set/unset a latch. every procedure execute() step will wait on the latch if any.
+     */
+    public void setWaitLatch(CountDownLatch latch) {
+      this.latch = latch;
+    }
+
+    public void waitOnLatch() throws InterruptedException {
+      if (latch != null) {
+        latch.await();
+      }
+    }
+  }
 }


[3/3] hbase git commit: HBASE-14106 TestProcedureRecovery is flaky

Posted by mb...@apache.org.
HBASE-14106 TestProcedureRecovery is flaky


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/22996564
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/22996564
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/22996564

Branch: refs/heads/branch-1.2
Commit: 229965646ba2510c6b4c175fd8d4e83e4da1b204
Parents: e65fda7
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Fri Jul 17 10:20:33 2015 -0700
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Fri Jul 17 10:38:11 2015 -0700

----------------------------------------------------------------------
 .../hbase/procedure2/TestProcedureRecovery.java | 71 ++++++++++++++------
 1 file changed, 49 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/22996564/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
index 145f7f3..74e4675 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.CountDownLatch;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -52,7 +53,8 @@ public class TestProcedureRecovery {
   private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
   private static final Procedure NULL_PROC = null;
 
-  private static ProcedureExecutor<Void> procExecutor;
+  private static TestProcEnv procEnv;
+  private static ProcedureExecutor<TestProcEnv> procExecutor;
   private static ProcedureStore procStore;
   private static int procSleepInterval;
 
@@ -69,15 +71,13 @@ public class TestProcedureRecovery {
     assertTrue(testDir.depth() > 1);
 
     logDir = new Path(testDir, "proc-logs");
+    procEnv = new TestProcEnv();
     procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir);
-    procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore);
+    procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
     procExecutor.testing = new ProcedureExecutor.Testing();
     procStore.start(PROCEDURE_EXECUTOR_SLOTS);
     procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
     procSleepInterval = 0;
-
-    ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, false);
-    ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, false);
   }
 
   @After
@@ -93,13 +93,14 @@ public class TestProcedureRecovery {
     dumpLogDirState();
   }
 
-  public static class TestSingleStepProcedure extends SequentialProcedure<Void> {
+  public static class TestSingleStepProcedure extends SequentialProcedure<TestProcEnv> {
     private int step = 0;
 
     public TestSingleStepProcedure() { }
 
     @Override
-    protected Procedure[] execute(Void env) {
+    protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
+      env.waitOnLatch();
       LOG.debug("execute procedure " + this + " step=" + step);
       step++;
       setResult(Bytes.toBytes(step));
@@ -107,18 +108,19 @@ public class TestProcedureRecovery {
     }
 
     @Override
-    protected void rollback(Void env) { }
+    protected void rollback(TestProcEnv env) { }
 
     @Override
-    protected boolean abort(Void env) { return true; }
+    protected boolean abort(TestProcEnv env) { return true; }
   }
 
-  public static class BaseTestStepProcedure extends SequentialProcedure<Void> {
+  public static class BaseTestStepProcedure extends SequentialProcedure<TestProcEnv> {
     private AtomicBoolean abort = new AtomicBoolean(false);
     private int step = 0;
 
     @Override
-    protected Procedure[] execute(Void env) {
+    protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
+      env.waitOnLatch();
       LOG.debug("execute procedure " + this + " step=" + step);
       ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
       step++;
@@ -133,14 +135,14 @@ public class TestProcedureRecovery {
     }
 
     @Override
-    protected void rollback(Void env) {
+    protected void rollback(TestProcEnv env) {
       LOG.debug("rollback procedure " + this + " step=" + step);
       ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
       step++;
     }
 
     @Override
-    protected boolean abort(Void env) {
+    protected boolean abort(TestProcEnv env) {
       abort.set(true);
       return true;
     }
@@ -160,7 +162,7 @@ public class TestProcedureRecovery {
     public TestMultiStepProcedure() { }
 
     @Override
-    public Procedure[] execute(Void env) {
+    public Procedure[] execute(TestProcEnv env) throws InterruptedException {
       super.execute(env);
       return isFailed() ? null : new Procedure[] { new Step1Procedure() };
     }
@@ -169,7 +171,7 @@ public class TestProcedureRecovery {
       public Step1Procedure() { }
 
       @Override
-      protected Procedure[] execute(Void env) {
+      protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
         super.execute(env);
         return isFailed() ? null : new Procedure[] { new Step2Procedure() };
       }
@@ -297,6 +299,8 @@ public class TestProcedureRecovery {
 
     // Restart
     restart();
+    waitProcedure(procId);
+
     Procedure proc2 = new TestSingleStepProcedure();
     // Submit a procedure with the same nonce and expect the same procedure would return.
     long procId2 = ProcedureTestingUtility.submitAndWait(procExecutor, proc2, nonceGroup, nonce);
@@ -313,17 +317,23 @@ public class TestProcedureRecovery {
     Procedure proc = new TestMultiStepProcedure();
     long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc, nonceGroup, nonce);
 
-    // Restart
+    // Restart (use a latch to prevent the step execution until we submitted proc2)
+    CountDownLatch latch = new CountDownLatch(1);
+    procEnv.setWaitLatch(latch);
     restart();
-    Procedure proc2 = new TestMultiStepProcedure();
     // Submit a procedure with the same nonce and expect the same procedure would return.
-    long procId2 = ProcedureTestingUtility.submitAndWait(procExecutor, proc2, nonceGroup, nonce);
+    Procedure proc2 = new TestMultiStepProcedure();
+    long procId2 = procExecutor.submitProcedure(proc2, nonceGroup, nonce);
+    latch.countDown();
+    procEnv.setWaitLatch(null);
+
     // The original proc is not completed and the new submission should have the same proc Id.
     assertTrue(procId == procId2);
   }
 
+
   public static class TestStateMachineProcedure
-      extends StateMachineProcedure<Void, TestStateMachineProcedure.State> {
+      extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> {
     enum State { STATE_1, STATE_2, STATE_3, DONE }
 
     public TestStateMachineProcedure() {}
@@ -332,7 +342,7 @@ public class TestProcedureRecovery {
     private int iResult = 0;
 
     @Override
-    protected StateMachineProcedure.Flow executeFromState(Void env, State state) {
+    protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state) {
       switch (state) {
         case STATE_1:
           LOG.info("execute step 1 " + this);
@@ -363,7 +373,7 @@ public class TestProcedureRecovery {
     }
 
     @Override
-    protected void rollbackState(Void env, final State state) {
+    protected void rollbackState(TestProcEnv env, final State state) {
       switch (state) {
         case STATE_1:
           LOG.info("rollback step 1 " + this);
@@ -395,7 +405,7 @@ public class TestProcedureRecovery {
     }
 
     @Override
-    protected boolean abort(Void env) {
+    protected boolean abort(TestProcEnv env) {
       aborted.set(true);
       return true;
     }
@@ -521,4 +531,21 @@ public class TestProcedureRecovery {
       LOG.warn("Unable to dump " + logDir, e);
     }
   }
+
+  private static class TestProcEnv {
+    private CountDownLatch latch = null;
+
+    /**
+     * set/unset a latch. every procedure execute() step will wait on the latch if any.
+     */
+    public void setWaitLatch(CountDownLatch latch) {
+      this.latch = latch;
+    }
+
+    public void waitOnLatch() throws InterruptedException {
+      if (latch != null) {
+        latch.await();
+      }
+    }
+  }
 }