You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2015/04/09 22:59:36 UTC
[31/50] [abbrv] hbase git commit: HBASE-13202 Procedure v2 - core
framework
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d763cd7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureResult.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureResult.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureResult.java
new file mode 100644
index 0000000..0aebd5a
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureResult.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Once a Procedure completes the ProcedureExecutor takes all the useful
+ * information of the procedure (e.g. exception/result) and creates a ProcedureResult.
+ * The user of the Procedure framework will get the procedure result with
+ * procedureExecutor.getResult(procId)
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ProcedureResult {
+ private final RemoteProcedureException exception;
+ private final long lastUpdate;
+ private final long startTime;
+ private final byte[] result;
+
+ private long clientAckTime = -1;
+
+ public ProcedureResult(final long startTime, final long lastUpdate,
+ final RemoteProcedureException exception) {
+ this.lastUpdate = lastUpdate;
+ this.startTime = startTime;
+ this.exception = exception;
+ this.result = null;
+ }
+
+ public ProcedureResult(final long startTime, final long lastUpdate, final byte[] result) {
+ this.lastUpdate = lastUpdate;
+ this.startTime = startTime;
+ this.exception = null;
+ this.result = result;
+ }
+
+ public boolean isFailed() {
+ return exception != null;
+ }
+
+ public RemoteProcedureException getException() {
+ return exception;
+ }
+
+ public boolean hasResultData() {
+ return result != null;
+ }
+
+ public byte[] getResult() {
+ return result;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public long getLastUpdate() {
+ return lastUpdate;
+ }
+
+ public long executionTime() {
+ return lastUpdate - startTime;
+ }
+
+ public boolean hasClientAckTime() {
+ return clientAckTime > 0;
+ }
+
+ public long getClientAckTime() {
+ return clientAckTime;
+ }
+
+ @InterfaceAudience.Private
+ protected void setClientAckTime(final long timestamp) {
+ this.clientAckTime = timestamp;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d763cd7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java
new file mode 100644
index 0000000..2d7ba39
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Keep track of the runnable procedures
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface ProcedureRunnableSet {
+ /**
+ * Inserts the specified element at the front of this queue.
+ * @param proc the Procedure to add
+ */
+ void addFront(Procedure proc);
+
+ /**
+ * Inserts the specified element at the end of this queue.
+ * @param proc the Procedure to add
+ */
+ void addBack(Procedure proc);
+
+ /**
+ * The procedure can't run at the moment.
+ * add it back to the queue, giving priority to someone else.
+ * @param proc the Procedure to add back to the list
+ */
+ void yield(Procedure proc);
+
+ /**
+ * The procedure in execution completed.
+ * This can be implemented to perform cleanups.
+ * @param proc the Procedure that completed the execution.
+ */
+ void completionCleanup(Procedure proc);
+
+ /**
+ * Fetch one Procedure from the queue
+ * @return the Procedure ID to execute, or null if nothing present.
+ */
+ Long poll();
+
+ /**
+ * In case the class is blocking on poll() waiting for items to be added,
+ * this method should awake poll() and poll() should return.
+ */
+ void signalAll();
+
+ /**
+ * Returns the number of elements in this collection.
+ * @return the number of elements in this collection.
+ */
+ int size();
+
+ /**
+ * Removes all of the elements from this collection.
+ */
+ void clear();
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d763cd7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java
new file mode 100644
index 0000000..7b17fb2
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Simple runqueue for the procedures
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ProcedureSimpleRunQueue implements ProcedureRunnableSet {
+ private final Deque<Long> runnables = new ArrayDeque<Long>();
+ private final ReentrantLock lock = new ReentrantLock();
+ private final Condition waitCond = lock.newCondition();
+
+ @Override
+ public void addFront(final Procedure proc) {
+ lock.lock();
+ try {
+ runnables.addFirst(proc.getProcId());
+ waitCond.signal();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void addBack(final Procedure proc) {
+ lock.lock();
+ try {
+ runnables.addLast(proc.getProcId());
+ waitCond.signal();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void yield(final Procedure proc) {
+ addBack(proc);
+ }
+
+ @Override
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
+ public Long poll() {
+ lock.lock();
+ try {
+ if (runnables.isEmpty()) {
+ waitCond.await();
+ if (!runnables.isEmpty()) {
+ return runnables.pop();
+ }
+ } else {
+ return runnables.pop();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return null;
+ } finally {
+ lock.unlock();
+ }
+ return null;
+ }
+
+ @Override
+ public void signalAll() {
+ lock.lock();
+ try {
+ waitCond.signalAll();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void clear() {
+ lock.lock();
+ try {
+ runnables.clear();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public int size() {
+ lock.lock();
+ try {
+ return runnables.size();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void completionCleanup(Procedure proc) {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d763cd7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureYieldException.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureYieldException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureYieldException.java
new file mode 100644
index 0000000..177ff5b
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureYieldException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+// TODO: Not used yet
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class ProcedureYieldException extends ProcedureException {
+ /** default constructor */
+ public ProcedureYieldException() {
+ super();
+ }
+
+ /**
+ * Constructor
+ * @param s message
+ */
+ public ProcedureYieldException(String s) {
+ super(s);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d763cd7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java
new file mode 100644
index 0000000..6be512d
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureException.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.protobuf.generated.ErrorHandlingProtos.ForeignExceptionMessage;
+import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * A RemoteProcedureException is an exception from another thread or process.
+ * <p>
+ * RemoteProcedureExceptions are sent to 'remote' peers to signal an abort in the face of failures.
+ * When serialized for transmission we encode using Protobufs to ensure version compatibility.
+ * <p>
+ * RemoteProcedureException exceptions contain a Throwable as its cause.
+ * This can be a "regular" exception generated locally or a ProxyThrowable that is a representation
+ * of the original exception created on original 'remote' source. These ProxyThrowables have their
+ * their stacks traces and messages overridden to reflect the original 'remote' exception.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+@SuppressWarnings("serial")
+public class RemoteProcedureException extends ProcedureException {
+
+ /**
+ * Name of the throwable's source such as a host or thread name. Must be non-null.
+ */
+ private final String source;
+
+ /**
+ * Create a new RemoteProcedureException that can be serialized.
+ * It is assumed that this came form a local source.
+ * @param source
+ * @param cause
+ */
+ public RemoteProcedureException(String source, Throwable cause) {
+ super(cause);
+ assert source != null;
+ assert cause != null;
+ this.source = source;
+ }
+
+ public String getSource() {
+ return source;
+ }
+
+ public IOException unwrapRemoteException() {
+ if (getCause() instanceof RemoteException) {
+ return ((RemoteException)getCause()).unwrapRemoteException();
+ }
+ if (getCause() instanceof IOException) {
+ return (IOException)getCause();
+ }
+ return new IOException(getCause());
+ }
+
+ @Override
+ public String toString() {
+ String className = getCause().getClass().getName();
+ return className + " via " + getSource() + ":" + getLocalizedMessage();
+ }
+
+ /**
+ * Converts a RemoteProcedureException to an array of bytes.
+ * @param source the name of the external exception source
+ * @param t the "local" external exception (local)
+ * @return protobuf serialized version of RemoteProcedureException
+ */
+ public static byte[] serialize(String source, Throwable t) {
+ return toProto(source, t).toByteArray();
+ }
+
+ /**
+ * Takes a series of bytes and tries to generate an RemoteProcedureException instance for it.
+ * @param bytes
+ * @return the ForeignExcpetion instance
+ * @throws InvalidProtocolBufferException if there was deserialization problem this is thrown.
+ */
+ public static RemoteProcedureException deserialize(byte[] bytes)
+ throws InvalidProtocolBufferException {
+ return fromProto(ForeignExceptionMessage.parseFrom(bytes));
+ }
+
+ public ForeignExceptionMessage convert() {
+ return ForeignExceptionUtil.toProtoForeignException(getSource(), getCause());
+ }
+
+ public static ForeignExceptionMessage toProto(String source, Throwable t) {
+ return ForeignExceptionUtil.toProtoForeignException(source, t);
+ }
+
+ public static RemoteProcedureException fromProto(final ForeignExceptionMessage eem) {
+ return new RemoteProcedureException(eem.getSource(), ForeignExceptionUtil.toIOException(eem));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d763cd7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RootProcedureState.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RootProcedureState.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RootProcedureState.java
new file mode 100644
index 0000000..bc1af20
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RootProcedureState.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
+
+/**
+ * Internal state of the ProcedureExecutor that describes the state of a "Root Procedure".
+ * A "Root Procedure" is a Procedure without parent, each subprocedure will be
+ * added to the "Root Procedure" stack (or rollback-stack).
+ *
+ * RootProcedureState is used and managed only by the ProcedureExecutor.
+ * Long rootProcId = getRootProcedureId(proc);
+ * rollbackStack.get(rootProcId).acquire(proc)
+ * rollbackStack.get(rootProcId).release(proc)
+ * ...
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class RootProcedureState {
+ private static final Log LOG = LogFactory.getLog(RootProcedureState.class);
+
+ private enum State {
+ RUNNING, // The Procedure is running or ready to run
+ FAILED, // The Procedure failed, waiting for the rollback executing
+ ROLLINGBACK, // The Procedure failed and the execution was rolledback
+ }
+
+ private ArrayList<Procedure> subprocedures = null;
+ private State state = State.RUNNING;
+ private int running = 0;
+
+ public synchronized boolean isFailed() {
+ switch (state) {
+ case ROLLINGBACK:
+ case FAILED:
+ return true;
+ default:
+ break;
+ }
+ return false;
+ }
+
+ public synchronized boolean isRollingback() {
+ return state == State.ROLLINGBACK;
+ }
+
+ /**
+ * Called by the ProcedureExecutor to mark rollback execution
+ */
+ protected synchronized boolean setRollback() {
+ if (running == 0 && state == State.FAILED) {
+ state = State.ROLLINGBACK;
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Called by the ProcedureExecutor to mark rollback execution
+ */
+ protected synchronized void unsetRollback() {
+ assert state == State.ROLLINGBACK;
+ state = State.FAILED;
+ }
+
+ protected synchronized List<Procedure> getSubprocedures() {
+ return subprocedures;
+ }
+
+ protected synchronized RemoteProcedureException getException() {
+ if (subprocedures != null) {
+ for (Procedure proc: subprocedures) {
+ if (proc.hasException()) {
+ return proc.getException();
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Called by the ProcedureExecutor to mark the procedure step as running.
+ */
+ protected synchronized boolean acquire(final Procedure proc) {
+ if (state != State.RUNNING) return false;
+
+ running++;
+ return true;
+ }
+
+ /**
+ * Called by the ProcedureExecutor to mark the procedure step as finished.
+ */
+ protected synchronized void release(final Procedure proc) {
+ running--;
+ }
+
+ protected synchronized void abort() {
+ if (state == State.RUNNING) {
+ state = State.FAILED;
+ }
+ }
+
+ /**
+ * Called by the ProcedureExecutor after the procedure step is completed,
+ * to add the step to the rollback list (or procedure stack)
+ */
+ protected synchronized void addRollbackStep(final Procedure proc) {
+ if (proc.isFailed()) {
+ state = State.FAILED;
+ }
+ if (subprocedures == null) {
+ subprocedures = new ArrayList<Procedure>();
+ }
+ proc.addStackIndex(subprocedures.size());
+ subprocedures.add(proc);
+ }
+
+ /**
+ * Called on store load by the ProcedureExecutor to load part of the stack.
+ *
+ * Each procedure has its own stack-positions. Which means we have to write
+ * to the store only the Procedure we executed, and nothing else.
+ * on load we recreate the full stack by aggregating each procedure stack-positions.
+ */
+ protected synchronized void loadStack(final Procedure proc) {
+ int[] stackIndexes = proc.getStackIndexes();
+ if (stackIndexes != null) {
+ if (subprocedures == null) {
+ subprocedures = new ArrayList<Procedure>();
+ }
+ int diff = (1 + stackIndexes[stackIndexes.length - 1]) - subprocedures.size();
+ if (diff > 0) {
+ subprocedures.ensureCapacity(1 + stackIndexes[stackIndexes.length - 1]);
+ while (diff-- > 0) subprocedures.add(null);
+ }
+ for (int i = 0; i < stackIndexes.length; ++i) {
+ subprocedures.set(stackIndexes[i], proc);
+ }
+ }
+ if (proc.getState() == ProcedureState.ROLLEDBACK) {
+ state = State.ROLLINGBACK;
+ } else if (proc.isFailed()) {
+ state = State.FAILED;
+ }
+ }
+
+ /**
+ * Called on store load by the ProcedureExecutor to validate the procedure stack.
+ */
+ protected synchronized boolean isValid() {
+ if (subprocedures != null) {
+ for (Procedure proc: subprocedures) {
+ if (proc == null) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d763cd7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java
new file mode 100644
index 0000000..b4b35f2
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.SequentialProcedureData;
+
+/**
+ * A SequentialProcedure describes one step in a procedure chain.
+ * -> Step 1 -> Step 2 -> Step 3
+ *
+ * The main difference from a base Procedure is that the execute() of a
+ * SequentialProcedure will be called only once, there will be no second
+ * execute() call once the child are finished. which means once the child
+ * of a SequentialProcedure are completed the SequentialProcedure is completed too.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class SequentialProcedure<TEnvironment> extends Procedure<TEnvironment> {
+ private boolean executed = false;
+
+ @Override
+ protected Procedure[] doExecute(final TEnvironment env)
+ throws ProcedureYieldException {
+ updateTimestamp();
+ try {
+ Procedure[] children = !executed ? execute(env) : null;
+ executed = !executed;
+ return children;
+ } finally {
+ updateTimestamp();
+ }
+ }
+
+ @Override
+ protected void doRollback(final TEnvironment env) throws IOException {
+ updateTimestamp();
+ if (executed) {
+ try {
+ rollback(env);
+ executed = !executed;
+ } finally {
+ updateTimestamp();
+ }
+ }
+ }
+
+ @Override
+ protected void serializeStateData(final OutputStream stream) throws IOException {
+ SequentialProcedureData.Builder data = SequentialProcedureData.newBuilder();
+ data.setExecuted(executed);
+ data.build().writeDelimitedTo(stream);
+ }
+
+ @Override
+ protected void deserializeStateData(final InputStream stream) throws IOException {
+ SequentialProcedureData data = SequentialProcedureData.parseDelimitedFrom(stream);
+ executed = data.getExecuted();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d763cd7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
new file mode 100644
index 0000000..eab96e4
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.StateMachineProcedureData;
+
+/**
+ * Procedure described by a series of steps.
+ *
+ * The procedure implementor must have an enum of 'states', describing
+ * the various step of the procedure.
+ * Once the procedure is running, the procedure-framework will call executeFromState()
+ * using the 'state' provided by the user. The first call to executeFromState()
+ * will be performed with 'state = null'. The implementor can jump between
+ * states using setNextState(MyStateEnum.ordinal()).
+ * The rollback will call rollbackState() for each state that was executed, in reverse order.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class StateMachineProcedure<TEnvironment, TState>
+ extends Procedure<TEnvironment> {
+ private int stateCount = 0;
+ private int[] states = null;
+
+ protected enum Flow {
+ HAS_MORE_STATE,
+ NO_MORE_STATE,
+ }
+
+ /**
+ * called to perform a single step of the specified 'state' of the procedure
+ * @param state state to execute
+ * @return Flow.NO_MORE_STATE if the procedure is completed,
+ * Flow.HAS_MORE_STATE if there is another step.
+ */
+ protected abstract Flow executeFromState(TEnvironment env, TState state)
+ throws ProcedureYieldException;
+
+ /**
+ * called to perform the rollback of the specified state
+ * @param state state to rollback
+ * @throws IOException temporary failure, the rollback will retry later
+ */
+ protected abstract void rollbackState(TEnvironment env, TState state)
+ throws IOException;
+
+ /**
+ * Convert an ordinal (or state id) to an Enum (or more descriptive) state object.
+ * @param stateId the ordinal() of the state enum (or state id)
+ * @return the state enum object
+ */
+ protected abstract TState getState(int stateId);
+
+ /**
+ * Convert the Enum (or more descriptive) state object to an ordinal (or state id).
+ * @param state the state enum object
+ * @return stateId the ordinal() of the state enum (or state id)
+ */
+ protected abstract int getStateId(TState state);
+
+ /**
+ * Return the initial state object that will be used for the first call to executeFromState().
+ * @return the initial state enum object
+ */
+ protected abstract TState getInitialState();
+
+ /**
+ * Set the next state for the procedure.
+ * @param state the state enum object
+ */
+ protected void setNextState(final TState state) {
+ setNextState(getStateId(state));
+ }
+
+ @Override
+ protected Procedure[] execute(final TEnvironment env)
+ throws ProcedureYieldException {
+ updateTimestamp();
+ try {
+ TState state = stateCount > 0 ? getState(states[stateCount-1]) : getInitialState();
+ if (stateCount == 0) {
+ setNextState(getStateId(state));
+ }
+ if (executeFromState(env, state) == Flow.NO_MORE_STATE) {
+ // completed
+ return null;
+ }
+ return (isWaiting() || isFailed()) ? null : new Procedure[] {this};
+ } finally {
+ updateTimestamp();
+ }
+ }
+
+ @Override
+ protected void rollback(final TEnvironment env) throws IOException {
+ try {
+ updateTimestamp();
+ rollbackState(env, stateCount > 0 ? getState(states[stateCount-1]) : getInitialState());
+ stateCount--;
+ } finally {
+ updateTimestamp();
+ }
+ }
+
+ /**
+ * Set the next state for the procedure.
+ * @param stateId the ordinal() of the state enum (or state id)
+ */
+ private void setNextState(final int stateId) {
+ if (states == null || states.length == stateCount) {
+ int newCapacity = stateCount + 8;
+ if (states != null) {
+ states = Arrays.copyOf(states, newCapacity);
+ } else {
+ states = new int[newCapacity];
+ }
+ }
+ states[stateCount++] = stateId;
+ }
+
+ @Override
+ protected void serializeStateData(final OutputStream stream) throws IOException {
+ StateMachineProcedureData.Builder data = StateMachineProcedureData.newBuilder();
+ for (int i = 0; i < stateCount; ++i) {
+ data.addState(states[i]);
+ }
+ data.build().writeDelimitedTo(stream);
+ }
+
+ @Override
+ protected void deserializeStateData(final InputStream stream) throws IOException {
+ StateMachineProcedureData data = StateMachineProcedureData.parseDelimitedFrom(stream);
+ stateCount = data.getStateCount();
+ if (stateCount > 0) {
+ states = new int[stateCount];
+ for (int i = 0; i < stateCount; ++i) {
+ states[i] = data.getState(i);
+ }
+ } else {
+ states = null;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d763cd7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TwoPhaseProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TwoPhaseProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TwoPhaseProcedure.java
new file mode 100644
index 0000000..cd6b0a7
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TwoPhaseProcedure.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class TwoPhaseProcedure<TEnvironment> extends Procedure<TEnvironment> {
+ // TODO (e.g. used by ACLs/VisibilityTags updates)
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d763cd7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
new file mode 100644
index 0000000..0d1c050
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.store;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+
+/**
+ * The ProcedureStore is used by the executor to persist the state of each procedure execution.
+ * This allows to resume the execution of pending/in-progress procedures in case
+ * of machine failure or service shutdown.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface ProcedureStore {
+ /**
+ * Store listener interface.
+ * The main process should register a listener and respond to the store events.
+ */
+ public interface ProcedureStoreListener {
+ /**
+ * triggered when the store is not able to write out data.
+ * the main process should abort.
+ */
+ void abortProcess();
+ }
+
+ /**
+ * Add the listener to the notification list.
+ * @param listener The AssignmentListener to register
+ */
+ void registerListener(ProcedureStoreListener listener);
+
+ /**
+ * Remove the listener from the notification list.
+ * @param listener The AssignmentListener to unregister
+ * @return true if the listner was in the list and it was removed, otherwise false.
+ */
+ boolean unregisterListener(ProcedureStoreListener listener);
+
+ /**
+ * Start/Open the procedure store
+ * @param numThreads
+ */
+ void start(int numThreads) throws IOException;
+
+ /**
+ * Stop/Close the procedure store
+ * @param abort true if the stop is an abort
+ */
+ void stop(boolean abort);
+
+ /**
+ * @return true if the store is running, otherwise false.
+ */
+ boolean isRunning();
+
+ /**
+ * @return the number of threads/slots passed to start()
+ */
+ int getNumThreads();
+
+ /**
+ * Acquire the lease for the procedure store.
+ */
+ void recoverLease() throws IOException;
+
+ /**
+ * Load the Procedures in the store.
+ * @return the set of procedures present in the store
+ */
+ Iterator<Procedure> load() throws IOException;
+
+ /**
+ * When a procedure is submitted to the executor insert(proc, null) will be called.
+ * 'proc' has a 'RUNNABLE' state and the initial information required to start up.
+ *
+ * When a procedure is executed and it returns children insert(proc, subprocs) will be called.
+ * 'proc' has a 'WAITING' state and an update state.
+ * 'subprocs' are the children in 'RUNNABLE' state with the initial information.
+ *
+ * @param proc the procedure to serialize and write to the store.
+ * @param subprocs the newly created child of the proc.
+ */
+ void insert(Procedure proc, Procedure[] subprocs);
+
+ /**
+ * The specified procedure was executed,
+ * and the new state should be written to the store.
+ * @param proc the procedure to serialize and write to the store.
+ */
+ void update(Procedure proc);
+
+ /**
+ * The specified procId was removed from the executor,
+ * due to completion, abort or failure.
+ * The store implementor should remove all the information about the specified procId.
+ * @param procId the ID of the procedure to remove.
+ */
+ void delete(long procId);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d763cd7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
new file mode 100644
index 0000000..4e4653a
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
@@ -0,0 +1,540 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.store;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
+
+/**
+ * Keeps track of live procedures.
+ *
+ * It can be used by the ProcedureStore to identify which procedures are already
+ * deleted/completed to avoid the deserialization step on restart.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ProcedureStoreTracker {
+ private final TreeMap<Long, BitSetNode> map = new TreeMap<Long, BitSetNode>();
+
+ private boolean keepDeletes = false;
+ private boolean partial = false;
+
+ public enum DeleteState { YES, NO, MAYBE }
+
+ public static class BitSetNode {
+ private final static long WORD_MASK = 0xffffffffffffffffL;
+ private final static int ADDRESS_BITS_PER_WORD = 6;
+ private final static int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD;
+ private final static int MAX_NODE_SIZE = 4 << ADDRESS_BITS_PER_WORD;
+
+ private long[] updated;
+ private long[] deleted;
+ private long start;
+
+ public void dump() {
+ System.out.printf("%06d:%06d min=%d max=%d%n", getStart(), getEnd(),
+ getMinProcId(), getMaxProcId());
+ System.out.println("Update:");
+ for (int i = 0; i < updated.length; ++i) {
+ for (int j = 0; j < BITS_PER_WORD; ++j) {
+ System.out.print((updated[i] & (1L << j)) != 0 ? "1" : "0");
+ }
+ System.out.println(" " + i);
+ }
+ System.out.println();
+ System.out.println("Delete:");
+ for (int i = 0; i < deleted.length; ++i) {
+ for (int j = 0; j < BITS_PER_WORD; ++j) {
+ System.out.print((deleted[i] & (1L << j)) != 0 ? "1" : "0");
+ }
+ System.out.println(" " + i);
+ }
+ System.out.println();
+ }
+
+ public BitSetNode(final long procId, final boolean partial) {
+ start = alignDown(procId);
+
+ int count = 2;
+ updated = new long[count];
+ deleted = new long[count];
+ for (int i = 0; i < count; ++i) {
+ updated[i] = 0;
+ deleted[i] = partial ? 0 : WORD_MASK;
+ }
+
+ updateState(procId, false);
+ }
+
+ protected BitSetNode(final long start, final long[] updated, final long[] deleted) {
+ this.start = start;
+ this.updated = updated;
+ this.deleted = deleted;
+ }
+
+ public void update(final long procId) {
+ updateState(procId, false);
+ }
+
+ public void delete(final long procId) {
+ updateState(procId, true);
+ }
+
+ public Long getStart() {
+ return start;
+ }
+
+ public Long getEnd() {
+ return start + (updated.length << ADDRESS_BITS_PER_WORD) - 1;
+ }
+
+ public boolean contains(final long procId) {
+ return start <= procId && procId <= getEnd();
+ }
+
+ public DeleteState isDeleted(final long procId) {
+ int bitmapIndex = getBitmapIndex(procId);
+ int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
+ if (wordIndex >= deleted.length) {
+ return DeleteState.MAYBE;
+ }
+ return (deleted[wordIndex] & (1L << bitmapIndex)) != 0 ? DeleteState.YES : DeleteState.NO;
+ }
+
+ private boolean isUpdated(final long procId) {
+ int bitmapIndex = getBitmapIndex(procId);
+ int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
+ if (wordIndex >= updated.length) {
+ return false;
+ }
+ return (updated[wordIndex] & (1L << bitmapIndex)) != 0;
+ }
+
+ public boolean isUpdated() {
+ // TODO: cache the value
+ for (int i = 0; i < updated.length; ++i) {
+ long deleteMask = ~deleted[i];
+ if ((updated[i] & deleteMask) != (WORD_MASK & deleteMask)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public boolean isEmpty() {
+ // TODO: cache the value
+ for (int i = 0; i < deleted.length; ++i) {
+ if (deleted[i] != WORD_MASK) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public void resetUpdates() {
+ for (int i = 0; i < updated.length; ++i) {
+ updated[i] = 0;
+ }
+ }
+
+ public void undeleteAll() {
+ for (int i = 0; i < updated.length; ++i) {
+ deleted[i] = 0;
+ }
+ }
+
+ public ProcedureProtos.ProcedureStoreTracker.TrackerNode convert() {
+ ProcedureProtos.ProcedureStoreTracker.TrackerNode.Builder builder =
+ ProcedureProtos.ProcedureStoreTracker.TrackerNode.newBuilder();
+ builder.setStartId(start);
+ for (int i = 0; i < updated.length; ++i) {
+ builder.addUpdated(updated[i]);
+ builder.addDeleted(deleted[i]);
+ }
+ return builder.build();
+ }
+
+ public static BitSetNode convert(ProcedureProtos.ProcedureStoreTracker.TrackerNode data) {
+ long start = data.getStartId();
+ int size = data.getUpdatedCount();
+ long[] updated = new long[size];
+ long[] deleted = new long[size];
+ for (int i = 0; i < size; ++i) {
+ updated[i] = data.getUpdated(i);
+ deleted[i] = data.getDeleted(i);
+ }
+ return new BitSetNode(start, updated, deleted);
+ }
+
+ // ========================================================================
+ // Grow/Merge Helpers
+ // ========================================================================
+ public boolean canGrow(final long procId) {
+ return (procId - start) < MAX_NODE_SIZE;
+ }
+
+ public boolean canMerge(final BitSetNode rightNode) {
+ return (start + rightNode.getEnd()) < MAX_NODE_SIZE;
+ }
+
+ public void grow(final long procId) {
+ int delta, offset;
+
+ if (procId < start) {
+ // add to head
+ long newStart = alignDown(procId);
+ delta = (int)(start - newStart) >> ADDRESS_BITS_PER_WORD;
+ offset = delta;
+ } else {
+ // Add to tail
+ long newEnd = alignUp(procId + 1);
+ delta = (int)(newEnd - getEnd()) >> ADDRESS_BITS_PER_WORD;
+ offset = 0;
+ }
+
+ long[] newBitmap;
+ int oldSize = updated.length;
+
+ newBitmap = new long[oldSize + delta];
+ System.arraycopy(updated, 0, newBitmap, offset, oldSize);
+ updated = newBitmap;
+
+ newBitmap = new long[deleted.length + delta];
+ System.arraycopy(deleted, 0, newBitmap, offset, oldSize);
+ deleted = newBitmap;
+
+ for (int i = 0; i < delta; ++i) {
+ updated[oldSize + i] = 0;
+ deleted[oldSize + i] = WORD_MASK;
+ }
+ }
+
+ public void merge(final BitSetNode rightNode) {
+ int delta = (int)(rightNode.getEnd() - getEnd()) >> ADDRESS_BITS_PER_WORD;
+
+ long[] newBitmap;
+ int oldSize = updated.length;
+ int newSize = (delta - rightNode.updated.length);
+ int offset = oldSize + newSize;
+
+ newBitmap = new long[oldSize + delta];
+ System.arraycopy(updated, 0, newBitmap, 0, oldSize);
+ System.arraycopy(rightNode.updated, 0, newBitmap, offset, rightNode.updated.length);
+ updated = newBitmap;
+
+ newBitmap = new long[oldSize + delta];
+ System.arraycopy(deleted, 0, newBitmap, 0, oldSize);
+ System.arraycopy(rightNode.deleted, 0, newBitmap, offset, rightNode.deleted.length);
+ deleted = newBitmap;
+
+ for (int i = 0; i < newSize; ++i) {
+ updated[offset + i] = 0;
+ deleted[offset + i] = WORD_MASK;
+ }
+ }
+
+ // ========================================================================
+ // Min/Max Helpers
+ // ========================================================================
+ public long getMinProcId() {
+ long minProcId = start;
+ for (int i = 0; i < deleted.length; ++i) {
+ if (deleted[i] == 0) {
+ return(minProcId);
+ }
+
+ if (deleted[i] != WORD_MASK) {
+ for (int j = 0; j < BITS_PER_WORD; ++j) {
+ if ((deleted[i] & (1L << j)) != 0) {
+ return minProcId + j;
+ }
+ }
+ }
+
+ minProcId += BITS_PER_WORD;
+ }
+ return minProcId;
+ }
+
+ public long getMaxProcId() {
+ long maxProcId = getEnd();
+ for (int i = deleted.length - 1; i >= 0; --i) {
+ if (deleted[i] == 0) {
+ return maxProcId;
+ }
+
+ if (deleted[i] != WORD_MASK) {
+ for (int j = BITS_PER_WORD - 1; j >= 0; --j) {
+ if ((deleted[i] & (1L << j)) == 0) {
+ return maxProcId - (BITS_PER_WORD - 1 - j);
+ }
+ }
+ }
+ maxProcId -= BITS_PER_WORD;
+ }
+ return maxProcId;
+ }
+
+ // ========================================================================
+ // Bitmap Helpers
+ // ========================================================================
+ private int getBitmapIndex(final long procId) {
+ return (int)(procId - start);
+ }
+
+ private void updateState(final long procId, final boolean isDeleted) {
+ int bitmapIndex = getBitmapIndex(procId);
+ int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
+ long value = (1L << bitmapIndex);
+
+ if (isDeleted) {
+ updated[wordIndex] |= value;
+ deleted[wordIndex] |= value;
+ } else {
+ updated[wordIndex] |= value;
+ deleted[wordIndex] &= ~value;
+ }
+ }
+
+ // ========================================================================
+ // Helpers
+ // ========================================================================
+ private static long alignUp(final long x) {
+ return (x + (BITS_PER_WORD - 1)) & -BITS_PER_WORD;
+ }
+
+ private static long alignDown(final long x) {
+ return x & -BITS_PER_WORD;
+ }
+ }
+
+ public void insert(final Procedure proc, final Procedure[] subprocs) {
+ insert(proc.getProcId());
+ if (subprocs != null) {
+ for (int i = 0; i < subprocs.length; ++i) {
+ insert(subprocs[i].getProcId());
+ }
+ }
+ }
+
+ public void update(final Procedure proc) {
+ update(proc.getProcId());
+ }
+
+ public void insert(long procId) {
+ BitSetNode node = getOrCreateNode(procId);
+ node.update(procId);
+ }
+
+ public void update(long procId) {
+ Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
+ assert entry != null : "expected node to update procId=" + procId;
+
+ BitSetNode node = entry.getValue();
+ assert node.contains(procId);
+ node.update(procId);
+ }
+
+ public void delete(long procId) {
+ Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
+ assert entry != null : "expected node to delete procId=" + procId;
+
+ BitSetNode node = entry.getValue();
+ assert node.contains(procId) : "expected procId in the node";
+ node.delete(procId);
+
+ if (!keepDeletes && node.isEmpty()) {
+ // TODO: RESET if (map.size() == 1)
+ map.remove(entry.getKey());
+ }
+ }
+
+ @InterfaceAudience.Private
+ public void setDeleted(final long procId, final boolean isDeleted) {
+ BitSetNode node = getOrCreateNode(procId);
+ node.updateState(procId, isDeleted);
+ }
+
+ public void clear() {
+ this.map.clear();
+ }
+
+ public DeleteState isDeleted(long procId) {
+ Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
+ if (entry != null) {
+ BitSetNode node = entry.getValue();
+ DeleteState state = node.isDeleted(procId);
+ return partial && !node.isUpdated(procId) ? DeleteState.MAYBE : state;
+ }
+ return partial ? DeleteState.MAYBE : DeleteState.YES;
+ }
+
+ public long getMinProcId() {
+ // TODO: Cache?
+ Map.Entry<Long, BitSetNode> entry = map.firstEntry();
+ return entry == null ? 0 : entry.getValue().getMinProcId();
+ }
+
+ public void setKeepDeletes(boolean keepDeletes) {
+ this.keepDeletes = keepDeletes;
+ if (!keepDeletes) {
+ Iterator<Map.Entry<Long, BitSetNode>> it = map.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<Long, BitSetNode> entry = it.next();
+ if (entry.getValue().isEmpty()) {
+ it.remove();
+ }
+ }
+ }
+ }
+
+ public void setPartialFlag(boolean isPartial) {
+ this.partial = isPartial;
+ }
+
+ public boolean isEmpty() {
+ for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
+ if (entry.getValue().isEmpty() == false) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public boolean isUpdated() {
+ for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
+ if (entry.getValue().isUpdated() == false) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public void resetUpdates() {
+ for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
+ entry.getValue().resetUpdates();
+ }
+ }
+
+ public void undeleteAll() {
+ for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
+ entry.getValue().undeleteAll();
+ }
+ }
+
+ private BitSetNode getOrCreateNode(final long procId) {
+ // can procId fit in the left node?
+ BitSetNode leftNode = null;
+ boolean leftCanGrow = false;
+ Map.Entry<Long, BitSetNode> leftEntry = map.floorEntry(procId);
+ if (leftEntry != null) {
+ leftNode = leftEntry.getValue();
+ if (leftNode.contains(procId)) {
+ return leftNode;
+ }
+ leftCanGrow = leftNode.canGrow(procId);
+ }
+
+ BitSetNode rightNode = null;
+ boolean rightCanGrow = false;
+ Map.Entry<Long, BitSetNode> rightEntry = map.ceilingEntry(procId);
+ if (rightEntry != null) {
+ rightNode = rightEntry.getValue();
+ rightCanGrow = rightNode.canGrow(procId);
+ if (leftNode != null) {
+ if (leftNode.canMerge(rightNode)) {
+ // merge left and right node
+ return mergeNodes(leftNode, rightNode);
+ }
+
+ if (leftCanGrow && rightCanGrow) {
+ if ((procId - leftNode.getEnd()) <= (rightNode.getStart() - procId)) {
+ // grow the left node
+ return growNode(leftNode, procId);
+ }
+ // grow the right node
+ return growNode(rightNode, procId);
+ }
+ }
+ }
+
+ // grow the left node
+ if (leftCanGrow) {
+ return growNode(leftNode, procId);
+ }
+
+ // grow the right node
+ if (rightCanGrow) {
+ return growNode(rightNode, procId);
+ }
+
+ // add new node
+ BitSetNode node = new BitSetNode(procId, partial);
+ map.put(node.getStart(), node);
+ return node;
+ }
+
+ private BitSetNode growNode(BitSetNode node, long procId) {
+ map.remove(node.getStart());
+ node.grow(procId);
+ map.put(node.getStart(), node);
+ return node;
+ }
+
+ private BitSetNode mergeNodes(BitSetNode leftNode, BitSetNode rightNode) {
+ leftNode.merge(rightNode);
+ map.remove(rightNode.getStart());
+ return leftNode;
+ }
+
+ public void dump() {
+ System.out.println("map " + map.size());
+ for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
+ entry.getValue().dump();
+ }
+ }
+
+ public void writeTo(final OutputStream stream) throws IOException {
+ ProcedureProtos.ProcedureStoreTracker.Builder builder =
+ ProcedureProtos.ProcedureStoreTracker.newBuilder();
+ for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
+ builder.addNode(entry.getValue().convert());
+ }
+ builder.build().writeDelimitedTo(stream);
+ }
+
+ public void readFrom(final InputStream stream) throws IOException {
+ ProcedureProtos.ProcedureStoreTracker data =
+ ProcedureProtos.ProcedureStoreTracker.parseDelimitedFrom(stream);
+ map.clear();
+ for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode: data.getNodeList()) {
+ BitSetNode node = BitSetNode.convert(protoNode);
+ map.put(node.getStart(), node);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d763cd7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java
new file mode 100644
index 0000000..29db3bf
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.store.wal;
+
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Thrown when a procedure WAL is corrupted
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class CorruptedWALProcedureStoreException extends HBaseIOException {
+ /** default constructor */
+ public CorruptedWALProcedureStoreException() {
+ super();
+ }
+
+ /**
+ * Constructor
+ * @param s message
+ */
+ public CorruptedWALProcedureStoreException(String s) {
+ super(s);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d763cd7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
new file mode 100644
index 0000000..859b3cb
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.store.wal;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALTrailer;
+
+/**
+ * Describes a WAL File
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ProcedureWALFile implements Comparable<ProcedureWALFile> {
+ private static final Log LOG = LogFactory.getLog(ProcedureWALFile.class);
+
+ private ProcedureWALHeader header;
+ private FSDataInputStream stream;
+ private FileStatus logStatus;
+ private FileSystem fs;
+ private Path logFile;
+ private long startPos;
+
+ public ProcedureWALFile(final FileSystem fs, final FileStatus logStatus) {
+ this.fs = fs;
+ this.logStatus = logStatus;
+ this.logFile = logStatus.getPath();
+ }
+
+ public ProcedureWALFile(FileSystem fs, Path logFile, ProcedureWALHeader header, long startPos) {
+ this.fs = fs;
+ this.logFile = logFile;
+ this.header = header;
+ this.startPos = startPos;
+ }
+
+ public void open() throws IOException {
+ if (stream == null) {
+ stream = fs.open(logFile);
+ }
+
+ if (header == null) {
+ header = ProcedureWALFormat.readHeader(stream);
+ startPos = stream.getPos();
+ } else {
+ stream.seek(startPos);
+ }
+ }
+
+ public ProcedureWALTrailer readTrailer() throws IOException {
+ try {
+ return ProcedureWALFormat.readTrailer(stream, startPos, logStatus.getLen());
+ } finally {
+ stream.seek(startPos);
+ }
+ }
+
+ public void readTracker(ProcedureStoreTracker tracker) throws IOException {
+ ProcedureWALTrailer trailer = readTrailer();
+ try {
+ stream.seek(trailer.getTrackerPos());
+ tracker.readFrom(stream);
+ } finally {
+ stream.seek(startPos);
+ }
+ }
+
+ public void close() {
+ if (stream == null) return;
+ try {
+ stream.close();
+ } catch (IOException e) {
+ LOG.warn("unable to close the wal file: " + logFile, e);
+ } finally {
+ stream = null;
+ }
+ }
+
+ public FSDataInputStream getStream() {
+ return stream;
+ }
+
+ public ProcedureWALHeader getHeader() {
+ return header;
+ }
+
+ public boolean isCompacted() {
+ return header.getType() == ProcedureWALFormat.LOG_TYPE_COMPACTED;
+ }
+
+ public long getLogId() {
+ return header.getLogId();
+ }
+
+ public long getSize() {
+ return logStatus.getLen();
+ }
+
+ public void removeFile() throws IOException {
+ close();
+ fs.delete(logFile, false);
+ }
+
+ @Override
+ public int compareTo(final ProcedureWALFile other) {
+ long diff = header.getLogId() - other.header.getLogId();
+ return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof ProcedureWALFile)) return false;
+ return compareTo((ProcedureWALFile)o) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ return logFile.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return logFile.toString();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d763cd7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
new file mode 100644
index 0000000..17432ac
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.store.wal;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Iterator;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.hbase.io.util.StreamUtils;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
+import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALTrailer;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * Helper class that contains the WAL serialization utils.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class ProcedureWALFormat {
+ static final byte LOG_TYPE_STREAM = 0;
+ static final byte LOG_TYPE_COMPACTED = 1;
+ static final byte LOG_TYPE_MAX_VALID = 1;
+
+ static final byte HEADER_VERSION = 1;
+ static final byte TRAILER_VERSION = 1;
+ static final long HEADER_MAGIC = 0x31764c4157637250L;
+ static final long TRAILER_MAGIC = 0x50726357414c7631L;
+
+ @InterfaceAudience.Private
+ public static class InvalidWALDataException extends IOException {
+ public InvalidWALDataException(String s) {
+ super(s);
+ }
+
+ public InvalidWALDataException(Throwable t) {
+ super(t);
+ }
+ }
+
+ interface Loader {
+ void removeLog(ProcedureWALFile log);
+ void markCorruptedWAL(ProcedureWALFile log, IOException e);
+ }
+
+ private ProcedureWALFormat() {}
+
+ public static Iterator<Procedure> load(final Iterator<ProcedureWALFile> logs,
+ final ProcedureStoreTracker tracker, final Loader loader) throws IOException {
+ ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker);
+ tracker.setKeepDeletes(true);
+ try {
+ while (logs.hasNext()) {
+ ProcedureWALFile log = logs.next();
+ log.open();
+ try {
+ reader.read(log, loader);
+ } finally {
+ log.close();
+ }
+ }
+ // The tracker is now updated with all the procedures read from the logs
+ tracker.setPartialFlag(false);
+ tracker.resetUpdates();
+ } finally {
+ tracker.setKeepDeletes(false);
+ }
+ // TODO: Write compacted version?
+ return reader.getProcedures();
+ }
+
+ public static void writeHeader(OutputStream stream, ProcedureWALHeader header)
+ throws IOException {
+ header.writeDelimitedTo(stream);
+ }
+
+ /*
+ * +-----------------+
+ * | END OF WAL DATA | <---+
+ * +-----------------+ |
+ * | | |
+ * | Tracker | |
+ * | | |
+ * +-----------------+ |
+ * | version | |
+ * +-----------------+ |
+ * | TRAILER_MAGIC | |
+ * +-----------------+ |
+ * | offset |-----+
+ * +-----------------+
+ */
+ public static void writeTrailer(FSDataOutputStream stream, ProcedureStoreTracker tracker)
+ throws IOException {
+ long offset = stream.getPos();
+
+ // Write EOF Entry
+ ProcedureWALEntry.newBuilder()
+ .setType(ProcedureWALEntry.Type.EOF)
+ .build().writeDelimitedTo(stream);
+
+ // Write Tracker
+ tracker.writeTo(stream);
+
+ stream.write(TRAILER_VERSION);
+ StreamUtils.writeLong(stream, TRAILER_MAGIC);
+ StreamUtils.writeLong(stream, offset);
+ }
+
+ public static ProcedureWALHeader readHeader(InputStream stream)
+ throws IOException {
+ ProcedureWALHeader header;
+ try {
+ header = ProcedureWALHeader.parseDelimitedFrom(stream);
+ } catch (InvalidProtocolBufferException e) {
+ throw new InvalidWALDataException(e);
+ }
+
+ if (header == null) {
+ throw new InvalidWALDataException("No data available to read the Header");
+ }
+
+ if (header.getVersion() < 0 || header.getVersion() != HEADER_VERSION) {
+ throw new InvalidWALDataException("Invalid Header version. got " + header.getVersion() +
+ " expected " + HEADER_VERSION);
+ }
+
+ if (header.getType() < 0 || header.getType() > LOG_TYPE_MAX_VALID) {
+ throw new InvalidWALDataException("Invalid header type. got " + header.getType());
+ }
+
+ return header;
+ }
+
+ public static ProcedureWALTrailer readTrailer(FSDataInputStream stream, long startPos, long size)
+ throws IOException {
+ long trailerPos = size - 17; // Beginning of the Trailer Jump
+
+ if (trailerPos < startPos) {
+ throw new InvalidWALDataException("Missing trailer: size=" + size + " startPos=" + startPos);
+ }
+
+ stream.seek(trailerPos);
+ int version = stream.read();
+ if (version != TRAILER_VERSION) {
+ throw new InvalidWALDataException("Invalid Trailer version. got " + version +
+ " expected " + TRAILER_VERSION);
+ }
+
+ long magic = StreamUtils.readLong(stream);
+ if (magic != TRAILER_MAGIC) {
+ throw new InvalidWALDataException("Invalid Trailer magic. got " + magic +
+ " expected " + TRAILER_MAGIC);
+ }
+
+ long trailerOffset = StreamUtils.readLong(stream);
+ stream.seek(trailerOffset);
+
+ ProcedureWALEntry entry = readEntry(stream);
+ if (entry.getType() != ProcedureWALEntry.Type.EOF) {
+ throw new InvalidWALDataException("Invalid Trailer begin");
+ }
+
+ ProcedureWALTrailer trailer = ProcedureWALTrailer.newBuilder()
+ .setVersion(version)
+ .setTrackerPos(stream.getPos())
+ .build();
+ return trailer;
+ }
+
+ public static ProcedureWALEntry readEntry(InputStream stream) throws IOException {
+ return ProcedureWALEntry.parseDelimitedFrom(stream);
+ }
+
+ public static void writeEntry(ByteSlot slot, ProcedureWALEntry.Type type,
+ Procedure proc, Procedure[] subprocs) throws IOException {
+ ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
+ builder.setType(type);
+ builder.addProcedure(Procedure.convert(proc));
+ if (subprocs != null) {
+ for (int i = 0; i < subprocs.length; ++i) {
+ builder.addProcedure(Procedure.convert(subprocs[i]));
+ }
+ }
+ builder.build().writeDelimitedTo(slot);
+ }
+
+ public static void writeInsert(ByteSlot slot, Procedure proc)
+ throws IOException {
+ writeEntry(slot, ProcedureWALEntry.Type.INIT, proc, null);
+ }
+
+ public static void writeInsert(ByteSlot slot, Procedure proc, Procedure[] subprocs)
+ throws IOException {
+ writeEntry(slot, ProcedureWALEntry.Type.INSERT, proc, subprocs);
+ }
+
+ public static void writeUpdate(ByteSlot slot, Procedure proc)
+ throws IOException {
+ writeEntry(slot, ProcedureWALEntry.Type.UPDATE, proc, null);
+ }
+
+ public static void writeDelete(ByteSlot slot, long procId)
+ throws IOException {
+ ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
+ builder.setType(ProcedureWALEntry.Type.DELETE);
+ builder.setProcId(procId);
+ builder.build().writeDelimitedTo(slot);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d763cd7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
new file mode 100644
index 0000000..a60b8f5
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.store.wal;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
+
+/**
+ * Helper class that loads the procedures stored in a WAL
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ProcedureWALFormatReader {
+ private static final Log LOG = LogFactory.getLog(ProcedureWALFormatReader.class);
+
+ private final ProcedureStoreTracker tracker;
+ //private final long compactionLogId;
+
+ private final Map<Long, Procedure> procedures = new HashMap<Long, Procedure>();
+ private final Map<Long, ProcedureProtos.Procedure> localProcedures =
+ new HashMap<Long, ProcedureProtos.Procedure>();
+
+ private long maxProcId = 0;
+
+ public ProcedureWALFormatReader(final ProcedureStoreTracker tracker) {
+ this.tracker = tracker;
+ }
+
+ public void read(ProcedureWALFile log, ProcedureWALFormat.Loader loader) throws IOException {
+ FSDataInputStream stream = log.getStream();
+ try {
+ boolean hasMore = true;
+ while (hasMore) {
+ ProcedureWALEntry entry = ProcedureWALFormat.readEntry(stream);
+ if (entry == null) {
+ LOG.warn("nothing left to decode. exiting with missing EOF");
+ hasMore = false;
+ break;
+ }
+ switch (entry.getType()) {
+ case INIT:
+ readInitEntry(entry);
+ break;
+ case INSERT:
+ readInsertEntry(entry);
+ break;
+ case UPDATE:
+ case COMPACT:
+ readUpdateEntry(entry);
+ break;
+ case DELETE:
+ readDeleteEntry(entry);
+ break;
+ case EOF:
+ hasMore = false;
+ break;
+ default:
+ throw new CorruptedWALProcedureStoreException("Invalid entry: " + entry);
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("got an exception while reading the procedure WAL: " + log, e);
+ loader.markCorruptedWAL(log, e);
+ }
+
+ if (localProcedures.isEmpty()) {
+ LOG.info("No active entry found in state log " + log + ". removing it");
+ loader.removeLog(log);
+ } else {
+ Iterator<Map.Entry<Long, ProcedureProtos.Procedure>> itd =
+ localProcedures.entrySet().iterator();
+ while (itd.hasNext()) {
+ Map.Entry<Long, ProcedureProtos.Procedure> entry = itd.next();
+ itd.remove();
+
+ // Deserialize the procedure
+ Procedure proc = Procedure.convert(entry.getValue());
+ procedures.put(entry.getKey(), proc);
+ }
+
+ // TODO: Some procedure may be already runnables (see readInitEntry())
+ // (we can also check the "update map" in the log trackers)
+ }
+ }
+
+ public Iterator<Procedure> getProcedures() {
+ return procedures.values().iterator();
+ }
+
+ private void loadEntries(final ProcedureWALEntry entry) {
+ for (ProcedureProtos.Procedure proc: entry.getProcedureList()) {
+ maxProcId = Math.max(maxProcId, proc.getProcId());
+ if (isRequired(proc.getProcId())) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("read " + entry.getType() + " entry " + proc.getProcId());
+ }
+ localProcedures.put(proc.getProcId(), proc);
+ tracker.setDeleted(proc.getProcId(), false);
+ }
+ }
+ }
+
+ private void readInitEntry(final ProcedureWALEntry entry)
+ throws IOException {
+ assert entry.getProcedureCount() == 1 : "Expected only one procedure";
+ // TODO: Make it runnable, before reading other files
+ loadEntries(entry);
+ }
+
+ private void readInsertEntry(final ProcedureWALEntry entry) throws IOException {
+ assert entry.getProcedureCount() >= 1 : "Expected one or more procedures";
+ loadEntries(entry);
+ }
+
+ private void readUpdateEntry(final ProcedureWALEntry entry) throws IOException {
+ assert entry.getProcedureCount() == 1 : "Expected only one procedure";
+ loadEntries(entry);
+ }
+
+ private void readDeleteEntry(final ProcedureWALEntry entry) throws IOException {
+ assert entry.getProcedureCount() == 0 : "Expected no procedures";
+ assert entry.hasProcId() : "expected ProcID";
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("read delete entry " + entry.getProcId());
+ }
+ maxProcId = Math.max(maxProcId, entry.getProcId());
+ localProcedures.remove(entry.getProcId());
+ tracker.setDeleted(entry.getProcId(), true);
+ }
+
+ private boolean isDeleted(final long procId) {
+ return tracker.isDeleted(procId) == ProcedureStoreTracker.DeleteState.YES;
+ }
+
+ private boolean isRequired(final long procId) {
+ return !isDeleted(procId) && !procedures.containsKey(procId);
+ }
+}
\ No newline at end of file