You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by je...@apache.org on 2014/12/01 20:17:35 UTC
hbase git commit: Revert "Remove Replay Tag"
Repository: hbase
Updated Branches:
refs/heads/master f723ffde7 -> 073345047
Revert "Remove Replay Tag"
This reverts commit f723ffde7ffa93ddd4c40456c917911817f314b1.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/07334504
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/07334504
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/07334504
Branch: refs/heads/master
Commit: 0733450473c422c3857f21c57651c9124bd8e25b
Parents: f723ffd
Author: Jeffrey Zhong <je...@apache.org>
Authored: Mon Dec 1 11:17:24 2014 -0800
Committer: Jeffrey Zhong <je...@apache.org>
Committed: Mon Dec 1 11:17:24 2014 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/CellComparator.java | 29 +++++++++++++++
.../java/org/apache/hadoop/hbase/KeyValue.java | 9 +++--
.../java/org/apache/hadoop/hbase/TagType.java | 2 +-
.../ZKSplitLogManagerCoordination.java | 10 +++---
.../hadoop/hbase/regionserver/HRegion.java | 12 ++-----
.../hbase/regionserver/RSRpcServices.java | 4 ++-
.../compactions/DefaultCompactor.java | 3 +-
.../compactions/StripeCompactor.java | 4 +--
.../apache/hadoop/hbase/wal/WALSplitter.java | 37 ++++++++++++++++++--
.../master/TestDistributedLogSplitting.java | 4 ++-
10 files changed, 87 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/07334504/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java
index d760aa2..9b7107b 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java
@@ -68,6 +68,19 @@ public class CellComparator implements Comparator<Cell>, Serializable {
if (!ignoreSequenceid) {
// Negate following comparisons so later edits show up first
+
+ // compare log replay tag value if there is any
+ // when either keyvalue tagged with log replay sequence number, we need to compare them:
+ // 1) when both keyvalues have the tag, then use the tag values for comparison
+ // 2) when one has and the other doesn't have, the one without the log
+ // replay tag wins because
+ // it means the edit isn't from recovery but new one coming from clients during recovery
+ // 3) when both doesn't have, then skip to the next mvcc comparison
+ long leftChangeSeqNum = getReplaySeqNum(a);
+ long RightChangeSeqNum = getReplaySeqNum(b);
+ if (leftChangeSeqNum != Long.MAX_VALUE || RightChangeSeqNum != Long.MAX_VALUE) {
+ return Longs.compare(RightChangeSeqNum, leftChangeSeqNum);
+ }
// mvccVersion: later sorts first
return Longs.compare(b.getMvccVersion(), a.getMvccVersion());
} else {
@@ -75,6 +88,22 @@ public class CellComparator implements Comparator<Cell>, Serializable {
}
}
+ /**
+ * Return replay log sequence number for the cell
+ *
+ * @param c
+ * @return Long.MAX_VALUE if there is no LOG_REPLAY_TAG
+ */
+ private static long getReplaySeqNum(final Cell c) {
+ Tag tag = Tag.getTag(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength(),
+ TagType.LOG_REPLAY_TAG_TYPE);
+
+ if (tag != null) {
+ return Bytes.toLong(tag.getBuffer(), tag.getTagOffset(), tag.getTagLength());
+ }
+ return Long.MAX_VALUE;
+ }
+
public static int findCommonPrefixInRowPart(Cell left, Cell right, int rowCommonPrefix) {
return findCommonPrefix(left.getRowArray(), right.getRowArray(), left.getRowLength()
- rowCommonPrefix, right.getRowLength() - rowCommonPrefix, left.getRowOffset()
http://git-wip-us.apache.org/repos/asf/hbase/blob/07334504/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
index 8566a88..516fd81 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
@@ -746,7 +746,6 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(),
c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(),
c.getValueLength(), c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
- this.seqId = c.getSequenceId();
}
/**
@@ -962,8 +961,8 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
checkForTagsLength(tagsLength);
// Allocate right-sized byte array.
int keyLength = (int) getKeyDataStructureSize(rlength, flength, qlength);
- byte[] bytes = new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength,
- tagsLength)];
+ byte [] bytes =
+ new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength, tagsLength)];
// Write key, value and key row length.
int pos = 0;
pos = Bytes.putInt(bytes, pos, keyLength);
@@ -2493,8 +2492,8 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
* Compare two keys assuming that the first n bytes are the same.
* @param commonPrefix How many bytes are the same.
*/
- int compareIgnoringPrefix(int commonPrefix, byte[] left, int loffset, int llength,
- byte[] right, int roffset, int rlength
+ int compareIgnoringPrefix(
+ int commonPrefix, byte[] left, int loffset, int llength, byte[] right, int roffset, int rlength
);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/07334504/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
index f088dcf..b113516 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
@@ -26,7 +26,7 @@ public final class TagType {
// Please declare new Tag Types here to avoid step on pre-existing tag types.
public static final byte ACL_TAG_TYPE = (byte) 1;
public static final byte VISIBILITY_TAG_TYPE = (byte) 2;
- // public static final byte LOG_REPLAY_TAG_TYPE = (byte) 3; // deprecated
+ public static final byte LOG_REPLAY_TAG_TYPE = (byte) 3;
public static final byte VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE = (byte)4;
// String based tag type used in replication
public static final byte STRING_VIS_TAG_TYPE = (byte) 7;
http://git-wip-us.apache.org/repos/asf/hbase/blob/07334504/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
index 694ccff..4f511f0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
@@ -773,8 +773,8 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
public void setRecoveryMode(boolean isForInitialization) throws IOException {
synchronized(this) {
if (this.isDrainingDone) {
- // when there is no outstanding splitlogtask after master start up, we already have up to
- // date recovery mode
+ // when there is no outstanding splitlogtask after master start up, we already have up to date
+ // recovery mode
return;
}
}
@@ -866,10 +866,12 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
boolean dlr =
conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
+ int version = conf.getInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
if (LOG.isDebugEnabled()) {
- LOG.debug("Distributed log replay=" + dlr);
+ LOG.debug("Distributed log replay=" + dlr + ", " + HFile.FORMAT_VERSION_KEY + "=" + version);
}
- return dlr;
+ // For distributed log replay, hfile version must be 3 at least; we need tag support.
+ return dlr && (version >= 3);
}
private boolean resubmit(ServerName serverName, String path, int version) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/07334504/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index d5c07f0..4a4d004 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2684,7 +2684,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
continue;
}
doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote
- addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells, isInReplay);
+ addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells);
}
// ------------------------------------
@@ -3189,13 +3189,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @param localizedWriteEntry The WriteEntry of the MVCC for this transaction.
* If null, then this method internally creates a mvcc transaction.
* @param output newly added KVs into memstore
- * @param isInReplay true when adding replayed KVs into memstore
* @return the additional memory usage of the memstore caused by the
* new entries.
* @throws IOException
*/
private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
- long mvccNum, List<Cell> memstoreCells, boolean isInReplay) throws IOException {
+ long mvccNum, List<Cell> memstoreCells) throws IOException {
long size = 0;
for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
@@ -3210,10 +3209,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
Pair<Long, Cell> ret = store.add(cell);
size += ret.getFirst();
memstoreCells.add(ret.getSecond());
- if(isInReplay) {
- // set memstore newly added cells with replay mvcc number
- CellUtil.setSequenceId(ret.getSecond(), mvccNum);
- }
}
}
@@ -3412,8 +3407,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
try {
- // replay the edits. Replay can return -1 if everything is skipped, only update
- // if seqId is greater
+ // replay the edits. Replay can return -1 if everything is skipped, only update if seqId is greater
seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter));
} catch (IOException e) {
boolean skipErrors = conf.getBoolean(
http://git-wip-us.apache.org/repos/asf/hbase/blob/07334504/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 8821803..dd895e0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -1430,6 +1430,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
? region.getCoprocessorHost()
: null; // do not invoke coprocessors if this is a secondary region replica
List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<Pair<WALKey, WALEdit>>();
+ // when tag is enabled, we need tag replay edits with log sequence number
+ boolean needAddReplayTag = (HFile.getFormatVersion(regionServer.conf) >= 3);
// Skip adding the edits to WAL if this is a secondary region replica
boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
@@ -1450,7 +1452,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
new Pair<WALKey, WALEdit>();
List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry,
- cells, walEntry, durability);
+ cells, walEntry, needAddReplayTag, durability);
if (coprocessorHost != null) {
// Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
// KeyValue.
http://git-wip-us.apache.org/repos/asf/hbase/blob/07334504/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
index df76073..cc03e09 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
@@ -92,9 +92,8 @@ public class DefaultCompactor extends Compactor {
smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
cleanSeqId = true;
}
-
writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
- true, fd.maxTagsLength > 0);
+ fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0);
boolean finished = performCompaction(scanner, writer, smallestReadPoint, cleanSeqId);
if (!finished) {
writer.close();
http://git-wip-us.apache.org/repos/asf/hbase/blob/07334504/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
index 40b4af0..a74bd3d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
@@ -112,13 +112,13 @@ public class StripeCompactor extends Compactor {
smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
cleanSeqId = true;
}
-
+ final boolean needMvcc = fd.maxMVCCReadpoint >= smallestReadPoint;
final Compression.Algorithm compression = store.getFamily().getCompactionCompression();
StripeMultiFileWriter.WriterFactory factory = new StripeMultiFileWriter.WriterFactory() {
@Override
public Writer createWriter() throws IOException {
return store.createWriterInTmp(
- fd.maxKeyCount, compression, true, true, fd.maxTagsLength > 0);
+ fd.maxKeyCount, compression, true, needMvcc, fd.maxTagsLength > 0);
}
};
http://git-wip-us.apache.org/repos/asf/hbase/blob/07334504/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 3f381cc..ef7b6ff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -2135,6 +2135,34 @@ public class WALSplitter {
public final long nonce;
}
+ /**
+ * Tag original sequence number for each edit to be replayed
+ * @param seqId
+ * @param cell
+ */
+ private static Cell tagReplayLogSequenceNumber(long seqId, Cell cell) {
+ // Tag puts with original sequence number if there is no LOG_REPLAY_TAG yet
+ boolean needAddRecoveryTag = true;
+ if (cell.getTagsLength() > 0) {
+ Tag tmpTag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(),
+ TagType.LOG_REPLAY_TAG_TYPE);
+ if (tmpTag != null) {
+ // found an existing log replay tag so reuse it
+ needAddRecoveryTag = false;
+ }
+ }
+ if (needAddRecoveryTag) {
+ List<Tag> newTags = new ArrayList<Tag>();
+ Tag replayTag = new Tag(TagType.LOG_REPLAY_TAG_TYPE, Bytes.toBytes(seqId));
+ newTags.add(replayTag);
+ if (cell.getTagsLength() > 0) {
+ newTags.addAll(Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()));
+ }
+ return new TagRewriteCell(cell, Tag.fromList(newTags));
+ }
+ return cell;
+ }
+
/**
* This function is used to construct mutations from a WALEntry. It also reconstructs WALKey &
* WALEdit from the passed in WALEntry
@@ -2142,11 +2170,12 @@ public class WALSplitter {
* @param cells
* @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances
* extracted from the passed in WALEntry.
+ * @param addLogReplayTag
* @return list of Pair<MutationType, Mutation> to be replayed
* @throws IOException
*/
public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells,
- Pair<WALKey, WALEdit> logEntry, Durability durability)
+ Pair<WALKey, WALEdit> logEntry, boolean addLogReplayTag, Durability durability)
throws IOException {
if (entry == null) {
@@ -2194,7 +2223,11 @@ public class WALSplitter {
if (CellUtil.isDelete(cell)) {
((Delete) m).addDeleteMarker(cell);
} else {
- ((Put) m).add(cell);
+ Cell tmpNewCell = cell;
+ if (addLogReplayTag) {
+ tmpNewCell = tagReplayLogSequenceNumber(replaySeqId, cell);
+ }
+ ((Put) m).add(tmpNewCell);
}
m.setDurability(durability);
previousCell = cell;
http://git-wip-us.apache.org/repos/asf/hbase/blob/07334504/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
index f37c1eb..19050d5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
@@ -1188,6 +1188,7 @@ public class TestDistributedLogSplitting {
LOG.info("testSameVersionUpdatesRecovery");
conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024);
conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
+ conf.setInt("hfile.format.version", 3);
startCluster(NUM_RS);
final AtomicLong sequenceId = new AtomicLong(100);
final int NUM_REGIONS_TO_CREATE = 40;
@@ -1281,10 +1282,11 @@ public class TestDistributedLogSplitting {
conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 30 * 1024);
conf.setInt("hbase.hstore.compactionThreshold", 3);
+ conf.setInt("hfile.format.version", 3);
startCluster(NUM_RS);
final AtomicLong sequenceId = new AtomicLong(100);
final int NUM_REGIONS_TO_CREATE = 40;
- final int NUM_LOG_LINES = 2000;
+ final int NUM_LOG_LINES = 1000;
// turn off load balancing to prevent regions from moving around otherwise
// they will consume recovered.edits
master.balanceSwitch(false);