You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2015/05/28 20:49:29 UTC
[2/2] hbase git commit: HBASE-13476 Procedure v2 - Add Replay Order
logic for child procedures
HBASE-13476 Procedure v2 - Add Replay Order logic for child procedures
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/24ef755f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/24ef755f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/24ef755f
Branch: refs/heads/branch-1
Commit: 24ef755f83faf17ff35735badac66f0c8d250a5a
Parents: 2f9851a
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Thu May 28 19:33:22 2015 +0100
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Thu May 28 19:42:42 2015 +0100
----------------------------------------------------------------------
.../hadoop/hbase/procedure2/Procedure.java | 14 +
.../hbase/procedure2/ProcedureExecutor.java | 126 ++--
.../hbase/procedure2/store/ProcedureStore.java | 56 +-
.../store/wal/ProcedureWALFormat.java | 8 +-
.../store/wal/ProcedureWALFormatReader.java | 592 +++++++++++++++++--
.../procedure2/store/wal/WALProcedureStore.java | 45 +-
.../procedure2/ProcedureTestingUtility.java | 6 +-
.../procedure2/TestProcedureExecution.java | 2 +-
.../hbase/procedure2/TestProcedureRecovery.java | 2 +-
.../procedure2/TestProcedureReplayOrder.java | 162 ++---
.../store/wal/TestWALProcedureStore.java | 324 +++++++++-
.../org/apache/hadoop/hbase/master/HMaster.java | 5 +-
.../procedure/MasterProcedureConstants.java | 13 +
hbase-server/src/test/resources/hbase-site.xml | 7 +
14 files changed, 1158 insertions(+), 204 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/24ef755f/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
index 338fcad..6abf2c5 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
@@ -519,6 +519,20 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
+ /**
+ * Get an hashcode for the specified Procedure ID
+ * @return the hashcode for the specified procId
+ */
+ public static long getProcIdHashCode(final long procId) {
+ long h = procId;
+ h ^= h >> 16;
+ h *= 0x85ebca6b;
+ h ^= h >> 13;
+ h *= 0xc2b2ae35;
+ h ^= h >> 16;
+ return h;
+ }
+
/*
* Helper to lookup the root Procedure ID given a specified procedure.
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/24ef755f/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 6e87997..59b346a 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -28,7 +28,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.HashSet;
-import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -43,6 +42,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue;
import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever;
@@ -264,45 +264,70 @@ public class ProcedureExecutor<TEnvironment> {
this.conf = conf;
}
- private List<Map.Entry<Long, RootProcedureState>> load() throws IOException {
+ private void load(final boolean abortOnCorruption) throws IOException {
Preconditions.checkArgument(completed.isEmpty());
Preconditions.checkArgument(rollbackStack.isEmpty());
Preconditions.checkArgument(procedures.isEmpty());
Preconditions.checkArgument(waitingTimeout.isEmpty());
Preconditions.checkArgument(runnables.size() == 0);
- // 1. Load the procedures
- Iterator<Procedure> loader = store.load();
- if (loader == null) {
- lastProcId.set(0);
- return null;
- }
+ store.load(new ProcedureStore.ProcedureLoader() {
+ @Override
+ public void setMaxProcId(long maxProcId) {
+ assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()";
+ LOG.debug("load procedures maxProcId=" + maxProcId);
+ lastProcId.set(maxProcId);
+ }
- long logMaxProcId = 0;
- int runnablesCount = 0;
- while (loader.hasNext()) {
- Procedure proc = loader.next();
- proc.beforeReplay(getEnvironment());
- procedures.put(proc.getProcId(), proc);
- logMaxProcId = Math.max(logMaxProcId, proc.getProcId());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Loading procedure state=" + proc.getState() +
- " isFailed=" + proc.hasException() + ": " + proc);
+ @Override
+ public void load(ProcedureIterator procIter) throws IOException {
+ loadProcedures(procIter, abortOnCorruption);
}
+
+ @Override
+ public void handleCorrupted(ProcedureIterator procIter) throws IOException {
+ int corruptedCount = 0;
+ while (procIter.hasNext()) {
+ Procedure proc = procIter.next();
+ LOG.error("corrupted procedure: " + proc);
+ corruptedCount++;
+ }
+ if (abortOnCorruption && corruptedCount > 0) {
+ throw new IOException("found " + corruptedCount + " procedures on replay");
+ }
+ }
+ });
+ }
+
+ private void loadProcedures(final ProcedureIterator procIter,
+ final boolean abortOnCorruption) throws IOException {
+ // 1. Build the rollback stack
+ int runnablesCount = 0;
+ while (procIter.hasNext()) {
+ Procedure proc = procIter.next();
if (!proc.hasParent() && !proc.isFinished()) {
rollbackStack.put(proc.getProcId(), new RootProcedureState());
}
+ // add the procedure to the map
+ proc.beforeReplay(getEnvironment());
+ procedures.put(proc.getProcId(), proc);
+
if (proc.getState() == ProcedureState.RUNNABLE) {
runnablesCount++;
}
}
- assert lastProcId.get() < 0;
- lastProcId.set(logMaxProcId);
// 2. Initialize the stacks
- TreeSet<Procedure> runnableSet = null;
+ ArrayList<Procedure> runnableList = new ArrayList(runnablesCount);
HashSet<Procedure> waitingSet = null;
- for (final Procedure proc: procedures.values()) {
+ procIter.reset();
+ while (procIter.hasNext()) {
+ Procedure proc = procIter.next();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Loading procedure state=%s isFailed=%s: %s",
+ proc.getState(), proc.hasException(), proc));
+ }
+
Long rootProcId = getRootProcedureId(proc);
if (rootProcId == null) {
// The 'proc' was ready to run but the root procedure was rolledback?
@@ -312,10 +337,11 @@ public class ProcedureExecutor<TEnvironment> {
if (!proc.hasParent() && proc.isFinished()) {
if (LOG.isDebugEnabled()) {
- LOG.debug("The procedure is completed state=" + proc.getState() +
- " isFailed=" + proc.hasException() + ": " + proc);
+ LOG.debug(String.format("The procedure is completed state=%s isFailed=%s",
+ proc.getState(), proc.hasException()));
}
assert !rollbackStack.containsKey(proc.getProcId());
+ procedures.remove(proc.getProcId());
completed.put(proc.getProcId(), newResultFromProcedure(proc));
continue;
}
@@ -333,10 +359,7 @@ public class ProcedureExecutor<TEnvironment> {
switch (proc.getState()) {
case RUNNABLE:
- if (runnableSet == null) {
- runnableSet = new TreeSet<Procedure>();
- }
- runnableSet.add(proc);
+ runnableList.add(proc);
break;
case WAITING_TIMEOUT:
if (waitingSet == null) {
@@ -361,7 +384,7 @@ public class ProcedureExecutor<TEnvironment> {
}
// 3. Validate the stacks
- List<Map.Entry<Long, RootProcedureState>> corrupted = null;
+ int corruptedCount = 0;
Iterator<Map.Entry<Long, RootProcedureState>> itStack = rollbackStack.entrySet().iterator();
while (itStack.hasNext()) {
Map.Entry<Long, RootProcedureState> entry = itStack.next();
@@ -369,32 +392,49 @@ public class ProcedureExecutor<TEnvironment> {
if (procStack.isValid()) continue;
for (Procedure proc: procStack.getSubprocedures()) {
+ LOG.error("corrupted procedure: " + proc);
procedures.remove(proc.getProcId());
- if (runnableSet != null) runnableSet.remove(proc);
+ runnableList.remove(proc);
if (waitingSet != null) waitingSet.remove(proc);
+ corruptedCount++;
}
itStack.remove();
- if (corrupted == null) {
- corrupted = new ArrayList<Map.Entry<Long, RootProcedureState>>();
- }
- corrupted.add(entry);
+ }
+
+ if (abortOnCorruption && corruptedCount > 0) {
+ throw new IOException("found " + corruptedCount + " procedures on replay");
}
// 4. Push the runnables
- if (runnableSet != null) {
- // TODO: See ProcedureWALFormatReader.readInitEntry() some procedure
- // may be started way before this stuff.
- for (Procedure proc: runnableSet) {
+ if (!runnableList.isEmpty()) {
+ // TODO: See ProcedureWALFormatReader#hasFastStartSupport
+ // some procedure may be started way before this stuff.
+ for (int i = runnableList.size() - 1; i >= 0; --i) {
+ Procedure proc = runnableList.get(i);
if (!proc.hasParent()) {
sendProcedureLoadedNotification(proc.getProcId());
}
- runnables.addBack(proc);
+ if (proc.wasExecuted()) {
+ runnables.addFront(proc);
+ } else {
+ // if it was not in execution, it can wait.
+ runnables.addBack(proc);
+ }
}
}
- return corrupted;
}
- public void start(int numThreads) throws IOException {
+ /**
+ * Start the procedure executor.
+ * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to
+ * recover the lease, and ensure a single executor, and start the procedure
+ * replay to resume and recover the previous pending and in-progress perocedures.
+ *
+ * @param numThreads number of threads available for procedure execution.
+ * @param abortOnCorruption true if you want to abort your service in case
+ * a corrupted procedure is found on replay. otherwise false.
+ */
+ public void start(int numThreads, boolean abortOnCorruption) throws IOException {
if (running.getAndSet(true)) {
LOG.warn("Already running");
return;
@@ -427,11 +467,11 @@ public class ProcedureExecutor<TEnvironment> {
store.recoverLease();
// TODO: Split in two steps.
- // TODO: Handle corrupted procedure returned (probably just a WARN)
+ // TODO: Handle corrupted procedures (currently just a warn)
// The first one will make sure that we have the latest id,
// so we can start the threads and accept new procedures.
// The second step will do the actual load of old procedures.
- load();
+ load(abortOnCorruption);
// Start the executors. Here we must have the lastProcId set.
for (int i = 0; i < threads.length; ++i) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/24ef755f/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
index 06bfa44..a05c115 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.hbase.procedure2.store;
import java.io.IOException;
-import java.util.Iterator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
@@ -46,6 +45,57 @@ public interface ProcedureStore {
}
/**
+ * An Iterator over a collection of Procedure
+ */
+ public interface ProcedureIterator {
+ /**
+ * Reset the Iterator by seeking to the beginning of the list.
+ */
+ void reset();
+
+ /**
+ * Returns true if the iterator has more elements.
+ * (In other words, returns true if next() would return a Procedure
+ * rather than throwing an exception.)
+ * @return true if the iterator has more procedures
+ */
+ boolean hasNext();
+
+ /**
+ * Returns the next procedure in the iteration.
+ * @throws IOException if there was an error fetching/deserializing the procedure
+ * @throws NoSuchElementException if the iteration has no more elements
+ * @return the next procedure in the iteration.
+ */
+ Procedure next() throws IOException;
+ }
+
+ /**
+ * Interface passed to the ProcedureStore.load() method to handle the store-load events.
+ */
+ public interface ProcedureLoader {
+ /**
+ * Called by ProcedureStore.load() to notify about the maximum proc-id in the store.
+ * @param maxProcId the highest proc-id in the store
+ */
+ void setMaxProcId(long maxProcId);
+
+ /**
+ * Called by the ProcedureStore.load() every time a set of procedures are ready to be executed.
+ * The ProcedureIterator passed to the method, has the procedure sorted in replay-order.
+ * @param procIter iterator over the procedures ready to be added to the executor.
+ */
+ void load(ProcedureIterator procIter) throws IOException;
+
+ /**
+ * Called by the ProcedureStore.load() in case we have procedures not-ready to be added to
+ * the executor, which probably means they are corrupted since some information/link is missing.
+ * @param procIter iterator over the procedures not ready to be added to the executor, corrupted
+ */
+ void handleCorrupted(ProcedureIterator procIter) throws IOException;
+ }
+
+ /**
* Add the listener to the notification list.
* @param listener The AssignmentListener to register
*/
@@ -87,9 +137,9 @@ public interface ProcedureStore {
/**
* Load the Procedures in the store.
- * @return the set of procedures present in the store
+ * @param loader the ProcedureLoader that will handle the store-load events
*/
- Iterator<Procedure> load() throws IOException;
+ void load(ProcedureLoader loader) throws IOException;
/**
* When a procedure is submitted to the executor insert(proc, null) will be called.
http://git-wip-us.apache.org/repos/asf/hbase/blob/24ef755f/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
index 17432ac..c75c141 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
@@ -63,14 +64,14 @@ public final class ProcedureWALFormat {
}
}
- interface Loader {
+ interface Loader extends ProcedureLoader {
void removeLog(ProcedureWALFile log);
void markCorruptedWAL(ProcedureWALFile log, IOException e);
}
private ProcedureWALFormat() {}
- public static Iterator<Procedure> load(final Iterator<ProcedureWALFile> logs,
+ public static void load(final Iterator<ProcedureWALFile> logs,
final ProcedureStoreTracker tracker, final Loader loader) throws IOException {
ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker);
tracker.setKeepDeletes(true);
@@ -84,14 +85,13 @@ public final class ProcedureWALFormat {
log.close();
}
}
+ reader.finalize(loader);
// The tracker is now updated with all the procedures read from the logs
tracker.setPartialFlag(false);
tracker.resetUpdates();
} finally {
tracker.setKeepDeletes(false);
}
- // TODO: Write compacted version?
- return reader.getProcedures();
}
public static void writeHeader(OutputStream stream, ProcedureWALHeader header)
http://git-wip-us.apache.org/repos/asf/hbase/blob/24ef755f/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
index a60b8f5..76c0554 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
@@ -19,9 +19,6 @@
package org.apache.hadoop.hbase.procedure2.store.wal;
import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.HashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -30,6 +27,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
@@ -41,17 +39,74 @@ import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEn
public class ProcedureWALFormatReader {
private static final Log LOG = LogFactory.getLog(ProcedureWALFormatReader.class);
- private final ProcedureStoreTracker tracker;
- //private final long compactionLogId;
-
- private final Map<Long, Procedure> procedures = new HashMap<Long, Procedure>();
- private final Map<Long, ProcedureProtos.Procedure> localProcedures =
- new HashMap<Long, ProcedureProtos.Procedure>();
+ // ==============================================================================================
+ // We read the WALs in reverse order. from the newest to the oldest.
+ // We have different entry types:
+ // - INIT: Procedure submitted by the user (also known as 'root procedure')
+ // - INSERT: Children added to the procedure <parentId>:[<childId>, ...]
+ // - UPDATE: The specified procedure was updated
+ // - DELETE: The procedure was removed (completed/rolledback and result TTL expired)
+ //
+ // In the WAL we can find multiple times the same procedure as UPDATE or INSERT.
+ // We read the WAL from top to bottom, so every time we find an entry of the
+ // same procedure, that will be the "latest" update.
+ //
+ // We keep two in-memory maps:
+ // - localProcedureMap: is the map containing the entries in the WAL we are processing
+ // - procedureMap: is the map containing all the procedures we found up to the WAL in process.
+ // localProcedureMap is merged with the procedureMap once we reach the WAL EOF.
+ //
+ // Since we are reading the WALs in reverse order (newest to oldest),
+ // if we find an entry related to a procedure we already have in 'procedureMap' we can discard it.
+ //
+ // The WAL is append-only so the last procedure in the WAL is the one that
+ // was in execution at the time we crashed/closed the server.
+ // given that, the procedure replay order can be inferred by the WAL order.
+ //
+ // Example:
+ // WAL-2: [A, B, A, C, D]
+ // WAL-1: [F, G, A, F, B]
+ // Replay-Order: [D, C, A, B, F, G]
+ //
+ // The "localProcedureMap" keeps a "replayOrder" list. Every time we add the
+ // record to the map that record is moved to the head of the "replayOrder" list.
+ // Using the example above:
+ // WAL-2 localProcedureMap.replayOrder is [D, C, A, B]
+ // WAL-1 localProcedureMap.replayOrder is [F, G]
+ //
+ // each time we reach the WAL-EOF, the "replayOrder" list is merged/appended in 'procedureMap'
+ // so using the example above we end up with: [D, C, A, B] + [F, G] as replay order.
+ //
+ // Fast Start: INIT/INSERT record and StackIDs
+ // ---------------------------------------------
+ // We have to special record, INIT and INSERT that tracks the first time
+ // the procedure was added to the WAL. We can use that information to be able
+ // to start procedures before reaching the end of the WAL, or before reading all the WALs.
+ // but in some cases the WAL with that record can be already gone.
+ // In alternative we can use the stackIds on each procedure,
+ // to identify when a procedure is ready to start.
+ // If there are gaps in the sum of the stackIds we need to read more WALs.
+ //
+ // Example (all procs child of A):
+ // WAL-2: [A, B] A stackIds = [0, 4], B stackIds = [1, 5]
+ // WAL-1: [A, B, C, D]
+ //
+ // In the case above we need to read one more WAL to be able to consider
+ // the root procedure A and all children as ready.
+ // ==============================================================================================
+ private final WalProcedureMap localProcedureMap = new WalProcedureMap(1024);
+ private final WalProcedureMap procedureMap = new WalProcedureMap(1024);
+ //private long compactionLogId;
private long maxProcId = 0;
+ private final ProcedureStoreTracker tracker;
+ private final boolean hasFastStartSupport;
+
public ProcedureWALFormatReader(final ProcedureStoreTracker tracker) {
this.tracker = tracker;
+ // we support fast-start only if we have a clean shutdown.
+ this.hasFastStartSupport = !tracker.isEmpty();
}
public void read(ProcedureWALFile log, ProcedureWALFormat.Loader loader) throws IOException {
@@ -91,58 +146,65 @@ public class ProcedureWALFormatReader {
loader.markCorruptedWAL(log, e);
}
- if (localProcedures.isEmpty()) {
+ if (localProcedureMap.isEmpty()) {
LOG.info("No active entry found in state log " + log + ". removing it");
loader.removeLog(log);
} else {
- Iterator<Map.Entry<Long, ProcedureProtos.Procedure>> itd =
- localProcedures.entrySet().iterator();
- while (itd.hasNext()) {
- Map.Entry<Long, ProcedureProtos.Procedure> entry = itd.next();
- itd.remove();
+ procedureMap.mergeTail(localProcedureMap);
- // Deserialize the procedure
- Procedure proc = Procedure.convert(entry.getValue());
- procedures.put(entry.getKey(), proc);
- }
-
- // TODO: Some procedure may be already runnables (see readInitEntry())
- // (we can also check the "update map" in the log trackers)
+ //if (hasFastStartSupport) {
+ // TODO: Some procedure may be already runnables (see readInitEntry())
+ // (we can also check the "update map" in the log trackers)
+ // --------------------------------------------------
+ //EntryIterator iter = procedureMap.fetchReady();
+ //if (iter != null) loader.load(iter);
+ // --------------------------------------------------
+ //}
}
}
- public Iterator<Procedure> getProcedures() {
- return procedures.values().iterator();
+ public void finalize(ProcedureWALFormat.Loader loader) throws IOException {
+ // notify the loader about the max proc ID
+ loader.setMaxProcId(maxProcId);
+
+ // fetch the procedure ready to run.
+ ProcedureIterator procIter = procedureMap.fetchReady();
+ if (procIter != null) loader.load(procIter);
+
+ // remaining procedures have missing link or dependencies
+ // consider them as corrupted, manual fix is probably required.
+ procIter = procedureMap.fetchAll();
+ if (procIter != null) loader.handleCorrupted(procIter);
}
- private void loadEntries(final ProcedureWALEntry entry) {
- for (ProcedureProtos.Procedure proc: entry.getProcedureList()) {
- maxProcId = Math.max(maxProcId, proc.getProcId());
- if (isRequired(proc.getProcId())) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("read " + entry.getType() + " entry " + proc.getProcId());
- }
- localProcedures.put(proc.getProcId(), proc);
- tracker.setDeleted(proc.getProcId(), false);
+ private void loadProcedure(final ProcedureWALEntry entry, final ProcedureProtos.Procedure proc) {
+ maxProcId = Math.max(maxProcId, proc.getProcId());
+ if (isRequired(proc.getProcId())) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("read " + entry.getType() + " entry " + proc.getProcId());
}
+ localProcedureMap.add(proc);
+ tracker.setDeleted(proc.getProcId(), false);
}
}
private void readInitEntry(final ProcedureWALEntry entry)
throws IOException {
assert entry.getProcedureCount() == 1 : "Expected only one procedure";
- // TODO: Make it runnable, before reading other files
- loadEntries(entry);
+ loadProcedure(entry, entry.getProcedure(0));
}
private void readInsertEntry(final ProcedureWALEntry entry) throws IOException {
assert entry.getProcedureCount() >= 1 : "Expected one or more procedures";
- loadEntries(entry);
+ loadProcedure(entry, entry.getProcedure(0));
+ for (int i = 1; i < entry.getProcedureCount(); ++i) {
+ loadProcedure(entry, entry.getProcedure(i));
+ }
}
private void readUpdateEntry(final ProcedureWALEntry entry) throws IOException {
assert entry.getProcedureCount() == 1 : "Expected only one procedure";
- loadEntries(entry);
+ loadProcedure(entry, entry.getProcedure(0));
}
private void readDeleteEntry(final ProcedureWALEntry entry) throws IOException {
@@ -152,7 +214,7 @@ public class ProcedureWALFormatReader {
LOG.trace("read delete entry " + entry.getProcId());
}
maxProcId = Math.max(maxProcId, entry.getProcId());
- localProcedures.remove(entry.getProcId());
+ localProcedureMap.remove(entry.getProcId());
tracker.setDeleted(entry.getProcId(), true);
}
@@ -161,6 +223,458 @@ public class ProcedureWALFormatReader {
}
private boolean isRequired(final long procId) {
- return !isDeleted(procId) && !procedures.containsKey(procId);
+ return !isDeleted(procId) && !procedureMap.contains(procId);
+ }
+
+ // ==========================================================================
+ // We keep an in-memory map of the procedures sorted by replay order.
+ // (see the details in the beginning of the file)
+ // _______________________________________________
+ // procedureMap = | A | | E | | C | | | | | G | | |
+ // D B
+ // replayOrderHead = C <-> B <-> E <-> D <-> A <-> G
+ //
+ // We also have a lazy grouping by "root procedure", and a list of
+ // unlinked procedure. If after reading all the WALs we have unlinked
+ // procedures it means that we had a missing WAL or a corruption.
+ // rootHead = A <-> D <-> G
+ // B E
+ // C
+ // unlinkFromLinkList = None
+ // ==========================================================================
+ private static class Entry {
+ // hash-table next
+ protected Entry hashNext;
+ // child head
+ protected Entry childHead;
+ // double-link for rootHead or childHead
+ protected Entry linkNext;
+ protected Entry linkPrev;
+ // replay double-linked-list
+ protected Entry replayNext;
+ protected Entry replayPrev;
+ // procedure-infos
+ protected Procedure procedure;
+ protected ProcedureProtos.Procedure proto;
+ protected boolean ready = false;
+
+ public Entry(Entry hashNext) { this.hashNext = hashNext; }
+
+ public long getProcId() { return proto.getProcId(); }
+ public long getParentId() { return proto.getParentId(); }
+ public boolean hasParent() { return proto.hasParentId(); }
+ public boolean isReady() { return ready; }
+
+ public Procedure convert() throws IOException {
+ if (procedure == null) {
+ procedure = Procedure.convert(proto);
+ }
+ return procedure;
+ }
+
+ @Override
+ public String toString() {
+ return "Entry(" + getProcId() + ", parentId=" + getParentId() + ")";
+ }
+ }
+
+ private static class EntryIterator implements ProcedureIterator {
+ private final Entry replayHead;
+ private Entry current;
+
+ public EntryIterator(Entry replayHead) {
+ this.replayHead = replayHead;
+ this.current = replayHead;
+ }
+
+ @Override
+ public void reset() {
+ this.current = replayHead;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return current != null;
+ }
+
+ @Override
+ public Procedure next() throws IOException {
+ try {
+ return current.convert();
+ } finally {
+ current = current.replayNext;
+ }
+ }
+ }
+
+ private static class WalProcedureMap {
+ // procedure hash table
+ private Entry[] procedureMap;
+
+ // replay-order double-linked-list
+ private Entry replayOrderHead;
+ private Entry replayOrderTail;
+
+ // root linked-list
+ private Entry rootHead;
+
+ // pending unlinked children (root not present yet)
+ private Entry childUnlinkedHead;
+
+ public WalProcedureMap(int size) {
+ procedureMap = new Entry[size];
+ replayOrderHead = null;
+ replayOrderTail = null;
+ rootHead = null;
+ childUnlinkedHead = null;
+ }
+
+ public void add(ProcedureProtos.Procedure procProto) {
+ Entry entry = addToMap(procProto.getProcId(), procProto.hasParentId());
+ boolean isNew = entry.proto == null;
+ entry.proto = procProto;
+ addToReplayList(entry);
+
+ if (isNew) {
+ if (procProto.hasParentId()) {
+ childUnlinkedHead = addToLinkList(entry, childUnlinkedHead);
+ } else {
+ rootHead = addToLinkList(entry, rootHead);
+ }
+ }
+ }
+
+ public boolean remove(long procId) {
+ Entry entry = removeFromMap(procId);
+ if (entry != null) {
+ unlinkFromReplayList(entry);
+ unlinkFromLinkList(entry);
+ return true;
+ }
+ return false;
+ }
+
+ public boolean contains(long procId) {
+ return getProcedure(procId) != null;
+ }
+
+ public boolean isEmpty() {
+ return replayOrderHead == null;
+ }
+
+ public void clear() {
+ for (int i = 0; i < procedureMap.length; ++i) {
+ procedureMap[i] = null;
+ }
+ replayOrderHead = null;
+ replayOrderTail = null;
+ rootHead = null;
+ childUnlinkedHead = null;
+ }
+
+ /*
+ * Merges two WalProcedureMap,
+ * the target is the "global" map, the source is the "local" map.
+ * - The entries in the hashtables are guaranteed to be unique.
+ * On replay we don't load procedures that already exist in the "global"
+ * map (the one we are merging the "local" in to).
+ * - The replayOrderList of the "local" nao will be appended to the "global"
+ * map replay list.
+ * - The "local" map will be cleared at the end of the operation.
+ */
+ public void mergeTail(WalProcedureMap other) {
+ for (Entry p = other.replayOrderHead; p != null; p = p.replayNext) {
+ int slotIndex = getMapSlot(p.getProcId());
+ p.hashNext = procedureMap[slotIndex];
+ procedureMap[slotIndex] = p;
+ }
+
+ if (replayOrderHead == null) {
+ replayOrderHead = other.replayOrderHead;
+ replayOrderTail = other.replayOrderTail;
+ rootHead = other.rootHead;
+ childUnlinkedHead = other.childUnlinkedHead;
+ } else {
+ // append replay list
+ assert replayOrderTail.replayNext == null;
+ assert other.replayOrderHead.replayPrev == null;
+ replayOrderTail.replayNext = other.replayOrderHead;
+ other.replayOrderHead.replayPrev = replayOrderTail;
+ replayOrderTail = other.replayOrderTail;
+
+ // merge rootHead
+ if (rootHead == null) {
+ rootHead = other.rootHead;
+ } else if (other.rootHead != null) {
+ Entry otherTail = findLinkListTail(other.rootHead);
+ otherTail.linkNext = rootHead;
+ rootHead.linkPrev = otherTail;
+ rootHead = other.rootHead;
+ }
+
+ // merge childUnlinkedHead
+ if (childUnlinkedHead == null) {
+ childUnlinkedHead = other.childUnlinkedHead;
+ } else if (other.childUnlinkedHead != null) {
+ Entry otherTail = findLinkListTail(other.childUnlinkedHead);
+ otherTail.linkNext = childUnlinkedHead;
+ childUnlinkedHead.linkPrev = otherTail;
+ childUnlinkedHead = other.childUnlinkedHead;
+ }
+ }
+
+ other.clear();
+ }
+
+ /*
+ * Returns an EntryIterator with the list of procedures ready
+ * to be added to the executor.
+ * A Procedure is ready if its children and parent are ready.
+ */
+ public EntryIterator fetchReady() {
+ buildGraph();
+
+ Entry readyHead = null;
+ Entry readyTail = null;
+ Entry p = replayOrderHead;
+ while (p != null) {
+ Entry next = p.replayNext;
+ if (p.isReady()) {
+ unlinkFromReplayList(p);
+ if (readyTail != null) {
+ readyTail.replayNext = p;
+ p.replayPrev = readyTail;
+ } else {
+ p.replayPrev = null;
+ readyHead = p;
+ }
+ readyTail = p;
+ p.replayNext = null;
+ }
+ p = next;
+ }
+ // we need the hash-table lookups for parents, so this must be done
+ // out of the loop where we check isReadyToRun()
+ for (p = readyHead; p != null; p = p.replayNext) {
+ removeFromMap(p.getProcId());
+ unlinkFromLinkList(p);
+ }
+ return readyHead != null ? new EntryIterator(readyHead) : null;
+ }
+
+ /*
+ * Drain this map and return all procedures in it.
+ */
+ public EntryIterator fetchAll() {
+ Entry head = replayOrderHead;
+ for (Entry p = head; p != null; p = p.replayNext) {
+ removeFromMap(p.getProcId());
+ }
+ for (int i = 0; i < procedureMap.length; ++i) {
+ assert procedureMap[i] == null : "map not empty i=" + i;
+ }
+ replayOrderHead = null;
+ replayOrderTail = null;
+ childUnlinkedHead = null;
+ rootHead = null;
+ return head != null ? new EntryIterator(head) : null;
+ }
+
+ private void buildGraph() {
+ Entry p = childUnlinkedHead;
+ while (p != null) {
+ Entry next = p.linkNext;
+ Entry rootProc = getRootProcedure(p);
+ if (rootProc != null) {
+ rootProc.childHead = addToLinkList(p, rootProc.childHead);
+ }
+ p = next;
+ }
+
+ for (p = rootHead; p != null; p = p.linkNext) {
+ checkReadyToRun(p);
+ }
+ }
+
+ private Entry getRootProcedure(Entry entry) {
+ while (entry != null && entry.hasParent()) {
+ entry = getProcedure(entry.getParentId());
+ }
+ return entry;
+ }
+
+ /*
+ * (see the comprehensive explaination in the beginning of the file)
+ * A Procedure is ready when parent and children are ready.
+ * "ready" means that we all the information that we need in-memory.
+ *
+ * Example-1:
+ * We have two WALs, we start reading fronm the newest (wal-2)
+ * wal-2 | C B |
+ * wal-1 | A B C |
+ *
+ * If C and B don't depend on A (A is not the parent), we can start them
+ * before reading wal-1. If B is the only one with parent A we can start C
+ * and read one more WAL before being able to start B.
+ *
+ * How do we know with the only information in B that we are not ready.
+ * - easy case, the parent is missing from the global map
+ * - more complex case we look at the Stack IDs
+ *
+ * The Stack-IDs are added to the procedure order as incremental index
+ * tracking how many times that procedure was executed, which is equivalent
+ * at the number of times we wrote the procedure to the WAL.
+ * In the example above:
+ * wal-2: B has stackId = [1, 2]
+ * wal-1: B has stackId = [1]
+ * wal-1: A has stackId = [0]
+ *
+ * Since we know that the Stack-IDs are incremental for a Procedure,
+ * we notice that there is a gap in the stackIds of B, so something was
+ * executed before.
+ * To identify when a Procedure is ready we do the sum of the stackIds of
+ * the procedure and the parent. if the stackIdSum is equals to the
+ * sum of {1..maxStackId} then everything we need is avaiable.
+ *
+ * Example-2
+ * wal-2 | A | A stackIds = [0, 2]
+ * wal-1 | A B | B stackIds = [1]
+ *
+ * There is a gap between A stackIds so something was executed in between.
+ */
+ private boolean checkReadyToRun(Entry rootEntry) {
+ int stackIdSum = 0;
+ int maxStackId = 0;
+ for (int i = 0; i < rootEntry.proto.getStackIdCount(); ++i) {
+ int stackId = 1 + rootEntry.proto.getStackId(i);
+ maxStackId = Math.max(maxStackId, stackId);
+ stackIdSum += stackId;
+ }
+
+ for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
+ for (int i = 0; i < p.proto.getStackIdCount(); ++i) {
+ int stackId = 1 + p.proto.getStackId(i);
+ maxStackId = Math.max(maxStackId, stackId);
+ stackIdSum += stackId;
+ }
+ }
+ final int cmpStackIdSum = (maxStackId * (maxStackId + 1) / 2);
+ if (cmpStackIdSum == stackIdSum) {
+ rootEntry.ready = true;
+ for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
+ p.ready = true;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ private void unlinkFromReplayList(Entry entry) {
+ if (replayOrderHead == entry) {
+ replayOrderHead = entry.replayNext;
+ }
+ if (replayOrderTail == entry) {
+ replayOrderTail = entry.replayPrev;
+ }
+ if (entry.replayPrev != null) {
+ entry.replayPrev.replayNext = entry.replayNext;
+ }
+ if (entry.replayNext != null) {
+ entry.replayNext.replayPrev = entry.replayPrev;
+ }
+ }
+
+ private void addToReplayList(final Entry entry) {
+ unlinkFromReplayList(entry);
+ entry.replayNext = replayOrderHead;
+ entry.replayPrev = null;
+ if (replayOrderHead != null) {
+ replayOrderHead.replayPrev = entry;
+ } else {
+ replayOrderTail = entry;
+ }
+ replayOrderHead = entry;
+ }
+
+ private void unlinkFromLinkList(Entry entry) {
+ if (entry == rootHead) {
+ rootHead = entry.linkNext;
+ } else if (entry == childUnlinkedHead) {
+ childUnlinkedHead = entry.linkNext;
+ }
+ if (entry.linkPrev != null) {
+ entry.linkPrev.linkNext = entry.linkNext;
+ }
+ if (entry.linkNext != null) {
+ entry.linkNext.linkPrev = entry.linkPrev;
+ }
+ }
+
+ private Entry addToLinkList(Entry entry, Entry linkHead) {
+ unlinkFromLinkList(entry);
+ entry.linkNext = linkHead;
+ entry.linkPrev = null;
+ if (linkHead != null) {
+ linkHead.linkPrev = entry;
+ }
+ return entry;
+ }
+
+ private Entry findLinkListTail(Entry linkHead) {
+ Entry tail = linkHead;
+ while (tail.linkNext != null) {
+ tail = tail.linkNext;
+ }
+ return tail;
+ }
+
+ private Entry addToMap(final long procId, final boolean hasParent) {
+ int slotIndex = getMapSlot(procId);
+ Entry entry = getProcedure(slotIndex, procId);
+ if (entry != null) return entry;
+
+ entry = new Entry(procedureMap[slotIndex]);
+ procedureMap[slotIndex] = entry;
+ return entry;
+ }
+
+ private Entry removeFromMap(final long procId) {
+ int slotIndex = getMapSlot(procId);
+ Entry prev = null;
+ Entry entry = procedureMap[slotIndex];
+ while (entry != null) {
+ if (procId == entry.getProcId()) {
+ if (prev != null) {
+ prev.hashNext = entry.hashNext;
+ } else {
+ procedureMap[slotIndex] = entry.hashNext;
+ }
+ entry.hashNext = null;
+ return entry;
+ }
+ prev = entry;
+ entry = entry.hashNext;
+ }
+ return null;
+ }
+
+ private Entry getProcedure(final long procId) {
+ return getProcedure(getMapSlot(procId), procId);
+ }
+
+ private Entry getProcedure(final int slotIndex, final long procId) {
+ Entry entry = procedureMap[slotIndex];
+ while (entry != null) {
+ if (procId == entry.getProcId()) {
+ return entry;
+ }
+ entry = entry.hashNext;
+ }
+ return null;
+ }
+
+ private int getMapSlot(final long procId) {
+ return (int)(Procedure.getProcIdHashCode(procId) % procedureMap.length);
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/24ef755f/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index 1884adc..f4a52b1 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -215,13 +215,12 @@ public class WALProcedureStore implements ProcedureStore {
FileStatus[] oldLogs = getLogFiles();
while (running.get()) {
// Get Log-MaxID and recover lease on old logs
- flushLogId = initOldLogs(oldLogs) + 1;
+ flushLogId = initOldLogs(oldLogs);
// Create new state-log
- if (!rollWriter(flushLogId)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Someone else has already created log: " + flushLogId);
- }
+ if (!rollWriter(flushLogId + 1)) {
+ // someone else has already created this log
+ LOG.debug("someone else has already created log " + flushLogId);
continue;
}
@@ -241,7 +240,7 @@ public class WALProcedureStore implements ProcedureStore {
}
@Override
- public Iterator<Procedure> load() throws IOException {
+ public void load(final ProcedureLoader loader) throws IOException {
if (logs.isEmpty()) {
throw new RuntimeException("recoverLease() must be called before loading data");
}
@@ -251,7 +250,8 @@ public class WALProcedureStore implements ProcedureStore {
if (LOG.isDebugEnabled()) {
LOG.debug("No state logs to replay.");
}
- return null;
+ loader.setMaxProcId(0);
+ return;
}
// Load the old logs
@@ -259,7 +259,22 @@ public class WALProcedureStore implements ProcedureStore {
Iterator<ProcedureWALFile> it = logs.descendingIterator();
it.next(); // Skip the current log
try {
- return ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() {
+ ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() {
+ @Override
+ public void setMaxProcId(long maxProcId) {
+ loader.setMaxProcId(maxProcId);
+ }
+
+ @Override
+ public void load(ProcedureIterator procIter) throws IOException {
+ loader.load(procIter);
+ }
+
+ @Override
+ public void handleCorrupted(ProcedureIterator procIter) throws IOException {
+ loader.handleCorrupted(procIter);
+ }
+
@Override
public void removeLog(ProcedureWALFile log) {
toRemove.add(log);
@@ -301,7 +316,7 @@ public class WALProcedureStore implements ProcedureStore {
}
// Push the transaction data and wait until it is persisted
- logId = pushData(slot);
+ pushData(slot);
} catch (IOException e) {
// We are not able to serialize the procedure.
// this is a code error, and we are not able to go on.
@@ -383,7 +398,7 @@ public class WALProcedureStore implements ProcedureStore {
storeTracker.delete(procId);
if (logId == flushLogId) {
if (storeTracker.isEmpty() && totalSynced.get() > rollThreshold) {
- removeOldLogs = rollWriterOrDie(logId + 1);
+ removeOldLogs = rollWriterOrDie();
}
}
}
@@ -541,9 +556,9 @@ public class WALProcedureStore implements ProcedureStore {
}
}
- private boolean rollWriterOrDie(final long logId) {
+ private boolean rollWriterOrDie() {
try {
- return rollWriter(logId);
+ return rollWriter();
} catch (IOException e) {
LOG.warn("Unable to roll the log", e);
sendAbortProcessSignal();
@@ -551,7 +566,13 @@ public class WALProcedureStore implements ProcedureStore {
}
}
+ protected boolean rollWriter() throws IOException {
+ return rollWriter(flushLogId + 1);
+ }
+
private boolean rollWriter(final long logId) throws IOException {
+ assert logId > flushLogId : "logId=" + logId + " flushLogId=" + flushLogId;
+
ProcedureWALHeader header = ProcedureWALHeader.newBuilder()
.setVersion(ProcedureWALFormat.HEADER_VERSION)
.setType(ProcedureWALFormat.LOG_TYPE_STREAM)
http://git-wip-us.apache.org/repos/asf/hbase/blob/24ef755f/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
index 7b9fc69..ddea9d2 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
@@ -57,11 +57,11 @@ public class ProcedureTestingUtility {
public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor)
throws Exception {
- restart(procExecutor, null);
+ restart(procExecutor, null, true);
}
public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor,
- Runnable beforeStartAction) throws Exception {
+ Runnable beforeStartAction, boolean failOnCorrupted) throws Exception {
ProcedureStore procStore = procExecutor.getStore();
int storeThreads = procExecutor.getNumThreads();
int execThreads = procExecutor.getNumThreads();
@@ -75,7 +75,7 @@ public class ProcedureTestingUtility {
}
// re-start
procStore.start(storeThreads);
- procExecutor.start(execThreads);
+ procExecutor.start(execThreads, failOnCorrupted);
}
public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
http://git-wip-us.apache.org/repos/asf/hbase/blob/24ef755f/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java
index 022f8ad..0b2a364 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java
@@ -70,7 +70,7 @@ public class TestProcedureExecution {
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore);
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
- procExecutor.start(PROCEDURE_EXECUTOR_SLOTS);
+ procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
}
@After
http://git-wip-us.apache.org/repos/asf/hbase/blob/24ef755f/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
index f21b6fa..7735b63 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
@@ -74,7 +74,7 @@ public class TestProcedureRecovery {
procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore);
procExecutor.testing = new ProcedureExecutor.Testing();
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
- procExecutor.start(PROCEDURE_EXECUTOR_SLOTS);
+ procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
procSleepInterval = 0;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/24ef755f/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
index 8a7c1a1..61c58e1 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
@@ -19,13 +19,17 @@
package org.apache.hadoop.hbase.procedure2;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -45,7 +49,7 @@ import static org.junit.Assert.fail;
public class TestProcedureReplayOrder {
private static final Log LOG = LogFactory.getLog(TestProcedureReplayOrder.class);
- private static final Procedure NULL_PROC = null;
+ private static final int NUM_THREADS = 16;
private ProcedureExecutor<Void> procExecutor;
private TestProcedureEnv procEnv;
@@ -59,7 +63,7 @@ public class TestProcedureReplayOrder {
@Before
public void setUp() throws IOException {
htu = new HBaseCommonTestingUtility();
- htu.getConfiguration().setInt("hbase.procedure.store.wal.sync.wait.msec", 10);
+ htu.getConfiguration().setInt("hbase.procedure.store.wal.sync.wait.msec", 25);
testDir = htu.getDataTestDir();
fs = testDir.getFileSystem(htu.getConfiguration());
@@ -69,8 +73,8 @@ public class TestProcedureReplayOrder {
procEnv = new TestProcedureEnv();
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
- procStore.start(24);
- procExecutor.start(1);
+ procStore.start(NUM_THREADS);
+ procExecutor.start(1, true);
}
@After
@@ -81,47 +85,45 @@ public class TestProcedureReplayOrder {
}
@Test(timeout=90000)
- public void testSingleStepReplyOrder() throws Exception {
- // avoid the procedure to be runnable
- procEnv.setAcquireLock(false);
+ public void testSingleStepReplayOrder() throws Exception {
+ final int NUM_PROC_XTHREAD = 32;
+ final int NUM_PROCS = NUM_THREADS * NUM_PROC_XTHREAD;
// submit the procedures
- submitProcedures(16, 25, TestSingleStepProcedure.class);
+ submitProcedures(NUM_THREADS, NUM_PROC_XTHREAD, TestSingleStepProcedure.class);
+
+ while (procEnv.getExecId() < NUM_PROCS) {
+ Thread.sleep(100);
+ }
// restart the executor and allow the procedures to run
- ProcedureTestingUtility.restart(procExecutor, new Runnable() {
- @Override
- public void run() {
- procEnv.setAcquireLock(true);
- }
- });
+ ProcedureTestingUtility.restart(procExecutor);
// wait the execution of all the procedures and
// assert that the execution order was sorted by procId
ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
- procEnv.assertSortedExecList();
-
- // TODO: FIXME: This should be revisited
+ procEnv.assertSortedExecList(NUM_PROCS);
}
- @Ignore
@Test(timeout=90000)
- public void testMultiStepReplyOrder() throws Exception {
- // avoid the procedure to be runnable
- procEnv.setAcquireLock(false);
+ public void testMultiStepReplayOrder() throws Exception {
+ final int NUM_PROC_XTHREAD = 24;
+ final int NUM_PROCS = NUM_THREADS * (NUM_PROC_XTHREAD * 2);
// submit the procedures
- submitProcedures(16, 10, TestTwoStepProcedure.class);
+ submitProcedures(NUM_THREADS, NUM_PROC_XTHREAD, TestTwoStepProcedure.class);
+
+ while (procEnv.getExecId() < NUM_PROCS) {
+ Thread.sleep(100);
+ }
// restart the executor and allow the procedures to run
- ProcedureTestingUtility.restart(procExecutor, new Runnable() {
- @Override
- public void run() {
- procEnv.setAcquireLock(true);
- }
- });
+ ProcedureTestingUtility.restart(procExecutor);
- fail("TODO: FIXME: NOT IMPLEMENT REPLAY ORDER");
+ // wait the execution of all the procedures and
+ // assert that the execution order was sorted by procId
+ ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
+ procEnv.assertSortedExecList(NUM_PROCS);
}
private void submitProcedures(final int nthreads, final int nprocPerThread,
@@ -153,46 +155,38 @@ public class TestProcedureReplayOrder {
}
private static class TestProcedureEnv {
- private ArrayList<Long> execList = new ArrayList<Long>();
- private boolean acquireLock = true;
-
- public void setAcquireLock(boolean acquireLock) {
- this.acquireLock = acquireLock;
- }
+ private ArrayList<TestProcedure> execList = new ArrayList<TestProcedure>();
+ private AtomicLong execTimestamp = new AtomicLong(0);
- public boolean canAcquireLock() {
- return acquireLock;
+ public long getExecId() {
+ return execTimestamp.get();
}
- public void addToExecList(final Procedure proc) {
- execList.add(proc.getProcId());
+ public long nextExecId() {
+ return execTimestamp.incrementAndGet();
}
- public ArrayList<Long> getExecList() {
- return execList;
+ public void addToExecList(final TestProcedure proc) {
+ execList.add(proc);
}
- public void assertSortedExecList() {
+ public void assertSortedExecList(int numProcs) {
+ assertEquals(numProcs, execList.size());
LOG.debug("EXEC LIST: " + execList);
- for (int i = 1; i < execList.size(); ++i) {
- assertTrue("exec list not sorted: " + execList.get(i-1) + " >= " + execList.get(i),
- execList.get(i-1) < execList.get(i));
+ for (int i = 0; i < execList.size() - 1; ++i) {
+ TestProcedure a = execList.get(i);
+ TestProcedure b = execList.get(i + 1);
+ assertTrue("exec list not sorted: " + a + " < " + b, a.getExecId() > b.getExecId());
}
}
}
- public static class TestSingleStepProcedure extends SequentialProcedure<TestProcedureEnv> {
- public TestSingleStepProcedure() { }
-
- @Override
- protected Procedure[] execute(TestProcedureEnv env) {
- LOG.debug("execute procedure " + this);
- env.addToExecList(this);
- return null;
- }
+ public static abstract class TestProcedure extends Procedure<TestProcedureEnv> {
+ protected long execId = 0;
+ protected int step = 0;
- protected boolean acquireLock(final TestProcedureEnv env) {
- return env.canAcquireLock();
+ public long getExecId() {
+ return execId;
}
@Override
@@ -200,26 +194,62 @@ public class TestProcedureReplayOrder {
@Override
protected boolean abort(TestProcedureEnv env) { return true; }
+
+ @Override
+ protected void serializeStateData(final OutputStream stream) throws IOException {
+ StreamUtils.writeLong(stream, execId);
+ }
+
+ @Override
+ protected void deserializeStateData(final InputStream stream) throws IOException {
+ execId = StreamUtils.readLong(stream);
+ step = 2;
+ }
}
- public static class TestTwoStepProcedure extends SequentialProcedure<TestProcedureEnv> {
- public TestTwoStepProcedure() { }
+ public static class TestSingleStepProcedure extends TestProcedure {
+ public TestSingleStepProcedure() { }
@Override
- protected Procedure[] execute(TestProcedureEnv env) {
- LOG.debug("execute procedure " + this);
- env.addToExecList(this);
- return new Procedure[] { new TestSingleStepProcedure() };
+ protected Procedure[] execute(TestProcedureEnv env) throws ProcedureYieldException {
+ LOG.trace("execute procedure step=" + step + ": " + this);
+ if (step == 0) {
+ step = 1;
+ execId = env.nextExecId();
+ return new Procedure[] { this };
+ } else if (step == 2) {
+ env.addToExecList(this);
+ return null;
+ }
+ throw new ProcedureYieldException();
}
- protected boolean acquireLock(final TestProcedureEnv env) {
- return true;
+ @Override
+ public String toString() {
+ return "SingleStep(procId=" + getProcId() + " execId=" + execId + ")";
}
+ }
+
+ public static class TestTwoStepProcedure extends TestProcedure {
+ public TestTwoStepProcedure() { }
@Override
- protected void rollback(TestProcedureEnv env) { }
+ protected Procedure[] execute(TestProcedureEnv env) throws ProcedureYieldException {
+ LOG.trace("execute procedure step=" + step + ": " + this);
+ if (step == 0) {
+ step = 1;
+ execId = env.nextExecId();
+ return new Procedure[] { new TestSingleStepProcedure() };
+ } else if (step == 2) {
+ env.addToExecList(this);
+ return null;
+ }
+ throw new ProcedureYieldException();
+ }
@Override
- protected boolean abort(TestProcedureEnv env) { return true; }
+ public String toString() {
+ return "TwoStep(procId=" + getProcId() + " execId=" + execId + ")";
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/24ef755f/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
index 1829d4b..19a9ea4 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
@@ -22,6 +22,10 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
import java.util.Iterator;
import java.util.HashSet;
import java.util.Set;
@@ -35,6 +39,8 @@ import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.SequentialProcedure;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IOUtils;
@@ -83,17 +89,20 @@ public class TestWALProcedureStore {
fs.delete(logDir, true);
}
- private Iterator<Procedure> storeRestart() throws Exception {
+ private void storeRestart(ProcedureStore.ProcedureLoader loader) throws Exception {
procStore.stop(false);
procStore.start(PROCEDURE_STORE_SLOTS);
procStore.recoverLease();
- return procStore.load();
+ procStore.load(loader);
}
@Test
public void testEmptyLogLoad() throws Exception {
- Iterator<Procedure> loader = storeRestart();
- assertEquals(0, countProcedures(loader));
+ LoadCounter loader = new LoadCounter();
+ storeRestart(loader);
+ assertEquals(0, loader.getMaxProcId());
+ assertEquals(0, loader.getLoadedCount());
+ assertEquals(0, loader.getCorruptedCount());
}
@Test
@@ -152,8 +161,10 @@ public class TestWALProcedureStore {
assertEquals(1, logs.length);
corruptLog(logs[0], 4);
- int count = countProcedures(storeRestart());
- assertEquals(100, count);
+ LoadCounter loader = new LoadCounter();
+ storeRestart(loader);
+ assertEquals(100, loader.getLoadedCount());
+ assertEquals(0, loader.getCorruptedCount());
}
@Test
@@ -172,10 +183,205 @@ public class TestWALProcedureStore {
assertEquals(1, logs.length);
corruptLog(logs[0], 1823);
- int count = countProcedures(storeRestart());
+ LoadCounter loader = new LoadCounter();
+ storeRestart(loader);
assertTrue(procStore.getCorruptedLogs() != null);
assertEquals(1, procStore.getCorruptedLogs().size());
- assertEquals(85, count);
+ assertEquals(85, loader.getLoadedCount());
+ assertEquals(0, loader.getCorruptedCount());
+ }
+
+ @Test
+ public void testCorruptedProcedures() throws Exception {
+ // Insert root-procedures
+ TestProcedure[] rootProcs = new TestProcedure[10];
+ for (int i = 1; i <= rootProcs.length; i++) {
+ rootProcs[i-1] = new TestProcedure(i, 0);
+ procStore.insert(rootProcs[i-1], null);
+ rootProcs[i-1].addStackId(0);
+ procStore.update(rootProcs[i-1]);
+ }
+ // insert root-child txn
+ procStore.rollWriter();
+ for (int i = 1; i <= rootProcs.length; i++) {
+ TestProcedure b = new TestProcedure(rootProcs.length + i, i);
+ rootProcs[i-1].addStackId(1);
+ procStore.insert(rootProcs[i-1], new Procedure[] { b });
+ }
+ // insert child updates
+ procStore.rollWriter();
+ for (int i = 1; i <= rootProcs.length; i++) {
+ procStore.update(new TestProcedure(rootProcs.length + i, i));
+ }
+
+ // Stop the store
+ procStore.stop(false);
+
+ // Remove 4 byte from the trailer
+ FileStatus[] logs = fs.listStatus(logDir);
+ assertEquals(3, logs.length);
+ Arrays.sort(logs, new Comparator<FileStatus>() {
+ @Override
+ public int compare(FileStatus o1, FileStatus o2) {
+ return o1.getPath().getName().compareTo(o2.getPath().getName());
+ }
+ });
+
+ // Remove the first log, we have insert-txn and updates in the others so everything is fine.
+ fs.delete(logs[0].getPath(), false);
+ LoadCounter loader = new LoadCounter();
+ storeRestart(loader);
+ assertEquals(rootProcs.length * 2, loader.getLoadedCount());
+ assertEquals(0, loader.getCorruptedCount());
+
+ // Remove the second log, we have lost any root/parent references
+ fs.delete(logs[1].getPath(), false);
+ loader.reset();
+ storeRestart(loader);
+ assertEquals(0, loader.getLoadedCount());
+ assertEquals(rootProcs.length, loader.getCorruptedCount());
+ for (Procedure proc: loader.getCorrupted()) {
+ assertTrue(proc.toString(), proc.getParentProcId() <= rootProcs.length);
+ assertTrue(proc.toString(),
+ proc.getProcId() > rootProcs.length &&
+ proc.getProcId() <= (rootProcs.length * 2));
+ }
+ }
+
+ @Test(timeout=60000)
+ public void testWalReplayOrder_AB_A() throws Exception {
+ /*
+ * | A B | -> | A |
+ */
+ TestProcedure a = new TestProcedure(1, 0);
+ TestProcedure b = new TestProcedure(2, 1);
+
+ procStore.insert(a, null);
+ a.addStackId(0);
+ procStore.update(a);
+
+ procStore.insert(a, new Procedure[] { b });
+ b.addStackId(1);
+ procStore.update(b);
+
+ procStore.rollWriter();
+
+ a.addStackId(2);
+ procStore.update(a);
+
+ storeRestart(new ProcedureStore.ProcedureLoader() {
+ @Override
+ public void setMaxProcId(long maxProcId) {
+ assertEquals(2, maxProcId);
+ }
+
+ @Override
+ public void load(ProcedureIterator procIter) throws IOException {
+ assertTrue(procIter.hasNext());
+ assertEquals(1, procIter.next().getProcId());
+ assertTrue(procIter.hasNext());
+ assertEquals(2, procIter.next().getProcId());
+ assertFalse(procIter.hasNext());
+ }
+
+ @Override
+ public void handleCorrupted(ProcedureIterator procIter) throws IOException {
+ assertFalse(procIter.hasNext());
+ }
+ });
+ }
+
+ @Test(timeout=60000)
+ public void testWalReplayOrder_ABC_BAD() throws Exception {
+ /*
+ * | A B C | -> | B A D |
+ */
+ TestProcedure a = new TestProcedure(1, 0);
+ TestProcedure b = new TestProcedure(2, 1);
+ TestProcedure c = new TestProcedure(3, 2);
+ TestProcedure d = new TestProcedure(4, 0);
+
+ procStore.insert(a, null);
+ a.addStackId(0);
+ procStore.update(a);
+
+ procStore.insert(a, new Procedure[] { b });
+ b.addStackId(1);
+ procStore.update(b);
+
+ procStore.insert(b, new Procedure[] { c });
+ b.addStackId(2);
+ procStore.update(b);
+
+ procStore.rollWriter();
+
+ b.addStackId(3);
+ procStore.update(b);
+
+ a.addStackId(4);
+ procStore.update(a);
+
+ procStore.insert(d, null);
+ d.addStackId(0);
+ procStore.update(d);
+
+ storeRestart(new ProcedureStore.ProcedureLoader() {
+ @Override
+ public void setMaxProcId(long maxProcId) {
+ assertEquals(4, maxProcId);
+ }
+
+ @Override
+ public void load(ProcedureIterator procIter) throws IOException {
+ assertTrue(procIter.hasNext());
+ assertEquals(4, procIter.next().getProcId());
+ // TODO: This will be multiple call once we do fast-start
+ //assertFalse(procIter.hasNext());
+
+ assertTrue(procIter.hasNext());
+ assertEquals(1, procIter.next().getProcId());
+ assertTrue(procIter.hasNext());
+ assertEquals(2, procIter.next().getProcId());
+ assertTrue(procIter.hasNext());
+ assertEquals(3, procIter.next().getProcId());
+ assertFalse(procIter.hasNext());
+ }
+
+ @Override
+ public void handleCorrupted(ProcedureIterator procIter) throws IOException {
+ assertFalse(procIter.hasNext());
+ }
+ });
+ }
+
+ public static class TestProcedure extends Procedure<Void> {
+ public TestProcedure() {}
+
+ public TestProcedure(long procId, long parentId) {
+ setProcId(procId);
+ if (parentId > 0) {
+ setParentProcId(parentId);
+ }
+ }
+
+ public void addStackId(final int index) {
+ addStackIndex(index);
+ }
+
+ @Override
+ protected Procedure[] execute(Void env) { return null; }
+
+ @Override
+ protected void rollback(Void env) { }
+
+ @Override
+ protected boolean abort(Void env) { return false; }
+
+ @Override
+ protected void serializeStateData(final OutputStream stream) throws IOException { }
+
+ @Override
+ protected void deserializeStateData(final InputStream stream) throws IOException { }
}
private void corruptLog(final FileStatus logFile, final long dropBytes)
@@ -191,29 +397,11 @@ public class TestWALProcedureStore {
}
private void verifyProcIdsOnRestart(final Set<Long> procIds) throws Exception {
- int count = 0;
- Iterator<Procedure> loader = storeRestart();
- while (loader.hasNext()) {
- Procedure proc = loader.next();
- LOG.debug("loading procId=" + proc.getProcId());
- assertTrue("procId=" + proc.getProcId() + " unexpected", procIds.contains(proc.getProcId()));
- count++;
- }
- assertEquals(procIds.size(), count);
- }
-
- private void assertIsEmpty(Iterator<Procedure> iterator) {
- assertEquals(0, countProcedures(iterator));
- }
-
- private int countProcedures(Iterator<Procedure> iterator) {
- int count = 0;
- while (iterator.hasNext()) {
- Procedure proc = iterator.next();
- LOG.trace("loading procId=" + proc.getProcId());
- count++;
- }
- return count;
+ LOG.debug("expected: " + procIds);
+ LoadCounter loader = new LoadCounter();
+ storeRestart(loader);
+ assertEquals(procIds.size(), loader.getLoadedCount());
+ assertEquals(0, loader.getCorruptedCount());
}
private void assertEmptyLogDir() {
@@ -263,4 +451,78 @@ public class TestWALProcedureStore {
}
}
}
+
+ private class LoadCounter implements ProcedureStore.ProcedureLoader {
+ private final ArrayList<Procedure> corrupted = new ArrayList<Procedure>();
+ private final ArrayList<Procedure> loaded = new ArrayList<Procedure>();
+
+ private Set<Long> procIds;
+ private long maxProcId = 0;
+
+ public LoadCounter() {
+ this(null);
+ }
+
+ public LoadCounter(final Set<Long> procIds) {
+ this.procIds = procIds;
+ }
+
+ public void reset() {
+ reset(null);
+ }
+
+ public void reset(final Set<Long> procIds) {
+ corrupted.clear();
+ loaded.clear();
+ this.procIds = procIds;
+ this.maxProcId = 0;
+ }
+
+ public long getMaxProcId() {
+ return maxProcId;
+ }
+
+ public ArrayList<Procedure> getLoaded() {
+ return loaded;
+ }
+
+ public int getLoadedCount() {
+ return loaded.size();
+ }
+
+ public ArrayList<Procedure> getCorrupted() {
+ return corrupted;
+ }
+
+ public int getCorruptedCount() {
+ return corrupted.size();
+ }
+
+ @Override
+ public void setMaxProcId(long maxProcId) {
+ maxProcId = maxProcId;
+ }
+
+ @Override
+ public void load(ProcedureIterator procIter) throws IOException {
+ while (procIter.hasNext()) {
+ Procedure proc = procIter.next();
+ LOG.debug("loading procId=" + proc.getProcId() + ": " + proc);
+ if (procIds != null) {
+ assertTrue("procId=" + proc.getProcId() + " unexpected",
+ procIds.contains(proc.getProcId()));
+ }
+ loaded.add(proc);
+ }
+ }
+
+ @Override
+ public void handleCorrupted(ProcedureIterator procIter) throws IOException {
+ while (procIter.hasNext()) {
+ Procedure proc = procIter.next();
+ LOG.debug("corrupted procId=" + proc.getProcId() + ": " + proc);
+ corrupted.add(proc);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/24ef755f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 2b180ad..7c99abb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -1131,8 +1131,11 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
Math.max(Runtime.getRuntime().availableProcessors(),
MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
+ final boolean abortOnCorruption = conf.getBoolean(
+ MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
+ MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
procedureStore.start(numThreads);
- procedureExecutor.start(numThreads);
+ procedureExecutor.start(numThreads, abortOnCorruption);
}
private void stopProcedureExecutor() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/24ef755f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java
index 90ed4ee..c21137d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java
@@ -24,8 +24,21 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
public final class MasterProcedureConstants {
private MasterProcedureConstants() {}
+ /** Used to construct the name of the log directory for master procedures */
public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs";
+ /** Number of threads used by the procedure executor */
public static final String MASTER_PROCEDURE_THREADS = "hbase.master.procedure.threads";
public static final int DEFAULT_MIN_MASTER_PROCEDURE_THREADS = 4;
+
+ /**
+ * Procedure replay sanity check. In case a WAL is missing or unreadable we
+ * may lose information about pending/running procedures.
+ * Set this to true in case you want the Master failing on load if a corrupted
+ * procedure is encountred.
+ * (Default is off, because we prefer having the Master up and running and
+ * fix the "in transition" state "by hand")
+ */
+ public static final String EXECUTOR_ABORT_ON_CORRUPTION = "hbase.procedure.abort.on.corruption";
+ public static final boolean DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION = false;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/24ef755f/hbase-server/src/test/resources/hbase-site.xml
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/resources/hbase-site.xml b/hbase-server/src/test/resources/hbase-site.xml
index 8c8312c..34a1b20 100644
--- a/hbase-server/src/test/resources/hbase-site.xml
+++ b/hbase-server/src/test/resources/hbase-site.xml
@@ -147,4 +147,11 @@
<description>Skip sanity checks in tests
</description>
</property>
+ <property>
+ <name>hbase.procedure.fail.on.corruption</name>
+ <value>true</value>
+ <description>
+ Enable replay sanity checks on procedure tests.
+ </description>
+ </property>
</configuration>