You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2016/03/09 04:51:09 UTC

[8/8] hbase git commit: HBASE-15422 Procedure v2 - Avoid double yield

HBASE-15422 Procedure v2 - Avoid double yield


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/648cc5e8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/648cc5e8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/648cc5e8

Branch: refs/heads/master
Commit: 648cc5e8233f548b737a6d38683b66148fcf6e7f
Parents: 7e4e8dc
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Tue Mar 8 11:07:33 2016 -0800
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Tue Mar 8 19:47:15 2016 -0800

----------------------------------------------------------------------
 .../hbase/procedure2/ProcedureExecutor.java     |  10 +-
 .../hbase/procedure2/TestYieldProcedures.java   | 154 ++++++++++++++++---
 2 files changed, 132 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/648cc5e8/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 95db256..41497af 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -876,12 +876,6 @@ public class ProcedureExecutor<TEnvironment> {
         }
         break;
       }
-
-      // if the procedure is kind enough to pass the slot to someone else, yield
-      if (proc.isYieldAfterExecutionStep(getEnvironment())) {
-        runnables.yield(proc);
-        break;
-      }
     } while (procStack.isFailed());
   }
 
@@ -1159,7 +1153,9 @@ public class ProcedureExecutor<TEnvironment> {
       }
 
       // if the procedure is kind enough to pass the slot to someone else, yield
-      if (reExecute && procedure.isYieldAfterExecutionStep(getEnvironment())) {
+      if (procedure.getState() == ProcedureState.RUNNABLE &&
+          procedure.isYieldAfterExecutionStep(getEnvironment())) {
+        runnables.yield(procedure);
         return;
       }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/648cc5e8/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java
index c4e80e2..6e66f76 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.hbase.procedure2;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -48,6 +50,7 @@ public class TestYieldProcedures {
   private static final Procedure NULL_PROC = null;
 
   private ProcedureExecutor<TestProcEnv> procExecutor;
+  private TestRunQueue procRunnables;
   private ProcedureStore procStore;
 
   private HBaseCommonTestingUtility htu;
@@ -64,7 +67,9 @@ public class TestYieldProcedures {
 
     logDir = new Path(testDir, "proc-logs");
     procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
-    procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(), procStore);
+    procRunnables = new TestRunQueue();
+    procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(),
+        procStore, procRunnables);
     procStore.start(PROCEDURE_EXECUTOR_SLOTS);
     procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
   }
@@ -87,34 +92,32 @@ public class TestYieldProcedures {
     }
     ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
 
