You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2016/06/29 19:33:11 UTC
hbase git commit: HBASE-16130 Add comments to ProcedureStoreTracker.
Change-Id: I09d7c2375fd18a96aea48eaa161799496f491b4f
Repository: hbase
Updated Branches:
refs/heads/master 9b1ecb31f -> a3546a375
HBASE-16130 Add comments to ProcedureStoreTracker.
Change-Id: I09d7c2375fd18a96aea48eaa161799496f491b4f
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a3546a37
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a3546a37
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a3546a37
Branch: refs/heads/master
Commit: a3546a37521e5171b675ab05342c3fc96ca12bac
Parents: 9b1ecb3
Author: Apekshit <ap...@gmail.com>
Authored: Tue Apr 19 16:53:28 2016 -0700
Committer: Apekshit Sharma <ap...@apache.org>
Committed: Wed Jun 29 12:29:48 2016 -0700
----------------------------------------------------------------------
.../procedure2/store/ProcedureStoreTracker.java | 107 +++++++++++++++++--
.../store/wal/ProcedureWALFormat.java | 3 +-
.../procedure2/store/wal/WALProcedureStore.java | 10 +-
3 files changed, 102 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/a3546a37/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
index fe2904b..d64da10 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
@@ -33,14 +33,27 @@ import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
* Keeps track of live procedures.
*
* It can be used by the ProcedureStore to identify which procedures are already
- * deleted/completed to avoid the deserialization step on restart.
+ * deleted/completed to avoid the deserialization step on restart
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ProcedureStoreTracker {
+ // Key is procedure id corresponding to first bit of the bitmap.
private final TreeMap<Long, BitSetNode> map = new TreeMap<Long, BitSetNode>();
+ /**
+ * If true, do not remove bits corresponding to deleted procedures. Note that this can result
+ * in huge bitmaps overtime.
+ * Currently, it's set to true only when building tracker state from logs during recovery. During
+ * recovery, if we are sure that a procedure has been deleted, reading its old update entries
+ * can be skipped.
+ */
private boolean keepDeletes = false;
+ /**
+ * If true, it means tracker has incomplete information about the active/deleted procedures.
+ * It's set to true only when recovering from old logs. See {@link #isDeleted(long)} docs to
+ * understand it's real use.
+ */
private boolean partial = false;
private long minUpdatedProcId = Long.MAX_VALUE;
@@ -48,20 +61,39 @@ public class ProcedureStoreTracker {
public enum DeleteState { YES, NO, MAYBE }
+ /**
+ * A bitmap which can grow/merge with other {@link BitSetNode} (if certain conditions are met).
+ * Boundaries of bitmap are aligned to multiples of {@link BitSetNode#BITS_PER_WORD}. So the
+ * range of a {@link BitSetNode} is from [x * K, y * K) where x and y are integers, y > x and K
+ * is BITS_PER_WORD.
+ */
public static class BitSetNode {
private final static long WORD_MASK = 0xffffffffffffffffL;
private final static int ADDRESS_BITS_PER_WORD = 6;
private final static int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD;
private final static int MAX_NODE_SIZE = 1 << ADDRESS_BITS_PER_WORD;
+ /**
+ * Mimics {@link ProcedureStoreTracker#partial}.
+ */
private final boolean partial;
+ /**
+ * Set of procedures which have been updated since last {@link #resetUpdates()}.
+ * Useful to track procedures which have been updated since last WAL write.
+ */
private long[] updated;
+ /**
+ * Keeps track of procedure ids which belong to this bitmap's range and have been deleted.
+ */
private long[] deleted;
+ /**
+ * Offset of bitmap i.e. procedure id corresponding to first bit.
+ */
private long start;
public void dump() {
System.out.printf("%06d:%06d min=%d max=%d%n", getStart(), getEnd(),
- getMinProcId(), getMaxProcId());
+ getActiveMinProcId(), getActiveMaxProcId());
System.out.println("Update:");
for (int i = 0; i < updated.length; ++i) {
for (int j = 0; j < BITS_PER_WORD; ++j) {
@@ -150,6 +182,9 @@ public class ProcedureStoreTracker {
return true;
}
+ /**
+ * @return true, if there are no active procedures in this BitSetNode, else false.
+ */
public boolean isEmpty() {
// TODO: cache the value
for (int i = 0; i < deleted.length; ++i) {
@@ -166,6 +201,9 @@ public class ProcedureStoreTracker {
}
}
+ /**
+ * Clears the {@link #deleted} bitmaps.
+ */
public void undeleteAll() {
for (int i = 0; i < updated.length; ++i) {
deleted[i] = 0;
@@ -182,6 +220,10 @@ public class ProcedureStoreTracker {
}
}
+ // ========================================================================
+ // Convert to/from Protocol Buffer.
+ // ========================================================================
+
public ProcedureProtos.ProcedureStoreTracker.TrackerNode convert() {
ProcedureProtos.ProcedureStoreTracker.TrackerNode.Builder builder =
ProcedureProtos.ProcedureStoreTracker.TrackerNode.newBuilder();
@@ -213,7 +255,8 @@ public class ProcedureStoreTracker {
}
public boolean canMerge(final BitSetNode rightNode) {
- assert start < rightNode.getEnd();
+ // Can just compare 'starts' since boundaries are aligned to multiples of BITS_PER_WORD.
+ assert start < rightNode.start;
return (rightNode.getEnd() - start) < MAX_NODE_SIZE;
}
@@ -283,7 +326,7 @@ public class ProcedureStoreTracker {
// ========================================================================
// Min/Max Helpers
// ========================================================================
- public long getMinProcId() {
+ public long getActiveMinProcId() {
long minProcId = start;
for (int i = 0; i < deleted.length; ++i) {
if (deleted[i] == 0) {
@@ -303,7 +346,7 @@ public class ProcedureStoreTracker {
return minProcId;
}
- public long getMaxProcId() {
+ public long getActiveMaxProcId() {
long maxProcId = getEnd();
for (int i = deleted.length - 1; i >= 0; --i) {
if (deleted[i] == 0) {
@@ -346,10 +389,16 @@ public class ProcedureStoreTracker {
// ========================================================================
// Helpers
// ========================================================================
+ /**
+ * @return upper boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to.
+ */
private static long alignUp(final long x) {
return (x + (BITS_PER_WORD - 1)) & -BITS_PER_WORD;
}
+ /**
+ * @return lower boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to.
+ */
private static long alignDown(final long x) {
return x & -BITS_PER_WORD;
}
@@ -421,6 +470,13 @@ public class ProcedureStoreTracker {
resetUpdates();
}
+ /**
+ * If {@link #partial} is false, returns state from the bitmap. If no state is found for
+ * {@code procId}, returns YES.
+ * If partial is true, tracker doesn't have complete view of system state, so it returns MAYBE
+ * if there is no update for the procedure or if it doesn't have a state in bitmap. Otherwise,
+ * returns state from the bitmap.
+ */
public DeleteState isDeleted(long procId) {
Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
if (entry != null && entry.getValue().contains(procId)) {
@@ -431,14 +487,16 @@ public class ProcedureStoreTracker {
return partial ? DeleteState.MAYBE : DeleteState.YES;
}
- public long getMinProcId() {
+ public long getActiveMinProcId() {
// TODO: Cache?
Map.Entry<Long, BitSetNode> entry = map.firstEntry();
- return entry == null ? 0 : entry.getValue().getMinProcId();
+ return entry == null ? 0 : entry.getValue().getActiveMinProcId();
}
public void setKeepDeletes(boolean keepDeletes) {
this.keepDeletes = keepDeletes;
+ // If not to keep deletes, remove the BitSetNodes which are empty (i.e. contains ids of deleted
+ // procedures).
if (!keepDeletes) {
Iterator<Map.Entry<Long, BitSetNode>> it = map.entrySet().iterator();
while (it.hasNext()) {
@@ -459,6 +517,9 @@ public class ProcedureStoreTracker {
this.partial = isPartial;
}
+ /**
+ * @return true, if no procedure is active, else false.
+ */
public boolean isEmpty() {
for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
if (entry.getValue().isEmpty() == false) {
@@ -468,6 +529,9 @@ public class ProcedureStoreTracker {
return true;
}
+ /**
+ * @return true if any procedure was updated since last call to {@link #resetUpdates()}.
+ */
public boolean isUpdated() {
for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
if (entry.getValue().isUpdated() == false) {
@@ -482,6 +546,10 @@ public class ProcedureStoreTracker {
return map.floorEntry(minId) != null || map.floorEntry(maxId) != null;
}
+ /**
+ * Clears the list of updated procedure ids. This doesn't affect global list of active
+ * procedure ids.
+ */
public void resetUpdates() {
for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
entry.getValue().resetUpdates();
@@ -497,7 +565,7 @@ public class ProcedureStoreTracker {
}
private BitSetNode getOrCreateNode(final long procId) {
- // can procId fit in the left node?
+ // If procId can fit in left node (directly or by growing it)
BitSetNode leftNode = null;
boolean leftCanGrow = false;
Map.Entry<Long, BitSetNode> leftEntry = map.floorEntry(procId);
@@ -509,6 +577,7 @@ public class ProcedureStoreTracker {
leftCanGrow = leftNode.canGrow(procId);
}
+ // If procId can fit in right node (directly or by growing it)
BitSetNode rightNode = null;
boolean rightCanGrow = false;
Map.Entry<Long, BitSetNode> rightEntry = map.ceilingEntry(procId);
@@ -521,12 +590,11 @@ public class ProcedureStoreTracker {
return mergeNodes(leftNode, rightNode);
}
+ // If left and right nodes can not merge, decide which one to grow.
if (leftCanGrow && rightCanGrow) {
if ((procId - leftNode.getEnd()) <= (rightNode.getStart() - procId)) {
- // grow the left node
return growNode(leftNode, procId);
}
- // grow the right node
return growNode(rightNode, procId);
}
}
@@ -542,12 +610,16 @@ public class ProcedureStoreTracker {
return growNode(rightNode, procId);
}
- // add new node
+ // add new node if there are no left/right nodes which can be used.
BitSetNode node = new BitSetNode(procId, partial);
map.put(node.getStart(), node);
return node;
}
+ /**
+ * Grows {@code node} to contain {@code procId} and updates the map.
+ * @return {@link BitSetNode} instance which contains {@code procId}.
+ */
private BitSetNode growNode(BitSetNode node, long procId) {
map.remove(node.getStart());
node.grow(procId);
@@ -555,6 +627,9 @@ public class ProcedureStoreTracker {
return node;
}
+ /**
+ * Merges {@code leftNode} & {@code rightNode} and updates the map.
+ */
private BitSetNode mergeNodes(BitSetNode leftNode, BitSetNode rightNode) {
assert leftNode.getStart() < rightNode.getStart();
leftNode.merge(rightNode);
@@ -571,6 +646,11 @@ public class ProcedureStoreTracker {
}
}
+ /**
+ * Builds
+ * {@link org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker}
+ * protocol buffer from current state, serializes it and writes to the {@code stream}.
+ */
public void writeTo(final OutputStream stream) throws IOException {
ProcedureProtos.ProcedureStoreTracker.Builder builder =
ProcedureProtos.ProcedureStoreTracker.newBuilder();
@@ -580,6 +660,11 @@ public class ProcedureStoreTracker {
builder.build().writeDelimitedTo(stream);
}
+ /**
+ * Reads serialized
+ * {@link org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker}
+ * protocol buffer from the {@code stream}, and use it to build the state.
+ */
public void readFrom(final InputStream stream) throws IOException {
reset();
final ProcedureProtos.ProcedureStoreTracker data =
http://git-wip-us.apache.org/repos/asf/hbase/blob/a3546a37/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
index add7d03..0643eed 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
@@ -158,7 +158,8 @@ public final class ProcedureWALFormat {
public static ProcedureWALTrailer readTrailer(FSDataInputStream stream, long startPos, long size)
throws IOException {
- long trailerPos = size - 17; // Beginning of the Trailer Jump
+ // Beginning of the Trailer Jump. 17 = 1 byte version + 8 byte magic + 8 byte offset
+ long trailerPos = size - 17;
if (trailerPos < startPos) {
throw new InvalidWALDataException("Missing trailer: size=" + size + " startPos=" + startPos);
http://git-wip-us.apache.org/repos/asf/hbase/blob/a3546a37/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index 560072f..e0a6856 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -800,7 +800,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
ProcedureWALHeader header = ProcedureWALHeader.newBuilder()
.setVersion(ProcedureWALFormat.HEADER_VERSION)
.setType(ProcedureWALFormat.LOG_TYPE_STREAM)
- .setMinProcId(storeTracker.getMinProcId())
+ .setMinProcId(storeTracker.getActiveMinProcId())
.setLogId(logId)
.build();
@@ -876,6 +876,9 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
}
+ /**
+ * Remove all logs with logId <= {@code lastLogId}.
+ */
private void removeAllLogs(long lastLogId) {
if (logs.size() <= 1) return;
@@ -927,11 +930,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
private static long getLogIdFromName(final String name) {
int end = name.lastIndexOf(".log");
int start = name.lastIndexOf('-') + 1;
- while (start < end) {
- if (name.charAt(start) != '0')
- break;
- start++;
- }
return Long.parseLong(name.substring(start, end));
}