You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/10/22 01:33:33 UTC
hbase git commit: Revert "HBASE-21336 Simplify the implementation of
WALProcedureMap"
Repository: hbase
Updated Branches:
refs/heads/master dd474ef19 -> 7d7293049
Revert "HBASE-21336 Simplify the implementation of WALProcedureMap"
This reverts commit 7adf590106826b9e4432cfeee06acdc0ccff8c6e.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7d729304
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7d729304
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7d729304
Branch: refs/heads/master
Commit: 7d7293049ac5268d40eb2de4d6564c6bc9d18a39
Parents: dd474ef
Author: Duo Zhang <zh...@apache.org>
Authored: Mon Oct 22 09:32:55 2018 +0800
Committer: Duo Zhang <zh...@apache.org>
Committed: Mon Oct 22 09:32:55 2018 +0800
----------------------------------------------------------------------
.../hbase/procedure2/ProcedureExecutor.java | 102 ++--
.../hbase/procedure2/store/ProcedureStore.java | 6 -
.../store/wal/ProcedureWALFormatReader.java | 93 ++-
.../procedure2/store/wal/WALProcedureMap.java | 591 +++++++++++++++++--
.../procedure2/store/wal/WALProcedureTree.java | 299 ----------
.../store/wal/TestStressWALProcedureStore.java | 2 +
.../store/wal/TestWALProcedureStore.java | 164 ++++-
.../store/wal/TestWALProcedureTree.java | 173 ------
.../hadoop/hbase/HBaseTestingUtility.java | 1 -
9 files changed, 804 insertions(+), 627 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/7d729304/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 91a305b..0e2d9b8 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -505,10 +505,8 @@ public class ProcedureExecutor<TEnvironment> {
private void loadProcedures(ProcedureIterator procIter, boolean abortOnCorruption)
throws IOException {
// 1. Build the rollback stack
- int runnableCount = 0;
+ int runnablesCount = 0;
int failedCount = 0;
- int waitingCount = 0;
- int waitingTimeoutCount = 0;
while (procIter.hasNext()) {
boolean finished = procIter.isNextFinished();
@SuppressWarnings("unchecked")
@@ -528,21 +526,11 @@ public class ProcedureExecutor<TEnvironment> {
// add the procedure to the map
proc.beforeReplay(getEnvironment());
procedures.put(proc.getProcId(), proc);
- switch (proc.getState()) {
- case RUNNABLE:
- runnableCount++;
- break;
- case FAILED:
- failedCount++;
- break;
- case WAITING:
- waitingCount++;
- break;
- case WAITING_TIMEOUT:
- waitingTimeoutCount++;
- break;
- default:
- break;
+
+ if (proc.getState() == ProcedureState.RUNNABLE) {
+ runnablesCount++;
+ } else if (proc.getState() == ProcedureState.FAILED) {
+ failedCount++;
}
}
@@ -563,10 +551,9 @@ public class ProcedureExecutor<TEnvironment> {
// have been polled out already, so when loading we can not add the procedure to scheduler first
// and then call acquireLock, since the procedure is still in the queue, and since we will
// remove the queue from runQueue, then no one can poll it out, then there is a dead lock
- List<Procedure<TEnvironment>> runnableList = new ArrayList<>(runnableCount);
+ List<Procedure<TEnvironment>> runnableList = new ArrayList<>(runnablesCount);
List<Procedure<TEnvironment>> failedList = new ArrayList<>(failedCount);
- List<Procedure<TEnvironment>> waitingList = new ArrayList<>(waitingCount);
- List<Procedure<TEnvironment>> waitingTimeoutList = new ArrayList<>(waitingTimeoutCount);
+ Set<Procedure<TEnvironment>> waitingSet = null;
procIter.reset();
while (procIter.hasNext()) {
if (procIter.isNextFinished()) {
@@ -604,10 +591,26 @@ public class ProcedureExecutor<TEnvironment> {
runnableList.add(proc);
break;
case WAITING:
- waitingList.add(proc);
+ if (!proc.hasChildren()) {
+ // Normally, WAITING procedures should be waken by its children.
+ // But, there is a case that, all the children are successful and before
+ // they can wake up their parent procedure, the master was killed.
+ // So, during recovering the procedures from ProcedureWal, its children
+ // are not loaded because of their SUCCESS state.
+ // So we need to continue to run this WAITING procedure. But before
+ // executing, we need to set its state to RUNNABLE, otherwise, a exception
+ // will throw:
+ // Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,
+ // "NOT RUNNABLE! " + procedure.toString());
+ proc.setState(ProcedureState.RUNNABLE);
+ runnableList.add(proc);
+ }
break;
case WAITING_TIMEOUT:
- waitingTimeoutList.add(proc);
+ if (waitingSet == null) {
+ waitingSet = new HashSet<>();
+ }
+ waitingSet.add(proc);
break;
case FAILED:
failedList.add(proc);
@@ -622,32 +625,39 @@ public class ProcedureExecutor<TEnvironment> {
}
}
- // 4. Check the waiting procedures to see if some of them can be added to runnable.
- waitingList.forEach(proc -> {
- if (!proc.hasChildren()) {
- // Normally, WAITING procedures should be waken by its children.
- // But, there is a case that, all the children are successful and before
- // they can wake up their parent procedure, the master was killed.
- // So, during recovering the procedures from ProcedureWal, its children
- // are not loaded because of their SUCCESS state.
- // So we need to continue to run this WAITING procedure. But before
- // executing, we need to set its state to RUNNABLE, otherwise, a exception
- // will throw:
- // Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,
- // "NOT RUNNABLE! " + procedure.toString());
- proc.setState(ProcedureState.RUNNABLE);
- runnableList.add(proc);
+ // 3. Validate the stacks
+ int corruptedCount = 0;
+ Iterator<Map.Entry<Long, RootProcedureState<TEnvironment>>> itStack =
+ rollbackStack.entrySet().iterator();
+ while (itStack.hasNext()) {
+ Map.Entry<Long, RootProcedureState<TEnvironment>> entry = itStack.next();
+ RootProcedureState<TEnvironment> procStack = entry.getValue();
+ if (procStack.isValid()) continue;
+
+ for (Procedure<TEnvironment> proc : procStack.getSubproceduresStack()) {
+ LOG.error("Corrupted " + proc);
+ procedures.remove(proc.getProcId());
+ runnableList.remove(proc);
+ if (waitingSet != null) waitingSet.remove(proc);
+ corruptedCount++;
}
- });
+ itStack.remove();
+ }
- // 5. Push the procedures to the timeout executor
- waitingTimeoutList.forEach(proc -> {
- proc.afterReplay(getEnvironment());
- timeoutExecutor.add(proc);
- });
- // 6. restore locks
+ if (abortOnCorruption && corruptedCount > 0) {
+ throw new IOException("found " + corruptedCount + " procedures on replay");
+ }
+
+ // 4. Push the procedures to the timeout executor
+ if (waitingSet != null && !waitingSet.isEmpty()) {
+ for (Procedure<TEnvironment> proc: waitingSet) {
+ proc.afterReplay(getEnvironment());
+ timeoutExecutor.add(proc);
+ }
+ }
+ // 5. restore locks
restoreLocks();
- // 7. Push the procedure to the scheduler
+ // 6. Push the procedure to the scheduler
failedList.forEach(scheduler::addBack);
runnableList.forEach(p -> {
p.afterReplay(getEnvironment());
http://git-wip-us.apache.org/repos/asf/hbase/blob/7d729304/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
index d737a7a..0599acf 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
@@ -85,18 +85,12 @@ public interface ProcedureStore {
boolean hasNext();
/**
- * Calling this method does not need to converting the protobuf message to the Procedure class,
- * so if it returns true we can call {@link #skipNext()} to skip the procedure without
- * deserializing. This could increase the performance.
* @return true if the iterator next element is a completed procedure.
*/
boolean isNextFinished();
/**
* Skip the next procedure
- * <p/>
- * This method is used to skip the deserializing of the procedure to increase performance, as
- * when calling next we need to convert the protobuf message to the Procedure class.
*/
void skipNext();
http://git-wip-us.apache.org/repos/asf/hbase/blob/7d729304/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
index 2e1e06c..1ac8e01 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.procedure2.store.wal;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -30,25 +31,70 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
/**
- * Helper class that loads the procedures stored in a WAL.
+ * Helper class that loads the procedures stored in a WAL
*/
@InterfaceAudience.Private
public class ProcedureWALFormatReader {
private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFormatReader.class);
- /**
- * We will use the localProcedureMap to track the active procedures for the current proc wal file,
- * and when we finished reading one proc wal file, we will merge he localProcedureMap to the
- * procedureMap, which tracks the global active procedures.
- * <p/>
- * See the comments of {@link WALProcedureMap} for more details.
- * <p/>
- * After reading all the proc wal files, we will use the procedures in the procedureMap to build a
- * {@link WALProcedureTree}, and then give the result to the upper layer. See the comments of
- * {@link WALProcedureTree} and the code in {@link #finish()} for more details.
- */
- private final WALProcedureMap localProcedureMap = new WALProcedureMap();
- private final WALProcedureMap procedureMap = new WALProcedureMap();
+ // ==============================================================================================
+ // We read the WALs in reverse order from the newest to the oldest.
+ // We have different entry types:
+ // - INIT: Procedure submitted by the user (also known as 'root procedure')
+ // - INSERT: Children added to the procedure <parentId>:[<childId>, ...]
+ // - UPDATE: The specified procedure was updated
+ // - DELETE: The procedure was removed (finished/rolledback and result TTL expired)
+ //
+ // In the WAL we can find multiple times the same procedure as UPDATE or INSERT.
+ // We read the WAL from top to bottom, so every time we find an entry of the
+ // same procedure, that will be the "latest" update (Caveat: with multiple threads writing
+ // the store, this assumption does not hold).
+ //
+ // We keep two in-memory maps:
+ // - localProcedureMap: is the map containing the entries in the WAL we are processing
+ // - procedureMap: is the map containing all the procedures we found up to the WAL in process.
+ // localProcedureMap is merged with the procedureMap once we reach the WAL EOF.
+ //
+ // Since we are reading the WALs in reverse order (newest to oldest),
+ // if we find an entry related to a procedure we already have in 'procedureMap' we can discard it.
+ //
+ // The WAL is append-only so the last procedure in the WAL is the one that
+ // was in execution at the time we crashed/closed the server.
+ // Given that, the procedure replay order can be inferred by the WAL order.
+ //
+ // Example:
+ // WAL-2: [A, B, A, C, D]
+ // WAL-1: [F, G, A, F, B]
+ // Replay-Order: [D, C, A, B, F, G]
+ //
+ // The "localProcedureMap" keeps a "replayOrder" list. Every time we add the
+ // record to the map that record is moved to the head of the "replayOrder" list.
+ // Using the example above:
+ // WAL-2 localProcedureMap.replayOrder is [D, C, A, B]
+ // WAL-1 localProcedureMap.replayOrder is [F, G]
+ //
+ // Each time we reach the WAL-EOF, the "replayOrder" list is merged/appended in 'procedureMap'
+ // so using the example above we end up with: [D, C, A, B] + [F, G] as replay order.
+ //
+ // Fast Start: INIT/INSERT record and StackIDs
+ // ---------------------------------------------
+ // We have two special records, INIT and INSERT, that track the first time
+ // the procedure was added to the WAL. We can use this information to be able
+ // to start procedures before reaching the end of the WAL, or before reading all WALs.
+ // But in some cases, the WAL with that record can be already gone.
+ // As an alternative, we can use the stackIds on each procedure,
+ // to identify when a procedure is ready to start.
+ // If there are gaps in the sum of the stackIds we need to read more WALs.
+ //
+ // Example (all procs child of A):
+ // WAL-2: [A, B] A stackIds = [0, 4], B stackIds = [1, 5]
+ // WAL-1: [A, B, C, D]
+ //
+ // In the case above we need to read one more WAL to be able to consider
+ // the root procedure A and all children as ready.
+ // ==============================================================================================
+ private final WALProcedureMap localProcedureMap = new WALProcedureMap(1024);
+ private final WALProcedureMap procedureMap = new WALProcedureMap(1024);
private final ProcedureWALFormat.Loader loader;
@@ -132,7 +178,7 @@ public class ProcedureWALFormatReader {
localTracker.setMinMaxModifiedProcIds(localProcedureMap.getMinModifiedProcId(),
localProcedureMap.getMaxModifiedProcId());
}
- procedureMap.merge(localProcedureMap);
+ procedureMap.mergeTail(localProcedureMap);
}
if (localTracker.isPartial()) {
localTracker.setPartialFlag(false);
@@ -143,11 +189,18 @@ public class ProcedureWALFormatReader {
// notify the loader about the max proc ID
loader.setMaxProcId(maxProcId);
- // build the procedure execution tree. When building we will verify that whether a procedure is
- // valid.
- WALProcedureTree tree = WALProcedureTree.build(procedureMap.getProcedures());
- loader.load(tree.getValidProcs());
- loader.handleCorrupted(tree.getCorruptedProcs());
+ // fetch the procedure ready to run.
+ ProcedureIterator procIter = procedureMap.fetchReady();
+ if (procIter != null) {
+ loader.load(procIter);
+ }
+
+ // remaining procedures have missing link or dependencies
+ // consider them as corrupted, manual fix is probably required.
+ procIter = procedureMap.fetchAll();
+ if (procIter != null) {
+ loader.handleCorrupted(procIter);
+ }
}
private void setDeletedIfPartial(ProcedureStoreTracker tracker, long procId) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/7d729304/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java
index 9cda1bc..18d7823 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java
@@ -17,50 +17,193 @@
*/
package org.apache.hadoop.hbase.procedure2.store.wal;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.yetus.audience.InterfaceAudience;
+import java.io.IOException;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
/**
- * This class is used to track the active procedures when loading procedures from proc wal file.
- * <p/>
- * We will read proc wal files from new to old, but when reading a proc wal file, we will still read
- * from top to bottom, so there are two groups of methods for this class.
- * <p/>
- * The first group is {@link #add(ProcedureProtos.Procedure)} and {@link #remove(long)}. It is used
- * when reading a proc wal file. In these methods, for the same procedure, typically the one comes
- * later should win, please see the comment for
- * {@link #isIncreasing(ProcedureProtos.Procedure, ProcedureProtos.Procedure)} to see the
- * exceptions.
- * <p/>
- * The second group is {@link #merge(WALProcedureMap)}. We will have a global
- * {@link WALProcedureMap} to hold global the active procedures, and a local {@link WALProcedureMap}
- * to hold the active procedures for the current proc wal file. And when we finish reading a proc
- * wal file, we will merge the local one into the global one, by calling the
- * {@link #merge(WALProcedureMap)} method of the global one and pass the local one in. In this
- * method, for the same procedure, the one comes earlier will win, as we read the proc wal files
- * from new to old(the reverse order).
+ * We keep an in-memory map of the procedures sorted by replay order. (see the details in the
+ * beginning of {@link ProcedureWALFormatReader}).
+ *
+ * <pre>
+ * procedureMap = | A | | E | | C | | | | | G | | |
+ * D B
+ * replayOrderHead = C <-> B <-> E <-> D <-> A <-> G
+ *
+ * We also have a lazy grouping by "root procedure", and a list of
+ * unlinked procedures. If after reading all the WALs we have unlinked
+ * procedures it means that we had a missing WAL or a corruption.
+ * rootHead = A <-> D <-> G
+ * B E
+ * C
+ * unlinkFromLinkList = None
+ * </pre>
*/
-@InterfaceAudience.Private
class WALProcedureMap {
private static final Logger LOG = LoggerFactory.getLogger(WALProcedureMap.class);
- private final Map<Long, ProcedureProtos.Procedure> procMap = new HashMap<>();
+ private static class Entry {
+ // For bucketed linked lists in hash-table.
+ private Entry hashNext;
+ // child head
+ private Entry childHead;
+ // double-link for rootHead or childHead
+ private Entry linkNext;
+ private Entry linkPrev;
+ // replay double-linked-list
+ private Entry replayNext;
+ private Entry replayPrev;
+ // procedure-infos
+ private Procedure<?> procedure;
+ private ProcedureProtos.Procedure proto;
+ private boolean ready = false;
- private long minModifiedProcId = Long.MAX_VALUE;
+ public Entry(Entry hashNext) {
+ this.hashNext = hashNext;
+ }
+
+ public long getProcId() {
+ return proto.getProcId();
+ }
+
+ public long getParentId() {
+ return proto.getParentId();
+ }
+
+ public boolean hasParent() {
+ return proto.hasParentId();
+ }
+
+ public boolean isReady() {
+ return ready;
+ }
+
+ public boolean isFinished() {
+ if (!hasParent()) {
+ // we only consider 'root' procedures. because for the user 'finished'
+ // means when everything up to the 'root' is finished.
+ switch (proto.getState()) {
+ case ROLLEDBACK:
+ case SUCCESS:
+ return true;
+ default:
+ break;
+ }
+ }
+ return false;
+ }
+
+ public Procedure<?> convert() throws IOException {
+ if (procedure == null) {
+ procedure = ProcedureUtil.convertToProcedure(proto);
+ }
+ return procedure;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("Entry(");
+ sb.append(getProcId());
+ sb.append(", parentId=");
+ sb.append(getParentId());
+ sb.append(", class=");
+ sb.append(proto.getClassName());
+ sb.append(")");
+ return sb.toString();
+ }
+ }
+
+ private static class EntryIterator implements ProcedureIterator {
+ private final Entry replayHead;
+ private Entry current;
+
+ public EntryIterator(Entry replayHead) {
+ this.replayHead = replayHead;
+ this.current = replayHead;
+ }
+
+ @Override
+ public void reset() {
+ this.current = replayHead;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return current != null;
+ }
+
+ @Override
+ public boolean isNextFinished() {
+ return current != null && current.isFinished();
+ }
+
+ @Override
+ public void skipNext() {
+ current = current.replayNext;
+ }
+
+ @Override
+ public Procedure<?> next() throws IOException {
+ try {
+ return current.convert();
+ } finally {
+ current = current.replayNext;
+ }
+ }
+ }
+
+ // procedure hash table
+ private Entry[] procedureMap;
+ // replay-order double-linked-list
+ private Entry replayOrderHead;
+ private Entry replayOrderTail;
+
+ // root linked-list
+ private Entry rootHead;
+
+ // pending unlinked children (root not present yet)
+ private Entry childUnlinkedHead;
+
+ // Track ProcId range
+ private long minModifiedProcId = Long.MAX_VALUE;
private long maxModifiedProcId = Long.MIN_VALUE;
- private void trackProcId(long procId) {
- minModifiedProcId = Math.min(minModifiedProcId, procId);
- maxModifiedProcId = Math.max(maxModifiedProcId, procId);
+ public WALProcedureMap(int size) {
+ procedureMap = new Entry[size];
+ replayOrderHead = null;
+ replayOrderTail = null;
+ rootHead = null;
+ childUnlinkedHead = null;
+ }
+
+ public void add(ProcedureProtos.Procedure procProto) {
+ trackProcIds(procProto.getProcId());
+ Entry entry = addToMap(procProto.getProcId(), procProto.hasParentId());
+ boolean newEntry = entry.proto == null;
+ // We have seen procedure WALs where the entries are out of order; see HBASE-18152.
+ // To compensate, only replace the Entry procedure if for sure this new procedure
+ // is indeed an entry that came later.
+ // TODO: Fix the writing of procedure info so it does not violate basic expectation, that WALs
+ // contain procedure changes goingfrom start to finish in sequence.
+ if (newEntry || isIncreasing(entry.proto, procProto)) {
+ entry.proto = procProto;
+ }
+ addToReplayList(entry);
+ if (newEntry) {
+ if (procProto.hasParentId()) {
+ childUnlinkedHead = addToLinkList(entry, childUnlinkedHead);
+ } else {
+ rootHead = addToLinkList(entry, rootHead);
+ }
+ }
}
/**
@@ -82,51 +225,383 @@ class WALProcedureMap {
return increasing;
}
- public void add(ProcedureProtos.Procedure proc) {
- procMap.compute(proc.getProcId(), (procId, existingProc) -> {
- if (existingProc == null || isIncreasing(existingProc, proc)) {
- return proc;
- } else {
- return existingProc;
- }
- });
- trackProcId(proc.getProcId());
+ public boolean remove(long procId) {
+ trackProcIds(procId);
+ Entry entry = removeFromMap(procId);
+ if (entry != null) {
+ unlinkFromReplayList(entry);
+ unlinkFromLinkList(entry);
+ return true;
+ }
+ return false;
}
- public void remove(long procId) {
- procMap.remove(procId);
+ private void trackProcIds(long procId) {
+ minModifiedProcId = Math.min(minModifiedProcId, procId);
+ maxModifiedProcId = Math.max(maxModifiedProcId, procId);
}
- public boolean isEmpty() {
- return procMap.isEmpty();
+ public long getMinModifiedProcId() {
+ return minModifiedProcId;
+ }
+
+ public long getMaxModifiedProcId() {
+ return maxModifiedProcId;
}
public boolean contains(long procId) {
- return procMap.containsKey(procId);
+ return getProcedure(procId) != null;
}
- /**
- * Merge the given {@link WALProcedureMap} into this one. The {@link WALProcedureMap} passed in
- * will be cleared after merging.
+ public boolean isEmpty() {
+ return replayOrderHead == null;
+ }
+
+ public void clear() {
+ for (int i = 0; i < procedureMap.length; ++i) {
+ procedureMap[i] = null;
+ }
+ replayOrderHead = null;
+ replayOrderTail = null;
+ rootHead = null;
+ childUnlinkedHead = null;
+ minModifiedProcId = Long.MAX_VALUE;
+ maxModifiedProcId = Long.MIN_VALUE;
+ }
+
+ /*
+ * Merges two WalProcedureMap, the target is the "global" map, the source is the "local" map. -
+ * The entries in the hashtables are guaranteed to be unique. On replay we don't load procedures
+ * that already exist in the "global" map (the one we are merging the "local" in to). - The
+ * replayOrderList of the "local" nao will be appended to the "global" map replay list. - The
+ * "local" map will be cleared at the end of the operation.
*/
- public void merge(WALProcedureMap other) {
- other.procMap.forEach(procMap::putIfAbsent);
+ public void mergeTail(WALProcedureMap other) {
+ for (Entry p = other.replayOrderHead; p != null; p = p.replayNext) {
+ int slotIndex = getMapSlot(p.getProcId());
+ p.hashNext = procedureMap[slotIndex];
+ procedureMap[slotIndex] = p;
+ }
+
+ if (replayOrderHead == null) {
+ replayOrderHead = other.replayOrderHead;
+ replayOrderTail = other.replayOrderTail;
+ rootHead = other.rootHead;
+ childUnlinkedHead = other.childUnlinkedHead;
+ } else {
+ // append replay list
+ assert replayOrderTail.replayNext == null;
+ assert other.replayOrderHead.replayPrev == null;
+ replayOrderTail.replayNext = other.replayOrderHead;
+ other.replayOrderHead.replayPrev = replayOrderTail;
+ replayOrderTail = other.replayOrderTail;
+
+ // merge rootHead
+ if (rootHead == null) {
+ rootHead = other.rootHead;
+ } else if (other.rootHead != null) {
+ Entry otherTail = findLinkListTail(other.rootHead);
+ otherTail.linkNext = rootHead;
+ rootHead.linkPrev = otherTail;
+ rootHead = other.rootHead;
+ }
+
+ // merge childUnlinkedHead
+ if (childUnlinkedHead == null) {
+ childUnlinkedHead = other.childUnlinkedHead;
+ } else if (other.childUnlinkedHead != null) {
+ Entry otherTail = findLinkListTail(other.childUnlinkedHead);
+ otherTail.linkNext = childUnlinkedHead;
+ childUnlinkedHead.linkPrev = otherTail;
+ childUnlinkedHead = other.childUnlinkedHead;
+ }
+ }
maxModifiedProcId = Math.max(maxModifiedProcId, other.maxModifiedProcId);
minModifiedProcId = Math.max(minModifiedProcId, other.minModifiedProcId);
- other.procMap.clear();
- other.maxModifiedProcId = Long.MIN_VALUE;
- other.minModifiedProcId = Long.MAX_VALUE;
+
+ other.clear();
}
- public Collection<ProcedureProtos.Procedure> getProcedures() {
- return Collections.unmodifiableCollection(procMap.values());
+ /**
+ * Returns an EntryIterator with the list of procedures ready to be added to the executor. A
+ * Procedure is ready if its children and parent are ready.
+ */
+ public ProcedureIterator fetchReady() {
+ buildGraph();
+
+ Entry readyHead = null;
+ Entry readyTail = null;
+ Entry p = replayOrderHead;
+ while (p != null) {
+ Entry next = p.replayNext;
+ if (p.isReady()) {
+ unlinkFromReplayList(p);
+ if (readyTail != null) {
+ readyTail.replayNext = p;
+ p.replayPrev = readyTail;
+ } else {
+ p.replayPrev = null;
+ readyHead = p;
+ }
+ readyTail = p;
+ p.replayNext = null;
+ }
+ p = next;
+ }
+ // we need the hash-table lookups for parents, so this must be done
+ // out of the loop where we check isReadyToRun()
+ for (p = readyHead; p != null; p = p.replayNext) {
+ removeFromMap(p.getProcId());
+ unlinkFromLinkList(p);
+ }
+ return readyHead != null ? new EntryIterator(readyHead) : null;
}
- public long getMinModifiedProcId() {
- return minModifiedProcId;
+ /**
+ * Drain this map and return all procedures in it.
+ */
+ public ProcedureIterator fetchAll() {
+ Entry head = replayOrderHead;
+ for (Entry p = head; p != null; p = p.replayNext) {
+ removeFromMap(p.getProcId());
+ }
+ for (int i = 0; i < procedureMap.length; ++i) {
+ assert procedureMap[i] == null : "map not empty i=" + i;
+ }
+ replayOrderHead = null;
+ replayOrderTail = null;
+ childUnlinkedHead = null;
+ rootHead = null;
+ return head != null ? new EntryIterator(head) : null;
}
- public long getMaxModifiedProcId() {
- return maxModifiedProcId;
+ private void buildGraph() {
+ Entry p = childUnlinkedHead;
+ while (p != null) {
+ Entry next = p.linkNext;
+ Entry rootProc = getRootProcedure(p);
+ if (rootProc != null) {
+ rootProc.childHead = addToLinkList(p, rootProc.childHead);
+ }
+ p = next;
+ }
+
+ for (p = rootHead; p != null; p = p.linkNext) {
+ checkReadyToRun(p);
+ }
+ }
+
+ private Entry getRootProcedure(Entry entry) {
+ while (entry != null && entry.hasParent()) {
+ entry = getProcedure(entry.getParentId());
+ }
+ return entry;
+ }
+
+ /**
+ * (see the comprehensive explanation in the beginning of {@link ProcedureWALFormatReader}). A
+ * Procedure is ready when parent and children are ready. "ready" means that we all the
+ * information that we need in-memory.
+ * <p/>
+ * Example-1:<br/>
+ * We have two WALs, we start reading from the newest (wal-2)
+ *
+ * <pre>
+ * wal-2 | C B |
+ * wal-1 | A B C |
+ * </pre>
+ *
+ * If C and B don't depend on A (A is not the parent), we can start them before reading wal-1. If
+ * B is the only one with parent A we can start C. We have to read one more WAL before being able
+ * to start B.
+ * <p/>
+ * How do we know with the only information in B that we are not ready.
+ * <ul>
+ * <li>easy case, the parent is missing from the global map</li>
+ * <li>more complex case we look at the Stack IDs.</li>
+ * </ul>
+ * The Stack-IDs are added to the procedure order as an incremental index tracking how many times
+ * that procedure was executed, which is equivalent to the number of times we wrote the procedure
+ * to the WAL. <br/>
+ * In the example above:
+ *
+ * <pre>
+ * wal-2: B has stackId = [1, 2]
+ * wal-1: B has stackId = [1]
+ * wal-1: A has stackId = [0]
+ * </pre>
+ *
+ * Since we know that the Stack-IDs are incremental for a Procedure, we notice that there is a gap
+ * in the stackIds of B, so something was executed before.
+ * <p/>
+ * To identify when a Procedure is ready we do the sum of the stackIds of the procedure and the
+ * parent. if the stackIdSum is equal to the sum of {1..maxStackId} then everything we need is
+ * available.
+ * <p/>
+ * Example-2
+ *
+ * <pre>
+ * wal-2 | A | A stackIds = [0, 2]
+ * wal-1 | A B | B stackIds = [1]
+ * </pre>
+ *
+ * There is a gap between A stackIds so something was executed in between.
+ */
+ private boolean checkReadyToRun(Entry rootEntry) {
+ assert !rootEntry.hasParent() : "expected root procedure, got " + rootEntry;
+
+ if (rootEntry.isFinished()) {
+ // If the root procedure is finished, sub-procedures should be gone
+ if (rootEntry.childHead != null) {
+ LOG.error("unexpected active children for root-procedure: {}", rootEntry);
+ for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
+ LOG.error("unexpected active children: {}", p);
+ }
+ }
+
+ assert rootEntry.childHead == null : "unexpected children on root completion. " + rootEntry;
+ rootEntry.ready = true;
+ return true;
+ }
+
+ int stackIdSum = 0;
+ int maxStackId = 0;
+ for (int i = 0; i < rootEntry.proto.getStackIdCount(); ++i) {
+ int stackId = 1 + rootEntry.proto.getStackId(i);
+ maxStackId = Math.max(maxStackId, stackId);
+ stackIdSum += stackId;
+ LOG.trace("stackId={} stackIdSum={} maxStackid={} {}", stackId, stackIdSum, maxStackId,
+ rootEntry);
+ }
+
+ for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
+ for (int i = 0; i < p.proto.getStackIdCount(); ++i) {
+ int stackId = 1 + p.proto.getStackId(i);
+ maxStackId = Math.max(maxStackId, stackId);
+ stackIdSum += stackId;
+ LOG.trace("stackId={} stackIdSum={} maxStackid={} {}", stackId, stackIdSum, maxStackId, p);
+ }
+ }
+ // The cmpStackIdSum is this formula for finding the sum of a series of numbers:
+ // http://www.wikihow.com/Sum-the-Integers-from-1-to-N#/Image:Sum-the-Integers-from-1-to-N-Step-2-Version-3.jpg
+ final int cmpStackIdSum = (maxStackId * (maxStackId + 1) / 2);
+ if (cmpStackIdSum == stackIdSum) {
+ rootEntry.ready = true;
+ for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
+ p.ready = true;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ private void unlinkFromReplayList(Entry entry) {
+ if (replayOrderHead == entry) {
+ replayOrderHead = entry.replayNext;
+ }
+ if (replayOrderTail == entry) {
+ replayOrderTail = entry.replayPrev;
+ }
+ if (entry.replayPrev != null) {
+ entry.replayPrev.replayNext = entry.replayNext;
+ }
+ if (entry.replayNext != null) {
+ entry.replayNext.replayPrev = entry.replayPrev;
+ }
+ }
+
+ private void addToReplayList(final Entry entry) {
+ unlinkFromReplayList(entry);
+ entry.replayNext = replayOrderHead;
+ entry.replayPrev = null;
+ if (replayOrderHead != null) {
+ replayOrderHead.replayPrev = entry;
+ } else {
+ replayOrderTail = entry;
+ }
+ replayOrderHead = entry;
+ }
+
+ private void unlinkFromLinkList(Entry entry) {
+ if (entry == rootHead) {
+ rootHead = entry.linkNext;
+ } else if (entry == childUnlinkedHead) {
+ childUnlinkedHead = entry.linkNext;
+ }
+ if (entry.linkPrev != null) {
+ entry.linkPrev.linkNext = entry.linkNext;
+ }
+ if (entry.linkNext != null) {
+ entry.linkNext.linkPrev = entry.linkPrev;
+ }
+ }
+
+ private Entry addToLinkList(Entry entry, Entry linkHead) {
+ unlinkFromLinkList(entry);
+ entry.linkNext = linkHead;
+ entry.linkPrev = null;
+ if (linkHead != null) {
+ linkHead.linkPrev = entry;
+ }
+ return entry;
+ }
+
+ private Entry findLinkListTail(Entry linkHead) {
+ Entry tail = linkHead;
+ while (tail.linkNext != null) {
+ tail = tail.linkNext;
+ }
+ return tail;
+ }
+
+ private Entry addToMap(long procId, boolean hasParent) {
+ int slotIndex = getMapSlot(procId);
+ Entry entry = getProcedure(slotIndex, procId);
+ if (entry != null) {
+ return entry;
+ }
+
+ entry = new Entry(procedureMap[slotIndex]);
+ procedureMap[slotIndex] = entry;
+ return entry;
+ }
+
+ private Entry removeFromMap(final long procId) {
+ int slotIndex = getMapSlot(procId);
+ Entry prev = null;
+ Entry entry = procedureMap[slotIndex];
+ while (entry != null) {
+ if (procId == entry.getProcId()) {
+ if (prev != null) {
+ prev.hashNext = entry.hashNext;
+ } else {
+ procedureMap[slotIndex] = entry.hashNext;
+ }
+ entry.hashNext = null;
+ return entry;
+ }
+ prev = entry;
+ entry = entry.hashNext;
+ }
+ return null;
+ }
+
+ private Entry getProcedure(long procId) {
+ return getProcedure(getMapSlot(procId), procId);
+ }
+
+ private Entry getProcedure(int slotIndex, long procId) {
+ Entry entry = procedureMap[slotIndex];
+ while (entry != null) {
+ if (procId == entry.getProcId()) {
+ return entry;
+ }
+ entry = entry.hashNext;
+ }
+ return null;
+ }
+
+ private int getMapSlot(long procId) {
+ return (int) (Procedure.getProcIdHashCode(procId) % procedureMap.length);
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/7d729304/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureTree.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureTree.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureTree.java
deleted file mode 100644
index c32bd7f..0000000
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureTree.java
+++ /dev/null
@@ -1,299 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.procedure2.store.wal;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
-import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
-
-/**
- * Used to build the tree for procedures.
- * <p/>
- * We will group the procedures with the root procedure, and then validate each group. For each
- * group of procedures(with the same root procedure), we will collect all the stack ids, if the max
- * stack id is n, then all the stack ids should be from 0 to n, non-repetition and non-omission. If
- * not, we will consider all the procedures in this group as corrupted. Please see the code in
- * {@link #checkReady(Entry, Map)} method.
- * <p/>
- * For the procedures not in any group, i.e, can not find the root procedure for these procedures,
- * we will also consider them as corrupted. Please see the code in {@link #checkOrphan(Map)} method.
- */
-@InterfaceAudience.Private
-public final class WALProcedureTree {
-
- private static final Logger LOG = LoggerFactory.getLogger(WALProcedureTree.class);
-
- private static final class Entry {
-
- private final ProcedureProtos.Procedure proc;
-
- private final List<Entry> subProcs = new ArrayList<>();
-
- public Entry(ProcedureProtos.Procedure proc) {
- this.proc = proc;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("Procedure(pid=");
- sb.append(proc.getProcId());
- sb.append(", ppid=");
- sb.append(proc.hasParentId() ? proc.getParentId() : Procedure.NO_PROC_ID);
- sb.append(", class=");
- sb.append(proc.getClassName());
- sb.append(")");
- return sb.toString();
- }
- }
-
- // when loading we will iterator the procedures twice, so use this class to cache the deserialized
- // result to prevent deserializing multiple times.
- private static final class ProtoAndProc {
- private final ProcedureProtos.Procedure proto;
-
- private Procedure<?> proc;
-
- public ProtoAndProc(ProcedureProtos.Procedure proto) {
- this.proto = proto;
- }
-
- public Procedure<?> getProc() throws IOException {
- if (proc == null) {
- proc = ProcedureUtil.convertToProcedure(proto);
- }
- return proc;
- }
- }
-
- private final List<ProtoAndProc> validProcs = new ArrayList<>();
-
- private final List<ProtoAndProc> corruptedProcs = new ArrayList<>();
-
- private static boolean isFinished(ProcedureProtos.Procedure proc) {
- if (!proc.hasParentId()) {
- switch (proc.getState()) {
- case ROLLEDBACK:
- case SUCCESS:
- return true;
- default:
- break;
- }
- }
- return false;
- }
-
- private WALProcedureTree(Map<Long, Entry> procMap) {
- List<Entry> rootEntries = buildTree(procMap);
- for (Entry rootEntry : rootEntries) {
- checkReady(rootEntry, procMap);
- }
- checkOrphan(procMap);
- Comparator<ProtoAndProc> cmp =
- (p1, p2) -> Long.compare(p1.proto.getProcId(), p2.proto.getProcId());
- Collections.sort(validProcs, cmp);
- Collections.sort(corruptedProcs, cmp);
- }
-
- private List<Entry> buildTree(Map<Long, Entry> procMap) {
- List<Entry> rootEntries = new ArrayList<>();
- procMap.values().forEach(entry -> {
- if (!entry.proc.hasParentId()) {
- rootEntries.add(entry);
- } else {
- Entry parentEntry = procMap.get(entry.proc.getParentId());
- // For a valid procedure this should not be null. We will log the error later if it is null,
- // as it will not be referenced by any root procedures.
- if (parentEntry != null) {
- parentEntry.subProcs.add(entry);
- }
- }
- });
- return rootEntries;
- }
-
- private void collectStackId(Entry entry, Map<Integer, List<Entry>> stackId2Proc,
- MutableInt maxStackId) {
- for (int i = 0, n = entry.proc.getStackIdCount(); i < n; i++) {
- int stackId = entry.proc.getStackId(i);
- if (stackId > maxStackId.intValue()) {
- maxStackId.setValue(stackId);
- }
- stackId2Proc.computeIfAbsent(stackId, k -> new ArrayList<>()).add(entry);
- }
- entry.subProcs.forEach(e -> collectStackId(e, stackId2Proc, maxStackId));
- }
-
- private void addAllToCorruptedAndRemoveFromProcMap(Entry entry,
- Map<Long, Entry> remainingProcMap) {
- corruptedProcs.add(new ProtoAndProc(entry.proc));
- remainingProcMap.remove(entry.proc.getProcId());
- for (Entry e : entry.subProcs) {
- addAllToCorruptedAndRemoveFromProcMap(e, remainingProcMap);
- }
- }
-
- private void addAllToValidAndRemoveFromProcMap(Entry entry, Map<Long, Entry> remainingProcMap) {
- validProcs.add(new ProtoAndProc(entry.proc));
- remainingProcMap.remove(entry.proc.getProcId());
- for (Entry e : entry.subProcs) {
- addAllToValidAndRemoveFromProcMap(e, remainingProcMap);
- }
- }
-
- // In this method first we will check whether the given root procedure and all its sub procedures
- // are valid, through the procedure stack. And we will also remove all these procedures from the
- // remainingProcMap, so at last, if there are still procedures in the map, we know that there are
- // orphan procedures.
- private void checkReady(Entry rootEntry, Map<Long, Entry> remainingProcMap) {
- if (isFinished(rootEntry.proc)) {
- if (!rootEntry.subProcs.isEmpty()) {
- LOG.error("unexpected active children for root-procedure: {}", rootEntry);
- rootEntry.subProcs.forEach(e -> LOG.error("unexpected active children: {}", e));
- addAllToCorruptedAndRemoveFromProcMap(rootEntry, remainingProcMap);
- } else {
- addAllToValidAndRemoveFromProcMap(rootEntry, remainingProcMap);
- }
- return;
- }
- Map<Integer, List<Entry>> stackId2Proc = new HashMap<>();
- MutableInt maxStackId = new MutableInt(Integer.MIN_VALUE);
- collectStackId(rootEntry, stackId2Proc, maxStackId);
- // the stack ids should start from 0 and increase by one every time
- boolean valid = true;
- for (int i = 0; i <= maxStackId.intValue(); i++) {
- List<Entry> entries = stackId2Proc.get(i);
- if (entries == null) {
- LOG.error("Missing stack id {}, max stack id is {}, root procedure is {}", i, maxStackId,
- rootEntry);
- valid = false;
- } else if (entries.size() > 1) {
- LOG.error("Multiple procedures {} have the same stack id {}, max stack id is {}," +
- " root procedure is {}", entries, i, maxStackId, rootEntry);
- valid = false;
- }
- }
- if (valid) {
- addAllToValidAndRemoveFromProcMap(rootEntry, remainingProcMap);
- } else {
- addAllToCorruptedAndRemoveFromProcMap(rootEntry, remainingProcMap);
- }
- }
-
- private void checkOrphan(Map<Long, Entry> procMap) {
- procMap.values().forEach(entry -> {
- LOG.error("Orphan procedure: {}", entry);
- corruptedProcs.add(new ProtoAndProc(entry.proc));
- });
- }
-
- private static final class Iter implements ProcedureIterator {
-
- private final List<ProtoAndProc> procs;
-
- private Iterator<ProtoAndProc> iter;
-
- private ProtoAndProc current;
-
- public Iter(List<ProtoAndProc> procs) {
- this.procs = procs;
- reset();
- }
-
- @Override
- public void reset() {
- iter = procs.iterator();
- if (iter.hasNext()) {
- current = iter.next();
- } else {
- current = null;
- }
- }
-
- @Override
- public boolean hasNext() {
- return current != null;
- }
-
- private void checkNext() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- }
-
- @Override
- public boolean isNextFinished() {
- checkNext();
- return isFinished(current.proto);
- }
-
- private void moveToNext() {
- if (iter.hasNext()) {
- current = iter.next();
- } else {
- current = null;
- }
- }
-
- @Override
- public void skipNext() {
- checkNext();
- moveToNext();
- }
-
- @Override
- public Procedure<?> next() throws IOException {
- checkNext();
- Procedure<?> proc = current.getProc();
- moveToNext();
- return proc;
- }
- }
-
- public ProcedureIterator getValidProcs() {
- return new Iter(validProcs);
- }
-
- public ProcedureIterator getCorruptedProcs() {
- return new Iter(corruptedProcs);
- }
-
- public static WALProcedureTree build(Collection<ProcedureProtos.Procedure> procedures) {
- Map<Long, Entry> procMap = new HashMap<>();
- for (ProcedureProtos.Procedure proc : procedures) {
- procMap.put(proc.getProcId(), new Entry(proc));
- }
- return new WALProcedureTree(procMap);
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/7d729304/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java
index da53fa5..443386d 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java
@@ -18,7 +18,9 @@
package org.apache.hadoop.hbase.procedure2.store.wal;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Random;
http://git-wip-us.apache.org/repos/asf/hbase/blob/7d729304/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
index 0f598b0..d682481 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.procedure2.store.wal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -43,6 +44,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
import org.apache.hadoop.hbase.procedure2.SequentialProcedure;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -70,6 +72,7 @@ public class TestWALProcedureStore {
private static final Logger LOG = LoggerFactory.getLogger(TestWALProcedureStore.class);
private static final int PROCEDURE_STORE_SLOTS = 1;
+ private static final Procedure NULL_PROC = null;
private WALProcedureStore procStore;
@@ -150,7 +153,7 @@ public class TestWALProcedureStore {
@Test
public void testWalCleanerSequentialClean() throws Exception {
- final Procedure<?>[] procs = new Procedure[5];
+ final Procedure[] procs = new Procedure[5];
ArrayList<ProcedureWALFile> logs = null;
// Insert procedures and roll wal after every insert.
@@ -179,7 +182,7 @@ public class TestWALProcedureStore {
// they are in the starting of the list.
@Test
public void testWalCleanerNoHoles() throws Exception {
- final Procedure<?>[] procs = new Procedure[5];
+ final Procedure[] procs = new Procedure[5];
ArrayList<ProcedureWALFile> logs = null;
// Insert procedures and roll wal after every insert.
for (int i = 0; i < procs.length; i++) {
@@ -239,7 +242,7 @@ public class TestWALProcedureStore {
@Test
public void testWalCleanerWithEmptyRolls() throws Exception {
- final Procedure<?>[] procs = new Procedure[3];
+ final Procedure[] procs = new Procedure[3];
for (int i = 0; i < procs.length; ++i) {
procs[i] = new TestSequentialProcedure();
procStore.insert(procs[i], null);
@@ -281,12 +284,12 @@ public class TestWALProcedureStore {
Set<Long> procIds = new HashSet<>();
// Insert something in the log
- Procedure<?> proc1 = new TestSequentialProcedure();
+ Procedure proc1 = new TestSequentialProcedure();
procIds.add(proc1.getProcId());
procStore.insert(proc1, null);
- Procedure<?> proc2 = new TestSequentialProcedure();
- Procedure<?>[] child2 = new Procedure[2];
+ Procedure proc2 = new TestSequentialProcedure();
+ Procedure[] child2 = new Procedure[2];
child2[0] = new TestSequentialProcedure();
child2[1] = new TestSequentialProcedure();
@@ -320,11 +323,11 @@ public class TestWALProcedureStore {
@Test
public void testNoTrailerDoubleRestart() throws Exception {
// log-0001: proc 0, 1 and 2 are inserted
- Procedure<?> proc0 = new TestSequentialProcedure();
+ Procedure proc0 = new TestSequentialProcedure();
procStore.insert(proc0, null);
- Procedure<?> proc1 = new TestSequentialProcedure();
+ Procedure proc1 = new TestSequentialProcedure();
procStore.insert(proc1, null);
- Procedure<?> proc2 = new TestSequentialProcedure();
+ Procedure proc2 = new TestSequentialProcedure();
procStore.insert(proc2, null);
procStore.rollWriterForTesting();
@@ -417,7 +420,7 @@ public class TestWALProcedureStore {
}
private static void assertUpdated(final ProcedureStoreTracker tracker,
- final Procedure<?>[] procs, final int[] updatedProcs, final int[] nonUpdatedProcs) {
+ final Procedure[] procs, final int[] updatedProcs, final int[] nonUpdatedProcs) {
for (int index : updatedProcs) {
long procId = procs[index].getProcId();
assertTrue("Procedure id : " + procId, tracker.isModified(procId));
@@ -429,7 +432,7 @@ public class TestWALProcedureStore {
}
private static void assertDeleted(final ProcedureStoreTracker tracker,
- final Procedure<?>[] procs, final int[] deletedProcs, final int[] nonDeletedProcs) {
+ final Procedure[] procs, final int[] deletedProcs, final int[] nonDeletedProcs) {
for (int index : deletedProcs) {
long procId = procs[index].getProcId();
assertEquals("Procedure id : " + procId,
@@ -444,7 +447,7 @@ public class TestWALProcedureStore {
@Test
public void testCorruptedTrailersRebuild() throws Exception {
- final Procedure<?>[] procs = new Procedure[6];
+ final Procedure[] procs = new Procedure[6];
for (int i = 0; i < procs.length; ++i) {
procs[i] = new TestSequentialProcedure();
}
@@ -572,20 +575,127 @@ public class TestWALProcedureStore {
storeRestart(loader);
assertEquals(0, loader.getLoadedCount());
assertEquals(rootProcs.length, loader.getCorruptedCount());
- for (Procedure<?> proc : loader.getCorrupted()) {
+ for (Procedure proc: loader.getCorrupted()) {
assertTrue(proc.toString(), proc.getParentProcId() <= rootProcs.length);
assertTrue(proc.toString(),
- proc.getProcId() > rootProcs.length && proc.getProcId() <= (rootProcs.length * 2));
+ proc.getProcId() > rootProcs.length &&
+ proc.getProcId() <= (rootProcs.length * 2));
}
}
@Test
+ public void testWalReplayOrder_AB_A() throws Exception {
+ /*
+ * | A B | -> | A |
+ */
+ TestProcedure a = new TestProcedure(1, 0);
+ TestProcedure b = new TestProcedure(2, 1);
+
+ procStore.insert(a, null);
+ a.addStackId(0);
+ procStore.update(a);
+
+ procStore.insert(a, new Procedure[] { b });
+ b.addStackId(1);
+ procStore.update(b);
+
+ procStore.rollWriterForTesting();
+
+ a.addStackId(2);
+ procStore.update(a);
+
+ storeRestart(new ProcedureStore.ProcedureLoader() {
+ @Override
+ public void setMaxProcId(long maxProcId) {
+ assertEquals(2, maxProcId);
+ }
+
+ @Override
+ public void load(ProcedureIterator procIter) throws IOException {
+ assertTrue(procIter.hasNext());
+ assertEquals(1, procIter.next().getProcId());
+ assertTrue(procIter.hasNext());
+ assertEquals(2, procIter.next().getProcId());
+ assertFalse(procIter.hasNext());
+ }
+
+ @Override
+ public void handleCorrupted(ProcedureIterator procIter) throws IOException {
+ assertFalse(procIter.hasNext());
+ }
+ });
+ }
+
+ @Test
+ public void testWalReplayOrder_ABC_BAD() throws Exception {
+ /*
+ * | A B C | -> | B A D |
+ */
+ TestProcedure a = new TestProcedure(1, 0);
+ TestProcedure b = new TestProcedure(2, 1);
+ TestProcedure c = new TestProcedure(3, 2);
+ TestProcedure d = new TestProcedure(4, 0);
+
+ procStore.insert(a, null);
+ a.addStackId(0);
+ procStore.update(a);
+
+ procStore.insert(a, new Procedure[] { b });
+ b.addStackId(1);
+ procStore.update(b);
+
+ procStore.insert(b, new Procedure[] { c });
+ b.addStackId(2);
+ procStore.update(b);
+
+ procStore.rollWriterForTesting();
+
+ b.addStackId(3);
+ procStore.update(b);
+
+ a.addStackId(4);
+ procStore.update(a);
+
+ procStore.insert(d, null);
+ d.addStackId(0);
+ procStore.update(d);
+
+ storeRestart(new ProcedureStore.ProcedureLoader() {
+ @Override
+ public void setMaxProcId(long maxProcId) {
+ assertEquals(4, maxProcId);
+ }
+
+ @Override
+ public void load(ProcedureIterator procIter) throws IOException {
+ assertTrue(procIter.hasNext());
+ assertEquals(4, procIter.next().getProcId());
+ // TODO: This will be multiple call once we do fast-start
+ //assertFalse(procIter.hasNext());
+
+ assertTrue(procIter.hasNext());
+ assertEquals(1, procIter.next().getProcId());
+ assertTrue(procIter.hasNext());
+ assertEquals(2, procIter.next().getProcId());
+ assertTrue(procIter.hasNext());
+ assertEquals(3, procIter.next().getProcId());
+ assertFalse(procIter.hasNext());
+ }
+
+ @Override
+ public void handleCorrupted(ProcedureIterator procIter) throws IOException {
+ assertFalse(procIter.hasNext());
+ }
+ });
+ }
+
+ @Test
public void testRollAndRemove() throws IOException {
// Insert something in the log
- Procedure<?> proc1 = new TestSequentialProcedure();
+ Procedure proc1 = new TestSequentialProcedure();
procStore.insert(proc1, null);
- Procedure<?> proc2 = new TestSequentialProcedure();
+ Procedure proc2 = new TestSequentialProcedure();
procStore.insert(proc2, null);
// roll the log, now we have 2
@@ -832,6 +942,17 @@ public class TestWALProcedureStore {
assertEquals(0, loader.getCorruptedCount());
}
+ private void assertEmptyLogDir() {
+ try {
+ FileStatus[] status = fs.listStatus(logDir);
+ assertTrue("expected empty state-log dir", status == null || status.length == 0);
+ } catch (FileNotFoundException e) {
+ fail("expected the state-log dir to be present: " + logDir);
+ } catch (IOException e) {
+ fail("got en exception on state-log dir list: " + e.getMessage());
+ }
+ }
+
public static class TestSequentialProcedure extends SequentialProcedure<Void> {
private static long seqid = 0;
@@ -840,18 +961,13 @@ public class TestWALProcedureStore {
}
@Override
- protected Procedure<Void>[] execute(Void env) {
- return null;
- }
+ protected Procedure[] execute(Void env) { return null; }
@Override
- protected void rollback(Void env) {
- }
+ protected void rollback(Void env) { }
@Override
- protected boolean abort(Void env) {
- return false;
- }
+ protected boolean abort(Void env) { return false; }
@Override
protected void serializeStateData(ProcedureStateSerializer serializer)
http://git-wip-us.apache.org/repos/asf/hbase/blob/7d729304/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureTree.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureTree.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureTree.java
deleted file mode 100644
index 890d0e3..0000000
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureTree.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.procedure2.store.wal;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
-import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
-import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
-import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
-import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
-import org.apache.hadoop.hbase.testclassification.MasterTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
-
-@Category({ MasterTests.class, SmallTests.class })
-public class TestWALProcedureTree {
-
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestWALProcedureTree.class);
-
- public static final class TestProcedure extends Procedure<Void> {
-
- @Override
- public void setProcId(long procId) {
- super.setProcId(procId);
- }
-
- @Override
- public void setParentProcId(long parentProcId) {
- super.setParentProcId(parentProcId);
- }
-
- @Override
- public synchronized void addStackIndex(int index) {
- super.addStackIndex(index);
- }
-
- @Override
- protected Procedure<Void>[] execute(Void env)
- throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
- return null;
- }
-
- @Override
- protected void rollback(Void env) throws IOException, InterruptedException {
- }
-
- @Override
- protected boolean abort(Void env) {
- return false;
- }
-
- @Override
- protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
- }
-
- @Override
- protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
- }
- }
-
- private TestProcedure createProc(long procId, long parentProcId) {
- TestProcedure proc = new TestProcedure();
- proc.setProcId(procId);
- if (parentProcId != Procedure.NO_PROC_ID) {
- proc.setParentProcId(parentProcId);
- }
- return proc;
- }
-
- private List<ProcedureProtos.Procedure> toProtos(TestProcedure... procs) {
- return Arrays.stream(procs).map(p -> {
- try {
- return ProcedureUtil.convertToProtoProcedure(p);
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }).collect(Collectors.toList());
- }
-
- private List<TestProcedure> getProcs(ProcedureIterator iter) throws IOException {
- List<TestProcedure> procs = new ArrayList<>();
- while (iter.hasNext()) {
- procs.add((TestProcedure) iter.next());
- }
- return procs;
- }
-
- @Test
- public void testMissingStackId() throws IOException {
- TestProcedure proc0 = createProc(1, Procedure.NO_PROC_ID);
- proc0.addStackIndex(0);
- TestProcedure proc1 = createProc(2, 1);
- proc1.addStackIndex(1);
- TestProcedure proc2 = createProc(3, 2);
- proc2.addStackIndex(3);
- WALProcedureTree tree = WALProcedureTree.build(toProtos(proc0, proc1, proc2));
- List<TestProcedure> validProcs = getProcs(tree.getValidProcs());
- assertEquals(0, validProcs.size());
- List<TestProcedure> corruptedProcs = getProcs(tree.getCorruptedProcs());
- assertEquals(3, corruptedProcs.size());
- assertEquals(1, corruptedProcs.get(0).getProcId());
- assertEquals(2, corruptedProcs.get(1).getProcId());
- assertEquals(3, corruptedProcs.get(2).getProcId());
- }
-
- @Test
- public void testDuplicatedStackId() throws IOException {
- TestProcedure proc0 = createProc(1, Procedure.NO_PROC_ID);
- proc0.addStackIndex(0);
- TestProcedure proc1 = createProc(2, 1);
- proc1.addStackIndex(1);
- TestProcedure proc2 = createProc(3, 2);
- proc2.addStackIndex(1);
- WALProcedureTree tree = WALProcedureTree.build(toProtos(proc0, proc1, proc2));
- List<TestProcedure> validProcs = getProcs(tree.getValidProcs());
- assertEquals(0, validProcs.size());
- List<TestProcedure> corruptedProcs = getProcs(tree.getCorruptedProcs());
- assertEquals(3, corruptedProcs.size());
- assertEquals(1, corruptedProcs.get(0).getProcId());
- assertEquals(2, corruptedProcs.get(1).getProcId());
- assertEquals(3, corruptedProcs.get(2).getProcId());
- }
-
- @Test
- public void testOrphan() throws IOException {
- TestProcedure proc0 = createProc(1, Procedure.NO_PROC_ID);
- proc0.addStackIndex(0);
- TestProcedure proc1 = createProc(2, 1);
- proc1.addStackIndex(1);
- TestProcedure proc2 = createProc(3, Procedure.NO_PROC_ID);
- proc2.addStackIndex(0);
- TestProcedure proc3 = createProc(5, 4);
- proc3.addStackIndex(1);
- WALProcedureTree tree = WALProcedureTree.build(toProtos(proc0, proc1, proc2, proc3));
- List<TestProcedure> validProcs = getProcs(tree.getValidProcs());
- assertEquals(3, validProcs.size());
- List<TestProcedure> corruptedProcs = getProcs(tree.getCorruptedProcs());
- assertEquals(1, corruptedProcs.size());
- assertEquals(5, corruptedProcs.get(0).getProcId());
- assertEquals(4, corruptedProcs.get(0).getParentProcId());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/7d729304/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index 695daea..7f5e11a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -171,7 +171,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
* avoiding port contention if another local HBase instance is already running).
* <p>To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
* setting it to true.
- * Trigger pre commit.
*/
@InterfaceAudience.Public
@SuppressWarnings("deprecation")