You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2015/04/09 22:59:37 UTC

[32/50] [abbrv] hbase git commit: HBASE-13202 Procedure v2 - core framework

HBASE-13202 Procedure v2 - core framework


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9d763cd7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9d763cd7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9d763cd7

Branch: refs/heads/hbase-12439
Commit: 9d763cd70243834359c96905871f807a5736f144
Parents: 1deadb6
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Thu Apr 9 20:44:56 2015 +0100
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Thu Apr 9 20:44:56 2015 +0100

----------------------------------------------------------------------
 .../hadoop/hbase/io/util/StreamUtils.java       |   12 +-
 .../hadoop/hbase/util/ForeignExceptionUtil.java |  109 +
 hbase-procedure/pom.xml                         |  181 +
 .../hbase/procedure2/OnePhaseProcedure.java     |   28 +
 .../hadoop/hbase/procedure2/Procedure.java      |  680 ++
 .../procedure2/ProcedureAbortedException.java   |   42 +
 .../hbase/procedure2/ProcedureException.java    |   45 +
 .../hbase/procedure2/ProcedureExecutor.java     | 1077 +++
 .../procedure2/ProcedureFairRunQueues.java      |  172 +
 .../hbase/procedure2/ProcedureResult.java       |   95 +
 .../hbase/procedure2/ProcedureRunnableSet.java  |   78 +
 .../procedure2/ProcedureSimpleRunQueue.java     |  121 +
 .../procedure2/ProcedureYieldException.java     |   40 +
 .../procedure2/RemoteProcedureException.java    |  116 +
 .../hbase/procedure2/RootProcedureState.java    |  185 +
 .../hbase/procedure2/SequentialProcedure.java   |   81 +
 .../hbase/procedure2/StateMachineProcedure.java |  166 +
 .../hbase/procedure2/TwoPhaseProcedure.java     |   28 +
 .../hbase/procedure2/store/ProcedureStore.java  |  121 +
 .../procedure2/store/ProcedureStoreTracker.java |  540 ++
 .../CorruptedWALProcedureStoreException.java    |   43 +
 .../procedure2/store/wal/ProcedureWALFile.java  |  152 +
 .../store/wal/ProcedureWALFormat.java           |  234 +
 .../store/wal/ProcedureWALFormatReader.java     |  166 +
 .../procedure2/store/wal/WALProcedureStore.java |  721 ++
 .../hadoop/hbase/procedure2/util/ByteSlot.java  |  111 +
 .../hbase/procedure2/util/StringUtils.java      |   80 +
 .../procedure2/util/TimeoutBlockingQueue.java   |  217 +
 .../procedure2/ProcedureTestingUtility.java     |  163 +
 .../procedure2/TestProcedureExecution.java      |  338 +
 .../procedure2/TestProcedureFairRunQueues.java  |  155 +
 .../hbase/procedure2/TestProcedureRecovery.java |  488 ++
 .../procedure2/TestProcedureReplayOrder.java    |  226 +
 .../store/TestProcedureStoreTracker.java        |  168 +
 .../store/wal/TestWALProcedureStore.java        |  267 +
 .../util/TestTimeoutBlockingQueue.java          |  137 +
 hbase-protocol/pom.xml                          |    1 +
 .../protobuf/generated/ProcedureProtos.java     | 7219 ++++++++++++++++++
 .../src/main/protobuf/Procedure.proto           |  114 +
 pom.xml                                         |   21 +-
 40 files changed, 14933 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/9d763cd7/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java
