You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2016/10/20 14:46:06 UTC

hbase git commit: HBASE-16871 Procedure v2 - add waiting procs back to the queue after restart

Repository: hbase
Updated Branches:
  refs/heads/master 59857a41e -> 553373671


HBASE-16871 Procedure v2 - add waiting procs back to the queue after restart


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/55337367
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/55337367
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/55337367

Branch: refs/heads/master
Commit: 553373671b736540a6f53de4f57fc5e0ca7373a7
Parents: 59857a4
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Thu Oct 20 07:17:10 2016 -0700
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Thu Oct 20 07:17:10 2016 -0700

----------------------------------------------------------------------
 .../hbase/procedure2/ProcedureExecutor.java     | 19 ++++++-
 .../procedure2/ProcedureTestingUtility.java     | 22 +++++---
 .../hbase/procedure2/TestProcedureEvents.java   | 59 ++++++++++++++++++--
 3 files changed, 85 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/55337367/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index f167f4a..2e2dbdf 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -82,6 +82,7 @@ public class ProcedureExecutor<TEnvironment> {
 
   Testing testing = null;
   public static class Testing {
+    protected boolean killIfSuspended = false;
     protected boolean killBeforeStoreUpdate = false;
     protected boolean toggleKillBeforeStoreUpdate = false;
 
@@ -93,6 +94,10 @@ public class ProcedureExecutor<TEnvironment> {
       }
       return kill;
     }
+
+    protected boolean shouldKillBeforeStoreUpdate(final boolean isSuspended) {
+      return (isSuspended && !killIfSuspended) ? false : shouldKillBeforeStoreUpdate();
+    }
   }
 
   public interface ProcedureExecutorListener {
@@ -343,7 +348,7 @@ public class ProcedureExecutor<TEnvironment> {
     }
 
     // 2. Initialize the stacks
-    ArrayList<Procedure> runnableList = new ArrayList(runnablesCount);
+    final ArrayList<Procedure> runnableList = new ArrayList(runnablesCount);
     HashSet<Procedure> waitingSet = null;
     procIter.reset();
     while (procIter.hasNext()) {
@@ -432,7 +437,15 @@ public class ProcedureExecutor<TEnvironment> {
       throw new IOException("found " + corruptedCount + " procedures on replay");
     }
 
-    // 4. Push the scheduler
+    // 4. Push the procedures to the timeout executor
+    if (waitingSet != null && !waitingSet.isEmpty()) {
+      for (Procedure proc: waitingSet) {
+        proc.afterReplay(getEnvironment());
+        timeoutExecutor.add(proc);
+      }
+    }
+
+    // 5. Push the procedure to the scheduler
     if (!runnableList.isEmpty()) {
       // TODO: See ProcedureWALFormatReader#hasFastStartSupport
       // some procedure may be started way before this stuff.
@@ -1192,7 +1205,7 @@ public class ProcedureExecutor<TEnvironment> {
 
       // allows to kill the executor before something is stored to the wal.
       // useful to test the procedure recovery.
-      if (testing != null && !isSuspended && testing.shouldKillBeforeStoreUpdate()) {
+      if (testing != null && testing.shouldKillBeforeStoreUpdate(isSuspended)) {
         LOG.debug("TESTING: Kill before store update: " + procedure);
         stop();
         return;

http://git-wip-us.apache.org/repos/asf/hbase/blob/55337367/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
index 968ccbf..25d6e2e 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
@@ -108,11 +108,21 @@ public class ProcedureTestingUtility {
     return loader;
   }
 
-  public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
-      boolean value) {
+  private static <TEnv> void createExecutorTesting(final ProcedureExecutor<TEnv> procExecutor) {
     if (procExecutor.testing == null) {
       procExecutor.testing = new ProcedureExecutor.Testing();
     }
+  }
+
+  public static <TEnv> void setKillIfSuspended(ProcedureExecutor<TEnv> procExecutor,
+      boolean value) {
+    createExecutorTesting(procExecutor);
+    procExecutor.testing.killIfSuspended = value;
+  }
+
+  public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
+      boolean value) {
+    createExecutorTesting(procExecutor);
     procExecutor.testing.killBeforeStoreUpdate = value;
     LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate);
     assertSingleExecutorForKillTests(procExecutor);
@@ -120,17 +130,13 @@ public class ProcedureTestingUtility {
 
   public static <TEnv> void setToggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
       boolean value) {
-    if (procExecutor.testing == null) {
-      procExecutor.testing = new ProcedureExecutor.Testing();
-    }
+    createExecutorTesting(procExecutor);
     procExecutor.testing.toggleKillBeforeStoreUpdate = value;
     assertSingleExecutorForKillTests(procExecutor);
   }
 
   public static <TEnv> void toggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor) {
-    if (procExecutor.testing == null) {
-      procExecutor.testing = new ProcedureExecutor.Testing();
-    }
+    createExecutorTesting(procExecutor);
     procExecutor.testing.killBeforeStoreUpdate = !procExecutor.testing.killBeforeStoreUpdate;
     LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate);
     assertSingleExecutorForKillTests(procExecutor);

http://git-wip-us.apache.org/repos/asf/hbase/blob/55337367/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java
index c431646..b81e0f9 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java
@@ -19,14 +19,19 @@
 package org.apache.hadoop.hbase.procedure2;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.io.util.StreamUtils;
 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
-import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
@@ -45,17 +50,22 @@ public class TestProcedureEvents {
   private static final Log LOG = LogFactory.getLog(TestProcedureEvents.class);
 
   private TestProcEnv procEnv;
-  private NoopProcedureStore procStore;
+  private ProcedureStore procStore;
   private ProcedureExecutor<TestProcEnv> procExecutor;
 
   private HBaseCommonTestingUtility htu;
+  private FileSystem fs;
+  private Path logDir;
 
   @Before
   public void setUp() throws IOException {
     htu = new HBaseCommonTestingUtility();
+    Path testDir = htu.getDataTestDir();
+    fs = testDir.getFileSystem(htu.getConfiguration());
+    logDir = new Path(testDir, "proc-logs");
 
     procEnv = new TestProcEnv();
-    procStore = new NoopProcedureStore();
+    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
     procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
     procStore.start(1);
     procExecutor.start(1, true);
@@ -66,13 +76,14 @@ public class TestProcedureEvents {
     procExecutor.stop();
     procStore.stop(false);
     procExecutor.join();
+    fs.delete(logDir, true);
   }
 
   @Test(timeout=30000)
   public void testTimeoutEventProcedure() throws Exception {
     final int NTIMEOUTS = 5;
 
-    TestTimeoutEventProcedure proc = new TestTimeoutEventProcedure(1000, NTIMEOUTS);
+    TestTimeoutEventProcedure proc = new TestTimeoutEventProcedure(500, NTIMEOUTS);
     procExecutor.submitProcedure(proc);
 
     ProcedureTestingUtility.waitProcedure(procExecutor, proc.getProcId());
@@ -80,6 +91,26 @@ public class TestProcedureEvents {
     assertEquals(NTIMEOUTS + 1, proc.getTimeoutsCount());
   }
 
+  @Test(timeout=30000)
+  public void testTimeoutEventProcedureDoubleExecution() throws Exception {
+    testTimeoutEventProcedureDoubleExecution(false);
+  }
+
+  @Test(timeout=30000)
+  public void testTimeoutEventProcedureDoubleExecutionKillIfSuspended() throws Exception {
+    testTimeoutEventProcedureDoubleExecution(true);
+  }
+
+  private void testTimeoutEventProcedureDoubleExecution(final boolean killIfSuspended)
+      throws Exception {
+    TestTimeoutEventProcedure proc = new TestTimeoutEventProcedure(1000, 3);
+    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExecutor, true);
+    ProcedureTestingUtility.setKillIfSuspended(procExecutor, killIfSuspended);
+    long procId = procExecutor.submitProcedure(proc);
+    ProcedureTestingUtility.testRecoveryAndDoubleExecution(procExecutor, procId, true);
+    ProcedureTestingUtility.assertIsAbortException(procExecutor.getResult(proc.getProcId()));
+  }
+
   public static class TestTimeoutEventProcedure extends NoopProcedure<TestProcEnv> {
     private final ProcedureEvent event = new ProcedureEvent("timeout-event");
 
@@ -122,6 +153,26 @@ public class TestProcedureEvents {
       env.getProcedureScheduler().wakeEvent(event);
       return false;
     }
+
+    @Override
+    protected void afterReplay(final TestProcEnv env) {
+      if (getState() == ProcedureState.WAITING_TIMEOUT) {
+        env.getProcedureScheduler().suspendEvent(event);
+        env.getProcedureScheduler().waitEvent(event, this);
+      }
+    }
+
+    @Override
+    protected void serializeStateData(final OutputStream stream) throws IOException {
+      StreamUtils.writeRawVInt32(stream, ntimeouts.get());
+      StreamUtils.writeRawVInt32(stream, maxTimeouts);
+    }
+
+    @Override
+    protected void deserializeStateData(final InputStream stream) throws IOException {
+      ntimeouts.set(StreamUtils.readRawVarint32(stream));
+      maxTimeouts = StreamUtils.readRawVarint32(stream);
+    }
   }
 
   private class TestProcEnv {