-    // verify yield during execute()
-    long prevTimestamp = 0;
-    for (int execStep = 0; execStep < NUM_STATES; ++execStep) {
-      for (int i = 0; i < procs.length; ++i) {
-        assertEquals(NUM_STATES * 2, procs[i].getExecutionInfo().size());
-        TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(execStep);
-        LOG.info("i=" + i + " execStep=" + execStep + " timestamp=" + info.getTimestamp());
+    for (int i = 0; i < procs.length; ++i) {
+      assertEquals(NUM_STATES * 2, procs[i].getExecutionInfo().size());
+
+      // verify execution
+      int index = 0;
+      for (int execStep = 0; execStep < NUM_STATES; ++execStep) {
+        TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(index++);
         assertEquals(false, info.isRollback());
         assertEquals(execStep, info.getStep().ordinal());
-        assertEquals(prevTimestamp + 1, info.getTimestamp());
-        prevTimestamp++;
       }
-    }
 
-    // verify yield during rollback()
-    int count = NUM_STATES;
-    for (int execStep = NUM_STATES - 1; execStep >= 0; --execStep) {
-      for (int i = 0; i < procs.length; ++i) {
-        assertEquals(NUM_STATES * 2, procs[i].getExecutionInfo().size());
-        TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(count);
-        LOG.info("i=" + i + " execStep=" + execStep + " timestamp=" + info.getTimestamp());
+      // verify rollback
+      for (int execStep = NUM_STATES - 1; execStep >= 0; --execStep) {
+        TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(index++);
         assertEquals(true, info.isRollback());
         assertEquals(execStep, info.getStep().ordinal());
-        assertEquals(prevTimestamp + 1, info.getTimestamp());
-        prevTimestamp++;
       }
-      count++;
     }
+
+    // check runnable queue stats
+    assertEquals(0, procRunnables.size());
+    assertEquals(0, procRunnables.addFrontCalls);
+    assertEquals(18, procRunnables.addBackCalls);
+    assertEquals(15, procRunnables.yieldCalls);
+    assertEquals(19, procRunnables.pollCalls);
+    assertEquals(3, procRunnables.completionCalls);
   }
 
   @Test
@@ -147,6 +150,29 @@ public class TestYieldProcedures {
       assertEquals(true, info.isRollback());
       assertEquals(i, info.getStep().ordinal());
     }
+
+    // check runnable queue stats
+    assertEquals(0, procRunnables.size());
+    assertEquals(0, procRunnables.addFrontCalls);
+    assertEquals(12, procRunnables.addBackCalls);
+    assertEquals(11, procRunnables.yieldCalls);
+    assertEquals(13, procRunnables.pollCalls);
+    assertEquals(1, procRunnables.completionCalls);
+  }
+
+  @Test
+  public void testYieldException() {
+    TestYieldProcedure proc = new TestYieldProcedure();
+    ProcedureTestingUtility.submitAndWait(procExecutor, proc);
+    assertEquals(6, proc.step);
+
+    // check runnable queue stats
+    assertEquals(0, procRunnables.size());
+    assertEquals(0, procRunnables.addFrontCalls);
+    assertEquals(6, procRunnables.addBackCalls);
+    assertEquals(5, procRunnables.yieldCalls);
+    assertEquals(7, procRunnables.pollCalls);
+    assertEquals(1, procRunnables.completionCalls);
   }
 
   private static class TestProcEnv {
@@ -199,8 +225,9 @@ public class TestYieldProcedures {
     @Override
     protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state)
         throws InterruptedException {
-      LOG.info("execute step " + state);
-      executionInfo.add(new ExecutionInfo(env.nextTimestamp(), state, false));
+      final long ts = env.nextTimestamp();
+      LOG.info(getProcId() + " execute step " + state + " ts=" + ts);
+      executionInfo.add(new ExecutionInfo(ts, state, false));
       Thread.sleep(150);
 
       if (throwInterruptOnceOnEachStep && ((executionInfo.size() - 1) % 2) == 0) {
@@ -229,8 +256,9 @@ public class TestYieldProcedures {
     @Override
     protected void rollbackState(TestProcEnv env, final State state)
         throws InterruptedException {
-      LOG.debug("rollback state " + state);
-      executionInfo.add(new ExecutionInfo(env.nextTimestamp(), state, true));
+      final long ts = env.nextTimestamp();
+      LOG.debug(getProcId() + " rollback state " + state + " ts=" + ts);
+      executionInfo.add(new ExecutionInfo(ts, state, true));
       Thread.sleep(150);
 
       if (throwInterruptOnceOnEachStep && ((executionInfo.size() - 1) % 2) == 0) {
@@ -276,4 +304,80 @@ public class TestYieldProcedures {
       return true;
     }
   }
+
+  public static class TestYieldProcedure extends Procedure<TestProcEnv> {
+    private int step = 0;
+
+    public TestYieldProcedure() {
+    }
+
+    @Override
+    protected Procedure[] execute(final TestProcEnv env) throws ProcedureYieldException {
+      LOG.info("execute step " + step);
+      if (step++ < 5) {
+        throw new ProcedureYieldException();
+      }
+      return null;
+    }
+
+    @Override
+    protected void rollback(TestProcEnv env) {
+    }
+
+    @Override
+    protected boolean abort(TestProcEnv env) {
+      return false;
+    }
+
+    @Override
+    protected boolean isYieldAfterExecutionStep(final TestProcEnv env) {
+      return true;
+    }
+
+    @Override
+    protected void serializeStateData(final OutputStream stream) throws IOException {
+    }
+
+    @Override
+    protected void deserializeStateData(final InputStream stream) throws IOException {
+    }
+  }
+
+  private static class TestRunQueue extends ProcedureSimpleRunQueue {
+    private int completionCalls;
+    private int addFrontCalls;
+    private int addBackCalls;
+    private int yieldCalls;
+    private int pollCalls;
+
+    public TestRunQueue() {}
+
+    public void addFront(final Procedure proc) {
+        addFrontCalls++;
+        super.addFront(proc);
+      }
+
+      @Override
+      public void addBack(final Procedure proc) {
+        addBackCalls++;
+        super.addBack(proc);
+      }
+
+      @Override
+      public void yield(final Procedure proc) {
+        yieldCalls++;
+        super.yield(proc);
+      }
+
+      @Override
+      public Procedure poll() {
+        pollCalls++;
+        return super.poll();
+      }
+
+      @Override
+      public void completionCleanup(Procedure proc) {
+        completionCalls++;
+      }
+  }
 }