index 314ed2b..0b442a5 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java
@@ -120,7 +120,7 @@ public class StreamUtils {
 
   /**
    * Reads a varInt value stored in an array.
-   * 
+   *
    * @param input
    *          Input array where the varInt is available
    * @param offset
@@ -198,4 +198,14 @@ public class StreamUtils {
     out.write((byte) (0xff & (v >> 8)));
     out.write((byte) (0xff & v));
   }
+
+  public static long readLong(InputStream in) throws IOException {
+    long result = 0;
+    for (int shift = 56; shift >= 0; shift -= 8) {
+      long x = in.read();
+      if (x < 0) throw new IOException("EOF");
+      result |= (x << shift);
+    }
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/9d763cd7/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ForeignExceptionUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ForeignExceptionUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ForeignExceptionUtil.java
new file mode 100644
index 0000000..a0006ed
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ForeignExceptionUtil.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.protobuf.generated.ErrorHandlingProtos.ForeignExceptionMessage;
+import org.apache.hadoop.hbase.protobuf.generated.ErrorHandlingProtos.GenericExceptionMessage;
+import org.apache.hadoop.hbase.protobuf.generated.ErrorHandlingProtos.StackTraceElementMessage;
+
+/**
+ * Helper to convert Exceptions and StackTraces from/to protobuf.
+ * (see ErrorHandling.proto for the internal of the proto messages)
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class ForeignExceptionUtil {
+  private ForeignExceptionUtil() { }
+
+  public static IOException toIOException(final ForeignExceptionMessage eem) {
+    GenericExceptionMessage gem = eem.getGenericException();
+    StackTraceElement[] trace = toStackTrace(gem.getTraceList());
+    RemoteException re = new RemoteException(gem.getClassName(), gem.getMessage());
+    re.setStackTrace(trace);
+    return re.unwrapRemoteException();
+  }
+
+  public static ForeignExceptionMessage toProtoForeignException(String source, Throwable t) {
+    GenericExceptionMessage.Builder gemBuilder = GenericExceptionMessage.newBuilder();
+    gemBuilder.setClassName(t.getClass().getName());
+    if (t.getMessage() != null) {
+      gemBuilder.setMessage(t.getMessage());
+    }
+    // set the stack trace, if there is one
+    List<StackTraceElementMessage> stack = toProtoStackTraceElement(t.getStackTrace());
+    if (stack != null) {
+      gemBuilder.addAllTrace(stack);
+    }
+    GenericExceptionMessage payload = gemBuilder.build();
+    ForeignExceptionMessage.Builder exception = ForeignExceptionMessage.newBuilder();
+    exception.setGenericException(payload).setSource(source);
+    return exception.build();
+  }
+
+  /**
+   * Convert a stack trace to list of {@link StackTraceElement}.
+   * @param trace the stack trace to convert to protobuf message
+   * @return <tt>null</tt> if the passed stack is <tt>null</tt>.
+   */
+  public static List<StackTraceElementMessage> toProtoStackTraceElement(StackTraceElement[] trace) {
+    // if there is no stack trace, ignore it and just return the message
+    if (trace == null) return null;
+    // build the stack trace for the message
+    List<StackTraceElementMessage> pbTrace = new ArrayList<StackTraceElementMessage>(trace.length);
+    for (StackTraceElement elem : trace) {
+      StackTraceElementMessage.Builder stackBuilder = StackTraceElementMessage.newBuilder();
+      stackBuilder.setDeclaringClass(elem.getClassName());
+      if (elem.getFileName() != null) {
+        stackBuilder.setFileName(elem.getFileName());
+      }
+      stackBuilder.setLineNumber(elem.getLineNumber());
+      stackBuilder.setMethodName(elem.getMethodName());
+      pbTrace.add(stackBuilder.build());
+    }
+    return pbTrace;
+  }
+
+  /**
+   * Unwind a serialized array of {@link StackTraceElementMessage}s to a
+   * {@link StackTraceElement}s.
+   * @param traceList list that was serialized
+   * @return the deserialized list or <tt>null</tt> if it couldn't be unwound (e.g. wasn't set on
+   *         the sender).
+   */
+  public static StackTraceElement[] toStackTrace(List<StackTraceElementMessage> traceList) {
+    if (traceList == null || traceList.size() == 0) {
+      return new StackTraceElement[0]; // empty array
+    }
+    StackTraceElement[] trace = new StackTraceElement[traceList.size()];
+    for (int i = 0; i < traceList.size(); i++) {
+      StackTraceElementMessage elem = traceList.get(i);
+      trace[i] = new StackTraceElement(
+          elem.getDeclaringClass(), elem.getMethodName(),
+          elem.hasFileName() ? elem.getFileName() : null,
+          elem.getLineNumber());
+    }
+    return trace;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/9d763cd7/hbase-procedure/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-procedure/pom.xml b/hbase-procedure/pom.xml
new file mode 100644
index 0000000..9683db2
--- /dev/null
+++ b/hbase-procedure/pom.xml
@@ -0,0 +1,181 @@
+<?xml version="1.0"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>hbase</artifactId>
+    <groupId>org.apache.hbase</groupId>
+    <version>2.0.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>hbase-procedure</artifactId>
+  <name>HBase - Procedure</name>
+  <description>Procedure Framework</description>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-site-plugin</artifactId>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+      <!-- Make a jar and put the sources in the jar -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-source-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <!--Make it so assembly:single does nothing in here-->
+        <artifactId>maven-assembly-plugin</artifactId>
+        <version>${maven.assembly.version}</version>
+        <configuration>
+          <skipAssembly>true</skipAssembly>
+        </configuration>
+      </plugin>
+      <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <!-- Always skip the second part executions, since we only run
+        simple unit tests in this module. -->
+        <executions>
+          <execution>
+            <id>secondPartTestsExecution</id>
+            <phase>test</phase>
+            <goals>
+              <goal>test</goal>
+            </goals>
+            <configuration>
+              <skip>true</skip>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-common</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+    </dependency>
+   <dependency>
+     <groupId>org.apache.hbase</groupId>
+     <artifactId>hbase-annotations</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-protocol</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+    </dependency>
+  </dependencies>
+
+  <profiles>
+    <!-- Profiles for building against different hadoop versions -->
+    <profile>
+      <id>hadoop-1.1</id>
+      <activation>
+        <property>
+            <!--Below formatting for dev-support/generate-hadoopX-poms.sh-->
+            <!--h1--><name>hadoop.profile</name><value>1.1</value>
+        </property>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-core</artifactId>
+        </dependency>
+      </dependencies>
+    </profile>
+    <profile>
+      <id>hadoop-1.0</id>
+      <activation>
+        <property>
+          <name>hadoop.profile</name>
+          <value>1.0</value>
+        </property>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-core</artifactId>
+        </dependency>
+      </dependencies>
+    </profile>
+    <!--
+      profile for building against Hadoop 2.0.0-alpha. Activate using:
+       mvn -Dhadoop.profile=2.0
+    -->
+    <profile>
+      <id>hadoop-2.0</id>
+      <activation>
+        <property>
+            <!--Below formatting for dev-support/generate-hadoopX-poms.sh-->
+            <!--h2--><name>!hadoop.profile</name>
+        </property>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+        </dependency>
+      </dependencies>
+    </profile>
+    <!--
+      profile for building against Hadoop 3.0.x. Activate using:
+       mvn -Dhadoop.profile=3.0
+    -->
+    <profile>
+      <id>hadoop-3.0</id>
+      <activation>
+        <property>
+          <name>hadoop.profile</name>
+          <value>3.0</value>
+        </property>
+      </activation>
+      <properties>
+        <hadoop.version>3.0-SNAPSHOT</hadoop.version>
+      </properties>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
+</project>

http://git-wip-us.apache.org/repos/asf/hbase/blob/9d763cd7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/OnePhaseProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/OnePhaseProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/OnePhaseProcedure.java
new file mode 100644
index 0000000..1c3be2d
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/OnePhaseProcedure.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class OnePhaseProcedure<TEnvironment> extends Procedure<TEnvironment> {
+  // TODO (e.g. used by online snapshots)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/9d763cd7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
new file mode 100644
index 0000000..338fcad
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
@@ -0,0 +1,680 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.procedure2.util.StringUtils;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+
+/**
+ * Base Procedure class responsible to handle the Procedure Metadata
+ * e.g. state, startTime, lastUpdate, stack-indexes, ...
+ *
+ * execute() is called each time the procedure is executed.
+ * it may be called multiple times in case of failure and restart, so the
+ * code must be idempotent.
+ * the return is a set of sub-procedures or null in case the procedure doesn't
+ * have sub-procedures. Once the sub-procedures are successfully completed
+ * the execute() method is called again, you should think at it as a stack:
+ *  -> step 1
+ *  ---> step 2
+ *  -> step 1
+ *
+ * rollback() is called when the procedure or one of the sub-procedures is failed.
+ * the rollback step is supposed to cleanup the resources created during the
+ * execute() step. in case of failure and restart rollback() may be called
+ * multiple times, so the code must be idempotent.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
+  // unchanged after initialization
+  private String owner = null;
+  private Long parentProcId = null;
+  private Long procId = null;
+  private long startTime;
+
+  // runtime state, updated every operation
+  private ProcedureState state = ProcedureState.INITIALIZING;
+  private Integer timeout = null;
+  private int[] stackIndexes = null;
+  private int childrenLatch = 0;
+  private long lastUpdate;
+
+  private RemoteProcedureException exception = null;
+  private byte[] result = null;
+
+  /**
+   * The main code of the procedure. It must be idempotent since execute()
+   * may be called multiple time in case of machine failure in the middle
+   * of the execution.
+   * @return a set of sub-procedures or null if there is nothing else to execute.
+   */
+  protected abstract Procedure[] execute(TEnvironment env)
+    throws ProcedureYieldException;
+
+  /**
+   * The code to undo what done by the execute() code.
+   * It is called when the procedure or one of the sub-procedure failed or an
+   * abort was requested. It should cleanup all the resources created by
+   * the execute() call. The implementation must be idempotent since rollback()
+   * may be called multiple time in case of machine failure in the middle
+   * of the execution.
+   * @throws IOException temporary failure, the rollback will retry later
+   */
+  protected abstract void rollback(TEnvironment env)
+    throws IOException;
+
+  /**
+   * The abort() call is asynchronous and each procedure must decide how to deal
+   * with that, if they want to be abortable. The simplest implementation
+   * is to have an AtomicBoolean set in the abort() method and then the execute()
+   * will check if the abort flag is set or not.
+   * abort() may be called multiple times from the client, so the implementation
+   * must be idempotent.
+   *
+   * NOTE: abort() is not like Thread.interrupt() it is just a notification
+   * that allows the procedure implementor where to abort to avoid leak and
+   * have a better control on what was executed and what not.
+   */
+  protected abstract boolean abort(TEnvironment env);
+
+  /**
+   * The user-level code of the procedure may have some state to
+   * persist (e.g. input arguments) to be able to resume on failure.
+   * @param stream the stream that will contain the user serialized data
+   */
+  protected abstract void serializeStateData(final OutputStream stream)
+    throws IOException;
+
+  /**
+   * Called on store load to allow the user to decode the previously serialized
+   * state.
+   * @param stream the stream that contains the user serialized data
+   */
+  protected abstract void deserializeStateData(final InputStream stream)
+    throws IOException;
+
+  /**
+   * The user should override this method, and try to take a lock if necessary.
+   * A lock can be anything, and it is up to the implementor.
+   * Example: in our Master we can execute request in parallel for different tables
+   *          create t1 and create t2 can be executed at the same time.
+   *          anything else on t1/t2 is queued waiting that specific table create to happen.
+   *
+   * @return true if the lock was acquired and false otherwise
+   */
+  protected boolean acquireLock(final TEnvironment env) {
+    return true;
+  }
+
+  /**
+   * The user should override this method, and release lock if necessary.
+   */
+  protected void releaseLock(final TEnvironment env) {
+    // no-op
+  }
+
+  /**
+   * Called when the procedure is loaded for replay.
+   * The procedure implementor may use this method to perform some quick
+   * operation before replay.
+   * e.g. failing the procedure if the state on replay may be unknown.
+   */
+  protected void beforeReplay(final TEnvironment env) {
+    // no-op
+  }
+
+  /**
+   * Called when the procedure is marked as completed (success or rollback).
+   * The procedure implementor may use this method to cleanup in-memory states.
+   * This operation will not be retried on failure.
+   */
+  protected void completionCleanup(final TEnvironment env) {
+    // no-op
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    toStringClassDetails(sb);
+
+    if (procId != null) {
+      sb.append(" id=");
+      sb.append(getProcId());
+    }
+
+    if (hasParent()) {
+      sb.append(" parent=");
+      sb.append(getParentProcId());
+    }
+
+    if (hasOwner()) {
+      sb.append(" owner=");
+      sb.append(getOwner());
+    }
+
+    sb.append(" state=");
+    sb.append(getState());
+    return sb.toString();
+  }
+
+  /**
+   * Extend the toString() information with the procedure details
+   * e.g. className and parameters
+   * @param builder the string builder to use to append the proc specific information
+   */
+  protected void toStringClassDetails(StringBuilder builder) {
+    builder.append(getClass().getName());
+  }
+
+  /**
+   * @return the serialized result if any, otherwise null
+   */
+  public byte[] getResult() {
+    return result;
+  }
+
+  /**
+   * The procedure may leave a "result" on completion.
+   * @param result the serialized result that will be passed to the client
+   */
+  protected void setResult(final byte[] result) {
+    this.result = result;
+  }
+
+  public long getProcId() {
+    return procId;
+  }
+
+  public boolean hasParent() {
+    return parentProcId != null;
+  }
+
+  public boolean hasException() {
+    return exception != null;
+  }
+
+  public boolean hasTimeout() {
+    return timeout != null;
+  }
+
+  public long getParentProcId() {
+    return parentProcId;
+  }
+
+  /**
+   * @return true if the procedure has failed.
+   *         true may mean failed but not yet rolledback or failed and rolledback.
+   */
+  public synchronized boolean isFailed() {
+    return exception != null || state == ProcedureState.ROLLEDBACK;
+  }
+
+  /**
+   * @return true if the procedure is finished successfully.
+   */
+  public synchronized boolean isSuccess() {
+    return state == ProcedureState.FINISHED && exception == null;
+  }
+
+  /**
+   * @return true if the procedure is finished. The Procedure may be completed
+   *         successfuly or failed and rolledback.
+   */
+  public synchronized boolean isFinished() {
+    switch (state) {
+      case ROLLEDBACK:
+        return true;
+      case FINISHED:
+        return exception == null;
+      default:
+        break;
+    }
+    return false;
+  }
+
+  /**
+   * @return true if the procedure is waiting for a child to finish or for an external event.
+   */
+  public synchronized boolean isWaiting() {
+    switch (state) {
+      case WAITING:
+      case WAITING_TIMEOUT:
+        return true;
+      default:
+        break;
+    }
+    return false;
+  }
+
+  public synchronized RemoteProcedureException getException() {
+    return exception;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public synchronized long getLastUpdate() {
+    return lastUpdate;
+  }
+
+  public synchronized long elapsedTime() {
+    return lastUpdate - startTime;
+  }
+
+  /**
+   * @param timeout timeout in msec
+   */
+  protected void setTimeout(final int timeout) {
+    this.timeout = timeout;
+  }
+
+  /**
+   * @return the timeout in msec
+   */
+  public int getTimeout() {
+    return timeout;
+  }
+
+  /**
+   * @return the remaining time before the timeout
+   */
+  public long getTimeRemaining() {
+    return Math.max(0, timeout - (EnvironmentEdgeManager.currentTime() - startTime));
+  }
+
+  protected void setOwner(final String owner) {
+    this.owner = StringUtils.isEmpty(owner) ? null : owner;
+  }
+
+  public String getOwner() {
+    return owner;
+  }
+
+  public boolean hasOwner() {
+    return owner != null;
+  }
+
+  @VisibleForTesting
+  @InterfaceAudience.Private
+  protected synchronized void setState(final ProcedureState state) {
+    this.state = state;
+    updateTimestamp();
+  }
+
+  @InterfaceAudience.Private
+  protected synchronized ProcedureState getState() {
+    return state;
+  }
+
+  protected void setFailure(final String source, final Throwable cause) {
+    setFailure(new RemoteProcedureException(source, cause));
+  }
+
+  protected synchronized void setFailure(final RemoteProcedureException exception) {
+    this.exception = exception;
+    if (!isFinished()) {
+      setState(ProcedureState.FINISHED);
+    }
+  }
+
+  protected void setAbortFailure(final String source, final String msg) {
+    setFailure(source, new ProcedureAbortedException(msg));
+  }
+
+  @InterfaceAudience.Private
+  protected synchronized boolean setTimeoutFailure() {
+    if (state == ProcedureState.WAITING_TIMEOUT) {
+      long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate;
+      setFailure("ProcedureExecutor", new TimeoutException(
+        "Operation timed out after " + StringUtils.humanTimeDiff(timeDiff)));
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Called by the ProcedureExecutor to assign the ID to the newly created procedure.
+   */
+  @VisibleForTesting
+  @InterfaceAudience.Private
+  protected void setProcId(final long procId) {
+    this.procId = procId;
+    this.startTime = EnvironmentEdgeManager.currentTime();
+    setState(ProcedureState.RUNNABLE);
+  }
+
+  /**
+   * Called by the ProcedureExecutor to assign the parent to the newly created procedure.
+   */
+  @InterfaceAudience.Private
+  protected void setParentProcId(final long parentProcId) {
+    this.parentProcId = parentProcId;
+  }
+
+  /**
+   * Internal method called by the ProcedureExecutor that starts the
+   * user-level code execute().
+   */
+  @InterfaceAudience.Private
+  protected Procedure[] doExecute(final TEnvironment env)
+      throws ProcedureYieldException {
+    try {
+      updateTimestamp();
+      return execute(env);
+    } finally {
+      updateTimestamp();
+    }
+  }
+
+  /**
+   * Internal method called by the ProcedureExecutor that starts the
+   * user-level code rollback().
+   */
+  @InterfaceAudience.Private
+  protected void doRollback(final TEnvironment env) throws IOException {
+    try {
+      updateTimestamp();
+      rollback(env);
+    } finally {
+      updateTimestamp();
+    }
+  }
+
+  /**
+   * Called on store load to initialize the Procedure internals after
+   * the creation/deserialization.
+   */
+  @InterfaceAudience.Private
+  protected void setStartTime(final long startTime) {
+    this.startTime = startTime;
+  }
+
+  /**
+   * Called on store load to initialize the Procedure internals after
+   * the creation/deserialization.
+   */
+  private synchronized void setLastUpdate(final long lastUpdate) {
+    this.lastUpdate = lastUpdate;
+  }
+
+  protected synchronized void updateTimestamp() {
+    this.lastUpdate = EnvironmentEdgeManager.currentTime();
+  }
+
+  /**
+   * Called by the ProcedureExecutor on procedure-load to restore the latch state
+   */
+  @InterfaceAudience.Private
+  protected synchronized void setChildrenLatch(final int numChildren) {
+    this.childrenLatch = numChildren;
+  }
+
+  /**
+   * Called by the ProcedureExecutor on procedure-load to restore the latch state
+   */
+  @InterfaceAudience.Private
+  protected synchronized void incChildrenLatch() {
+    // TODO: can this be inferred from the stack? I think so...
+    this.childrenLatch++;
+  }
+
+  /**
+   * Called by the ProcedureExecutor to notify that one of the sub-procedures
+   * has completed.
+   */
+  @InterfaceAudience.Private
+  protected synchronized boolean childrenCountDown() {
+    assert childrenLatch > 0;
+    return --childrenLatch == 0;
+  }
+
+  /**
+   * Called by the RootProcedureState on procedure execution.
+   * Each procedure store its stack-index positions.
+   */
+  @InterfaceAudience.Private
+  protected synchronized void addStackIndex(final int index) {
+    if (stackIndexes == null) {
+      stackIndexes = new int[] { index };
+    } else {
+      int count = stackIndexes.length;
+      stackIndexes = Arrays.copyOf(stackIndexes, count + 1);
+      stackIndexes[count] = index;
+    }
+  }
+
+  @InterfaceAudience.Private
+  protected synchronized boolean removeStackIndex() {
+    if (stackIndexes.length > 1) {
+      stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1);
+      return false;
+    } else {
+      stackIndexes = null;
+      return true;
+    }
+  }
+
+  /**
+   * Called on store load to initialize the Procedure internals after
+   * the creation/deserialization.
+   */
+  @InterfaceAudience.Private
+  protected synchronized void setStackIndexes(final List<Integer> stackIndexes) {
+    this.stackIndexes = new int[stackIndexes.size()];
+    for (int i = 0; i < this.stackIndexes.length; ++i) {
+      this.stackIndexes[i] = stackIndexes.get(i);
+    }
+  }
+
+  @InterfaceAudience.Private
+  protected synchronized boolean wasExecuted() {
+    return stackIndexes != null;
+  }
+
+  @InterfaceAudience.Private
+  protected synchronized int[] getStackIndexes() {
+    return stackIndexes;
+  }
+
+  @Override
+  public int compareTo(final Procedure other) {
+    long diff = getProcId() - other.getProcId();
+    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
+  }
+
+  /*
+   * Helper to lookup the root Procedure ID given a specified procedure.
+   */
+  @InterfaceAudience.Private
+  protected static Long getRootProcedureId(final Map<Long, Procedure> procedures, Procedure proc) {
+    while (proc.hasParent()) {
+      proc = procedures.get(proc.getParentProcId());
+      if (proc == null) return null;
+    }
+    return proc.getProcId();
+  }
+
+  protected static Procedure newInstance(final String className) throws IOException {
+    try {
+      Class<?> clazz = Class.forName(className);
+      if (!Modifier.isPublic(clazz.getModifiers())) {
+        throw new Exception("the " + clazz + " class is not public");
+      }
+
+      Constructor<?> ctor = clazz.getConstructor();
+      assert ctor != null : "no constructor found";
+      if (!Modifier.isPublic(ctor.getModifiers())) {
+        throw new Exception("the " + clazz + " constructor is not public");
+      }
+      return (Procedure)ctor.newInstance();
+    } catch (Exception e) {
+      throw new IOException("The procedure class " + className +
+          " must be accessible and have an empty constructor", e);
+    }
+  }
+
+  protected static void validateClass(final Procedure proc) throws IOException {
+    try {
+      Class<?> clazz = proc.getClass();
+      if (!Modifier.isPublic(clazz.getModifiers())) {
+        throw new Exception("the " + clazz + " class is not public");
+      }
+
+      Constructor<?> ctor = clazz.getConstructor();
+      assert ctor != null;
+      if (!Modifier.isPublic(ctor.getModifiers())) {
+        throw new Exception("the " + clazz + " constructor is not public");
+      }
+    } catch (Exception e) {
+      throw new IOException("The procedure class " + proc.getClass().getName() +
+          " must be accessible and have an empty constructor", e);
+    }
+  }
+
+  /**
+   * Helper to convert the procedure to protobuf.
+   * Used by ProcedureStore implementations.
+   */
+  @InterfaceAudience.Private
+  public static ProcedureProtos.Procedure convert(final Procedure proc)
+      throws IOException {
+    Preconditions.checkArgument(proc != null);
+    validateClass(proc);
+
+    ProcedureProtos.Procedure.Builder builder = ProcedureProtos.Procedure.newBuilder()
+      .setClassName(proc.getClass().getName())
+      .setProcId(proc.getProcId())
+      .setState(proc.getState())
+      .setStartTime(proc.getStartTime())
+      .setLastUpdate(proc.getLastUpdate());
+
+    if (proc.hasParent()) {
+      builder.setParentId(proc.getParentProcId());
+    }
+
+    if (proc.hasTimeout()) {
+      builder.setTimeout(proc.getTimeout());
+    }
+
+    if (proc.hasOwner()) {
+      builder.setOwner(proc.getOwner());
+    }
+
+    int[] stackIds = proc.getStackIndexes();
+    if (stackIds != null) {
+      for (int i = 0; i < stackIds.length; ++i) {
+        builder.addStackId(stackIds[i]);
+      }
+    }
+
+    if (proc.hasException()) {
+      RemoteProcedureException exception = proc.getException();
+      builder.setException(
+        RemoteProcedureException.toProto(exception.getSource(), exception.getCause()));
+    }
+
+    byte[] result = proc.getResult();
+    if (result != null) {
+      builder.setResult(ByteStringer.wrap(result));
+    }
+
+    ByteString.Output stateStream = ByteString.newOutput();
+    proc.serializeStateData(stateStream);
+    if (stateStream.size() > 0) {
+      builder.setStateData(stateStream.toByteString());
+    }
+
+    return builder.build();
+  }
+
+  /**
+   * Helper to convert the protobuf procedure.
+   * Used by ProcedureStore implementations.
+   *
+   * TODO: OPTIMIZATION: some of the field never change during the execution
+   *                     (e.g. className, procId, parentId, ...).
+   *                     We can split in 'data' and 'state', and the store
+   *                     may take advantage of it by storing the data only on insert().
+   */
+  @InterfaceAudience.Private
+  public static Procedure convert(final ProcedureProtos.Procedure proto)
+      throws IOException {
+    // Procedure from class name
+    Procedure proc = Procedure.newInstance(proto.getClassName());
+
+    // set fields
+    proc.setProcId(proto.getProcId());
+    proc.setState(proto.getState());
+    proc.setStartTime(proto.getStartTime());
+    proc.setLastUpdate(proto.getLastUpdate());
+
+    if (proto.hasParentId()) {
+      proc.setParentProcId(proto.getParentId());
+    }
+
+    if (proto.hasOwner()) {
+      proc.setOwner(proto.getOwner());
+    }
+
+    if (proto.hasTimeout()) {
+      proc.setTimeout(proto.getTimeout());
+    }
+
+    if (proto.getStackIdCount() > 0) {
+      proc.setStackIndexes(proto.getStackIdList());
+    }
+
+    if (proto.hasException()) {
+      assert proc.getState() == ProcedureState.FINISHED ||
+             proc.getState() == ProcedureState.ROLLEDBACK :
+             "The procedure must be failed (waiting to rollback) or rolledback";
+      proc.setFailure(RemoteProcedureException.fromProto(proto.getException()));
+    }
+
+    if (proto.hasResult()) {
+      proc.setResult(proto.getResult().toByteArray());
+    }
+
+    // we want to call deserialize even when the stream is empty, mainly for testing.
+    proc.deserializeStateData(proto.getStateData().newInput());
+
+    return proc;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/9d763cd7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureAbortedException.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureAbortedException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureAbortedException.java
new file mode 100644
index 0000000..2e409cf
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureAbortedException.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Thrown when a procedure is aborted
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class ProcedureAbortedException extends ProcedureException {
+  /** default constructor */
+  public ProcedureAbortedException() {
+    super();
+  }
+
+  /**
+   * Constructor
+   * @param s message
+   */
+  public ProcedureAbortedException(String s) {
+    super(s);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/9d763cd7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureException.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureException.java
new file mode 100644
index 0000000..9f922b1
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureException.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class ProcedureException extends IOException {
+  /** default constructor */
+  public ProcedureException() {
+    super();
+  }
+
+  /**
+   * Constructor
+   * @param s message
+   */
+  public ProcedureException(String s) {
+    super(s);
+  }
+
+  public ProcedureException(Throwable t) {
+    super(t);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/9d763cd7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
new file mode 100644
index 0000000..8588315
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -0,0 +1,1077 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.HashSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.procedure2.util.StringUtils;
+import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue;
+import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Thread Pool that executes the submitted procedures.
+ * The executor has a ProcedureStore associated.
+ * Each operation is logged and on restart the pending procedures are resumed.
+ *
+ * Unless the Procedure code throws an error (e.g. invalid user input)
+ * the procedure will complete (at some point in time), On restart the pending
+ * procedures are resumed and the once failed will be rolledback.
+ *
+ * The user can add procedures to the executor via submitProcedure(proc)
+ * check for the finished state via isFinished(procId)
+ * and get the result via getResult(procId)
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ProcedureExecutor<TEnvironment> {
+  private static final Log LOG = LogFactory.getLog(ProcedureExecutor.class);
+
+  Testing testing = null;
+  public static class Testing {
+    protected boolean killBeforeStoreUpdate = false;
+    protected boolean toggleKillBeforeStoreUpdate = false;
+
+    protected boolean shouldKillBeforeStoreUpdate() {
+      final boolean kill = this.killBeforeStoreUpdate;
+      if (this.toggleKillBeforeStoreUpdate) {
+        this.killBeforeStoreUpdate = !kill;
+        LOG.warn("Toggle Kill before store update to: " + this.killBeforeStoreUpdate);
+      }
+      return kill;
+    }
+  }
+
+  public interface ProcedureExecutorListener {
+    void procedureLoaded(long procId);
+    void procedureAdded(long procId);
+    void procedureFinished(long procId);
+  }
+
+  /**
+   * Used by the TimeoutBlockingQueue to get the timeout interval of the procedure
+   */
+  private static class ProcedureTimeoutRetriever implements TimeoutRetriever<Procedure> {
+    @Override
+    public long getTimeout(Procedure proc) {
+      return proc.getTimeRemaining();
+    }
+
+    @Override
+    public TimeUnit getTimeUnit(Procedure proc) {
+      return TimeUnit.MILLISECONDS;
+    }
+  }
+
+  /**
+   * Internal cleaner that removes the completed procedure results after a TTL.
+   * NOTE: This is a special case handled in timeoutLoop().
+   *
+   * Since the client code looks more or less like:
+   *   procId = master.doOperation()
+   *   while (master.getProcResult(procId) == ProcInProgress);
+   * The master should not throw away the proc result as soon as the procedure is done
+   * but should wait a result request from the client (see executor.removeResult(procId))
+   * The client will call something like master.isProcDone() or master.getProcResult()
+   * which will return the result/state to the client, and it will mark the completed
+   * proc as ready to delete. note that the client may not receive the response from
+   * the master (e.g. master failover) so, if we delay a bit the real deletion of
+   * the proc result the client will be able to get the result the next try.
+   */
+  private static class CompletedProcedureCleaner<TEnvironment> extends Procedure<TEnvironment> {
+    private static final Log LOG = LogFactory.getLog(CompletedProcedureCleaner.class);
+
+    private static final String CLEANER_INTERVAL_CONF_KEY = "hbase.procedure.cleaner.interval";
+    private static final int DEFAULT_CLEANER_INTERVAL = 30 * 1000; // 30sec
+
+    private static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl";
+    private static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min
+
+    private static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl";
+    private static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min
+
+    private final Map<Long, ProcedureResult> completed;
+    private final ProcedureStore store;
+    private final Configuration conf;
+
+    public CompletedProcedureCleaner(final Configuration conf, final ProcedureStore store,
+        final Map<Long, ProcedureResult> completedMap) {
+      // set the timeout interval that triggers the periodic-procedure
+      setTimeout(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL));
+      this.completed = completedMap;
+      this.store = store;
+      this.conf = conf;
+    }
+
+    public void periodicExecute(final TEnvironment env) {
+      if (completed.isEmpty()) {
+        LOG.debug("no completed procedures to cleanup");
+        return;
+      }
+
+      final long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL);
+      final long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL);
+
+      long now = EnvironmentEdgeManager.currentTime();
+      Iterator<Map.Entry<Long, ProcedureResult>> it = completed.entrySet().iterator();
+      while (it.hasNext() && store.isRunning()) {
+        Map.Entry<Long, ProcedureResult> entry = it.next();
+        ProcedureResult result = entry.getValue();
+
+        // TODO: Select TTL based on Procedure type
+        if ((result.hasClientAckTime() && (now - result.getClientAckTime()) >= evictAckTtl) ||
+            (now - result.getLastUpdate()) >= evictTtl) {
+          LOG.debug("Evict completed procedure " + entry.getKey());
+          store.delete(entry.getKey());
+          it.remove();
+        }
+      }
+    }
+
+    @Override
+    protected Procedure[] execute(final TEnvironment env) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected void rollback(final TEnvironment env) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected boolean abort(final TEnvironment env) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void serializeStateData(final OutputStream stream) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void deserializeStateData(final InputStream stream) {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  /**
+   * Map the the procId returned by submitProcedure(), the Root-ProcID, to the ProcedureResult.
+   * Once a Root-Procedure completes (success or failure), the result will be added to this map.
+   * The user of ProcedureExecutor should call getResult(procId) to get the result.
+   */
+  private final ConcurrentHashMap<Long, ProcedureResult> completed =
+    new ConcurrentHashMap<Long, ProcedureResult>();
+
+  /**
+   * Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState.
+   * The RootProcedureState contains the execution stack of the Root-Procedure,
+   * It is added to the map by submitProcedure() and removed on procedure completion.
+   */
+  private final ConcurrentHashMap<Long, RootProcedureState> rollbackStack =
+    new ConcurrentHashMap<Long, RootProcedureState>();
+
+  /**
+   * Helper map to lookup the live procedures by ID.
+   * This map contains every procedure. root-procedures and subprocedures.
+   */
+  private final ConcurrentHashMap<Long, Procedure> procedures =
+    new ConcurrentHashMap<Long, Procedure>();
+
+  /**
+   * Timeout Queue that contains Procedures in a WAITING_TIMEOUT state
+   * or periodic procedures.
+   */
+  private final TimeoutBlockingQueue<Procedure> waitingTimeout =
+    new TimeoutBlockingQueue<Procedure>(new ProcedureTimeoutRetriever());
+
+  /**
+   * Queue that contains runnable procedures.
+   */
+  private final ProcedureRunnableSet runnables;
+
+  // TODO
+  private final ReentrantLock submitLock = new ReentrantLock();
+  private final AtomicLong lastProcId = new AtomicLong(-1);
+
+  private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners =
+    new CopyOnWriteArrayList<ProcedureExecutorListener>();
+
+  private final AtomicInteger activeExecutorCount = new AtomicInteger(0);
+  private final AtomicBoolean running = new AtomicBoolean(false);
+  private final TEnvironment environment;
+  private final ProcedureStore store;
+  private final Configuration conf;
+
+  private Thread[] threads;
+
+  public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
+      final ProcedureStore store) {
+    this(conf, environment, store, new ProcedureSimpleRunQueue());
+  }
+
+  public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
+      final ProcedureStore store, final ProcedureRunnableSet runqueue) {
+    this.environment = environment;
+    this.runnables = runqueue;
+    this.store = store;
+    this.conf = conf;
+  }
+
+  private List<Map.Entry<Long, RootProcedureState>> load() throws IOException {
+    Preconditions.checkArgument(completed.isEmpty());
+    Preconditions.checkArgument(rollbackStack.isEmpty());
+    Preconditions.checkArgument(procedures.isEmpty());
+    Preconditions.checkArgument(waitingTimeout.isEmpty());
+    Preconditions.checkArgument(runnables.size() == 0);
+
+    // 1. Load the procedures
+    Iterator<Procedure> loader = store.load();
+    if (loader == null) {
+      lastProcId.set(0);
+      return null;
+    }
+
+    long logMaxProcId = 0;
+    int runnablesCount = 0;
+    while (loader.hasNext()) {
+      Procedure proc = loader.next();
+      proc.beforeReplay(getEnvironment());
+      procedures.put(proc.getProcId(), proc);
+      logMaxProcId = Math.max(logMaxProcId, proc.getProcId());
+      LOG.debug("Loading procedure state=" + proc.getState() +
+                " isFailed=" + proc.hasException() + ": " + proc);
+      if (!proc.hasParent() && !proc.isFinished()) {
+        rollbackStack.put(proc.getProcId(), new RootProcedureState());
+      }
+      if (proc.getState() == ProcedureState.RUNNABLE) {
+        runnablesCount++;
+      }
+    }
+    assert lastProcId.get() < 0;
+    lastProcId.set(logMaxProcId);
+
+    // 2. Initialize the stacks
+    TreeSet<Procedure> runnableSet = null;
+    HashSet<Procedure> waitingSet = null;
+    for (final Procedure proc: procedures.values()) {
+      Long rootProcId = getRootProcedureId(proc);
+      if (rootProcId == null) {
+        // The 'proc' was ready to run but the root procedure was rolledback?
+        runnables.addBack(proc);
+        continue;
+      }
+
+      if (!proc.hasParent() && proc.isFinished()) {
+        LOG.debug("The procedure is completed state=" + proc.getState() +
+                  " isFailed=" + proc.hasException() + ": " + proc);
+        assert !rollbackStack.containsKey(proc.getProcId());
+        completed.put(proc.getProcId(), newResultFromProcedure(proc));
+        continue;
+      }
+
+      if (proc.hasParent() && !proc.isFinished()) {
+        Procedure parent = procedures.get(proc.getParentProcId());
+        // corrupted procedures are handled later at step 3
+        if (parent != null) {
+          parent.incChildrenLatch();
+        }
+      }
+
+      RootProcedureState procStack = rollbackStack.get(rootProcId);
+      procStack.loadStack(proc);
+
+      switch (proc.getState()) {
+        case RUNNABLE:
+          if (runnableSet == null) {
+            runnableSet = new TreeSet<Procedure>();
+          }
+          runnableSet.add(proc);
+          break;
+        case WAITING_TIMEOUT:
+          if (waitingSet == null) {
+            waitingSet = new HashSet<Procedure>();
+          }
+          waitingSet.add(proc);
+          break;
+        case FINISHED:
+          if (proc.hasException()) {
+            // add the proc to the runnables to perform the rollback
+            runnables.addBack(proc);
+            break;
+          }
+        case ROLLEDBACK:
+        case INITIALIZING:
+          String msg = "Unexpected " + proc.getState() + " state for " + proc;
+          LOG.error(msg);
+          throw new UnsupportedOperationException(msg);
+        default:
+          break;
+      }
+    }
+
+    // 3. Validate the stacks
+    List<Map.Entry<Long, RootProcedureState>> corrupted = null;
+    Iterator<Map.Entry<Long, RootProcedureState>> itStack = rollbackStack.entrySet().iterator();
+    while (itStack.hasNext()) {
+      Map.Entry<Long, RootProcedureState> entry = itStack.next();
+      RootProcedureState procStack = entry.getValue();
+      if (procStack.isValid()) continue;
+
+      for (Procedure proc: procStack.getSubprocedures()) {
+        procedures.remove(proc.getProcId());
+        if (runnableSet != null) runnableSet.remove(proc);
+        if (waitingSet != null) waitingSet.remove(proc);
+      }
+      itStack.remove();
+      if (corrupted == null) {
+        corrupted = new ArrayList<Map.Entry<Long, RootProcedureState>>();
+      }
+      corrupted.add(entry);
+    }
+
+    // 4. Push the runnables
+    if (runnableSet != null) {
+      // TODO: See ProcedureWALFormatReader.readInitEntry() some procedure
+      // may be started way before this stuff.
+      for (Procedure proc: runnableSet) {
+        if (!proc.hasParent()) {
+          sendProcedureLoadedNotification(proc.getProcId());
+        }
+        runnables.addBack(proc);
+      }
+    }
+    return corrupted;
+  }
+
+  public void start(int numThreads) throws IOException {
+    if (running.getAndSet(true)) {
+      LOG.warn("Already running");
+      return;
+    }
+
+    // We have numThreads executor + one timer thread used for timing out
+    // procedures and triggering periodic procedures.
+    threads = new Thread[numThreads + 1];
+    LOG.info("Starting procedure executor threads=" + threads.length);
+
+    // Initialize procedures executor
+    for (int i = 0; i < numThreads; ++i) {
+      threads[i] = new Thread() {
+        @Override
+        public void run() {
+          execLoop();
+        }
+      };
+    }
+
+    // Initialize procedures timeout handler (this is the +1 thread)
+    threads[numThreads] = new Thread() {
+      @Override
+      public void run() {
+        timeoutLoop();
+      }
+    };
+
+    // Acquire the store lease.
+    store.recoverLease();
+
+    // TODO: Split in two steps.
+    // TODO: Handle corrupted procedure returned (probably just a WARN)
+    // The first one will make sure that we have the latest id,
+    // so we can start the threads and accept new procedures.
+    // The second step will do the actual load of old procedures.
+    load();
+
+    // Start the executors. Here we must have the lastProcId set.
+    for (int i = 0; i < threads.length; ++i) {
+      threads[i].start();
+    }
+
+    // Add completed cleaner
+    waitingTimeout.add(new CompletedProcedureCleaner(conf, store, completed));
+  }
+
+  public void stop() {
+    if (!running.getAndSet(false)) {
+      return;
+    }
+
+    LOG.info("Stopping the procedure executor");
+    runnables.signalAll();
+    waitingTimeout.signalAll();
+  }
+
+  public void join() {
+    boolean interrupted = false;
+
+    for (int i = 0; i < threads.length; ++i) {
+      try {
+        threads[i].join();
+      } catch (InterruptedException ex) {
+        interrupted = true;
+      }
+    }
+
+    if (interrupted) {
+      Thread.currentThread().interrupt();
+    }
+
+    completed.clear();
+    rollbackStack.clear();
+    procedures.clear();
+    waitingTimeout.clear();
+    runnables.clear();
+    lastProcId.set(-1);
+  }
+
+  public boolean isRunning() {
+    return running.get();
+  }
+
+  /**
+   * @return the number of execution threads.
+   */
+  public int getNumThreads() {
+    return threads == null ? 0 : (threads.length - 1);
+  }
+
+  public int getActiveExecutorCount() {
+    return activeExecutorCount.get();
+  }
+
+  public TEnvironment getEnvironment() {
+    return this.environment;
+  }
+
+  public ProcedureStore getStore() {
+    return this.store;
+  }
+
+  public void registerListener(ProcedureExecutorListener listener) {
+    this.listeners.add(listener);
+  }
+
+  public boolean unregisterListener(ProcedureExecutorListener listener) {
+    return this.listeners.remove(listener);
+  }
+
+  /**
+   * Add a new root-procedure to the executor.
+   * @param proc the new procedure to execute.
+   * @return the procedure id, that can be used to monitor the operation
+   */
+  public long submitProcedure(final Procedure proc) {
+    Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING);
+    Preconditions.checkArgument(isRunning());
+    Preconditions.checkArgument(lastProcId.get() >= 0);
+    Preconditions.checkArgument(!proc.hasParent());
+
+    // Initialize the Procedure ID
+    proc.setProcId(nextProcId());
+
+    // Commit the transaction
+    store.insert(proc, null);
+    LOG.debug("procedure " + proc + " added to the store");
+
+    // Create the rollback stack for the procedure
+    RootProcedureState stack = new RootProcedureState();
+    rollbackStack.put(proc.getProcId(), stack);
+
+    // Submit the new subprocedures
+    assert !procedures.containsKey(proc.getProcId());
+    procedures.put(proc.getProcId(), proc);
+    sendProcedureAddedNotification(proc.getProcId());
+    runnables.addBack(proc);
+    return proc.getProcId();
+  }
+
+  public ProcedureResult getResult(final long procId) {
+    return completed.get(procId);
+  }
+
+  /**
+   * Return true if the procedure is finished.
+   * The state may be "completed successfully" or "failed and rolledback".
+   * Use getResult() to check the state or get the result data.
+   * @param procId the ID of the procedure to check
+   * @return true if the procedure execution is finished, otherwise false.
+   */
+  public boolean isFinished(final long procId) {
+    return completed.containsKey(procId);
+  }
+
+  /**
+   * Return true if the procedure is started.
+   * @param procId the ID of the procedure to check
+   * @return true if the procedure execution is started, otherwise false.
+   */
+  public boolean isStarted(final long procId) {
+    Procedure proc = procedures.get(procId);
+    if (proc == null) {
+      return completed.get(procId) != null;
+    }
+    return proc.wasExecuted();
+  }
+
+  /**
+   * Mark the specified completed procedure, as ready to remove.
+   * @param procId the ID of the procedure to remove
+   */
+  public void removeResult(final long procId) {
+    ProcedureResult result = completed.get(procId);
+    if (result == null) {
+      assert !procedures.containsKey(procId) : "procId=" + procId + " is still running";
+      LOG.debug("Procedure procId=" + procId + " already removed by the cleaner");
+      return;
+    }
+
+    // The CompletedProcedureCleaner will take care of deletion, once the TTL is expired.
+    result.setClientAckTime(EnvironmentEdgeManager.currentTime());
+  }
+
+  /**
+   * Send an abort notification the specified procedure.
+   * Depending on the procedure implementation the abort can be considered or ignored.
+   * @param procId the procedure to abort
+   * @return true if the procedure exist and has received the abort, otherwise false.
+   */
+  public boolean abort(final long procId) {
+    Procedure proc = procedures.get(procId);
+    if (proc != null) {
+      return proc.abort(getEnvironment());
+    }
+    return false;
+  }
+
+  public Map<Long, ProcedureResult> getResults() {
+    return Collections.unmodifiableMap(completed);
+  }
+
+  public Procedure getProcedure(final long procId) {
+    return procedures.get(procId);
+  }
+
+  protected ProcedureRunnableSet getRunnableSet() {
+    return runnables;
+  }
+
+  /**
+   * Execution loop (N threads)
+   * while the executor is in a running state,
+   * fetch a procedure from the runnables queue and start the execution.
+   */
+  private void execLoop() {
+    while (isRunning()) {
+      Long procId = runnables.poll();
+      Procedure proc = procId != null ? procedures.get(procId) : null;
+      if (proc == null) continue;
+
+      try {
+        activeExecutorCount.incrementAndGet();
+        execLoop(proc);
+      } finally {
+        activeExecutorCount.decrementAndGet();
+      }
+    }
+  }
+
+  private void execLoop(Procedure proc) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("trying to start the execution of " + proc);
+    }
+
+    Long rootProcId = getRootProcedureId(proc);
+    if (rootProcId == null) {
+      // The 'proc' was ready to run but the root procedure was rolledback
+      executeRollback(proc);
+      return;
+    }
+
+    RootProcedureState procStack = rollbackStack.get(rootProcId);
+    if (procStack == null) return;
+
+    do {
+      // Try to acquire the execution
+      if (!procStack.acquire(proc)) {
+        if (procStack.setRollback()) {
+          // we have the 'rollback-lock' we can start rollingback
+          if (!executeRollback(rootProcId, procStack)) {
+            procStack.unsetRollback();
+            runnables.yield(proc);
+          }
+        } else {
+          // if we can't rollback means that some child is still running.
+          // the rollback will be executed after all the children are done.
+          // If the procedure was never executed, remove and mark it as rolledback.
+          if (!proc.wasExecuted()) {
+            if (!executeRollback(proc)) {
+              runnables.yield(proc);
+            }
+          }
+        }
+        break;
+      }
+
+      // Execute the procedure
+      assert proc.getState() == ProcedureState.RUNNABLE;
+      if (proc.acquireLock(getEnvironment())) {
+        execProcedure(procStack, proc);
+        proc.releaseLock(getEnvironment());
+      } else {
+        runnables.yield(proc);
+      }
+      procStack.release(proc);
+
+      // allows to kill the executor before something is stored to the wal.
+      // useful to test the procedure recovery.
+      if (testing != null && !isRunning()) {
+        break;
+      }
+
+      if (proc.getProcId() == rootProcId && proc.isSuccess()) {
+        // Finalize the procedure state
+        LOG.info("Procedure completed in " +
+            StringUtils.humanTimeDiff(proc.elapsedTime()) + ": " + proc);
+        procedureFinished(proc);
+        break;
+      }
+    } while (procStack.isFailed());
+  }
+
+  private void timeoutLoop() {
+    while (isRunning()) {
+      Procedure proc = waitingTimeout.poll();
+      if (proc == null) continue;
+
+      if (proc.getTimeRemaining() > 100) {
+        // got an early wake, maybe a stop?
+        // re-enqueue the task in case was not a stop or just a signal
+        waitingTimeout.add(proc);
+        continue;
+      }
+
+      // ----------------------------------------------------------------------------
+      // TODO-MAYBE: Should we provide a notification to the store with the
+      // full set of procedures pending and completed to write a compacted
+      // version of the log (in case is a log)?
+      // In theory no, procedures are have a short life, so at some point the store
+      // will have the tracker saying everything is in the last log.
+      // ----------------------------------------------------------------------------
+
+      // The CompletedProcedureCleaner is a special case, and it acts as a chore.
+      // instead of bringing the Chore class in, we reuse this timeout thread for
+      // this special case.
+      if (proc instanceof CompletedProcedureCleaner) {
+        try {
+          ((CompletedProcedureCleaner)proc).periodicExecute(getEnvironment());
+        } catch (Throwable e) {
+          LOG.error("ignoring CompletedProcedureCleaner exception: " + e.getMessage(), e);
+        }
+        proc.setStartTime(EnvironmentEdgeManager.currentTime());
+        waitingTimeout.add(proc);
+        continue;
+      }
+
+      // The procedure received an "abort-timeout", call abort() and
+      // add the procedure back in the queue for rollback.
+      if (proc.setTimeoutFailure()) {
+        long rootProcId = Procedure.getRootProcedureId(procedures, proc);
+        RootProcedureState procStack = rollbackStack.get(rootProcId);
+        procStack.abort();
+        store.update(proc);
+        runnables.addFront(proc);
+        continue;
+      }
+    }
+  }
+
+  /**
+   * Execute the rollback of the full procedure stack.
+   * Once the procedure is rolledback, the root-procedure will be visible as
+   * finished to user, and the result will be the fatal exception.
+   */
+  private boolean executeRollback(final long rootProcId, final RootProcedureState procStack) {
+    Procedure rootProc = procedures.get(rootProcId);
+    RemoteProcedureException exception = rootProc.getException();
+    if (exception == null) {
+      exception = procStack.getException();
+      rootProc.setFailure(exception);
+      store.update(rootProc);
+    }
+
+    List<Procedure> subprocStack = procStack.getSubprocedures();
+    assert subprocStack != null : "called rollback with no steps executed rootProc=" + rootProc;
+
+    int stackTail = subprocStack.size();
+    boolean reuseLock = false;
+    while (stackTail --> 0) {
+      final Procedure proc = subprocStack.get(stackTail);
+
+      if (!reuseLock && !proc.acquireLock(getEnvironment())) {
+        // can't take a lock on the procedure, add the root-proc back on the
+        // queue waiting for the lock availability
+        return false;
+      }
+
+      boolean abortRollback = !executeRollback(proc);
+      abortRollback |= !isRunning() || !store.isRunning();
+
+      // If the next procedure is the same to this one
+      // (e.g. StateMachineProcedure reuse the same instance)
+      // we can avoid to lock/unlock each step
+      reuseLock = stackTail > 0 && (subprocStack.get(stackTail - 1) == proc) && !abortRollback;
+      if (!reuseLock) {
+        proc.releaseLock(getEnvironment());
+      }
+
+      // allows to kill the executor before something is stored to the wal.
+      // useful to test the procedure recovery.
+      if (abortRollback) {
+        return false;
+      }
+
+      subprocStack.remove(stackTail);
+    }
+
+    // Finalize the procedure state
+    LOG.info("Rolledback procedure " + rootProc +
+             " exec-time=" + StringUtils.humanTimeDiff(rootProc.elapsedTime()) +
+             " exception=" + exception.getMessage());
+    procedureFinished(rootProc);
+    return true;
+  }
+
+  /**
+   * Execute the rollback of the procedure step.
+   * It updates the store with the new state (stack index)
+   * or will remove completly the procedure in case it is a child.
+   */
+  private boolean executeRollback(final Procedure proc) {
+    try {
+      proc.doRollback(getEnvironment());
+    } catch (IOException e) {
+      LOG.debug("rollback attempt failed for " + proc, e);
+      return false;
+    } catch (Throwable e) {
+      // Catch NullPointerExceptions or similar errors...
+      LOG.fatal("CODE-BUG: uncatched runtime exception for procedure: " + proc, e);
+    }
+
+    // allows to kill the executor before something is stored to the wal.
+    // useful to test the procedure recovery.
+    if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
+      LOG.debug("TESTING: Kill before store update");
+      stop();
+      return false;
+    }
+
+    if (proc.removeStackIndex()) {
+      proc.setState(ProcedureState.ROLLEDBACK);
+      if (proc.hasParent()) {
+        store.delete(proc.getProcId());
+        procedures.remove(proc.getProcId());
+      } else {
+        store.update(proc);
+      }
+    } else {
+      store.update(proc);
+    }
+    return true;
+  }
+
+  /**
+   * Executes the specified procedure
+   *  - calls the doExecute() of the procedure
+   *  - if the procedure execution didn't fail (e.g. invalid user input)
+   *     - ...and returned subprocedures
+   *        - the subprocedures are initialized.
+   *        - the subprocedures are added to the store
+   *        - the subprocedures are added to the runnable queue
+   *        - the procedure is now in a WAITING state, waiting for the subprocedures to complete
+   *     - ...if there are no subprocedure
+   *        - the procedure completed successfully
+   *        - if there is a parent (WAITING)
+   *            - the parent state will be set to RUNNABLE
+   *  - in case of failure
+   *    - the store is updated with the new state
+   *    - the executor (caller of this method) will start the rollback of the procedure
+   */
+  private void execProcedure(final RootProcedureState procStack, final Procedure procedure) {
+    Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE);
+
+    // Execute the procedure
+    boolean reExecute = false;
+    Procedure[] subprocs = null;
+    do {
+      reExecute = false;
+      try {
+        subprocs = procedure.doExecute(getEnvironment());
+        if (subprocs != null && subprocs.length == 0) {
+          subprocs = null;
+        }
+      } catch (ProcedureYieldException e) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("yield procedure: " + procedure);
+        }
+        runnables.yield(procedure);
+        return;
+      } catch (Throwable e) {
+        // Catch NullPointerExceptions or similar errors...
+        String msg = "CODE-BUG: uncatched runtime exception for procedure: " + procedure;
+        LOG.error(msg, e);
+        procedure.setFailure(new RemoteProcedureException(msg, e));
+      }
+
+      if (!procedure.isFailed()) {
+        if (subprocs != null) {
+          if (subprocs.length == 1 && subprocs[0] == procedure) {
+            // quick-shortcut for a state machine like procedure
+            subprocs = null;
+            reExecute = true;
+          } else {
+            // yield the current procedure, and make the subprocedure runnable
+            for (int i = 0; i < subprocs.length; ++i) {
+              Procedure subproc = subprocs[i];
+              if (subproc == null) {
+                String msg = "subproc[" + i + "] is null, aborting the procedure";
+                procedure.setFailure(new RemoteProcedureException(msg,
+                  new IllegalArgumentException(msg)));
+                subprocs = null;
+                break;
+              }
+
+              assert subproc.getState() == ProcedureState.INITIALIZING;
+              subproc.setParentProcId(procedure.getProcId());
+              subproc.setProcId(nextProcId());
+            }
+
+            if (!procedure.isFailed()) {
+              procedure.setChildrenLatch(subprocs.length);
+              switch (procedure.getState()) {
+                case RUNNABLE:
+                  procedure.setState(ProcedureState.WAITING);
+                  break;
+                case WAITING_TIMEOUT:
+                  waitingTimeout.add(procedure);
+                  break;
+                default:
+                  break;
+              }
+            }
+          }
+        } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
+          waitingTimeout.add(procedure);
+        } else {
+          // No subtask, so we are done
+          procedure.setState(ProcedureState.FINISHED);
+        }
+      }
+
+      // Add the procedure to the stack
+      procStack.addRollbackStep(procedure);
+
+      // allows to kill the executor before something is stored to the wal.
+      // useful to test the procedure recovery.
+      if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
+        LOG.debug("TESTING: Kill before store update");
+        stop();
+        return;
+      }
+
+      // Commit the transaction
+      if (subprocs != null && !procedure.isFailed()) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("store add " + procedure + " children " + Arrays.toString(subprocs));
+        }
+        store.insert(procedure, subprocs);
+      } else {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("store update " + procedure);
+        }
+        store.update(procedure);
+      }
+
+      // if the store is not running we are aborting
+      if (!store.isRunning()) {
+        return;
+      }
+
+      assert (reExecute && subprocs == null) || !reExecute;
+    } while (reExecute);
+
+    // Submit the new subprocedures
+    if (subprocs != null && !procedure.isFailed()) {
+      for (int i = 0; i < subprocs.length; ++i) {
+        Procedure subproc = subprocs[i];
+        assert !procedures.containsKey(subproc.getProcId());
+        procedures.put(subproc.getProcId(), subproc);
+        runnables.addFront(subproc);
+      }
+    }
+
+    if (procedure.isFinished() && procedure.hasParent()) {
+      Procedure parent = procedures.get(procedure.getParentProcId());
+      if (parent == null) {
+        assert procStack.isRollingback();
+        return;
+      }
+
+      // If this procedure is the last child awake the parent procedure
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(parent + " child is done: " + procedure);
+      }
+      if (parent.childrenCountDown() && parent.getState() == ProcedureState.WAITING) {
+        parent.setState(ProcedureState.RUNNABLE);
+        store.update(parent);
+        runnables.addFront(parent);
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(parent + " all the children finished their work, resume.");
+        }
+        return;
+      }
+    }
+  }
+
+  private void sendProcedureLoadedNotification(final long procId) {
+    if (!this.listeners.isEmpty()) {
+      for (ProcedureExecutorListener listener: this.listeners) {
+        try {
+          listener.procedureLoaded(procId);
+        } catch (Throwable e) {
+          LOG.error("the listener " + listener + " had an error: " + e.getMessage(), e);
+        }
+      }
+    }
+  }
+
+  private void sendProcedureAddedNotification(final long procId) {
+    if (!this.listeners.isEmpty()) {
+      for (ProcedureExecutorListener listener: this.listeners) {
+        try {
+          listener.procedureAdded(procId);
+        } catch (Throwable e) {
+          LOG.error("the listener " + listener + " had an error: " + e.getMessage(), e);
+        }
+      }
+    }
+  }
+
+  private void sendProcedureFinishedNotification(final long procId) {
+    if (!this.listeners.isEmpty()) {
+      for (ProcedureExecutorListener listener: this.listeners) {
+        try {
+          listener.procedureFinished(procId);
+        } catch (Throwable e) {
+          LOG.error("the listener " + listener + " had an error: " + e.getMessage(), e);
+        }
+      }
+    }
+  }
+
+  private long nextProcId() {
+    long procId = lastProcId.incrementAndGet();
+    if (procId < 0) {
+      while (!lastProcId.compareAndSet(procId, 0)) {
+        procId = lastProcId.get();
+        if (procId >= 0)
+          break;
+      }
+      while (procedures.containsKey(procId)) {
+        procId = lastProcId.incrementAndGet();
+      }
+    }
+    return procId;
+  }
+
+  private Long getRootProcedureId(Procedure proc) {
+    return Procedure.getRootProcedureId(procedures, proc);
+  }
+
+  private void procedureFinished(final Procedure proc) {
+    // call the procedure completion cleanup handler
+    try {
+      proc.completionCleanup(getEnvironment());
+    } catch (Throwable e) {
+      // Catch NullPointerExceptions or similar errors...
+      LOG.error("CODE-BUG: uncatched runtime exception for procedure: " + proc, e);
+    }
+
+    // update the executor internal state maps
+    completed.put(proc.getProcId(), newResultFromProcedure(proc));
+    rollbackStack.remove(proc.getProcId());
+    procedures.remove(proc.getProcId());
+
+    // call the runnableSet completion cleanup handler
+    try {
+      runnables.completionCleanup(proc);
+    } catch (Throwable e) {
+      // Catch NullPointerExceptions or similar errors...
+      LOG.error("CODE-BUG: uncatched runtime exception for runnableSet: " + runnables, e);
+    }
+
+    // Notify the listeners
+    sendProcedureFinishedNotification(proc.getProcId());
+  }
+
+  public Pair<ProcedureResult, Procedure> getResultOrProcedure(final long procId) {
+    ProcedureResult result = completed.get(procId);
+    Procedure proc = null;
+    if (result == null) {
+      proc = procedures.get(procId);
+      if (proc == null) {
+        result = completed.get(procId);
+      }
+    }
+    return new Pair(result, proc);
+  }
+
+  private static ProcedureResult newResultFromProcedure(final Procedure proc) {
+    if (proc.isFailed()) {
+      return new ProcedureResult(proc.getStartTime(), proc.getLastUpdate(), proc.getException());
+    }
+    return new ProcedureResult(proc.getStartTime(), proc.getLastUpdate(), proc.getResult());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/9d763cd7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java
new file mode 100644
index 0000000..d8252b1
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.util.Map;
+
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * This class is a container of queues that allows to select a queue
+ * in a round robin fashion, considering priority of the queue.
+ *
+ * the quantum is just how many poll() will return the same object.
+ * e.g. if quantum is 1 and you have A and B as object you'll get: A B A B
+ * e.g. if quantum is 2 and you have A and B as object you'll get: A A B B A A B B
+ * then the object priority is just a priority * quantum
+ *
+ * Example:
+ *  - three queues (A, B, C) with priorities (1, 1, 2)
+ *  - The first poll() will return A
+ *  - The second poll() will return B
+ *  - The third and forth poll() will return C
+ *  - and so on again and again.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ProcedureFairRunQueues<TKey, TQueue extends ProcedureFairRunQueues.FairObject> {
+  private ConcurrentSkipListMap<TKey, TQueue> objMap =
+    new ConcurrentSkipListMap<TKey, TQueue>();
+
+  private final ReentrantLock lock = new ReentrantLock();
+  private final int quantum;
+
+  private Map.Entry<TKey, TQueue> current = null;
+  private int currentQuantum = 0;
+
+  public interface FairObject {
+    boolean isAvailable();
+    int getPriority();
+  }
+
+  /**
+   * @param quantum how many poll() will return the same object.
+   */
+  public ProcedureFairRunQueues(final int quantum) {
+    this.quantum = quantum;
+  }
+
+  public TQueue get(final TKey key) {
+    return objMap.get(key);
+  }
+
+  public TQueue add(final TKey key, final TQueue queue) {
+    TQueue oldq = objMap.putIfAbsent(key, queue);
+    return oldq != null ? oldq : queue;
+  }
+
+  public TQueue remove(final TKey key) {
+    TQueue queue = objMap.remove(key);
+    if (queue != null) {
+      lock.lock();
+      try {
+        if (current != null && queue == current.getValue()) {
+          currentQuantum = 0;
+          current = null;
+        }
+      } finally {
+        lock.unlock();
+      }
+    }
+    return queue;
+  }
+
+  public void clear() {
+    lock.lock();
+    try {
+      current = null;
+      objMap.clear();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * @return the next available item if present
+   */
+  public TQueue poll() {
+    lock.lock();
+    try {
+      TQueue queue;
+      if (currentQuantum == 0) {
+        if (nextObject() == null) {
+          // nothing here
+          return null;
+        }
+
+        queue = current.getValue();
+        currentQuantum = calculateQuantum(queue) - 1;
+      } else {
+        currentQuantum--;
+        queue = current.getValue();
+      }
+
+      if (!queue.isAvailable()) {
+        Map.Entry<TKey, TQueue> last = current;
+        // Try the next one
+        do {
+          if (nextObject() == null)
+            return null;
+        } while (current.getValue() != last.getValue() && !current.getValue().isAvailable());
+
+        queue = current.getValue();
+        currentQuantum = calculateQuantum(queue) - 1;
+      }
+
+      return queue;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    builder.append('{');
+    for (Map.Entry<TKey, TQueue> entry: objMap.entrySet()) {
+      builder.append(entry.getKey());
+      builder.append(':');
+      builder.append(entry.getValue());
+    }
+    builder.append('}');
+    return builder.toString();
+  }
+
+  private Map.Entry<TKey, TQueue> nextObject() {
+    Map.Entry<TKey, TQueue> next = null;
+
+    // If we have already a key, try the next one
+    if (current != null) {
+      next = objMap.higherEntry(current.getKey());
+    }
+
+    // if there is no higher key, go back to the first
+    current = (next != null) ? next : objMap.firstEntry();
+    return current;
+  }
+
+  private int calculateQuantum(final TQueue fairObject) {
+    // TODO
+    return Math.max(1, fairObject.getPriority() * quantum);
+  }
+}
\ No newline at end of file