You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2015/04/09 22:59:37 UTC
[32/50] [abbrv] hbase git commit: HBASE-13202 Procedure v2 - core
framework
HBASE-13202 Procedure v2 - core framework
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9d763cd7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9d763cd7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9d763cd7
Branch: refs/heads/hbase-12439
Commit: 9d763cd70243834359c96905871f807a5736f144
Parents: 1deadb6
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Thu Apr 9 20:44:56 2015 +0100
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Thu Apr 9 20:44:56 2015 +0100
----------------------------------------------------------------------
.../hadoop/hbase/io/util/StreamUtils.java | 12 +-
.../hadoop/hbase/util/ForeignExceptionUtil.java | 109 +
hbase-procedure/pom.xml | 181 +
.../hbase/procedure2/OnePhaseProcedure.java | 28 +
.../hadoop/hbase/procedure2/Procedure.java | 680 ++
.../procedure2/ProcedureAbortedException.java | 42 +
.../hbase/procedure2/ProcedureException.java | 45 +
.../hbase/procedure2/ProcedureExecutor.java | 1077 +++
.../procedure2/ProcedureFairRunQueues.java | 172 +
.../hbase/procedure2/ProcedureResult.java | 95 +
.../hbase/procedure2/ProcedureRunnableSet.java | 78 +
.../procedure2/ProcedureSimpleRunQueue.java | 121 +
.../procedure2/ProcedureYieldException.java | 40 +
.../procedure2/RemoteProcedureException.java | 116 +
.../hbase/procedure2/RootProcedureState.java | 185 +
.../hbase/procedure2/SequentialProcedure.java | 81 +
.../hbase/procedure2/StateMachineProcedure.java | 166 +
.../hbase/procedure2/TwoPhaseProcedure.java | 28 +
.../hbase/procedure2/store/ProcedureStore.java | 121 +
.../procedure2/store/ProcedureStoreTracker.java | 540 ++
.../CorruptedWALProcedureStoreException.java | 43 +
.../procedure2/store/wal/ProcedureWALFile.java | 152 +
.../store/wal/ProcedureWALFormat.java | 234 +
.../store/wal/ProcedureWALFormatReader.java | 166 +
.../procedure2/store/wal/WALProcedureStore.java | 721 ++
.../hadoop/hbase/procedure2/util/ByteSlot.java | 111 +
.../hbase/procedure2/util/StringUtils.java | 80 +
.../procedure2/util/TimeoutBlockingQueue.java | 217 +
.../procedure2/ProcedureTestingUtility.java | 163 +
.../procedure2/TestProcedureExecution.java | 338 +
.../procedure2/TestProcedureFairRunQueues.java | 155 +
.../hbase/procedure2/TestProcedureRecovery.java | 488 ++
.../procedure2/TestProcedureReplayOrder.java | 226 +
.../store/TestProcedureStoreTracker.java | 168 +
.../store/wal/TestWALProcedureStore.java | 267 +
.../util/TestTimeoutBlockingQueue.java | 137 +
hbase-protocol/pom.xml | 1 +
.../protobuf/generated/ProcedureProtos.java | 7219 ++++++++++++++++++
.../src/main/protobuf/Procedure.proto | 114 +
pom.xml | 21 +-
40 files changed, 14933 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d763cd7/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java
index 314ed2b..0b442a5 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java
@@ -120,7 +120,7 @@ public class StreamUtils {
/**
* Reads a varInt value stored in an array.
- *
+ *
* @param input
* Input array where the varInt is available
* @param offset
@@ -198,4 +198,14 @@ public class StreamUtils {
out.write((byte) (0xff & (v >> 8)));
out.write((byte) (0xff & v));
}
+
+ public static long readLong(InputStream in) throws IOException {
+ long result = 0;
+ for (int shift = 56; shift >= 0; shift -= 8) {
+ long x = in.read();
+ if (x < 0) throw new IOException("EOF");
+ result |= (x << shift);
+ }
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d763cd7/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ForeignExceptionUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ForeignExceptionUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ForeignExceptionUtil.java
new file mode 100644
index 0000000..a0006ed
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ForeignExceptionUtil.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.protobuf.generated.ErrorHandlingProtos.ForeignExceptionMessage;
+import org.apache.hadoop.hbase.protobuf.generated.ErrorHandlingProtos.GenericExceptionMessage;
+import org.apache.hadoop.hbase.protobuf.generated.ErrorHandlingProtos.StackTraceElementMessage;
+
+/**
+ * Helper to convert Exceptions and StackTraces from/to protobuf.
+ * (see ErrorHandling.proto for the internal of the proto messages)
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class ForeignExceptionUtil {
+ private ForeignExceptionUtil() { }
+
+ public static IOException toIOException(final ForeignExceptionMessage eem) {
+ GenericExceptionMessage gem = eem.getGenericException();
+ StackTraceElement[] trace = toStackTrace(gem.getTraceList());
+ RemoteException re = new RemoteException(gem.getClassName(), gem.getMessage());
+ re.setStackTrace(trace);
+ return re.unwrapRemoteException();
+ }
+
+ public static ForeignExceptionMessage toProtoForeignException(String source, Throwable t) {
+ GenericExceptionMessage.Builder gemBuilder = GenericExceptionMessage.newBuilder();
+ gemBuilder.setClassName(t.getClass().getName());
+ if (t.getMessage() != null) {
+ gemBuilder.setMessage(t.getMessage());
+ }
+ // set the stack trace, if there is one
+ List<StackTraceElementMessage> stack = toProtoStackTraceElement(t.getStackTrace());
+ if (stack != null) {
+ gemBuilder.addAllTrace(stack);
+ }
+ GenericExceptionMessage payload = gemBuilder.build();
+ ForeignExceptionMessage.Builder exception = ForeignExceptionMessage.newBuilder();
+ exception.setGenericException(payload).setSource(source);
+ return exception.build();
+ }
+
+ /**
+ * Convert a stack trace to list of {@link StackTraceElement}.
+ * @param trace the stack trace to convert to protobuf message
+ * @return <tt>null</tt> if the passed stack is <tt>null</tt>.
+ */
+ public static List<StackTraceElementMessage> toProtoStackTraceElement(StackTraceElement[] trace) {
+ // if there is no stack trace, ignore it and just return the message
+ if (trace == null) return null;
+ // build the stack trace for the message
+ List<StackTraceElementMessage> pbTrace = new ArrayList<StackTraceElementMessage>(trace.length);
+ for (StackTraceElement elem : trace) {
+ StackTraceElementMessage.Builder stackBuilder = StackTraceElementMessage.newBuilder();
+ stackBuilder.setDeclaringClass(elem.getClassName());
+ if (elem.getFileName() != null) {
+ stackBuilder.setFileName(elem.getFileName());
+ }
+ stackBuilder.setLineNumber(elem.getLineNumber());
+ stackBuilder.setMethodName(elem.getMethodName());
+ pbTrace.add(stackBuilder.build());
+ }
+ return pbTrace;
+ }
+
+ /**
+ * Unwind a serialized array of {@link StackTraceElementMessage}s to a
+ * {@link StackTraceElement}s.
+ * @param traceList list that was serialized
+ * @return the deserialized list or <tt>null</tt> if it couldn't be unwound (e.g. wasn't set on
+ * the sender).
+ */
+ public static StackTraceElement[] toStackTrace(List<StackTraceElementMessage> traceList) {
+ if (traceList == null || traceList.size() == 0) {
+ return new StackTraceElement[0]; // empty array
+ }
+ StackTraceElement[] trace = new StackTraceElement[traceList.size()];
+ for (int i = 0; i < traceList.size(); i++) {
+ StackTraceElementMessage elem = traceList.get(i);
+ trace[i] = new StackTraceElement(
+ elem.getDeclaringClass(), elem.getMethodName(),
+ elem.hasFileName() ? elem.getFileName() : null,
+ elem.getLineNumber());
+ }
+ return trace;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d763cd7/hbase-procedure/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-procedure/pom.xml b/hbase-procedure/pom.xml
new file mode 100644
index 0000000..9683db2
--- /dev/null
+++ b/hbase-procedure/pom.xml
@@ -0,0 +1,181 @@
+<?xml version="1.0"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>hbase</artifactId>
+ <groupId>org.apache.hbase</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>hbase-procedure</artifactId>
+ <name>HBase - Procedure</name>
+ <description>Procedure Framework</description>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-site-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ <!-- Make a jar and put the sources in the jar -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <!--Make it so assembly:single does nothing in here-->
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>${maven.assembly.version}</version>
+ <configuration>
+ <skipAssembly>true</skipAssembly>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <!-- Always skip the second part executions, since we only run
+ simple unit tests in this module. -->
+ <executions>
+ <execution>
+ <id>secondPartTestsExecution</id>
+ <phase>test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-annotations</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-protocol</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </dependency>
+ </dependencies>
+
+ <profiles>
+ <!-- Profiles for building against different hadoop versions -->
+ <profile>
+ <id>hadoop-1.1</id>
+ <activation>
+ <property>
+ <!--Below formatting for dev-support/generate-hadoopX-poms.sh-->
+ <!--h1--><name>hadoop.profile</name><value>1.1</value>
+ </property>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
+ <id>hadoop-1.0</id>
+ <activation>
+ <property>
+ <name>hadoop.profile</name>
+ <value>1.0</value>
+ </property>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ </dependency>
+ </dependencies>
+ </profile>
+ <!--
+ profile for building against Hadoop 2.0.0-alpha. Activate using:
+ mvn -Dhadoop.profile=2.0
+ -->
+ <profile>
+ <id>hadoop-2.0</id>
+ <activation>
+ <property>
+ <!--Below formatting for dev-support/generate-hadoopX-poms.sh-->
+ <!--h2--><name>!hadoop.profile</name>
+ </property>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </dependency>
+ </dependencies>
+ </profile>
+ <!--
+ profile for building against Hadoop 3.0.x. Activate using:
+ mvn -Dhadoop.profile=3.0
+ -->
+ <profile>
+ <id>hadoop-3.0</id>
+ <activation>
+ <property>
+ <name>hadoop.profile</name>
+ <value>3.0</value>
+ </property>
+ </activation>
+ <properties>
+ <hadoop.version>3.0-SNAPSHOT</hadoop.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+</project>
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d763cd7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/OnePhaseProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/OnePhaseProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/OnePhaseProcedure.java
new file mode 100644
index 0000000..1c3be2d
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/OnePhaseProcedure.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class OnePhaseProcedure<TEnvironment> extends Procedure<TEnvironment> {
+ // TODO (e.g. used by online snapshots)
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d763cd7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
new file mode 100644
index 0000000..338fcad
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
@@ -0,0 +1,680 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.procedure2.util.StringUtils;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+
+/**
+ * Base Procedure class responsible to handle the Procedure Metadata
+ * e.g. state, startTime, lastUpdate, stack-indexes, ...
+ *
+ * execute() is called each time the procedure is executed.
+ * it may be called multiple times in case of failure and restart, so the
+ * code must be idempotent.
+ * the return is a set of sub-procedures or null in case the procedure doesn't
+ * have sub-procedures. Once the sub-procedures are successfully completed
+ * the execute() method is called again, you should think at it as a stack:
+ * -> step 1
+ * ---> step 2
+ * -> step 1
+ *
+ * rollback() is called when the procedure or one of the sub-procedures is failed.
+ * the rollback step is supposed to cleanup the resources created during the
+ * execute() step. in case of failure and restart rollback() may be called
+ * multiple times, so the code must be idempotent.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
+ // unchanged after initialization
+ private String owner = null;
+ private Long parentProcId = null;
+ private Long procId = null;
+ private long startTime;
+
+ // runtime state, updated every operation
+ private ProcedureState state = ProcedureState.INITIALIZING;
+ private Integer timeout = null;
+ private int[] stackIndexes = null;
+ private int childrenLatch = 0;
+ private long lastUpdate;
+
+ private RemoteProcedureException exception = null;
+ private byte[] result = null;
+
+ /**
+ * The main code of the procedure. It must be idempotent since execute()
+ * may be called multiple time in case of machine failure in the middle
+ * of the execution.
+ * @return a set of sub-procedures or null if there is nothing else to execute.
+ */
+ protected abstract Procedure[] execute(TEnvironment env)
+ throws ProcedureYieldException;
+
+ /**
+ * The code to undo what done by the execute() code.
+ * It is called when the procedure or one of the sub-procedure failed or an
+ * abort was requested. It should cleanup all the resources created by
+ * the execute() call. The implementation must be idempotent since rollback()
+ * may be called multiple time in case of machine failure in the middle
+ * of the execution.
+ * @throws IOException temporary failure, the rollback will retry later
+ */
+ protected abstract void rollback(TEnvironment env)
+ throws IOException;
+
+ /**
+ * The abort() call is asynchronous and each procedure must decide how to deal
+ * with that, if they want to be abortable. The simplest implementation
+ * is to have an AtomicBoolean set in the abort() method and then the execute()
+ * will check if the abort flag is set or not.
+ * abort() may be called multiple times from the client, so the implementation
+ * must be idempotent.
+ *
+ * NOTE: abort() is not like Thread.interrupt() it is just a notification
+ * that allows the procedure implementor where to abort to avoid leak and
+ * have a better control on what was executed and what not.
+ */
+ protected abstract boolean abort(TEnvironment env);
+
+ /**
+ * The user-level code of the procedure may have some state to
+ * persist (e.g. input arguments) to be able to resume on failure.
+ * @param stream the stream that will contain the user serialized data
+ */
+ protected abstract void serializeStateData(final OutputStream stream)
+ throws IOException;
+
+ /**
+ * Called on store load to allow the user to decode the previously serialized
+ * state.
+ * @param stream the stream that contains the user serialized data
+ */
+ protected abstract void deserializeStateData(final InputStream stream)
+ throws IOException;
+
+ /**
+ * The user should override this method, and try to take a lock if necessary.
+ * A lock can be anything, and it is up to the implementor.
+ * Example: in our Master we can execute request in parallel for different tables
+ * create t1 and create t2 can be executed at the same time.
+ * anything else on t1/t2 is queued waiting that specific table create to happen.
+ *
+ * @return true if the lock was acquired and false otherwise
+ */
+ protected boolean acquireLock(final TEnvironment env) {
+ return true;
+ }
+
+ /**
+ * The user should override this method, and release lock if necessary.
+ */
+ protected void releaseLock(final TEnvironment env) {
+ // no-op
+ }
+
+ /**
+ * Called when the procedure is loaded for replay.
+ * The procedure implementor may use this method to perform some quick
+ * operation before replay.
+ * e.g. failing the procedure if the state on replay may be unknown.
+ */
+ protected void beforeReplay(final TEnvironment env) {
+ // no-op
+ }
+
+ /**
+ * Called when the procedure is marked as completed (success or rollback).
+ * The procedure implementor may use this method to cleanup in-memory states.
+ * This operation will not be retried on failure.
+ */
+ protected void completionCleanup(final TEnvironment env) {
+ // no-op
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ toStringClassDetails(sb);
+
+ if (procId != null) {
+ sb.append(" id=");
+ sb.append(getProcId());
+ }
+
+ if (hasParent()) {
+ sb.append(" parent=");
+ sb.append(getParentProcId());
+ }
+
+ if (hasOwner()) {
+ sb.append(" owner=");
+ sb.append(getOwner());
+ }
+
+ sb.append(" state=");
+ sb.append(getState());
+ return sb.toString();
+ }
+
+ /**
+ * Extend the toString() information with the procedure details
+ * e.g. className and parameters
+ * @param builder the string builder to use to append the proc specific information
+ */
+ protected void toStringClassDetails(StringBuilder builder) {
+ builder.append(getClass().getName());
+ }
+
+ /**
+ * @return the serialized result if any, otherwise null
+ */
+ public byte[] getResult() {
+ return result;
+ }
+
+ /**
+ * The procedure may leave a "result" on completion.
+ * @param result the serialized result that will be passed to the client
+ */
+ protected void setResult(final byte[] result) {
+ this.result = result;
+ }
+
+ public long getProcId() {
+ return procId;
+ }
+
+ public boolean hasParent() {
+ return parentProcId != null;
+ }
+
+ public boolean hasException() {
+ return exception != null;
+ }
+
+ public boolean hasTimeout() {
+ return timeout != null;
+ }
+
+ public long getParentProcId() {
+ return parentProcId;
+ }
+
+ /**
+ * @return true if the procedure has failed.
+ * true may mean failed but not yet rolledback or failed and rolledback.
+ */
+ public synchronized boolean isFailed() {
+ return exception != null || state == ProcedureState.ROLLEDBACK;
+ }
+
+ /**
+ * @return true if the procedure is finished successfully.
+ */
+ public synchronized boolean isSuccess() {
+ return state == ProcedureState.FINISHED && exception == null;
+ }
+
+ /**
+ * @return true if the procedure is finished. The Procedure may be completed
+ * successfuly or failed and rolledback.
+ */
+ public synchronized boolean isFinished() {
+ switch (state) {
+ case ROLLEDBACK:
+ return true;
+ case FINISHED:
+ return exception == null;
+ default:
+ break;
+ }
+ return false;
+ }
+
+ /**
+ * @return true if the procedure is waiting for a child to finish or for an external event.
+ */
+ public synchronized boolean isWaiting() {
+ switch (state) {
+ case WAITING:
+ case WAITING_TIMEOUT:
+ return true;
+ default:
+ break;
+ }
+ return false;
+ }
+
+ public synchronized RemoteProcedureException getException() {
+ return exception;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public synchronized long getLastUpdate() {
+ return lastUpdate;
+ }
+
+ public synchronized long elapsedTime() {
+ return lastUpdate - startTime;
+ }
+
+ /**
+ * @param timeout timeout in msec
+ */
+ protected void setTimeout(final int timeout) {
+ this.timeout = timeout;
+ }
+
+ /**
+ * @return the timeout in msec
+ */
+ public int getTimeout() {
+ return timeout;
+ }
+
+ /**
+ * @return the remaining time before the timeout
+ */
+ public long getTimeRemaining() {
+ return Math.max(0, timeout - (EnvironmentEdgeManager.currentTime() - startTime));
+ }
+
+ protected void setOwner(final String owner) {
+ this.owner = StringUtils.isEmpty(owner) ? null : owner;
+ }
+
+ public String getOwner() {
+ return owner;
+ }
+
+ public boolean hasOwner() {
+ return owner != null;
+ }
+
+ @VisibleForTesting
+ @InterfaceAudience.Private
+ protected synchronized void setState(final ProcedureState state) {
+ this.state = state;
+ updateTimestamp();
+ }
+
+ @InterfaceAudience.Private
+ protected synchronized ProcedureState getState() {
+ return state;
+ }
+
+ protected void setFailure(final String source, final Throwable cause) {
+ setFailure(new RemoteProcedureException(source, cause));
+ }
+
+ protected synchronized void setFailure(final RemoteProcedureException exception) {
+ this.exception = exception;
+ if (!isFinished()) {
+ setState(ProcedureState.FINISHED);
+ }
+ }
+
+ protected void setAbortFailure(final String source, final String msg) {
+ setFailure(source, new ProcedureAbortedException(msg));
+ }
+
+ @InterfaceAudience.Private
+ protected synchronized boolean setTimeoutFailure() {
+ if (state == ProcedureState.WAITING_TIMEOUT) {
+ long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate;
+ setFailure("ProcedureExecutor", new TimeoutException(
+ "Operation timed out after " + StringUtils.humanTimeDiff(timeDiff)));
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Called by the ProcedureExecutor to assign the ID to the newly created procedure.
+ */
+ @VisibleForTesting
+ @InterfaceAudience.Private
+ protected void setProcId(final long procId) {
+ this.procId = procId;
+ this.startTime = EnvironmentEdgeManager.currentTime();
+ setState(ProcedureState.RUNNABLE);
+ }
+
+ /**
+ * Called by the ProcedureExecutor to assign the parent to the newly created procedure.
+ */
+ @InterfaceAudience.Private
+ protected void setParentProcId(final long parentProcId) {
+ this.parentProcId = parentProcId;
+ }
+
+ /**
+ * Internal method called by the ProcedureExecutor that starts the
+ * user-level code execute().
+ */
+ @InterfaceAudience.Private
+ protected Procedure[] doExecute(final TEnvironment env)
+ throws ProcedureYieldException {
+ try {
+ updateTimestamp();
+ return execute(env);
+ } finally {
+ updateTimestamp();
+ }
+ }
+
+ /**
+ * Internal method called by the ProcedureExecutor that starts the
+ * user-level code rollback().
+ */
+ @InterfaceAudience.Private
+ protected void doRollback(final TEnvironment env) throws IOException {
+ try {
+ updateTimestamp();
+ rollback(env);
+ } finally {
+ updateTimestamp();
+ }
+ }
+
+ /**
+ * Called on store load to initialize the Procedure internals after
+ * the creation/deserialization.
+ */
+ @InterfaceAudience.Private
+ protected void setStartTime(final long startTime) {
+ this.startTime = startTime;
+ }
+
+ /**
+ * Called on store load to initialize the Procedure internals after
+ * the creation/deserialization.
+ */
+ private synchronized void setLastUpdate(final long lastUpdate) {
+ this.lastUpdate = lastUpdate;
+ }
+
+ protected synchronized void updateTimestamp() {
+ this.lastUpdate = EnvironmentEdgeManager.currentTime();
+ }
+
+ /**
+ * Called by the ProcedureExecutor on procedure-load to restore the latch state
+ */
+ @InterfaceAudience.Private
+ protected synchronized void setChildrenLatch(final int numChildren) {
+ this.childrenLatch = numChildren;
+ }
+
+ /**
+ * Called by the ProcedureExecutor on procedure-load to restore the latch state
+ */
+ @InterfaceAudience.Private
+ protected synchronized void incChildrenLatch() {
+ // TODO: can this be inferred from the stack? I think so...
+ this.childrenLatch++;
+ }
+
+ /**
+ * Called by the ProcedureExecutor to notify that one of the sub-procedures
+ * has completed.
+ */
+ @InterfaceAudience.Private
+ protected synchronized boolean childrenCountDown() {
+ assert childrenLatch > 0;
+ return --childrenLatch == 0;
+ }
+
+ /**
+ * Called by the RootProcedureState on procedure execution.
+ * Each procedure store its stack-index positions.
+ */
+ @InterfaceAudience.Private
+ protected synchronized void addStackIndex(final int index) {
+ if (stackIndexes == null) {
+ stackIndexes = new int[] { index };
+ } else {
+ int count = stackIndexes.length;
+ stackIndexes = Arrays.copyOf(stackIndexes, count + 1);
+ stackIndexes[count] = index;
+ }
+ }
+
+ @InterfaceAudience.Private
+ protected synchronized boolean removeStackIndex() {
+ if (stackIndexes.length > 1) {
+ stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1);
+ return false;
+ } else {
+ stackIndexes = null;
+ return true;
+ }
+ }
+
+ /**
+ * Called on store load to initialize the Procedure internals after
+ * the creation/deserialization.
+ */
+ @InterfaceAudience.Private
+ protected synchronized void setStackIndexes(final List<Integer> stackIndexes) {
+ this.stackIndexes = new int[stackIndexes.size()];
+ for (int i = 0; i < this.stackIndexes.length; ++i) {
+ this.stackIndexes[i] = stackIndexes.get(i);
+ }
+ }
+
+ @InterfaceAudience.Private
+ protected synchronized boolean wasExecuted() {
+ return stackIndexes != null;
+ }
+
+ @InterfaceAudience.Private
+ protected synchronized int[] getStackIndexes() {
+ return stackIndexes;
+ }
+
+ @Override
+ public int compareTo(final Procedure other) {
+ long diff = getProcId() - other.getProcId();
+ return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
+ }
+
+ /*
+ * Helper to lookup the root Procedure ID given a specified procedure.
+ */
+ @InterfaceAudience.Private
+ protected static Long getRootProcedureId(final Map<Long, Procedure> procedures, Procedure proc) {
+ while (proc.hasParent()) {
+ proc = procedures.get(proc.getParentProcId());
+ if (proc == null) return null;
+ }
+ return proc.getProcId();
+ }
+
+ protected static Procedure newInstance(final String className) throws IOException {
+ try {
+ Class<?> clazz = Class.forName(className);
+ if (!Modifier.isPublic(clazz.getModifiers())) {
+ throw new Exception("the " + clazz + " class is not public");
+ }
+
+ Constructor<?> ctor = clazz.getConstructor();
+ assert ctor != null : "no constructor found";
+ if (!Modifier.isPublic(ctor.getModifiers())) {
+ throw new Exception("the " + clazz + " constructor is not public");
+ }
+ return (Procedure)ctor.newInstance();
+ } catch (Exception e) {
+ throw new IOException("The procedure class " + className +
+ " must be accessible and have an empty constructor", e);
+ }
+ }
+
+ protected static void validateClass(final Procedure proc) throws IOException {
+ try {
+ Class<?> clazz = proc.getClass();
+ if (!Modifier.isPublic(clazz.getModifiers())) {
+ throw new Exception("the " + clazz + " class is not public");
+ }
+
+ Constructor<?> ctor = clazz.getConstructor();
+ assert ctor != null;
+ if (!Modifier.isPublic(ctor.getModifiers())) {
+ throw new Exception("the " + clazz + " constructor is not public");
+ }
+ } catch (Exception e) {
+ throw new IOException("The procedure class " + proc.getClass().getName() +
+ " must be accessible and have an empty constructor", e);
+ }
+ }
+
+ /**
+ * Helper to convert the procedure to protobuf.
+ * Used by ProcedureStore implementations.
+ */
+ @InterfaceAudience.Private
+ public static ProcedureProtos.Procedure convert(final Procedure proc)
+ throws IOException {
+ Preconditions.checkArgument(proc != null);
+ validateClass(proc);
+
+ ProcedureProtos.Procedure.Builder builder = ProcedureProtos.Procedure.newBuilder()
+ .setClassName(proc.getClass().getName())
+ .setProcId(proc.getProcId())
+ .setState(proc.getState())
+ .setStartTime(proc.getStartTime())
+ .setLastUpdate(proc.getLastUpdate());
+
+ if (proc.hasParent()) {
+ builder.setParentId(proc.getParentProcId());
+ }
+
+ if (proc.hasTimeout()) {
+ builder.setTimeout(proc.getTimeout());
+ }
+
+ if (proc.hasOwner()) {
+ builder.setOwner(proc.getOwner());
+ }
+
+ int[] stackIds = proc.getStackIndexes();
+ if (stackIds != null) {
+ for (int i = 0; i < stackIds.length; ++i) {
+ builder.addStackId(stackIds[i]);
+ }
+ }
+
+ if (proc.hasException()) {
+ RemoteProcedureException exception = proc.getException();
+ builder.setException(
+ RemoteProcedureException.toProto(exception.getSource(), exception.getCause()));
+ }
+
+ byte[] result = proc.getResult();
+ if (result != null) {
+ builder.setResult(ByteStringer.wrap(result));
+ }
+
+ ByteString.Output stateStream = ByteString.newOutput();
+ proc.serializeStateData(stateStream);
+ if (stateStream.size() > 0) {
+ builder.setStateData(stateStream.toByteString());
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Helper to convert the protobuf procedure.
+ * Used by ProcedureStore implementations.
+ *
+ * TODO: OPTIMIZATION: some of the field never change during the execution
+ * (e.g. className, procId, parentId, ...).
+ * We can split in 'data' and 'state', and the store
+ * may take advantage of it by storing the data only on insert().
+ */
+ @InterfaceAudience.Private
+ public static Procedure convert(final ProcedureProtos.Procedure proto)
+ throws IOException {
+ // Procedure from class name
+ Procedure proc = Procedure.newInstance(proto.getClassName());
+
+ // set fields
+ proc.setProcId(proto.getProcId());
+ proc.setState(proto.getState());
+ proc.setStartTime(proto.getStartTime());
+ proc.setLastUpdate(proto.getLastUpdate());
+
+ if (proto.hasParentId()) {
+ proc.setParentProcId(proto.getParentId());
+ }
+
+ if (proto.hasOwner()) {
+ proc.setOwner(proto.getOwner());
+ }
+
+ if (proto.hasTimeout()) {
+ proc.setTimeout(proto.getTimeout());
+ }
+
+ if (proto.getStackIdCount() > 0) {
+ proc.setStackIndexes(proto.getStackIdList());
+ }
+
+ if (proto.hasException()) {
+ assert proc.getState() == ProcedureState.FINISHED ||
+ proc.getState() == ProcedureState.ROLLEDBACK :
+ "The procedure must be failed (waiting to rollback) or rolledback";
+ proc.setFailure(RemoteProcedureException.fromProto(proto.getException()));
+ }
+
+ if (proto.hasResult()) {
+ proc.setResult(proto.getResult().toByteArray());
+ }
+
+ // we want to call deserialize even when the stream is empty, mainly for testing.
+ proc.deserializeStateData(proto.getStateData().newInput());
+
+ return proc;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d763cd7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureAbortedException.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureAbortedException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureAbortedException.java
new file mode 100644
index 0000000..2e409cf
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureAbortedException.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Thrown when a procedure is aborted
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class ProcedureAbortedException extends ProcedureException {
+ /** default constructor */
+ public ProcedureAbortedException() {
+ super();
+ }
+
+ /**
+ * Constructor
+ * @param s message
+ */
+ public ProcedureAbortedException(String s) {
+ super(s);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d763cd7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureException.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureException.java
new file mode 100644
index 0000000..9f922b1
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureException.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class ProcedureException extends IOException {
+ /** default constructor */
+ public ProcedureException() {
+ super();
+ }
+
+ /**
+ * Constructor
+ * @param s message
+ */
+ public ProcedureException(String s) {
+ super(s);
+ }
+
+ public ProcedureException(Throwable t) {
+ super(t);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d763cd7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
new file mode 100644
index 0000000..8588315
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -0,0 +1,1077 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.HashSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.procedure2.util.StringUtils;
+import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue;
+import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Thread Pool that executes the submitted procedures.
+ * The executor has a ProcedureStore associated.
+ * Each operation is logged and on restart the pending procedures are resumed.
+ *
+ * Unless the Procedure code throws an error (e.g. invalid user input)
+ * the procedure will complete (at some point in time), On restart the pending
+ * procedures are resumed and the once failed will be rolledback.
+ *
+ * The user can add procedures to the executor via submitProcedure(proc)
+ * check for the finished state via isFinished(procId)
+ * and get the result via getResult(procId)
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ProcedureExecutor<TEnvironment> {
+ private static final Log LOG = LogFactory.getLog(ProcedureExecutor.class);
+
+ Testing testing = null;
+ public static class Testing {
+ protected boolean killBeforeStoreUpdate = false;
+ protected boolean toggleKillBeforeStoreUpdate = false;
+
+ protected boolean shouldKillBeforeStoreUpdate() {
+ final boolean kill = this.killBeforeStoreUpdate;
+ if (this.toggleKillBeforeStoreUpdate) {
+ this.killBeforeStoreUpdate = !kill;
+ LOG.warn("Toggle Kill before store update to: " + this.killBeforeStoreUpdate);
+ }
+ return kill;
+ }
+ }
+
+ public interface ProcedureExecutorListener {
+ void procedureLoaded(long procId);
+ void procedureAdded(long procId);
+ void procedureFinished(long procId);
+ }
+
+ /**
+ * Used by the TimeoutBlockingQueue to get the timeout interval of the procedure
+ */
+ private static class ProcedureTimeoutRetriever implements TimeoutRetriever<Procedure> {
+ @Override
+ public long getTimeout(Procedure proc) {
+ return proc.getTimeRemaining();
+ }
+
+ @Override
+ public TimeUnit getTimeUnit(Procedure proc) {
+ return TimeUnit.MILLISECONDS;
+ }
+ }
+
+ /**
+ * Internal cleaner that removes the completed procedure results after a TTL.
+ * NOTE: This is a special case handled in timeoutLoop().
+ *
+ * Since the client code looks more or less like:
+ * procId = master.doOperation()
+ * while (master.getProcResult(procId) == ProcInProgress);
+ * The master should not throw away the proc result as soon as the procedure is done
+ * but should wait a result request from the client (see executor.removeResult(procId))
+ * The client will call something like master.isProcDone() or master.getProcResult()
+ * which will return the result/state to the client, and it will mark the completed
+ * proc as ready to delete. note that the client may not receive the response from
+ * the master (e.g. master failover) so, if we delay a bit the real deletion of
+ * the proc result the client will be able to get the result the next try.
+ */
+ private static class CompletedProcedureCleaner<TEnvironment> extends Procedure<TEnvironment> {
+ private static final Log LOG = LogFactory.getLog(CompletedProcedureCleaner.class);
+
+ private static final String CLEANER_INTERVAL_CONF_KEY = "hbase.procedure.cleaner.interval";
+ private static final int DEFAULT_CLEANER_INTERVAL = 30 * 1000; // 30sec
+
+ private static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl";
+ private static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min
+
+ private static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl";
+ private static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min
+
+ private final Map<Long, ProcedureResult> completed;
+ private final ProcedureStore store;
+ private final Configuration conf;
+
+ public CompletedProcedureCleaner(final Configuration conf, final ProcedureStore store,
+ final Map<Long, ProcedureResult> completedMap) {
+ // set the timeout interval that triggers the periodic-procedure
+ setTimeout(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL));
+ this.completed = completedMap;
+ this.store = store;
+ this.conf = conf;
+ }
+
+ public void periodicExecute(final TEnvironment env) {
+ if (completed.isEmpty()) {
+ LOG.debug("no completed procedures to cleanup");
+ return;
+ }
+
+ final long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL);
+ final long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL);
+
+ long now = EnvironmentEdgeManager.currentTime();
+ Iterator<Map.Entry<Long, ProcedureResult>> it = completed.entrySet().iterator();
+ while (it.hasNext() && store.isRunning()) {
+ Map.Entry<Long, ProcedureResult> entry = it.next();
+ ProcedureResult result = entry.getValue();
+
+ // TODO: Select TTL based on Procedure type
+ if ((result.hasClientAckTime() && (now - result.getClientAckTime()) >= evictAckTtl) ||
+ (now - result.getLastUpdate()) >= evictTtl) {
+ LOG.debug("Evict completed procedure " + entry.getKey());
+ store.delete(entry.getKey());
+ it.remove();
+ }
+ }
+ }
+
+ @Override
+ protected Procedure[] execute(final TEnvironment env) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected void rollback(final TEnvironment env) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected boolean abort(final TEnvironment env) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void serializeStateData(final OutputStream stream) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void deserializeStateData(final InputStream stream) {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ /**
+ * Map the the procId returned by submitProcedure(), the Root-ProcID, to the ProcedureResult.
+ * Once a Root-Procedure completes (success or failure), the result will be added to this map.
+ * The user of ProcedureExecutor should call getResult(procId) to get the result.
+ */
+ private final ConcurrentHashMap<Long, ProcedureResult> completed =
+ new ConcurrentHashMap<Long, ProcedureResult>();
+
+ /**
+ * Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState.
+ * The RootProcedureState contains the execution stack of the Root-Procedure,
+ * It is added to the map by submitProcedure() and removed on procedure completion.
+ */
+ private final ConcurrentHashMap<Long, RootProcedureState> rollbackStack =
+ new ConcurrentHashMap<Long, RootProcedureState>();
+
+ /**
+ * Helper map to lookup the live procedures by ID.
+ * This map contains every procedure. root-procedures and subprocedures.
+ */
+ private final ConcurrentHashMap<Long, Procedure> procedures =
+ new ConcurrentHashMap<Long, Procedure>();
+
+ /**
+ * Timeout Queue that contains Procedures in a WAITING_TIMEOUT state
+ * or periodic procedures.
+ */
+ private final TimeoutBlockingQueue<Procedure> waitingTimeout =
+ new TimeoutBlockingQueue<Procedure>(new ProcedureTimeoutRetriever());
+
+ /**
+ * Queue that contains runnable procedures.
+ */
+ private final ProcedureRunnableSet runnables;
+
+ // TODO
+ private final ReentrantLock submitLock = new ReentrantLock();
+ private final AtomicLong lastProcId = new AtomicLong(-1);
+
+ private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners =
+ new CopyOnWriteArrayList<ProcedureExecutorListener>();
+
+ private final AtomicInteger activeExecutorCount = new AtomicInteger(0);
+ private final AtomicBoolean running = new AtomicBoolean(false);
+ private final TEnvironment environment;
+ private final ProcedureStore store;
+ private final Configuration conf;
+
+ private Thread[] threads;
+
+ public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
+ final ProcedureStore store) {
+ this(conf, environment, store, new ProcedureSimpleRunQueue());
+ }
+
+ public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
+ final ProcedureStore store, final ProcedureRunnableSet runqueue) {
+ this.environment = environment;
+ this.runnables = runqueue;
+ this.store = store;
+ this.conf = conf;
+ }
+
+ private List<Map.Entry<Long, RootProcedureState>> load() throws IOException {
+ Preconditions.checkArgument(completed.isEmpty());
+ Preconditions.checkArgument(rollbackStack.isEmpty());
+ Preconditions.checkArgument(procedures.isEmpty());
+ Preconditions.checkArgument(waitingTimeout.isEmpty());
+ Preconditions.checkArgument(runnables.size() == 0);
+
+ // 1. Load the procedures
+ Iterator<Procedure> loader = store.load();
+ if (loader == null) {
+ lastProcId.set(0);
+ return null;
+ }
+
+ long logMaxProcId = 0;
+ int runnablesCount = 0;
+ while (loader.hasNext()) {
+ Procedure proc = loader.next();
+ proc.beforeReplay(getEnvironment());
+ procedures.put(proc.getProcId(), proc);
+ logMaxProcId = Math.max(logMaxProcId, proc.getProcId());
+ LOG.debug("Loading procedure state=" + proc.getState() +
+ " isFailed=" + proc.hasException() + ": " + proc);
+ if (!proc.hasParent() && !proc.isFinished()) {
+ rollbackStack.put(proc.getProcId(), new RootProcedureState());
+ }
+ if (proc.getState() == ProcedureState.RUNNABLE) {
+ runnablesCount++;
+ }
+ }
+ assert lastProcId.get() < 0;
+ lastProcId.set(logMaxProcId);
+
+ // 2. Initialize the stacks
+ TreeSet<Procedure> runnableSet = null;
+ HashSet<Procedure> waitingSet = null;
+ for (final Procedure proc: procedures.values()) {
+ Long rootProcId = getRootProcedureId(proc);
+ if (rootProcId == null) {
+ // The 'proc' was ready to run but the root procedure was rolledback?
+ runnables.addBack(proc);
+ continue;
+ }
+
+ if (!proc.hasParent() && proc.isFinished()) {
+ LOG.debug("The procedure is completed state=" + proc.getState() +
+ " isFailed=" + proc.hasException() + ": " + proc);
+ assert !rollbackStack.containsKey(proc.getProcId());
+ completed.put(proc.getProcId(), newResultFromProcedure(proc));
+ continue;
+ }
+
+ if (proc.hasParent() && !proc.isFinished()) {
+ Procedure parent = procedures.get(proc.getParentProcId());
+ // corrupted procedures are handled later at step 3
+ if (parent != null) {
+ parent.incChildrenLatch();
+ }
+ }
+
+ RootProcedureState procStack = rollbackStack.get(rootProcId);
+ procStack.loadStack(proc);
+
+ switch (proc.getState()) {
+ case RUNNABLE:
+ if (runnableSet == null) {
+ runnableSet = new TreeSet<Procedure>();
+ }
+ runnableSet.add(proc);
+ break;
+ case WAITING_TIMEOUT:
+ if (waitingSet == null) {
+ waitingSet = new HashSet<Procedure>();
+ }
+ waitingSet.add(proc);
+ break;
+ case FINISHED:
+ if (proc.hasException()) {
+ // add the proc to the runnables to perform the rollback
+ runnables.addBack(proc);
+ break;
+ }
+ case ROLLEDBACK:
+ case INITIALIZING:
+ String msg = "Unexpected " + proc.getState() + " state for " + proc;
+ LOG.error(msg);
+ throw new UnsupportedOperationException(msg);
+ default:
+ break;
+ }
+ }
+
+ // 3. Validate the stacks
+ List<Map.Entry<Long, RootProcedureState>> corrupted = null;
+ Iterator<Map.Entry<Long, RootProcedureState>> itStack = rollbackStack.entrySet().iterator();
+ while (itStack.hasNext()) {
+ Map.Entry<Long, RootProcedureState> entry = itStack.next();
+ RootProcedureState procStack = entry.getValue();
+ if (procStack.isValid()) continue;
+
+ for (Procedure proc: procStack.getSubprocedures()) {
+ procedures.remove(proc.getProcId());
+ if (runnableSet != null) runnableSet.remove(proc);
+ if (waitingSet != null) waitingSet.remove(proc);
+ }
+ itStack.remove();
+ if (corrupted == null) {
+ corrupted = new ArrayList<Map.Entry<Long, RootProcedureState>>();
+ }
+ corrupted.add(entry);
+ }
+
+ // 4. Push the runnables
+ if (runnableSet != null) {
+ // TODO: See ProcedureWALFormatReader.readInitEntry() some procedure
+ // may be started way before this stuff.
+ for (Procedure proc: runnableSet) {
+ if (!proc.hasParent()) {
+ sendProcedureLoadedNotification(proc.getProcId());
+ }
+ runnables.addBack(proc);
+ }
+ }
+ return corrupted;
+ }
+
+ public void start(int numThreads) throws IOException {
+ if (running.getAndSet(true)) {
+ LOG.warn("Already running");
+ return;
+ }
+
+ // We have numThreads executor + one timer thread used for timing out
+ // procedures and triggering periodic procedures.
+ threads = new Thread[numThreads + 1];
+ LOG.info("Starting procedure executor threads=" + threads.length);
+
+ // Initialize procedures executor
+ for (int i = 0; i < numThreads; ++i) {
+ threads[i] = new Thread() {
+ @Override
+ public void run() {
+ execLoop();
+ }
+ };
+ }
+
+ // Initialize procedures timeout handler (this is the +1 thread)
+ threads[numThreads] = new Thread() {
+ @Override
+ public void run() {
+ timeoutLoop();
+ }
+ };
+
+ // Acquire the store lease.
+ store.recoverLease();
+
+ // TODO: Split in two steps.
+ // TODO: Handle corrupted procedure returned (probably just a WARN)
+ // The first one will make sure that we have the latest id,
+ // so we can start the threads and accept new procedures.
+ // The second step will do the actual load of old procedures.
+ load();
+
+ // Start the executors. Here we must have the lastProcId set.
+ for (int i = 0; i < threads.length; ++i) {
+ threads[i].start();
+ }
+
+ // Add completed cleaner
+ waitingTimeout.add(new CompletedProcedureCleaner(conf, store, completed));
+ }
+
+ public void stop() {
+ if (!running.getAndSet(false)) {
+ return;
+ }
+
+ LOG.info("Stopping the procedure executor");
+ runnables.signalAll();
+ waitingTimeout.signalAll();
+ }
+
+ public void join() {
+ boolean interrupted = false;
+
+ for (int i = 0; i < threads.length; ++i) {
+ try {
+ threads[i].join();
+ } catch (InterruptedException ex) {
+ interrupted = true;
+ }
+ }
+
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+
+ completed.clear();
+ rollbackStack.clear();
+ procedures.clear();
+ waitingTimeout.clear();
+ runnables.clear();
+ lastProcId.set(-1);
+ }
+
+ public boolean isRunning() {
+ return running.get();
+ }
+
+ /**
+ * @return the number of execution threads.
+ */
+ public int getNumThreads() {
+ return threads == null ? 0 : (threads.length - 1);
+ }
+
+ public int getActiveExecutorCount() {
+ return activeExecutorCount.get();
+ }
+
+ public TEnvironment getEnvironment() {
+ return this.environment;
+ }
+
+ public ProcedureStore getStore() {
+ return this.store;
+ }
+
+ public void registerListener(ProcedureExecutorListener listener) {
+ this.listeners.add(listener);
+ }
+
+ public boolean unregisterListener(ProcedureExecutorListener listener) {
+ return this.listeners.remove(listener);
+ }
+
+ /**
+ * Add a new root-procedure to the executor.
+ * @param proc the new procedure to execute.
+ * @return the procedure id, that can be used to monitor the operation
+ */
+ public long submitProcedure(final Procedure proc) {
+ Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING);
+ Preconditions.checkArgument(isRunning());
+ Preconditions.checkArgument(lastProcId.get() >= 0);
+ Preconditions.checkArgument(!proc.hasParent());
+
+ // Initialize the Procedure ID
+ proc.setProcId(nextProcId());
+
+ // Commit the transaction
+ store.insert(proc, null);
+ LOG.debug("procedure " + proc + " added to the store");
+
+ // Create the rollback stack for the procedure
+ RootProcedureState stack = new RootProcedureState();
+ rollbackStack.put(proc.getProcId(), stack);
+
+ // Submit the new subprocedures
+ assert !procedures.containsKey(proc.getProcId());
+ procedures.put(proc.getProcId(), proc);
+ sendProcedureAddedNotification(proc.getProcId());
+ runnables.addBack(proc);
+ return proc.getProcId();
+ }
+
+ public ProcedureResult getResult(final long procId) {
+ return completed.get(procId);
+ }
+
+ /**
+ * Return true if the procedure is finished.
+ * The state may be "completed successfully" or "failed and rolledback".
+ * Use getResult() to check the state or get the result data.
+ * @param procId the ID of the procedure to check
+ * @return true if the procedure execution is finished, otherwise false.
+ */
+ public boolean isFinished(final long procId) {
+ return completed.containsKey(procId);
+ }
+
+ /**
+ * Return true if the procedure is started.
+ * @param procId the ID of the procedure to check
+ * @return true if the procedure execution is started, otherwise false.
+ */
+ public boolean isStarted(final long procId) {
+ Procedure proc = procedures.get(procId);
+ if (proc == null) {
+ return completed.get(procId) != null;
+ }
+ return proc.wasExecuted();
+ }
+
+ /**
+ * Mark the specified completed procedure, as ready to remove.
+ * @param procId the ID of the procedure to remove
+ */
+ public void removeResult(final long procId) {
+ ProcedureResult result = completed.get(procId);
+ if (result == null) {
+ assert !procedures.containsKey(procId) : "procId=" + procId + " is still running";
+ LOG.debug("Procedure procId=" + procId + " already removed by the cleaner");
+ return;
+ }
+
+ // The CompletedProcedureCleaner will take care of deletion, once the TTL is expired.
+ result.setClientAckTime(EnvironmentEdgeManager.currentTime());
+ }
+
+ /**
+ * Send an abort notification the specified procedure.
+ * Depending on the procedure implementation the abort can be considered or ignored.
+ * @param procId the procedure to abort
+ * @return true if the procedure exist and has received the abort, otherwise false.
+ */
+ public boolean abort(final long procId) {
+ Procedure proc = procedures.get(procId);
+ if (proc != null) {
+ return proc.abort(getEnvironment());
+ }
+ return false;
+ }
+
+ public Map<Long, ProcedureResult> getResults() {
+ return Collections.unmodifiableMap(completed);
+ }
+
+ public Procedure getProcedure(final long procId) {
+ return procedures.get(procId);
+ }
+
+ protected ProcedureRunnableSet getRunnableSet() {
+ return runnables;
+ }
+
+ /**
+ * Execution loop (N threads)
+ * while the executor is in a running state,
+ * fetch a procedure from the runnables queue and start the execution.
+ */
+ private void execLoop() {
+ while (isRunning()) {
+ Long procId = runnables.poll();
+ Procedure proc = procId != null ? procedures.get(procId) : null;
+ if (proc == null) continue;
+
+ try {
+ activeExecutorCount.incrementAndGet();
+ execLoop(proc);
+ } finally {
+ activeExecutorCount.decrementAndGet();
+ }
+ }
+ }
+
+ private void execLoop(Procedure proc) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("trying to start the execution of " + proc);
+ }
+
+ Long rootProcId = getRootProcedureId(proc);
+ if (rootProcId == null) {
+ // The 'proc' was ready to run but the root procedure was rolledback
+ executeRollback(proc);
+ return;
+ }
+
+ RootProcedureState procStack = rollbackStack.get(rootProcId);
+ if (procStack == null) return;
+
+ do {
+ // Try to acquire the execution
+ if (!procStack.acquire(proc)) {
+ if (procStack.setRollback()) {
+ // we have the 'rollback-lock' we can start rollingback
+ if (!executeRollback(rootProcId, procStack)) {
+ procStack.unsetRollback();
+ runnables.yield(proc);
+ }
+ } else {
+ // if we can't rollback means that some child is still running.
+ // the rollback will be executed after all the children are done.
+ // If the procedure was never executed, remove and mark it as rolledback.
+ if (!proc.wasExecuted()) {
+ if (!executeRollback(proc)) {
+ runnables.yield(proc);
+ }
+ }
+ }
+ break;
+ }
+
+ // Execute the procedure
+ assert proc.getState() == ProcedureState.RUNNABLE;
+ if (proc.acquireLock(getEnvironment())) {
+ execProcedure(procStack, proc);
+ proc.releaseLock(getEnvironment());
+ } else {
+ runnables.yield(proc);
+ }
+ procStack.release(proc);
+
+ // allows to kill the executor before something is stored to the wal.
+ // useful to test the procedure recovery.
+ if (testing != null && !isRunning()) {
+ break;
+ }
+
+ if (proc.getProcId() == rootProcId && proc.isSuccess()) {
+ // Finalize the procedure state
+ LOG.info("Procedure completed in " +
+ StringUtils.humanTimeDiff(proc.elapsedTime()) + ": " + proc);
+ procedureFinished(proc);
+ break;
+ }
+ } while (procStack.isFailed());
+ }
+
+ private void timeoutLoop() {
+ while (isRunning()) {
+ Procedure proc = waitingTimeout.poll();
+ if (proc == null) continue;
+
+ if (proc.getTimeRemaining() > 100) {
+ // got an early wake, maybe a stop?
+ // re-enqueue the task in case was not a stop or just a signal
+ waitingTimeout.add(proc);
+ continue;
+ }
+
+ // ----------------------------------------------------------------------------
+ // TODO-MAYBE: Should we provide a notification to the store with the
+ // full set of procedures pending and completed to write a compacted
+ // version of the log (in case is a log)?
+ // In theory no, procedures are have a short life, so at some point the store
+ // will have the tracker saying everything is in the last log.
+ // ----------------------------------------------------------------------------
+
+ // The CompletedProcedureCleaner is a special case, and it acts as a chore.
+ // instead of bringing the Chore class in, we reuse this timeout thread for
+ // this special case.
+ if (proc instanceof CompletedProcedureCleaner) {
+ try {
+ ((CompletedProcedureCleaner)proc).periodicExecute(getEnvironment());
+ } catch (Throwable e) {
+ LOG.error("ignoring CompletedProcedureCleaner exception: " + e.getMessage(), e);
+ }
+ proc.setStartTime(EnvironmentEdgeManager.currentTime());
+ waitingTimeout.add(proc);
+ continue;
+ }
+
+ // The procedure received an "abort-timeout", call abort() and
+ // add the procedure back in the queue for rollback.
+ if (proc.setTimeoutFailure()) {
+ long rootProcId = Procedure.getRootProcedureId(procedures, proc);
+ RootProcedureState procStack = rollbackStack.get(rootProcId);
+ procStack.abort();
+ store.update(proc);
+ runnables.addFront(proc);
+ continue;
+ }
+ }
+ }
+
+ /**
+ * Execute the rollback of the full procedure stack.
+ * Once the procedure is rolledback, the root-procedure will be visible as
+ * finished to user, and the result will be the fatal exception.
+ */
+ private boolean executeRollback(final long rootProcId, final RootProcedureState procStack) {
+ Procedure rootProc = procedures.get(rootProcId);
+ RemoteProcedureException exception = rootProc.getException();
+ if (exception == null) {
+ exception = procStack.getException();
+ rootProc.setFailure(exception);
+ store.update(rootProc);
+ }
+
+ List<Procedure> subprocStack = procStack.getSubprocedures();
+ assert subprocStack != null : "called rollback with no steps executed rootProc=" + rootProc;
+
+ int stackTail = subprocStack.size();
+ boolean reuseLock = false;
+ while (stackTail --> 0) {
+ final Procedure proc = subprocStack.get(stackTail);
+
+ if (!reuseLock && !proc.acquireLock(getEnvironment())) {
+ // can't take a lock on the procedure, add the root-proc back on the
+ // queue waiting for the lock availability
+ return false;
+ }
+
+ boolean abortRollback = !executeRollback(proc);
+ abortRollback |= !isRunning() || !store.isRunning();
+
+ // If the next procedure is the same to this one
+ // (e.g. StateMachineProcedure reuse the same instance)
+ // we can avoid to lock/unlock each step
+ reuseLock = stackTail > 0 && (subprocStack.get(stackTail - 1) == proc) && !abortRollback;
+ if (!reuseLock) {
+ proc.releaseLock(getEnvironment());
+ }
+
+ // allows to kill the executor before something is stored to the wal.
+ // useful to test the procedure recovery.
+ if (abortRollback) {
+ return false;
+ }
+
+ subprocStack.remove(stackTail);
+ }
+
+ // Finalize the procedure state
+ LOG.info("Rolledback procedure " + rootProc +
+ " exec-time=" + StringUtils.humanTimeDiff(rootProc.elapsedTime()) +
+ " exception=" + exception.getMessage());
+ procedureFinished(rootProc);
+ return true;
+ }
+
+ /**
+ * Execute the rollback of the procedure step.
+ * It updates the store with the new state (stack index)
+ * or will remove completly the procedure in case it is a child.
+ */
+ private boolean executeRollback(final Procedure proc) {
+ try {
+ proc.doRollback(getEnvironment());
+ } catch (IOException e) {
+ LOG.debug("rollback attempt failed for " + proc, e);
+ return false;
+ } catch (Throwable e) {
+ // Catch NullPointerExceptions or similar errors...
+ LOG.fatal("CODE-BUG: uncatched runtime exception for procedure: " + proc, e);
+ }
+
+ // allows to kill the executor before something is stored to the wal.
+ // useful to test the procedure recovery.
+ if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
+ LOG.debug("TESTING: Kill before store update");
+ stop();
+ return false;
+ }
+
+ if (proc.removeStackIndex()) {
+ proc.setState(ProcedureState.ROLLEDBACK);
+ if (proc.hasParent()) {
+ store.delete(proc.getProcId());
+ procedures.remove(proc.getProcId());
+ } else {
+ store.update(proc);
+ }
+ } else {
+ store.update(proc);
+ }
+ return true;
+ }
+
+ /**
+ * Executes the specified procedure
+ * - calls the doExecute() of the procedure
+ * - if the procedure execution didn't fail (e.g. invalid user input)
+ * - ...and returned subprocedures
+ * - the subprocedures are initialized.
+ * - the subprocedures are added to the store
+ * - the subprocedures are added to the runnable queue
+ * - the procedure is now in a WAITING state, waiting for the subprocedures to complete
+ * - ...if there are no subprocedure
+ * - the procedure completed successfully
+ * - if there is a parent (WAITING)
+ * - the parent state will be set to RUNNABLE
+ * - in case of failure
+ * - the store is updated with the new state
+ * - the executor (caller of this method) will start the rollback of the procedure
+ */
+ private void execProcedure(final RootProcedureState procStack, final Procedure procedure) {
+ Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE);
+
+ // Execute the procedure
+ boolean reExecute = false;
+ Procedure[] subprocs = null;
+ do {
+ reExecute = false;
+ try {
+ subprocs = procedure.doExecute(getEnvironment());
+ if (subprocs != null && subprocs.length == 0) {
+ subprocs = null;
+ }
+ } catch (ProcedureYieldException e) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("yield procedure: " + procedure);
+ }
+ runnables.yield(procedure);
+ return;
+ } catch (Throwable e) {
+ // Catch NullPointerExceptions or similar errors...
+ String msg = "CODE-BUG: uncatched runtime exception for procedure: " + procedure;
+ LOG.error(msg, e);
+ procedure.setFailure(new RemoteProcedureException(msg, e));
+ }
+
+ if (!procedure.isFailed()) {
+ if (subprocs != null) {
+ if (subprocs.length == 1 && subprocs[0] == procedure) {
+ // quick-shortcut for a state machine like procedure
+ subprocs = null;
+ reExecute = true;
+ } else {
+ // yield the current procedure, and make the subprocedure runnable
+ for (int i = 0; i < subprocs.length; ++i) {
+ Procedure subproc = subprocs[i];
+ if (subproc == null) {
+ String msg = "subproc[" + i + "] is null, aborting the procedure";
+ procedure.setFailure(new RemoteProcedureException(msg,
+ new IllegalArgumentException(msg)));
+ subprocs = null;
+ break;
+ }
+
+ assert subproc.getState() == ProcedureState.INITIALIZING;
+ subproc.setParentProcId(procedure.getProcId());
+ subproc.setProcId(nextProcId());
+ }
+
+ if (!procedure.isFailed()) {
+ procedure.setChildrenLatch(subprocs.length);
+ switch (procedure.getState()) {
+ case RUNNABLE:
+ procedure.setState(ProcedureState.WAITING);
+ break;
+ case WAITING_TIMEOUT:
+ waitingTimeout.add(procedure);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+ } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
+ waitingTimeout.add(procedure);
+ } else {
+ // No subtask, so we are done
+ procedure.setState(ProcedureState.FINISHED);
+ }
+ }
+
+ // Add the procedure to the stack
+ procStack.addRollbackStep(procedure);
+
+ // allows to kill the executor before something is stored to the wal.
+ // useful to test the procedure recovery.
+ if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
+ LOG.debug("TESTING: Kill before store update");
+ stop();
+ return;
+ }
+
+ // Commit the transaction
+ if (subprocs != null && !procedure.isFailed()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("store add " + procedure + " children " + Arrays.toString(subprocs));
+ }
+ store.insert(procedure, subprocs);
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("store update " + procedure);
+ }
+ store.update(procedure);
+ }
+
+ // if the store is not running we are aborting
+ if (!store.isRunning()) {
+ return;
+ }
+
+ assert (reExecute && subprocs == null) || !reExecute;
+ } while (reExecute);
+
+ // Submit the new subprocedures
+ if (subprocs != null && !procedure.isFailed()) {
+ for (int i = 0; i < subprocs.length; ++i) {
+ Procedure subproc = subprocs[i];
+ assert !procedures.containsKey(subproc.getProcId());
+ procedures.put(subproc.getProcId(), subproc);
+ runnables.addFront(subproc);
+ }
+ }
+
+ if (procedure.isFinished() && procedure.hasParent()) {
+ Procedure parent = procedures.get(procedure.getParentProcId());
+ if (parent == null) {
+ assert procStack.isRollingback();
+ return;
+ }
+
+ // If this procedure is the last child awake the parent procedure
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(parent + " child is done: " + procedure);
+ }
+ if (parent.childrenCountDown() && parent.getState() == ProcedureState.WAITING) {
+ parent.setState(ProcedureState.RUNNABLE);
+ store.update(parent);
+ runnables.addFront(parent);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(parent + " all the children finished their work, resume.");
+ }
+ return;
+ }
+ }
+ }
+
+ private void sendProcedureLoadedNotification(final long procId) {
+ if (!this.listeners.isEmpty()) {
+ for (ProcedureExecutorListener listener: this.listeners) {
+ try {
+ listener.procedureLoaded(procId);
+ } catch (Throwable e) {
+ LOG.error("the listener " + listener + " had an error: " + e.getMessage(), e);
+ }
+ }
+ }
+ }
+
+ private void sendProcedureAddedNotification(final long procId) {
+ if (!this.listeners.isEmpty()) {
+ for (ProcedureExecutorListener listener: this.listeners) {
+ try {
+ listener.procedureAdded(procId);
+ } catch (Throwable e) {
+ LOG.error("the listener " + listener + " had an error: " + e.getMessage(), e);
+ }
+ }
+ }
+ }
+
+ private void sendProcedureFinishedNotification(final long procId) {
+ if (!this.listeners.isEmpty()) {
+ for (ProcedureExecutorListener listener: this.listeners) {
+ try {
+ listener.procedureFinished(procId);
+ } catch (Throwable e) {
+ LOG.error("the listener " + listener + " had an error: " + e.getMessage(), e);
+ }
+ }
+ }
+ }
+
+ private long nextProcId() {
+ long procId = lastProcId.incrementAndGet();
+ if (procId < 0) {
+ while (!lastProcId.compareAndSet(procId, 0)) {
+ procId = lastProcId.get();
+ if (procId >= 0)
+ break;
+ }
+ while (procedures.containsKey(procId)) {
+ procId = lastProcId.incrementAndGet();
+ }
+ }
+ return procId;
+ }
+
+ private Long getRootProcedureId(Procedure proc) {
+ return Procedure.getRootProcedureId(procedures, proc);
+ }
+
+ private void procedureFinished(final Procedure proc) {
+ // call the procedure completion cleanup handler
+ try {
+ proc.completionCleanup(getEnvironment());
+ } catch (Throwable e) {
+ // Catch NullPointerExceptions or similar errors...
+ LOG.error("CODE-BUG: uncatched runtime exception for procedure: " + proc, e);
+ }
+
+ // update the executor internal state maps
+ completed.put(proc.getProcId(), newResultFromProcedure(proc));
+ rollbackStack.remove(proc.getProcId());
+ procedures.remove(proc.getProcId());
+
+ // call the runnableSet completion cleanup handler
+ try {
+ runnables.completionCleanup(proc);
+ } catch (Throwable e) {
+ // Catch NullPointerExceptions or similar errors...
+ LOG.error("CODE-BUG: uncatched runtime exception for runnableSet: " + runnables, e);
+ }
+
+ // Notify the listeners
+ sendProcedureFinishedNotification(proc.getProcId());
+ }
+
+ public Pair<ProcedureResult, Procedure> getResultOrProcedure(final long procId) {
+ ProcedureResult result = completed.get(procId);
+ Procedure proc = null;
+ if (result == null) {
+ proc = procedures.get(procId);
+ if (proc == null) {
+ result = completed.get(procId);
+ }
+ }
+ return new Pair(result, proc);
+ }
+
+ private static ProcedureResult newResultFromProcedure(final Procedure proc) {
+ if (proc.isFailed()) {
+ return new ProcedureResult(proc.getStartTime(), proc.getLastUpdate(), proc.getException());
+ }
+ return new ProcedureResult(proc.getStartTime(), proc.getLastUpdate(), proc.getResult());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d763cd7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java
new file mode 100644
index 0000000..d8252b1
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.util.Map;
+
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * This class is a container of queues that allows to select a queue
+ * in a round robin fashion, considering priority of the queue.
+ *
+ * the quantum is just how many poll() will return the same object.
+ * e.g. if quantum is 1 and you have A and B as object you'll get: A B A B
+ * e.g. if quantum is 2 and you have A and B as object you'll get: A A B B A A B B
+ * then the object priority is just a priority * quantum
+ *
+ * Example:
+ * - three queues (A, B, C) with priorities (1, 1, 2)
+ * - The first poll() will return A
+ * - The second poll() will return B
+ * - The third and forth poll() will return C
+ * - and so on again and again.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ProcedureFairRunQueues<TKey, TQueue extends ProcedureFairRunQueues.FairObject> {
+ private ConcurrentSkipListMap<TKey, TQueue> objMap =
+ new ConcurrentSkipListMap<TKey, TQueue>();
+
+ private final ReentrantLock lock = new ReentrantLock();
+ private final int quantum;
+
+ private Map.Entry<TKey, TQueue> current = null;
+ private int currentQuantum = 0;
+
+ public interface FairObject {
+ boolean isAvailable();
+ int getPriority();
+ }
+
+ /**
+ * @param quantum how many poll() will return the same object.
+ */
+ public ProcedureFairRunQueues(final int quantum) {
+ this.quantum = quantum;
+ }
+
+ public TQueue get(final TKey key) {
+ return objMap.get(key);
+ }
+
+ public TQueue add(final TKey key, final TQueue queue) {
+ TQueue oldq = objMap.putIfAbsent(key, queue);
+ return oldq != null ? oldq : queue;
+ }
+
+ public TQueue remove(final TKey key) {
+ TQueue queue = objMap.remove(key);
+ if (queue != null) {
+ lock.lock();
+ try {
+ if (current != null && queue == current.getValue()) {
+ currentQuantum = 0;
+ current = null;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ return queue;
+ }
+
+ public void clear() {
+ lock.lock();
+ try {
+ current = null;
+ objMap.clear();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * @return the next available item if present
+ */
+ public TQueue poll() {
+ lock.lock();
+ try {
+ TQueue queue;
+ if (currentQuantum == 0) {
+ if (nextObject() == null) {
+ // nothing here
+ return null;
+ }
+
+ queue = current.getValue();
+ currentQuantum = calculateQuantum(queue) - 1;
+ } else {
+ currentQuantum--;
+ queue = current.getValue();
+ }
+
+ if (!queue.isAvailable()) {
+ Map.Entry<TKey, TQueue> last = current;
+ // Try the next one
+ do {
+ if (nextObject() == null)
+ return null;
+ } while (current.getValue() != last.getValue() && !current.getValue().isAvailable());
+
+ queue = current.getValue();
+ currentQuantum = calculateQuantum(queue) - 1;
+ }
+
+ return queue;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append('{');
+ for (Map.Entry<TKey, TQueue> entry: objMap.entrySet()) {
+ builder.append(entry.getKey());
+ builder.append(':');
+ builder.append(entry.getValue());
+ }
+ builder.append('}');
+ return builder.toString();
+ }
+
+ private Map.Entry<TKey, TQueue> nextObject() {
+ Map.Entry<TKey, TQueue> next = null;
+
+ // If we have already a key, try the next one
+ if (current != null) {
+ next = objMap.higherEntry(current.getKey());
+ }
+
+ // if there is no higher key, go back to the first
+ current = (next != null) ? next : objMap.firstEntry();
+ return current;
+ }
+
+ private int calculateQuantum(final TQueue fairObject) {
+ // TODO
+ return Math.max(1, fairObject.getPriority() * quantum);
+ }
+}
\ No newline at end of file