You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/10/07 09:24:14 UTC
[2/2] hbase git commit: HBASE-21250 Refactor WALProcedureStore and
add more comments for better understanding the implementation
HBASE-21250 Refactor WALProcedureStore and add more comments for better understanding the implementation
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/118b0746
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/118b0746
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/118b0746
Branch: refs/heads/master
Commit: 118b0746849c886fc64b0a53014f3186e2db4d9d
Parents: 5da0c20
Author: zhangduo <zh...@apache.org>
Authored: Sat Oct 6 17:27:05 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sun Oct 7 17:09:09 2018 +0800
----------------------------------------------------------------------
.../hbase/procedure2/store/BitSetNode.java | 397 +++++++++++
.../procedure2/store/NoopProcedureStore.java | 9 +-
.../hbase/procedure2/store/ProcedureStore.java | 9 +-
.../procedure2/store/ProcedureStoreTracker.java | 502 +++----------
.../CorruptedWALProcedureStoreException.java | 6 +-
.../procedure2/store/wal/ProcedureWALFile.java | 7 +-
.../store/wal/ProcedureWALFormat.java | 38 +-
.../store/wal/ProcedureWALFormatReader.java | 701 ++-----------------
.../store/wal/ProcedureWALPrettyPrinter.java | 18 +-
.../procedure2/store/wal/WALProcedureMap.java | 607 ++++++++++++++++
.../procedure2/store/wal/WALProcedureStore.java | 206 +++---
.../store/TestProcedureStoreTracker.java | 31 +-
.../store/wal/TestWALProcedureStore.java | 4 +-
.../hadoop/hbase/HBaseTestingUtility.java | 1 +
14 files changed, 1319 insertions(+), 1217 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/118b0746/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java
new file mode 100644
index 0000000..b76c026
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java
@@ -0,0 +1,397 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2.store;
+
+import java.util.Arrays;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker.DeleteState;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
+
+/**
+ * A bitmap which can grow/merge with other {@link BitSetNode} (if certain conditions are met).
+ * Boundaries of bitmap are aligned to multiples of {@link BitSetNode#BITS_PER_WORD}. So the range
+ * of a {@link BitSetNode} is from [x * K, y * K) where x and y are integers, y > x and K is
+ * BITS_PER_WORD.
+ * <p/>
+ * We have two main bit sets to describe the state of procedures, the meanings are:
+ *
+ * <pre>
+ * ----------------------
+ * | modified | deleted | meaning
+ * | 0 | 0 | proc exists, but hasn't been updated since last resetUpdates().
+ * | 1 | 0 | proc was updated (but not deleted).
+ * | 1 | 1 | proc was deleted.
+ * | 0 | 1 | proc doesn't exist (maybe never created, maybe deleted in past).
+ * ----------------------
+ * </pre>
+ *
+ * The meaning of modified is that, we have modified the state of the procedure, no matter insert,
+ * update, or delete. And if it is an insert or update, we will set the deleted to 0, if not we will
+ * set the delete to 1.
+ * <p/>
+ * For a non-partial BitSetNode, the initial modified value is 0 and deleted value is 1. For the
+ * partial one, the initial modified value is 0 and the initial deleted value is also 0. In
+ * {@link #unsetPartialFlag()} we will reset the deleted to 1 if it is not modified.
+ */
+@InterfaceAudience.Private
+class BitSetNode {
+ private static final long WORD_MASK = 0xffffffffffffffffL;
+ private static final int ADDRESS_BITS_PER_WORD = 6;
+ private static final int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD;
+ private static final int MAX_NODE_SIZE = 1 << ADDRESS_BITS_PER_WORD;
+
+ /**
+ * Mimics {@link ProcedureStoreTracker#partial}. It will effect how we fill the new deleted bits
+ * when growing.
+ */
+ private boolean partial;
+
+ /**
+ * Set of procedures which have been modified since last {@link #resetModified()}. Useful to track
+ * procedures which have been modified since last WAL write.
+ */
+ private long[] modified;
+
+ /**
+ * Keeps track of procedure ids which belong to this bitmap's range and have been deleted. This
+ * represents global state since it's not reset on WAL rolls.
+ */
+ private long[] deleted;
+ /**
+ * Offset of bitmap i.e. procedure id corresponding to first bit.
+ */
+ private long start;
+
+ public void dump() {
+ System.out.printf("%06d:%06d min=%d max=%d%n", getStart(), getEnd(), getActiveMinProcId(),
+ getActiveMaxProcId());
+ System.out.println("Modified:");
+ for (int i = 0; i < modified.length; ++i) {
+ for (int j = 0; j < BITS_PER_WORD; ++j) {
+ System.out.print((modified[i] & (1L << j)) != 0 ? "1" : "0");
+ }
+ System.out.println(" " + i);
+ }
+ System.out.println();
+ System.out.println("Delete:");
+ for (int i = 0; i < deleted.length; ++i) {
+ for (int j = 0; j < BITS_PER_WORD; ++j) {
+ System.out.print((deleted[i] & (1L << j)) != 0 ? "1" : "0");
+ }
+ System.out.println(" " + i);
+ }
+ System.out.println();
+ }
+
+ public BitSetNode(long procId, boolean partial) {
+ start = alignDown(procId);
+
+ int count = 1;
+ modified = new long[count];
+ deleted = new long[count];
+ if (!partial) {
+ Arrays.fill(deleted, WORD_MASK);
+ }
+
+ this.partial = partial;
+ updateState(procId, false);
+ }
+
+ public BitSetNode(ProcedureProtos.ProcedureStoreTracker.TrackerNode data) {
+ start = data.getStartId();
+ int size = data.getUpdatedCount();
+ assert size == data.getDeletedCount();
+ modified = new long[size];
+ deleted = new long[size];
+ for (int i = 0; i < size; ++i) {
+ modified[i] = data.getUpdated(i);
+ deleted[i] = data.getDeleted(i);
+ }
+ partial = false;
+ }
+
+ public BitSetNode(BitSetNode other, boolean resetDelete) {
+ this.start = other.start;
+ this.partial = other.partial;
+ this.modified = other.modified.clone();
+ // The resetDelete will be set to true when building cleanup tracker.
+ // The intention here is that, if a procedure is not modified in this tracker, then we do not
+ // need to take care of it, so we will set deleted to true for these bits, i.e, if modified is
+ // 0, then we set deleted to 1, otherwise keep it as is. So here, the equation is
+ // deleted |= ~modified, i.e,
+ if (resetDelete) {
+ this.deleted = new long[other.deleted.length];
+ for (int i = 0; i < this.deleted.length; ++i) {
+ this.deleted[i] |= ~(other.modified[i]);
+ }
+ } else {
+ this.deleted = other.deleted.clone();
+ }
+ }
+
+ public void insertOrUpdate(final long procId) {
+ updateState(procId, false);
+ }
+
+ public void delete(final long procId) {
+ updateState(procId, true);
+ }
+
+ public long getStart() {
+ return start;
+ }
+
+ public long getEnd() {
+ return start + (modified.length << ADDRESS_BITS_PER_WORD) - 1;
+ }
+
+ public boolean contains(final long procId) {
+ return start <= procId && procId <= getEnd();
+ }
+
+ public DeleteState isDeleted(final long procId) {
+ int bitmapIndex = getBitmapIndex(procId);
+ int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
+ if (wordIndex >= deleted.length) {
+ return DeleteState.MAYBE;
+ }
+ return (deleted[wordIndex] & (1L << bitmapIndex)) != 0 ? DeleteState.YES : DeleteState.NO;
+ }
+
+ public boolean isModified(long procId) {
+ int bitmapIndex = getBitmapIndex(procId);
+ int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
+ if (wordIndex >= modified.length) {
+ return false;
+ }
+ return (modified[wordIndex] & (1L << bitmapIndex)) != 0;
+ }
+
+ /**
+ * @return true, if all the procedures has been modified.
+ */
+ public boolean isAllModified() {
+ // TODO: cache the value
+ for (int i = 0; i < modified.length; ++i) {
+ if ((modified[i] | deleted[i]) != WORD_MASK) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * @return true, if there are no active procedures in this BitSetNode, else false.
+ */
+ public boolean isEmpty() {
+ // TODO: cache the value
+ for (int i = 0; i < deleted.length; ++i) {
+ if (deleted[i] != WORD_MASK) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public void resetModified() {
+ Arrays.fill(modified, 0);
+ }
+
+ public void unsetPartialFlag() {
+ partial = false;
+ for (int i = 0; i < modified.length; ++i) {
+ for (int j = 0; j < BITS_PER_WORD; ++j) {
+ if ((modified[i] & (1L << j)) == 0) {
+ deleted[i] |= (1L << j);
+ }
+ }
+ }
+ }
+
+ /**
+ * Convert to
+ * org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker.TrackerNode
+ * protobuf.
+ */
+ public ProcedureProtos.ProcedureStoreTracker.TrackerNode convert() {
+ ProcedureProtos.ProcedureStoreTracker.TrackerNode.Builder builder =
+ ProcedureProtos.ProcedureStoreTracker.TrackerNode.newBuilder();
+ builder.setStartId(start);
+ for (int i = 0; i < modified.length; ++i) {
+ builder.addUpdated(modified[i]);
+ builder.addDeleted(deleted[i]);
+ }
+ return builder.build();
+ }
+
+ // ========================================================================
+ // Grow/Merge Helpers
+ // ========================================================================
+ public boolean canGrow(final long procId) {
+ return Math.abs(procId - start) < MAX_NODE_SIZE;
+ }
+
+ public boolean canMerge(final BitSetNode rightNode) {
+ // Can just compare 'starts' since boundaries are aligned to multiples of BITS_PER_WORD.
+ assert start < rightNode.start;
+ return (rightNode.getEnd() - start) < MAX_NODE_SIZE;
+ }
+
+ public void grow(final long procId) {
+ int delta, offset;
+
+ if (procId < start) {
+ // add to head
+ long newStart = alignDown(procId);
+ delta = (int) (start - newStart) >> ADDRESS_BITS_PER_WORD;
+ offset = delta;
+ start = newStart;
+ } else {
+ // Add to tail
+ long newEnd = alignUp(procId + 1);
+ delta = (int) (newEnd - getEnd()) >> ADDRESS_BITS_PER_WORD;
+ offset = 0;
+ }
+
+ long[] newBitmap;
+ int oldSize = modified.length;
+
+ newBitmap = new long[oldSize + delta];
+ for (int i = 0; i < newBitmap.length; ++i) {
+ newBitmap[i] = 0;
+ }
+ System.arraycopy(modified, 0, newBitmap, offset, oldSize);
+ modified = newBitmap;
+
+ newBitmap = new long[deleted.length + delta];
+ for (int i = 0; i < newBitmap.length; ++i) {
+ newBitmap[i] = partial ? 0 : WORD_MASK;
+ }
+ System.arraycopy(deleted, 0, newBitmap, offset, oldSize);
+ deleted = newBitmap;
+ }
+
+ public void merge(final BitSetNode rightNode) {
+ int delta = (int) (rightNode.getEnd() - getEnd()) >> ADDRESS_BITS_PER_WORD;
+
+ long[] newBitmap;
+ int oldSize = modified.length;
+ int newSize = (delta - rightNode.modified.length);
+ int offset = oldSize + newSize;
+
+ newBitmap = new long[oldSize + delta];
+ System.arraycopy(modified, 0, newBitmap, 0, oldSize);
+ System.arraycopy(rightNode.modified, 0, newBitmap, offset, rightNode.modified.length);
+ modified = newBitmap;
+
+ newBitmap = new long[oldSize + delta];
+ System.arraycopy(deleted, 0, newBitmap, 0, oldSize);
+ System.arraycopy(rightNode.deleted, 0, newBitmap, offset, rightNode.deleted.length);
+ deleted = newBitmap;
+
+ for (int i = 0; i < newSize; ++i) {
+ modified[offset + i] = 0;
+ deleted[offset + i] = partial ? 0 : WORD_MASK;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "BitSetNode(" + getStart() + "-" + getEnd() + ")";
+ }
+
+ // ========================================================================
+ // Min/Max Helpers
+ // ========================================================================
+ public long getActiveMinProcId() {
+ long minProcId = start;
+ for (int i = 0; i < deleted.length; ++i) {
+ if (deleted[i] == 0) {
+ return (minProcId);
+ }
+
+ if (deleted[i] != WORD_MASK) {
+ for (int j = 0; j < BITS_PER_WORD; ++j) {
+ if ((deleted[i] & (1L << j)) != 0) {
+ return minProcId + j;
+ }
+ }
+ }
+
+ minProcId += BITS_PER_WORD;
+ }
+ return minProcId;
+ }
+
+ public long getActiveMaxProcId() {
+ long maxProcId = getEnd();
+ for (int i = deleted.length - 1; i >= 0; --i) {
+ if (deleted[i] == 0) {
+ return maxProcId;
+ }
+
+ if (deleted[i] != WORD_MASK) {
+ for (int j = BITS_PER_WORD - 1; j >= 0; --j) {
+ if ((deleted[i] & (1L << j)) == 0) {
+ return maxProcId - (BITS_PER_WORD - 1 - j);
+ }
+ }
+ }
+ maxProcId -= BITS_PER_WORD;
+ }
+ return maxProcId;
+ }
+
+ // ========================================================================
+ // Bitmap Helpers
+ // ========================================================================
+ private int getBitmapIndex(final long procId) {
+ return (int) (procId - start);
+ }
+
+ void updateState(long procId, boolean isDeleted) {
+ int bitmapIndex = getBitmapIndex(procId);
+ int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
+ long value = (1L << bitmapIndex);
+
+ modified[wordIndex] |= value;
+ if (isDeleted) {
+ deleted[wordIndex] |= value;
+ } else {
+ deleted[wordIndex] &= ~value;
+ }
+ }
+
+ // ========================================================================
+ // Helpers
+ // ========================================================================
+ /**
+ * @return upper boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to.
+ */
+ private static long alignUp(final long x) {
+ return (x + (BITS_PER_WORD - 1)) & -BITS_PER_WORD;
+ }
+
+ /**
+ * @return lower boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to.
+ */
+ private static long alignDown(final long x) {
+ return x & -BITS_PER_WORD;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/118b0746/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
index 9c6176d..8fbc147 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.procedure2.store;
import java.io.IOException;
@@ -64,17 +63,17 @@ public class NoopProcedureStore extends ProcedureStoreBase {
}
@Override
- public void insert(Procedure proc, Procedure[] subprocs) {
+ public void insert(Procedure<?> proc, Procedure<?>[] subprocs) {
// no-op
}
@Override
- public void insert(Procedure[] proc) {
+ public void insert(Procedure<?>[] proc) {
// no-op
}
@Override
- public void update(Procedure proc) {
+ public void update(Procedure<?> proc) {
// no-op
}
@@ -84,7 +83,7 @@ public class NoopProcedureStore extends ProcedureStoreBase {
}
@Override
- public void delete(Procedure proc, long[] subprocs) {
+ public void delete(Procedure<?> proc, long[] subprocs) {
// no-op
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/118b0746/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
index 7288340..8063b12 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
@@ -81,6 +81,7 @@ public interface ProcedureStore {
* @throws IOException if there was an error fetching/deserializing the procedure
* @return the next procedure in the iteration.
*/
+ @SuppressWarnings("rawtypes")
Procedure next() throws IOException;
}
@@ -173,7 +174,7 @@ public interface ProcedureStore {
* @param proc the procedure to serialize and write to the store.
* @param subprocs the newly created child of the proc.
*/
- void insert(Procedure proc, Procedure[] subprocs);
+ void insert(Procedure<?> proc, Procedure<?>[] subprocs);
/**
* Serialize a set of new procedures.
@@ -182,14 +183,14 @@ public interface ProcedureStore {
*
* @param procs the procedures to serialize and write to the store.
*/
- void insert(Procedure[] procs);
+ void insert(Procedure<?>[] procs);
/**
* The specified procedure was executed,
* and the new state should be written to the store.
* @param proc the procedure to serialize and write to the store.
*/
- void update(Procedure proc);
+ void update(Procedure<?> proc);
/**
* The specified procId was removed from the executor,
@@ -205,7 +206,7 @@ public interface ProcedureStore {
* @param parentProc the parent procedure to serialize and write to the store.
* @param subProcIds the IDs of the sub-procedure to remove.
*/
- void delete(Procedure parentProc, long[] subProcIds);
+ void delete(Procedure<?> parentProc, long[] subProcIds);
/**
* The specified procIds were removed from the executor,
http://git-wip-us.apache.org/repos/asf/hbase/blob/118b0746/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
index 2dad5ac..361419a 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
@@ -53,383 +53,14 @@ public class ProcedureStoreTracker {
* It's set to true only when recovering from old logs. See {@link #isDeleted(long)} docs to
* understand it's real use.
*/
- private boolean partial = false;
+ boolean partial = false;
- private long minUpdatedProcId = Long.MAX_VALUE;
- private long maxUpdatedProcId = Long.MIN_VALUE;
+ private long minModifiedProcId = Long.MAX_VALUE;
+ private long maxModifiedProcId = Long.MIN_VALUE;
public enum DeleteState { YES, NO, MAYBE }
- /**
- * A bitmap which can grow/merge with other {@link BitSetNode} (if certain conditions are met).
- * Boundaries of bitmap are aligned to multiples of {@link BitSetNode#BITS_PER_WORD}. So the
- * range of a {@link BitSetNode} is from [x * K, y * K) where x and y are integers, y > x and K
- * is BITS_PER_WORD.
- */
- public static class BitSetNode {
- private final static long WORD_MASK = 0xffffffffffffffffL;
- private final static int ADDRESS_BITS_PER_WORD = 6;
- private final static int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD;
- private final static int MAX_NODE_SIZE = 1 << ADDRESS_BITS_PER_WORD;
-
- /**
- * Mimics {@link ProcedureStoreTracker#partial}.
- */
- private final boolean partial;
-
- /* ----------------------
- * | updated | deleted | meaning
- * | 0 | 0 | proc exists, but hasn't been updated since last resetUpdates().
- * | 1 | 0 | proc was updated (but not deleted).
- * | 1 | 1 | proc was deleted.
- * | 0 | 1 | proc doesn't exist (maybe never created, maybe deleted in past).
- /* ----------------------
- */
-
- /**
- * Set of procedures which have been updated since last {@link #resetUpdates()}.
- * Useful to track procedures which have been updated since last WAL write.
- */
- private long[] updated;
- /**
- * Keeps track of procedure ids which belong to this bitmap's range and have been deleted.
- * This represents global state since it's not reset on WAL rolls.
- */
- private long[] deleted;
- /**
- * Offset of bitmap i.e. procedure id corresponding to first bit.
- */
- private long start;
-
- public void dump() {
- System.out.printf("%06d:%06d min=%d max=%d%n", getStart(), getEnd(),
- getActiveMinProcId(), getActiveMaxProcId());
- System.out.println("Update:");
- for (int i = 0; i < updated.length; ++i) {
- for (int j = 0; j < BITS_PER_WORD; ++j) {
- System.out.print((updated[i] & (1L << j)) != 0 ? "1" : "0");
- }
- System.out.println(" " + i);
- }
- System.out.println();
- System.out.println("Delete:");
- for (int i = 0; i < deleted.length; ++i) {
- for (int j = 0; j < BITS_PER_WORD; ++j) {
- System.out.print((deleted[i] & (1L << j)) != 0 ? "1" : "0");
- }
- System.out.println(" " + i);
- }
- System.out.println();
- }
-
- public BitSetNode(final long procId, final boolean partial) {
- start = alignDown(procId);
-
- int count = 1;
- updated = new long[count];
- deleted = new long[count];
- for (int i = 0; i < count; ++i) {
- updated[i] = 0;
- deleted[i] = partial ? 0 : WORD_MASK;
- }
-
- this.partial = partial;
- updateState(procId, false);
- }
-
- protected BitSetNode(final long start, final long[] updated, final long[] deleted) {
- this.start = start;
- this.updated = updated;
- this.deleted = deleted;
- this.partial = false;
- }
-
- public BitSetNode(ProcedureProtos.ProcedureStoreTracker.TrackerNode data) {
- start = data.getStartId();
- int size = data.getUpdatedCount();
- updated = new long[size];
- deleted = new long[size];
- for (int i = 0; i < size; ++i) {
- updated[i] = data.getUpdated(i);
- deleted[i] = data.getDeleted(i);
- }
- partial = false;
- }
-
- public BitSetNode(final BitSetNode other, final boolean resetDelete) {
- this.start = other.start;
- this.partial = other.partial;
- this.updated = other.updated.clone();
- if (resetDelete) {
- this.deleted = new long[other.deleted.length];
- for (int i = 0; i < this.deleted.length; ++i) {
- this.deleted[i] = ~(other.updated[i]);
- }
- } else {
- this.deleted = other.deleted.clone();
- }
- }
-
- public void update(final long procId) {
- updateState(procId, false);
- }
-
- public void delete(final long procId) {
- updateState(procId, true);
- }
-
- public long getStart() {
- return start;
- }
-
- public long getEnd() {
- return start + (updated.length << ADDRESS_BITS_PER_WORD) - 1;
- }
-
- public boolean contains(final long procId) {
- return start <= procId && procId <= getEnd();
- }
-
- public DeleteState isDeleted(final long procId) {
- int bitmapIndex = getBitmapIndex(procId);
- int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
- if (wordIndex >= deleted.length) {
- return DeleteState.MAYBE;
- }
- return (deleted[wordIndex] & (1L << bitmapIndex)) != 0 ? DeleteState.YES : DeleteState.NO;
- }
-
- private boolean isUpdated(final long procId) {
- int bitmapIndex = getBitmapIndex(procId);
- int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
- if (wordIndex >= updated.length) {
- return false;
- }
- return (updated[wordIndex] & (1L << bitmapIndex)) != 0;
- }
-
- public boolean isUpdated() {
- // TODO: cache the value
- for (int i = 0; i < updated.length; ++i) {
- if ((updated[i] | deleted[i]) != WORD_MASK) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * @return true, if there are no active procedures in this BitSetNode, else false.
- */
- public boolean isEmpty() {
- // TODO: cache the value
- for (int i = 0; i < deleted.length; ++i) {
- if (deleted[i] != WORD_MASK) {
- return false;
- }
- }
- return true;
- }
-
- public void resetUpdates() {
- for (int i = 0; i < updated.length; ++i) {
- updated[i] = 0;
- }
- }
-
- /**
- * Clears the {@link #deleted} bitmaps.
- */
- public void undeleteAll() {
- for (int i = 0; i < updated.length; ++i) {
- deleted[i] = 0;
- }
- }
-
- public void unsetPartialFlag() {
- for (int i = 0; i < updated.length; ++i) {
- for (int j = 0; j < BITS_PER_WORD; ++j) {
- if ((updated[i] & (1L << j)) == 0) {
- deleted[i] |= (1L << j);
- }
- }
- }
- }
-
- /**
- * Convert to
- * org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker.TrackerNode
- * protobuf.
- */
- public ProcedureProtos.ProcedureStoreTracker.TrackerNode convert() {
- ProcedureProtos.ProcedureStoreTracker.TrackerNode.Builder builder =
- ProcedureProtos.ProcedureStoreTracker.TrackerNode.newBuilder();
- builder.setStartId(start);
- for (int i = 0; i < updated.length; ++i) {
- builder.addUpdated(updated[i]);
- builder.addDeleted(deleted[i]);
- }
- return builder.build();
- }
-
- // ========================================================================
- // Grow/Merge Helpers
- // ========================================================================
- public boolean canGrow(final long procId) {
- return Math.abs(procId - start) < MAX_NODE_SIZE;
- }
-
- public boolean canMerge(final BitSetNode rightNode) {
- // Can just compare 'starts' since boundaries are aligned to multiples of BITS_PER_WORD.
- assert start < rightNode.start;
- return (rightNode.getEnd() - start) < MAX_NODE_SIZE;
- }
-
- public void grow(final long procId) {
- int delta, offset;
-
- if (procId < start) {
- // add to head
- long newStart = alignDown(procId);
- delta = (int)(start - newStart) >> ADDRESS_BITS_PER_WORD;
- offset = delta;
- start = newStart;
- } else {
- // Add to tail
- long newEnd = alignUp(procId + 1);
- delta = (int)(newEnd - getEnd()) >> ADDRESS_BITS_PER_WORD;
- offset = 0;
- }
-
- long[] newBitmap;
- int oldSize = updated.length;
-
- newBitmap = new long[oldSize + delta];
- for (int i = 0; i < newBitmap.length; ++i) {
- newBitmap[i] = 0;
- }
- System.arraycopy(updated, 0, newBitmap, offset, oldSize);
- updated = newBitmap;
-
- newBitmap = new long[deleted.length + delta];
- for (int i = 0; i < newBitmap.length; ++i) {
- newBitmap[i] = partial ? 0 : WORD_MASK;
- }
- System.arraycopy(deleted, 0, newBitmap, offset, oldSize);
- deleted = newBitmap;
- }
-
- public void merge(final BitSetNode rightNode) {
- int delta = (int)(rightNode.getEnd() - getEnd()) >> ADDRESS_BITS_PER_WORD;
-
- long[] newBitmap;
- int oldSize = updated.length;
- int newSize = (delta - rightNode.updated.length);
- int offset = oldSize + newSize;
-
- newBitmap = new long[oldSize + delta];
- System.arraycopy(updated, 0, newBitmap, 0, oldSize);
- System.arraycopy(rightNode.updated, 0, newBitmap, offset, rightNode.updated.length);
- updated = newBitmap;
-
- newBitmap = new long[oldSize + delta];
- System.arraycopy(deleted, 0, newBitmap, 0, oldSize);
- System.arraycopy(rightNode.deleted, 0, newBitmap, offset, rightNode.deleted.length);
- deleted = newBitmap;
-
- for (int i = 0; i < newSize; ++i) {
- updated[offset + i] = 0;
- deleted[offset + i] = partial ? 0 : WORD_MASK;
- }
- }
-
- @Override
- public String toString() {
- return "BitSetNode(" + getStart() + "-" + getEnd() + ")";
- }
-
- // ========================================================================
- // Min/Max Helpers
- // ========================================================================
- public long getActiveMinProcId() {
- long minProcId = start;
- for (int i = 0; i < deleted.length; ++i) {
- if (deleted[i] == 0) {
- return(minProcId);
- }
-
- if (deleted[i] != WORD_MASK) {
- for (int j = 0; j < BITS_PER_WORD; ++j) {
- if ((deleted[i] & (1L << j)) != 0) {
- return minProcId + j;
- }
- }
- }
-
- minProcId += BITS_PER_WORD;
- }
- return minProcId;
- }
-
- public long getActiveMaxProcId() {
- long maxProcId = getEnd();
- for (int i = deleted.length - 1; i >= 0; --i) {
- if (deleted[i] == 0) {
- return maxProcId;
- }
-
- if (deleted[i] != WORD_MASK) {
- for (int j = BITS_PER_WORD - 1; j >= 0; --j) {
- if ((deleted[i] & (1L << j)) == 0) {
- return maxProcId - (BITS_PER_WORD - 1 - j);
- }
- }
- }
- maxProcId -= BITS_PER_WORD;
- }
- return maxProcId;
- }
-
- // ========================================================================
- // Bitmap Helpers
- // ========================================================================
- private int getBitmapIndex(final long procId) {
- return (int)(procId - start);
- }
-
- private void updateState(final long procId, final boolean isDeleted) {
- int bitmapIndex = getBitmapIndex(procId);
- int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
- long value = (1L << bitmapIndex);
-
- updated[wordIndex] |= value;
- if (isDeleted) {
- deleted[wordIndex] |= value;
- } else {
- deleted[wordIndex] &= ~value;
- }
- }
-
-
- // ========================================================================
- // Helpers
- // ========================================================================
- /**
- * @return upper boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to.
- */
- private static long alignUp(final long x) {
- return (x + (BITS_PER_WORD - 1)) & -BITS_PER_WORD;
- }
-
- /**
- * @return lower boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to.
- */
- private static long alignDown(final long x) {
- return x & -BITS_PER_WORD;
- }
- }
-
- public void resetToProto(final ProcedureProtos.ProcedureStoreTracker trackerProtoBuf) {
+ public void resetToProto(ProcedureProtos.ProcedureStoreTracker trackerProtoBuf) {
reset();
for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode: trackerProtoBuf.getNodeList()) {
final BitSetNode node = new BitSetNode(protoNode);
@@ -440,14 +71,23 @@ public class ProcedureStoreTracker {
/**
* Resets internal state to same as given {@code tracker}. Does deep copy of the bitmap.
*/
- public void resetTo(final ProcedureStoreTracker tracker) {
+ public void resetTo(ProcedureStoreTracker tracker) {
resetTo(tracker, false);
}
- public void resetTo(final ProcedureStoreTracker tracker, final boolean resetDelete) {
+ /**
+ * Resets internal state to same as given {@code tracker}, and change the deleted flag according
+ * to the modified flag if {@code resetDelete} is true. Does deep copy of the bitmap.
+ * <p/>
+ * The {@code resetDelete} will be set to true when building cleanup tracker, please see the
+ * comments in {@link BitSetNode#BitSetNode(BitSetNode, boolean)} to learn how we change the
+ * deleted flag if {@code resetDelete} is true.
+ */
+ public void resetTo(ProcedureStoreTracker tracker, boolean resetDelete) {
+ reset();
this.partial = tracker.partial;
- this.minUpdatedProcId = tracker.minUpdatedProcId;
- this.maxUpdatedProcId = tracker.maxUpdatedProcId;
+ this.minModifiedProcId = tracker.minModifiedProcId;
+ this.maxModifiedProcId = tracker.maxModifiedProcId;
this.keepDeletes = tracker.keepDeletes;
for (Map.Entry<Long, BitSetNode> entry : tracker.map.entrySet()) {
map.put(entry.getKey(), new BitSetNode(entry.getValue(), resetDelete));
@@ -458,25 +98,24 @@ public class ProcedureStoreTracker {
insert(null, procId);
}
- public void insert(final long[] procIds) {
+ public void insert(long[] procIds) {
for (int i = 0; i < procIds.length; ++i) {
insert(procIds[i]);
}
}
- public void insert(final long procId, final long[] subProcIds) {
- BitSetNode node = null;
- node = update(node, procId);
+ public void insert(long procId, long[] subProcIds) {
+ BitSetNode node = update(null, procId);
for (int i = 0; i < subProcIds.length; ++i) {
node = insert(node, subProcIds[i]);
}
}
- private BitSetNode insert(BitSetNode node, final long procId) {
+ private BitSetNode insert(BitSetNode node, long procId) {
if (node == null || !node.contains(procId)) {
node = getOrCreateNode(procId);
}
- node.update(procId);
+ node.insertOrUpdate(procId);
trackProcIds(procId);
return node;
}
@@ -485,11 +124,11 @@ public class ProcedureStoreTracker {
update(null, procId);
}
- private BitSetNode update(BitSetNode node, final long procId) {
+ private BitSetNode update(BitSetNode node, long procId) {
node = lookupClosestNode(node, procId);
assert node != null : "expected node to update procId=" + procId;
assert node.contains(procId) : "expected procId=" + procId + " in the node";
- node.update(procId);
+ node.insertOrUpdate(procId);
trackProcIds(procId);
return node;
}
@@ -506,7 +145,7 @@ public class ProcedureStoreTracker {
}
}
- private BitSetNode delete(BitSetNode node, final long procId) {
+ private BitSetNode delete(BitSetNode node, long procId) {
node = lookupClosestNode(node, procId);
assert node != null : "expected node to delete procId=" + procId;
assert node.contains(procId) : "expected procId=" + procId + " in the node";
@@ -520,35 +159,62 @@ public class ProcedureStoreTracker {
return node;
}
- @InterfaceAudience.Private
- public void setDeleted(final long procId, final boolean isDeleted) {
+ /**
+ * Will be called when restarting where we need to rebuild the ProcedureStoreTracker.
+ */
+ public void setMinMaxModifiedProcIds(long min, long max) {
+ this.minModifiedProcId = min;
+ this.maxModifiedProcId = max;
+ }
+ /**
+ * This method is used when restarting where we need to rebuild the ProcedureStoreTracker. The
+ * {@link #delete(long)} method above assume that the {@link BitSetNode} exists, but when restart
+ * this is not true, as we will read the wal files in reverse order so a delete may come first.
+ */
+ public void setDeleted(long procId, boolean isDeleted) {
BitSetNode node = getOrCreateNode(procId);
assert node.contains(procId) : "expected procId=" + procId + " in the node=" + node;
node.updateState(procId, isDeleted);
trackProcIds(procId);
}
- public void setDeletedIfSet(final long... procId) {
+ /**
+ * Set the given bit for the procId to delete if it was modified before.
+ * <p/>
+ * This method is used to test whether a procedure wal file can be safely deleted, as if all the
+ * procedures in the given procedure wal file has been modified in the new procedure wal files,
+ * then we can delete it.
+ */
+ public void setDeletedIfModified(long... procId) {
BitSetNode node = null;
for (int i = 0; i < procId.length; ++i) {
node = lookupClosestNode(node, procId[i]);
- if (node != null && node.isUpdated(procId[i])) {
+ if (node != null && node.isModified(procId[i])) {
node.delete(procId[i]);
}
}
}
- public void setDeletedIfSet(final ProcedureStoreTracker tracker) {
+ /**
+ * Similar with {@link #setDeletedIfModified(long...)}, but here the {@code procId} are given by
+ * the {@code tracker}. If a procedure is modified by us, and also by the given {@code tracker},
+ * then we mark it as deleted.
+ * @see #setDeletedIfModified(long...)
+ */
+ public void setDeletedIfModifiedInBoth(ProcedureStoreTracker tracker) {
BitSetNode trackerNode = null;
- for (BitSetNode node: map.values()) {
+ for (BitSetNode node : map.values()) {
final long minProcId = node.getStart();
final long maxProcId = node.getEnd();
for (long procId = minProcId; procId <= maxProcId; ++procId) {
- if (!node.isUpdated(procId)) continue;
+ if (!node.isModified(procId)) {
+ continue;
+ }
trackerNode = tracker.lookupClosestNode(trackerNode, procId);
- if (trackerNode == null || !trackerNode.contains(procId) || trackerNode.isUpdated(procId)) {
- // the procedure was removed or updated
+ if (trackerNode == null || !trackerNode.contains(procId) ||
+ trackerNode.isModified(procId)) {
+ // the procedure was removed or modified
node.delete(procId);
}
}
@@ -568,28 +234,29 @@ public class ProcedureStoreTracker {
}
private void trackProcIds(long procId) {
- minUpdatedProcId = Math.min(minUpdatedProcId, procId);
- maxUpdatedProcId = Math.max(maxUpdatedProcId, procId);
+ minModifiedProcId = Math.min(minModifiedProcId, procId);
+ maxModifiedProcId = Math.max(maxModifiedProcId, procId);
}
- public long getUpdatedMinProcId() {
- return minUpdatedProcId;
+ public long getModifiedMinProcId() {
+ return minModifiedProcId;
}
- public long getUpdatedMaxProcId() {
- return maxUpdatedProcId;
+ public long getModifiedMaxProcId() {
+ return maxModifiedProcId;
}
public void reset() {
this.keepDeletes = false;
this.partial = false;
this.map.clear();
- resetUpdates();
+ resetModified();
}
- public boolean isUpdated(long procId) {
+ public boolean isModified(long procId) {
final Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
- return entry != null && entry.getValue().contains(procId) && entry.getValue().isUpdated(procId);
+ return entry != null && entry.getValue().contains(procId) &&
+ entry.getValue().isModified(procId);
}
/**
@@ -604,7 +271,7 @@ public class ProcedureStoreTracker {
if (entry != null && entry.getValue().contains(procId)) {
BitSetNode node = entry.getValue();
DeleteState state = node.isDeleted(procId);
- return partial && !node.isUpdated(procId) ? DeleteState.MAYBE : state;
+ return partial && !node.isModified(procId) ? DeleteState.MAYBE : state;
}
return partial ? DeleteState.MAYBE : DeleteState.YES;
}
@@ -656,11 +323,12 @@ public class ProcedureStoreTracker {
}
/**
- * @return true if any procedure was updated since last call to {@link #resetUpdates()}.
+ * @return true if all procedure was modified or deleted since last call to
+ * {@link #resetModified()}.
*/
- public boolean isUpdated() {
+ public boolean isAllModified() {
for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
- if (!entry.getValue().isUpdated()) {
+ if (!entry.getValue().isAllModified()) {
return false;
}
}
@@ -671,21 +339,15 @@ public class ProcedureStoreTracker {
* Clears the list of updated procedure ids. This doesn't affect global list of active
* procedure ids.
*/
- public void resetUpdates() {
- for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
- entry.getValue().resetUpdates();
- }
- minUpdatedProcId = Long.MAX_VALUE;
- maxUpdatedProcId = Long.MIN_VALUE;
- }
-
- public void undeleteAll() {
+ public void resetModified() {
for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
- entry.getValue().undeleteAll();
+ entry.getValue().resetModified();
}
+ minModifiedProcId = Long.MAX_VALUE;
+ maxModifiedProcId = Long.MIN_VALUE;
}
- private BitSetNode getOrCreateNode(final long procId) {
+ private BitSetNode getOrCreateNode(long procId) {
// If procId can fit in left node (directly or by growing it)
BitSetNode leftNode = null;
boolean leftCanGrow = false;
@@ -760,7 +422,7 @@ public class ProcedureStoreTracker {
public void dump() {
System.out.println("map " + map.size());
- System.out.println("isUpdated " + isUpdated());
+ System.out.println("isAllModified " + isAllModified());
System.out.println("isEmpty " + isEmpty());
for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
entry.getValue().dump();
http://git-wip-us.apache.org/repos/asf/hbase/blob/118b0746/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java
index dd34896..ba4480f 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java
@@ -15,19 +15,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.procedure2.store.wal;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
/**
* Thrown when a procedure WAL is corrupted
*/
@InterfaceAudience.Private
-@InterfaceStability.Stable
public class CorruptedWALProcedureStoreException extends HBaseIOException {
+
+ private static final long serialVersionUID = -3407300445435898074L;
+
/** default constructor */
public CorruptedWALProcedureStoreException() {
super();
http://git-wip-us.apache.org/repos/asf/hbase/blob/118b0746/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
index 6226350..1676744 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
@@ -15,20 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.procedure2.store.wal;
import java.io.IOException;
-
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALTrailer;
@@ -37,7 +35,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
* Describes a WAL File
*/
@InterfaceAudience.Private
-@InterfaceStability.Evolving
public class ProcedureWALFile implements Comparable<ProcedureWALFile> {
private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFile.class);
http://git-wip-us.apache.org/repos/asf/hbase/blob/118b0746/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
index ac3a529..c9986ed 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
@@ -18,25 +18,22 @@
package org.apache.hadoop.hbase.procedure2.store.wal;
-import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Iterator;
-
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALTrailer;
@@ -45,9 +42,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
* Helper class that contains the WAL serialization utils.
*/
@InterfaceAudience.Private
-@InterfaceStability.Evolving
public final class ProcedureWALFormat {
- private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFormat.class);
static final byte LOG_TYPE_STREAM = 0;
static final byte LOG_TYPE_COMPACTED = 1;
@@ -60,6 +55,9 @@ public final class ProcedureWALFormat {
@InterfaceAudience.Private
public static class InvalidWALDataException extends IOException {
+
+ private static final long serialVersionUID = 5471733223070202196L;
+
public InvalidWALDataException(String s) {
super(s);
}
@@ -75,9 +73,9 @@ public final class ProcedureWALFormat {
private ProcedureWALFormat() {}
- public static void load(final Iterator<ProcedureWALFile> logs,
- final ProcedureStoreTracker tracker, final Loader loader) throws IOException {
- final ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker, loader);
+ public static void load(Iterator<ProcedureWALFile> logs, ProcedureStoreTracker tracker,
+ Loader loader) throws IOException {
+ ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker, loader);
tracker.setKeepDeletes(true);
try {
// Ignore the last log which is current active log.
@@ -93,8 +91,10 @@ public final class ProcedureWALFormat {
reader.finish();
// The tracker is now updated with all the procedures read from the logs
- tracker.setPartialFlag(false);
- tracker.resetUpdates();
+ if (tracker.isPartial()) {
+ tracker.setPartialFlag(false);
+ }
+ tracker.resetModified();
} finally {
tracker.setKeepDeletes(false);
}
@@ -205,7 +205,7 @@ public final class ProcedureWALFormat {
}
public static void writeEntry(ByteSlot slot, ProcedureWALEntry.Type type,
- Procedure proc, Procedure[] subprocs) throws IOException {
+ Procedure<?> proc, Procedure<?>[] subprocs) throws IOException {
final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
builder.setType(type);
builder.addProcedure(ProcedureUtil.convertToProtoProcedure(proc));
@@ -217,17 +217,17 @@ public final class ProcedureWALFormat {
builder.build().writeDelimitedTo(slot);
}
- public static void writeInsert(ByteSlot slot, Procedure proc)
+ public static void writeInsert(ByteSlot slot, Procedure<?> proc)
throws IOException {
writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INIT, proc, null);
}
- public static void writeInsert(ByteSlot slot, Procedure proc, Procedure[] subprocs)
+ public static void writeInsert(ByteSlot slot, Procedure<?> proc, Procedure<?>[] subprocs)
throws IOException {
writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INSERT, proc, subprocs);
}
- public static void writeUpdate(ByteSlot slot, Procedure proc)
+ public static void writeUpdate(ByteSlot slot, Procedure<?> proc)
throws IOException {
writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_UPDATE, proc, null);
}
@@ -240,7 +240,7 @@ public final class ProcedureWALFormat {
builder.build().writeDelimitedTo(slot);
}
- public static void writeDelete(ByteSlot slot, Procedure proc, long[] subprocs)
+ public static void writeDelete(ByteSlot slot, Procedure<?> proc, long[] subprocs)
throws IOException {
final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
builder.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_DELETE);
http://git-wip-us.apache.org/repos/asf/hbase/blob/118b0746/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
index 4ab70f1..1ac8e01 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
@@ -15,22 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.procedure2.store.wal;
-import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
-
import java.io.IOException;
-
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
-import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
-import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
@@ -38,7 +34,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
* Helper class that loads the procedures stored in a WAL
*/
@InterfaceAudience.Private
-@InterfaceStability.Evolving
public class ProcedureWALFormatReader {
private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFormatReader.class);
@@ -98,8 +93,8 @@ public class ProcedureWALFormatReader {
// In the case above we need to read one more WAL to be able to consider
// the root procedure A and all children as ready.
// ==============================================================================================
- private final WalProcedureMap localProcedureMap = new WalProcedureMap(1024);
- private final WalProcedureMap procedureMap = new WalProcedureMap(1024);
+ private final WALProcedureMap localProcedureMap = new WALProcedureMap(1024);
+ private final WALProcedureMap procedureMap = new WALProcedureMap(1024);
private final ProcedureWALFormat.Loader loader;
@@ -111,33 +106,31 @@ public class ProcedureWALFormatReader {
* to rebuild the tracker.
*/
private final ProcedureStoreTracker tracker;
- // TODO: private final boolean hasFastStartSupport;
/**
- * If tracker for a log file is partial (see {@link ProcedureStoreTracker#partial}), we
- * re-build the list of procedures updated in that WAL because we need it for log cleaning
- * purposes. If all procedures updated in a WAL are found to be obsolete, it can be safely deleted.
- * (see {@link WALProcedureStore#removeInactiveLogs()}).
- * However, we don't need deleted part of a WAL's tracker for this purpose, so we don't bother
- * re-building it.
+ * If tracker for a log file is partial (see {@link ProcedureStoreTracker#partial}), we re-build
+ * the list of procedures modified in that WAL because we need it for log cleaning purposes. If
+ * all procedures modified in a WAL are found to be obsolete, it can be safely deleted. (see
+ * {@link WALProcedureStore#removeInactiveLogs()}).
+ * <p/>
+ * Notice that, the deleted part for this tracker will not be global valid as we can only count
+ * the deletes in the current file, but it is not big problem as finally, the above tracker will
+ * have the global state of deleted, and it will also be used to build the cleanup tracker.
*/
private ProcedureStoreTracker localTracker;
- // private long compactionLogId;
private long maxProcId = 0;
public ProcedureWALFormatReader(final ProcedureStoreTracker tracker,
ProcedureWALFormat.Loader loader) {
this.tracker = tracker;
this.loader = loader;
- // we support fast-start only if we have a clean shutdown.
- // this.hasFastStartSupport = !tracker.isEmpty();
}
- public void read(final ProcedureWALFile log) throws IOException {
- localTracker = log.getTracker().isPartial() ? log.getTracker() : null;
- if (localTracker != null) {
- LOG.info("Rebuilding tracker for " + log);
+ public void read(ProcedureWALFile log) throws IOException {
+ localTracker = log.getTracker();
+ if (localTracker.isPartial()) {
+ LOG.info("Rebuilding tracker for {}", log);
}
long count = 0;
@@ -147,7 +140,7 @@ public class ProcedureWALFormatReader {
while (hasMore) {
ProcedureWALEntry entry = ProcedureWALFormat.readEntry(stream);
if (entry == null) {
- LOG.warn("Nothing left to decode. Exiting with missing EOF, log=" + log);
+ LOG.warn("Nothing left to decode. Exiting with missing EOF, log={}", log);
break;
}
count++;
@@ -178,21 +171,17 @@ public class ProcedureWALFormatReader {
loader.markCorruptedWAL(log, e);
}
- if (localTracker != null) {
- localTracker.setPartialFlag(false);
- }
if (!localProcedureMap.isEmpty()) {
- log.setProcIds(localProcedureMap.getMinProcId(), localProcedureMap.getMaxProcId());
+ log.setProcIds(localProcedureMap.getMinModifiedProcId(),
+ localProcedureMap.getMaxModifiedProcId());
+ if (localTracker.isPartial()) {
+ localTracker.setMinMaxModifiedProcIds(localProcedureMap.getMinModifiedProcId(),
+ localProcedureMap.getMaxModifiedProcId());
+ }
procedureMap.mergeTail(localProcedureMap);
-
- //if (hasFastStartSupport) {
- // TODO: Some procedure may be already runnables (see readInitEntry())
- // (we can also check the "update map" in the log trackers)
- // --------------------------------------------------
- //EntryIterator iter = procedureMap.fetchReady();
- //if (iter != null) loader.load(iter);
- // --------------------------------------------------
- //}
+ }
+ if (localTracker.isPartial()) {
+ localTracker.setPartialFlag(false);
}
}
@@ -202,37 +191,46 @@ public class ProcedureWALFormatReader {
// fetch the procedure ready to run.
ProcedureIterator procIter = procedureMap.fetchReady();
- if (procIter != null) loader.load(procIter);
+ if (procIter != null) {
+ loader.load(procIter);
+ }
// remaining procedures have missing link or dependencies
// consider them as corrupted, manual fix is probably required.
procIter = procedureMap.fetchAll();
- if (procIter != null) loader.handleCorrupted(procIter);
+ if (procIter != null) {
+ loader.handleCorrupted(procIter);
+ }
+ }
+
+ private void setDeletedIfPartial(ProcedureStoreTracker tracker, long procId) {
+ if (tracker.isPartial()) {
+ tracker.setDeleted(procId, true);
+ }
}
- private void loadProcedure(final ProcedureWALEntry entry, final ProcedureProtos.Procedure proc) {
+ private void insertIfPartial(ProcedureStoreTracker tracker, ProcedureProtos.Procedure proc) {
+ if (tracker.isPartial()) {
+ tracker.insert(proc.getProcId());
+ }
+ }
+
+ private void loadProcedure(ProcedureWALEntry entry, ProcedureProtos.Procedure proc) {
maxProcId = Math.max(maxProcId, proc.getProcId());
if (isRequired(proc.getProcId())) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Read " + entry.getType() + " entry " + proc.getProcId());
- }
+ LOG.trace("Read {} entry {}", entry.getType(), proc.getProcId());
localProcedureMap.add(proc);
- if (tracker.isPartial()) {
- tracker.insert(proc.getProcId());
- }
- }
- if (localTracker != null) {
- localTracker.insert(proc.getProcId());
+ insertIfPartial(tracker, proc);
}
+ insertIfPartial(localTracker, proc);
}
- private void readInitEntry(final ProcedureWALEntry entry)
- throws IOException {
+ private void readInitEntry(ProcedureWALEntry entry) {
assert entry.getProcedureCount() == 1 : "Expected only one procedure";
loadProcedure(entry, entry.getProcedure(0));
}
- private void readInsertEntry(final ProcedureWALEntry entry) throws IOException {
+ private void readInsertEntry(ProcedureWALEntry entry) {
assert entry.getProcedureCount() >= 1 : "Expected one or more procedures";
loadProcedure(entry, entry.getProcedure(0));
for (int i = 1; i < entry.getProcedureCount(); ++i) {
@@ -240,12 +238,12 @@ public class ProcedureWALFormatReader {
}
}
- private void readUpdateEntry(final ProcedureWALEntry entry) throws IOException {
+ private void readUpdateEntry(ProcedureWALEntry entry) {
assert entry.getProcedureCount() == 1 : "Expected only one procedure";
loadProcedure(entry, entry.getProcedure(0));
}
- private void readDeleteEntry(final ProcedureWALEntry entry) throws IOException {
+ private void readDeleteEntry(ProcedureWALEntry entry) {
assert entry.hasProcId() : "expected ProcID";
if (entry.getChildIdCount() > 0) {
@@ -267,598 +265,19 @@ public class ProcedureWALFormatReader {
}
private void deleteEntry(final long procId) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("delete entry " + procId);
- }
+ LOG.trace("delete entry {}", procId);
maxProcId = Math.max(maxProcId, procId);
localProcedureMap.remove(procId);
assert !procedureMap.contains(procId);
- if (tracker.isPartial()) {
- tracker.setDeleted(procId, true);
- }
- if (localTracker != null) {
- // In case there is only delete entry for this procedure in current log.
- localTracker.setDeleted(procId, true);
- }
+ setDeletedIfPartial(tracker, procId);
+ setDeletedIfPartial(localTracker, procId);
}
- private boolean isDeleted(final long procId) {
+ private boolean isDeleted(long procId) {
return tracker.isDeleted(procId) == ProcedureStoreTracker.DeleteState.YES;
}
- private boolean isRequired(final long procId) {
+ private boolean isRequired(long procId) {
return !isDeleted(procId) && !procedureMap.contains(procId);
}
-
- // ==========================================================================
- // We keep an in-memory map of the procedures sorted by replay order.
- // (see the details in the beginning of the file)
- // _______________________________________________
- // procedureMap = | A | | E | | C | | | | | G | | |
- // D B
- // replayOrderHead = C <-> B <-> E <-> D <-> A <-> G
- //
- // We also have a lazy grouping by "root procedure", and a list of
- // unlinked procedures. If after reading all the WALs we have unlinked
- // procedures it means that we had a missing WAL or a corruption.
- // rootHead = A <-> D <-> G
- // B E
- // C
- // unlinkFromLinkList = None
- // ==========================================================================
- private static class Entry {
- // For bucketed linked lists in hash-table.
- protected Entry hashNext;
- // child head
- protected Entry childHead;
- // double-link for rootHead or childHead
- protected Entry linkNext;
- protected Entry linkPrev;
- // replay double-linked-list
- protected Entry replayNext;
- protected Entry replayPrev;
- // procedure-infos
- protected Procedure procedure;
- protected ProcedureProtos.Procedure proto;
- protected boolean ready = false;
-
- public Entry(Entry hashNext) {
- this.hashNext = hashNext;
- }
-
- public long getProcId() {
- return proto.getProcId();
- }
-
- public long getParentId() {
- return proto.getParentId();
- }
-
- public boolean hasParent() {
- return proto.hasParentId();
- }
-
- public boolean isReady() {
- return ready;
- }
-
- public boolean isFinished() {
- if (!hasParent()) {
- // we only consider 'root' procedures. because for the user 'finished'
- // means when everything up to the 'root' is finished.
- switch (proto.getState()) {
- case ROLLEDBACK:
- case SUCCESS:
- return true;
- default:
- break;
- }
- }
- return false;
- }
-
- public Procedure convert() throws IOException {
- if (procedure == null) {
- procedure = ProcedureUtil.convertToProcedure(proto);
- }
- return procedure;
- }
-
- @Override
- public String toString() {
- final StringBuilder sb = new StringBuilder();
- sb.append("Entry(");
- sb.append(getProcId());
- sb.append(", parentId=");
- sb.append(getParentId());
- sb.append(", class=");
- sb.append(proto.getClassName());
- sb.append(")");
- return sb.toString();
- }
- }
-
- private static class EntryIterator implements ProcedureIterator {
- private final Entry replayHead;
- private Entry current;
-
- public EntryIterator(Entry replayHead) {
- this.replayHead = replayHead;
- this.current = replayHead;
- }
-
- @Override
- public void reset() {
- this.current = replayHead;
- }
-
- @Override
- public boolean hasNext() {
- return current != null;
- }
-
- @Override
- public boolean isNextFinished() {
- return current != null && current.isFinished();
- }
-
- @Override
- public void skipNext() {
- current = current.replayNext;
- }
-
- @Override
- public Procedure next() throws IOException {
- try {
- return current.convert();
- } finally {
- current = current.replayNext;
- }
- }
- }
-
- private static class WalProcedureMap {
- // procedure hash table
- private Entry[] procedureMap;
-
- // replay-order double-linked-list
- private Entry replayOrderHead;
- private Entry replayOrderTail;
-
- // root linked-list
- private Entry rootHead;
-
- // pending unlinked children (root not present yet)
- private Entry childUnlinkedHead;
-
- // Track ProcId range
- private long minProcId = Long.MAX_VALUE;
- private long maxProcId = Long.MIN_VALUE;
-
- public WalProcedureMap(int size) {
- procedureMap = new Entry[size];
- replayOrderHead = null;
- replayOrderTail = null;
- rootHead = null;
- childUnlinkedHead = null;
- }
-
- public void add(ProcedureProtos.Procedure procProto) {
- trackProcIds(procProto.getProcId());
- Entry entry = addToMap(procProto.getProcId(), procProto.hasParentId());
- boolean newEntry = entry.proto == null;
- // We have seen procedure WALs where the entries are out of order; see HBASE-18152.
- // To compensate, only replace the Entry procedure if for sure this new procedure
- // is indeed an entry that came later. TODO: Fix the writing of procedure info so
- // it does not violate basic expectation, that WALs contain procedure changes going
- // from start to finish in sequence.
- if (newEntry || isIncreasing(entry.proto, procProto)) {
- entry.proto = procProto;
- }
- addToReplayList(entry);
- if(newEntry) {
- if (procProto.hasParentId()) {
- childUnlinkedHead = addToLinkList(entry, childUnlinkedHead);
- } else {
- rootHead = addToLinkList(entry, rootHead);
- }
- }
- }
-
- /**
- * @return True if this new procedure is 'richer' than the current one else
- * false and we log this incidence where it appears that the WAL has older entries
- * appended after newer ones. See HBASE-18152.
- */
- private static boolean isIncreasing(ProcedureProtos.Procedure current,
- ProcedureProtos.Procedure candidate) {
- // Check that the procedures we see are 'increasing'. We used to compare
- // procedure id first and then update time but it can legitimately go backwards if the
- // procedure is failed or rolled back so that was unreliable. Was going to compare
- // state but lets see if comparing update time enough (unfortunately this issue only
- // seen under load...)
- boolean increasing = current.getLastUpdate() <= candidate.getLastUpdate();
- if (!increasing) {
- LOG.warn("NOT INCREASING! current=" + current + ", candidate=" + candidate);
- }
- return increasing;
- }
-
- public boolean remove(long procId) {
- trackProcIds(procId);
- Entry entry = removeFromMap(procId);
- if (entry != null) {
- unlinkFromReplayList(entry);
- unlinkFromLinkList(entry);
- return true;
- }
- return false;
- }
-
- private void trackProcIds(long procId) {
- minProcId = Math.min(minProcId, procId);
- maxProcId = Math.max(maxProcId, procId);
- }
-
- public long getMinProcId() {
- return minProcId;
- }
-
- public long getMaxProcId() {
- return maxProcId;
- }
-
- public boolean contains(long procId) {
- return getProcedure(procId) != null;
- }
-
- public boolean isEmpty() {
- return replayOrderHead == null;
- }
-
- public void clear() {
- for (int i = 0; i < procedureMap.length; ++i) {
- procedureMap[i] = null;
- }
- replayOrderHead = null;
- replayOrderTail = null;
- rootHead = null;
- childUnlinkedHead = null;
- minProcId = Long.MAX_VALUE;
- maxProcId = Long.MIN_VALUE;
- }
-
- /*
- * Merges two WalProcedureMap,
- * the target is the "global" map, the source is the "local" map.
- * - The entries in the hashtables are guaranteed to be unique.
- * On replay we don't load procedures that already exist in the "global"
- * map (the one we are merging the "local" in to).
- * - The replayOrderList of the "local" nao will be appended to the "global"
- * map replay list.
- * - The "local" map will be cleared at the end of the operation.
- */
- public void mergeTail(WalProcedureMap other) {
- for (Entry p = other.replayOrderHead; p != null; p = p.replayNext) {
- int slotIndex = getMapSlot(p.getProcId());
- p.hashNext = procedureMap[slotIndex];
- procedureMap[slotIndex] = p;
- }
-
- if (replayOrderHead == null) {
- replayOrderHead = other.replayOrderHead;
- replayOrderTail = other.replayOrderTail;
- rootHead = other.rootHead;
- childUnlinkedHead = other.childUnlinkedHead;
- } else {
- // append replay list
- assert replayOrderTail.replayNext == null;
- assert other.replayOrderHead.replayPrev == null;
- replayOrderTail.replayNext = other.replayOrderHead;
- other.replayOrderHead.replayPrev = replayOrderTail;
- replayOrderTail = other.replayOrderTail;
-
- // merge rootHead
- if (rootHead == null) {
- rootHead = other.rootHead;
- } else if (other.rootHead != null) {
- Entry otherTail = findLinkListTail(other.rootHead);
- otherTail.linkNext = rootHead;
- rootHead.linkPrev = otherTail;
- rootHead = other.rootHead;
- }
-
- // merge childUnlinkedHead
- if (childUnlinkedHead == null) {
- childUnlinkedHead = other.childUnlinkedHead;
- } else if (other.childUnlinkedHead != null) {
- Entry otherTail = findLinkListTail(other.childUnlinkedHead);
- otherTail.linkNext = childUnlinkedHead;
- childUnlinkedHead.linkPrev = otherTail;
- childUnlinkedHead = other.childUnlinkedHead;
- }
- }
- maxProcId = Math.max(maxProcId, other.maxProcId);
- minProcId = Math.max(minProcId, other.minProcId);
-
- other.clear();
- }
-
- /*
- * Returns an EntryIterator with the list of procedures ready
- * to be added to the executor.
- * A Procedure is ready if its children and parent are ready.
- */
- public EntryIterator fetchReady() {
- buildGraph();
-
- Entry readyHead = null;
- Entry readyTail = null;
- Entry p = replayOrderHead;
- while (p != null) {
- Entry next = p.replayNext;
- if (p.isReady()) {
- unlinkFromReplayList(p);
- if (readyTail != null) {
- readyTail.replayNext = p;
- p.replayPrev = readyTail;
- } else {
- p.replayPrev = null;
- readyHead = p;
- }
- readyTail = p;
- p.replayNext = null;
- }
- p = next;
- }
- // we need the hash-table lookups for parents, so this must be done
- // out of the loop where we check isReadyToRun()
- for (p = readyHead; p != null; p = p.replayNext) {
- removeFromMap(p.getProcId());
- unlinkFromLinkList(p);
- }
- return readyHead != null ? new EntryIterator(readyHead) : null;
- }
-
- /*
- * Drain this map and return all procedures in it.
- */
- public EntryIterator fetchAll() {
- Entry head = replayOrderHead;
- for (Entry p = head; p != null; p = p.replayNext) {
- removeFromMap(p.getProcId());
- }
- for (int i = 0; i < procedureMap.length; ++i) {
- assert procedureMap[i] == null : "map not empty i=" + i;
- }
- replayOrderHead = null;
- replayOrderTail = null;
- childUnlinkedHead = null;
- rootHead = null;
- return head != null ? new EntryIterator(head) : null;
- }
-
- private void buildGraph() {
- Entry p = childUnlinkedHead;
- while (p != null) {
- Entry next = p.linkNext;
- Entry rootProc = getRootProcedure(p);
- if (rootProc != null) {
- rootProc.childHead = addToLinkList(p, rootProc.childHead);
- }
- p = next;
- }
-
- for (p = rootHead; p != null; p = p.linkNext) {
- checkReadyToRun(p);
- }
- }
-
- private Entry getRootProcedure(Entry entry) {
- while (entry != null && entry.hasParent()) {
- entry = getProcedure(entry.getParentId());
- }
- return entry;
- }
-
- /*
- * (see the comprehensive explanation in the beginning of the file)
- * A Procedure is ready when parent and children are ready.
- * "ready" means that we all the information that we need in-memory.
- *
- * Example-1:
- * We have two WALs, we start reading from the newest (wal-2)
- * wal-2 | C B |
- * wal-1 | A B C |
- *
- * If C and B don't depend on A (A is not the parent), we can start them
- * before reading wal-1. If B is the only one with parent A we can start C.
- * We have to read one more WAL before being able to start B.
- *
- * How do we know with the only information in B that we are not ready.
- * - easy case, the parent is missing from the global map
- * - more complex case we look at the Stack IDs.
- *
- * The Stack-IDs are added to the procedure order as an incremental index
- * tracking how many times that procedure was executed, which is equivalent
- * to the number of times we wrote the procedure to the WAL.
- * In the example above:
- * wal-2: B has stackId = [1, 2]
- * wal-1: B has stackId = [1]
- * wal-1: A has stackId = [0]
- *
- * Since we know that the Stack-IDs are incremental for a Procedure,
- * we notice that there is a gap in the stackIds of B, so something was
- * executed before.
- * To identify when a Procedure is ready we do the sum of the stackIds of
- * the procedure and the parent. if the stackIdSum is equal to the
- * sum of {1..maxStackId} then everything we need is available.
- *
- * Example-2
- * wal-2 | A | A stackIds = [0, 2]
- * wal-1 | A B | B stackIds = [1]
- *
- * There is a gap between A stackIds so something was executed in between.
- */
- private boolean checkReadyToRun(Entry rootEntry) {
- assert !rootEntry.hasParent() : "expected root procedure, got " + rootEntry;
-
- if (rootEntry.isFinished()) {
- // If the root procedure is finished, sub-procedures should be gone
- if (rootEntry.childHead != null) {
- LOG.error("unexpected active children for root-procedure: " + rootEntry);
- for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
- LOG.error("unexpected active children: " + p);
- }
- }
-
- assert rootEntry.childHead == null : "unexpected children on root completion. " + rootEntry;
- rootEntry.ready = true;
- return true;
- }
-
- int stackIdSum = 0;
- int maxStackId = 0;
- for (int i = 0; i < rootEntry.proto.getStackIdCount(); ++i) {
- int stackId = 1 + rootEntry.proto.getStackId(i);
- maxStackId = Math.max(maxStackId, stackId);
- stackIdSum += stackId;
- if (LOG.isTraceEnabled()) {
- LOG.trace("stackId=" + stackId + " stackIdSum=" + stackIdSum +
- " maxStackid=" + maxStackId + " " + rootEntry);
- }
- }
-
- for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
- for (int i = 0; i < p.proto.getStackIdCount(); ++i) {
- int stackId = 1 + p.proto.getStackId(i);
- maxStackId = Math.max(maxStackId, stackId);
- stackIdSum += stackId;
- if (LOG.isTraceEnabled()) {
- LOG.trace("stackId=" + stackId + " stackIdSum=" + stackIdSum +
- " maxStackid=" + maxStackId + " " + p);
- }
- }
- }
- // The cmpStackIdSum is this formula for finding the sum of a series of numbers:
- // http://www.wikihow.com/Sum-the-Integers-from-1-to-N#/Image:Sum-the-Integers-from-1-to-N-Step-2-Version-3.jpg
- final int cmpStackIdSum = (maxStackId * (maxStackId + 1) / 2);
- if (cmpStackIdSum == stackIdSum) {
- rootEntry.ready = true;
- for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
- p.ready = true;
- }
- return true;
- }
- return false;
- }
-
- private void unlinkFromReplayList(Entry entry) {
- if (replayOrderHead == entry) {
- replayOrderHead = entry.replayNext;
- }
- if (replayOrderTail == entry) {
- replayOrderTail = entry.replayPrev;
- }
- if (entry.replayPrev != null) {
- entry.replayPrev.replayNext = entry.replayNext;
- }
- if (entry.replayNext != null) {
- entry.replayNext.replayPrev = entry.replayPrev;
- }
- }
-
- private void addToReplayList(final Entry entry) {
- unlinkFromReplayList(entry);
- entry.replayNext = replayOrderHead;
- entry.replayPrev = null;
- if (replayOrderHead != null) {
- replayOrderHead.replayPrev = entry;
- } else {
- replayOrderTail = entry;
- }
- replayOrderHead = entry;
- }
-
- private void unlinkFromLinkList(Entry entry) {
- if (entry == rootHead) {
- rootHead = entry.linkNext;
- } else if (entry == childUnlinkedHead) {
- childUnlinkedHead = entry.linkNext;
- }
- if (entry.linkPrev != null) {
- entry.linkPrev.linkNext = entry.linkNext;
- }
- if (entry.linkNext != null) {
- entry.linkNext.linkPrev = entry.linkPrev;
- }
- }
-
- private Entry addToLinkList(Entry entry, Entry linkHead) {
- unlinkFromLinkList(entry);
- entry.linkNext = linkHead;
- entry.linkPrev = null;
- if (linkHead != null) {
- linkHead.linkPrev = entry;
- }
- return entry;
- }
-
- private Entry findLinkListTail(Entry linkHead) {
- Entry tail = linkHead;
- while (tail.linkNext != null) {
- tail = tail.linkNext;
- }
- return tail;
- }
-
- private Entry addToMap(final long procId, final boolean hasParent) {
- int slotIndex = getMapSlot(procId);
- Entry entry = getProcedure(slotIndex, procId);
- if (entry != null) return entry;
-
- entry = new Entry(procedureMap[slotIndex]);
- procedureMap[slotIndex] = entry;
- return entry;
- }
-
- private Entry removeFromMap(final long procId) {
- int slotIndex = getMapSlot(procId);
- Entry prev = null;
- Entry entry = procedureMap[slotIndex];
- while (entry != null) {
- if (procId == entry.getProcId()) {
- if (prev != null) {
- prev.hashNext = entry.hashNext;
- } else {
- procedureMap[slotIndex] = entry.hashNext;
- }
- entry.hashNext = null;
- return entry;
- }
- prev = entry;
- entry = entry.hashNext;
- }
- return null;
- }
-
- private Entry getProcedure(final long procId) {
- return getProcedure(getMapSlot(procId), procId);
- }
-
- private Entry getProcedure(final int slotIndex, final long procId) {
- Entry entry = procedureMap[slotIndex];
- while (entry != null) {
- if (procId == entry.getProcId()) {
- return entry;
- }
- entry = entry.hashNext;
- }
- return null;
- }
-
- private int getMapSlot(final long procId) {
- return (int)(Procedure.getProcIdHashCode(procId) % procedureMap.length);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/118b0746/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPrettyPrinter.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPrettyPrinter.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPrettyPrinter.java
index 582db77..3afcd16 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPrettyPrinter.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPrettyPrinter.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -30,22 +29,23 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
-import org.apache.hbase.thirdparty.org.apache.commons.cli.PosixParser;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
/**
* ProcedureWALPrettyPrinter prints the contents of a given ProcedureWAL file
@@ -164,7 +164,7 @@ public class ProcedureWALPrettyPrinter extends Configured implements Tool {
final List<Path> files = new ArrayList<>();
try {
- CommandLine cmd = new PosixParser().parse(options, args);
+ CommandLine cmd = new DefaultParser().parse(options, args);
if (cmd.hasOption("f")) {
files.add(new Path(cmd.getOptionValue("f")));