You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by je...@apache.org on 2014/07/07 03:42:20 UTC
git commit: HBase-11315: Keeping MVCC for configurable longer time
Repository: hbase
Updated Branches:
refs/heads/master a04e0b703 -> d07bc87cd
HBase-11315: Keeping MVCC for configurable longer time
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d07bc87c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d07bc87c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d07bc87c
Branch: refs/heads/master
Commit: d07bc87cd656b510d7ac610a41bc4195282a006b
Parents: a04e0b7
Author: Jeffrey Zhong <je...@apache.org>
Authored: Sun Jul 6 18:25:18 2014 -0700
Committer: Jeffrey Zhong <je...@apache.org>
Committed: Sun Jul 6 18:25:18 2014 -0700
----------------------------------------------------------------------
.../ipc/TestPayloadCarryingRpcController.java | 6 +
.../main/java/org/apache/hadoop/hbase/Cell.java | 11 ++
.../java/org/apache/hadoop/hbase/CellUtil.java | 4 +-
.../org/apache/hadoop/hbase/HConstants.java | 5 +
.../java/org/apache/hadoop/hbase/KeyValue.java | 24 ++--
.../org/apache/hadoop/hbase/KeyValueUtil.java | 4 +-
.../io/encoding/BufferedDataBlockEncoder.java | 10 ++
.../hbase/io/encoding/EncodedDataBlock.java | 4 +-
.../hadoop/hbase/codec/TestCellCodec.java | 2 +-
.../util/TestByteRangeWithKVSerialization.java | 4 +-
.../codec/prefixtree/decode/PrefixTreeCell.java | 5 +
.../data/TestRowDataDifferentTimestamps.java | 10 +-
.../hbase/protobuf/generated/WALProtos.java | 116 ++++++++++++++++---
hbase-protocol/src/main/protobuf/WAL.proto | 1 +
.../hadoop/hbase/io/hfile/HFileReaderV2.java | 2 +-
.../hbase/protobuf/ReplicationProtbufUtil.java | 3 +
.../hbase/regionserver/DefaultMemStore.java | 2 +-
.../hadoop/hbase/regionserver/HRegion.java | 57 +++++++--
.../MultiVersionConsistencyControl.java | 2 +-
.../hbase/regionserver/RSRpcServices.java | 28 +++--
.../hbase/regionserver/StoreFileScanner.java | 9 --
.../regionserver/compactions/Compactor.java | 31 +++--
.../compactions/DefaultCompactor.java | 7 +-
.../compactions/StripeCompactor.java | 7 +-
.../hbase/regionserver/wal/FSWALEntry.java | 4 +-
.../hadoop/hbase/regionserver/wal/HLogKey.java | 23 ++++
.../hbase/regionserver/wal/HLogSplitter.java | 20 ++--
.../hbase/security/access/AccessController.java | 2 +-
.../visibility/VisibilityController.java | 2 +-
.../hadoop/hbase/HBaseTestingUtility.java | 2 +-
.../hadoop/hbase/io/hfile/TestHFileBlock.java | 2 +-
.../hbase/regionserver/TestDefaultMemStore.java | 26 ++---
.../hadoop/hbase/regionserver/TestHRegion.java | 2 +
.../regionserver/TestReversibleScanners.java | 2 +-
.../regionserver/TestSeekOptimizations.java | 28 ++++-
35 files changed, 359 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java
index 249cc42..088e609 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java
@@ -144,6 +144,12 @@ public class TestPayloadCarryingRpcController {
}
@Override
+ public long getSequenceId() {
+ // unused
+ return 0;
+ }
+
+ @Override
public byte[] getValueArray() {
return Bytes.toBytes(this.i);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-common/src/main/java/org/apache/hadoop/hbase/Cell.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/Cell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/Cell.java
index 27b9345..f7e9272 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/Cell.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/Cell.java
@@ -142,14 +142,25 @@ public interface Cell {
//6) MvccVersion
/**
+ * @deprecated as of 1.0, use {@link Cell#getSequenceId()}
+ *
* Internal use only. A region-specific sequence ID given to each operation. It always exists for
* cells in the memstore but is not retained forever. It may survive several flushes, but
* generally becomes irrelevant after the cell's row is no longer involved in any operations that
* require strict consistency.
* @return mvccVersion (always >= 0 if exists), or 0 if it no longer exists
*/
+ @Deprecated
long getMvccVersion();
+ /**
+ * A region-specific unique monotonically increasing sequence ID given to each Cell. It always
+ * exists for cells in the memstore but is not retained forever. It will be kept for
+ * {@link HConstants#KEEP_SEQID_PERIOD} days, but generally becomes irrelevant after the cell's
+ * row is no longer involved in any operations that require strict consistency.
+ * @return seqId (always > 0 if exists), or 0 if it no longer exists
+ */
+ long getSequenceId();
//7) Value
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
index 376e073..d6564c2 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
@@ -167,7 +167,7 @@ public final class CellUtil {
final long timestamp, final byte type, final byte[] value, final long memstoreTS) {
KeyValue keyValue = new KeyValue(row, family, qualifier, timestamp,
KeyValue.Type.codeToType(type), value);
- keyValue.setMvccVersion(memstoreTS);
+ keyValue.setSequenceId(memstoreTS);
return keyValue;
}
@@ -175,7 +175,7 @@ public final class CellUtil {
final long timestamp, final byte type, final byte[] value, byte[] tags, final long memstoreTS) {
KeyValue keyValue = new KeyValue(row, family, qualifier, timestamp,
KeyValue.Type.codeToType(type), value, tags);
- keyValue.setMvccVersion(memstoreTS);
+ keyValue.setSequenceId(memstoreTS);
return keyValue;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index c2709f5..93209fd 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -353,6 +353,11 @@ public final class HConstants {
/** Default value for cluster ID */
public static final String CLUSTER_ID_DEFAULT = "default-cluster";
+
+ /** Parameter name for # days to keep MVCC values during a major compaction */
+ public static final String KEEP_SEQID_PERIOD = "hbase.hstore.compaction.keep.seqId.period";
+ /** At least to keep MVCC values in hfiles for 5 days */
+ public static final int MIN_KEEP_SEQID_PERIOD = 5;
// Always store the location of the root table's HRegion.
// This HRegion is never split.
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
index 887946e..002642a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
@@ -284,15 +284,23 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
// used to achieve atomic operations in the memstore.
@Override
public long getMvccVersion() {
- return mvcc;
+ return this.getSequenceId();
}
- public void setMvccVersion(long mvccVersion){
- this.mvcc = mvccVersion;
+ /**
+ * used to achieve atomic operations in the memstore.
+ */
+ @Override
+ public long getSequenceId() {
+ return seqId;
+ }
+
+ public void setSequenceId(long seqId) {
+ this.seqId = seqId;
}
// multi-version concurrency control version. default value is 0, aka do not care.
- private long mvcc = 0; // this value is not part of a serialized KeyValue (not in HFiles)
+ private long seqId = 0;
/** Dragon time over, return to normal business */
@@ -1083,7 +1091,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
// Important to clone the memstoreTS as well - otherwise memstore's
// update-in-place methods (eg increment) will end up creating
// new entries
- ret.setMvccVersion(mvcc);
+ ret.setSequenceId(seqId);
return ret;
}
@@ -1094,7 +1102,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
*/
public KeyValue shallowCopy() {
KeyValue shallowCopy = new KeyValue(this.bytes, this.offset, this.length);
- shallowCopy.setMvccVersion(this.mvcc);
+ shallowCopy.setSequenceId(this.seqId);
return shallowCopy;
}
@@ -1108,8 +1116,8 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
if (this.bytes == null || this.bytes.length == 0) {
return "empty";
}
- return keyToString(this.bytes, this.offset + ROW_OFFSET, getKeyLength()) +
- "/vlen=" + getValueLength() + "/mvcc=" + mvcc;
+ return keyToString(this.bytes, this.offset + ROW_OFFSET, getKeyLength()) + "/vlen="
+ + getValueLength() + "/seqid=" + seqId;
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java
index 4c1f345..c2a8826 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java
@@ -75,7 +75,7 @@ public class KeyValueUtil {
public static KeyValue copyToNewKeyValue(final Cell cell) {
byte[] bytes = copyToNewByteArray(cell);
KeyValue kvCell = new KeyValue(bytes, 0, bytes.length);
- kvCell.setMvccVersion(cell.getMvccVersion());
+ kvCell.setSequenceId(cell.getMvccVersion());
return kvCell;
}
@@ -175,7 +175,7 @@ public class KeyValueUtil {
keyValue = new KeyValue(bb.array(), underlyingArrayOffset, kvLength);
if (includesMvccVersion) {
long mvccVersion = ByteBufferUtils.readVLong(bb);
- keyValue.setMvccVersion(mvccVersion);
+ keyValue.setSequenceId(mvccVersion);
}
return keyValue;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
index fe019d1..b1b384a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
@@ -232,6 +232,11 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
}
@Override
+ public long getSequenceId() {
+ return memstoreTS;
+ }
+
+ @Override
public byte[] getValueArray() {
return currentBuffer.array();
}
@@ -422,6 +427,11 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
}
@Override
+ public long getSequenceId() {
+ return memstoreTS;
+ }
+
+ @Override
public byte[] getValueArray() {
return currentBuffer.array();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java
index ce7356c..d71d1a4 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/EncodedDataBlock.java
@@ -125,7 +125,7 @@ public class EncodedDataBlock {
(int) KeyValue.getKeyValueDataStructureSize(klen, vlen, tagsLen));
if (meta.isIncludesMvcc()) {
long mvccVersion = ByteBufferUtils.readVLong(decompressedData);
- kv.setMvccVersion(mvccVersion);
+ kv.setSequenceId(mvccVersion);
}
return kv;
}
@@ -244,7 +244,7 @@ public class EncodedDataBlock {
}
kv = new KeyValue(in.array(), kvOffset, (int) KeyValue.getKeyValueDataStructureSize(
klength, vlength, tagsLength));
- kv.setMvccVersion(memstoreTS);
+ kv.setSequenceId(memstoreTS);
this.dataBlockEncoder.encode(kv, encodingCtx, out);
}
BufferGrabbingByteArrayOutputStream stream = new BufferGrabbingByteArrayOutputStream();
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestCellCodec.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestCellCodec.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestCellCodec.java
index c27b91e..bca57d9 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestCellCodec.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestCellCodec.java
@@ -70,7 +70,7 @@ public class TestCellCodec {
Codec.Encoder encoder = codec.getEncoder(dos);
final KeyValue kv =
new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"), Bytes.toBytes("v"));
- kv.setMvccVersion(Long.MAX_VALUE);
+ kv.setSequenceId(Long.MAX_VALUE);
encoder.write(kv);
encoder.flush();
dos.close();
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteRangeWithKVSerialization.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteRangeWithKVSerialization.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteRangeWithKVSerialization.java
index e2af966..d60aba9 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteRangeWithKVSerialization.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteRangeWithKVSerialization.java
@@ -50,7 +50,7 @@ public class TestByteRangeWithKVSerialization {
long mvcc = pbr.getVLong();
KeyValue kv = new KeyValue(pbr.getBytes(), kvStartPos,
(int) KeyValue.getKeyValueDataStructureSize(keyLen, valLen, tagsLen));
- kv.setMvccVersion(mvcc);
+ kv.setSequenceId(mvcc);
return kv;
}
@@ -65,7 +65,7 @@ public class TestByteRangeWithKVSerialization {
Tag[] tags = new Tag[] { new Tag((byte) 1, "tag1") };
for (int i = 0; i < kvCount; i++) {
KeyValue kv = new KeyValue(Bytes.toBytes(i), FAMILY, QUALIFIER, i, VALUE, tags);
- kv.setMvccVersion(i);
+ kv.setSequenceId(i);
kvs.add(kv);
totalSize += kv.getLength() + Bytes.SIZEOF_LONG;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/decode/PrefixTreeCell.java
----------------------------------------------------------------------
diff --git a/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/decode/PrefixTreeCell.java b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/decode/PrefixTreeCell.java
index 740a08e..7763c6a 100644
--- a/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/decode/PrefixTreeCell.java
+++ b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/decode/PrefixTreeCell.java
@@ -123,6 +123,11 @@ public class PrefixTreeCell implements Cell, Comparable<Cell> {
}
@Override
+ public long getSequenceId() {
+ return getMvccVersion();
+ }
+
+ @Override
public int getValueLength() {
return valueLength;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-prefix-tree/src/test/java/org/apache/hadoop/hbase/codec/prefixtree/row/data/TestRowDataDifferentTimestamps.java
----------------------------------------------------------------------
diff --git a/hbase-prefix-tree/src/test/java/org/apache/hadoop/hbase/codec/prefixtree/row/data/TestRowDataDifferentTimestamps.java b/hbase-prefix-tree/src/test/java/org/apache/hadoop/hbase/codec/prefixtree/row/data/TestRowDataDifferentTimestamps.java
index 8b729bc..2668f2a 100644
--- a/hbase-prefix-tree/src/test/java/org/apache/hadoop/hbase/codec/prefixtree/row/data/TestRowDataDifferentTimestamps.java
+++ b/hbase-prefix-tree/src/test/java/org/apache/hadoop/hbase/codec/prefixtree/row/data/TestRowDataDifferentTimestamps.java
@@ -44,21 +44,21 @@ public class TestRowDataDifferentTimestamps extends BaseTestRowData{
static List<KeyValue> d = Lists.newArrayList();
static{
KeyValue kv0 = new KeyValue(Arow, cf, cq0, 0L, v0);
- kv0.setMvccVersion(123456789L);
+ kv0.setSequenceId(123456789L);
d.add(kv0);
KeyValue kv1 = new KeyValue(Arow, cf, cq1, 1L, v0);
- kv1.setMvccVersion(3L);
+ kv1.setSequenceId(3L);
d.add(kv1);
KeyValue kv2 = new KeyValue(Brow, cf, cq0, 12345678L, v0);
- kv2.setMvccVersion(65537L);
+ kv2.setSequenceId(65537L);
d.add(kv2);
//watch out... Long.MAX_VALUE comes back as 1332221664203, even with other encoders
// d.add(new KeyValue(Brow, cf, cq1, Long.MAX_VALUE, v0));
KeyValue kv3 = new KeyValue(Brow, cf, cq1, Long.MAX_VALUE-1, v0);
- kv3.setMvccVersion(1L);
+ kv3.setSequenceId(1L);
d.add(kv3);
KeyValue kv4 = new KeyValue(Brow, cf, cq1, 999999999, v0);
@@ -66,7 +66,7 @@ public class TestRowDataDifferentTimestamps extends BaseTestRowData{
d.add(kv4);
KeyValue kv5 = new KeyValue(Brow, cf, cq1, 12345, v0);
- kv5.setMvccVersion(0L);
+ kv5.setSequenceId(0L);
d.add(kv5);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
index 19afcb2..efea2ba 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
@@ -897,6 +897,16 @@ public final class WALProtos {
* <code>optional uint64 nonce = 10;</code>
*/
long getNonce();
+
+ // optional uint64 orig_sequence_number = 11;
+ /**
+ * <code>optional uint64 orig_sequence_number = 11;</code>
+ */
+ boolean hasOrigSequenceNumber();
+ /**
+ * <code>optional uint64 orig_sequence_number = 11;</code>
+ */
+ long getOrigSequenceNumber();
}
/**
* Protobuf type {@code WALKey}
@@ -1017,6 +1027,11 @@ public final class WALProtos {
nonce_ = input.readUInt64();
break;
}
+ case 88: {
+ bitField0_ |= 0x00000100;
+ origSequenceNumber_ = input.readUInt64();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -1323,6 +1338,22 @@ public final class WALProtos {
return nonce_;
}
+ // optional uint64 orig_sequence_number = 11;
+ public static final int ORIG_SEQUENCE_NUMBER_FIELD_NUMBER = 11;
+ private long origSequenceNumber_;
+ /**
+ * <code>optional uint64 orig_sequence_number = 11;</code>
+ */
+ public boolean hasOrigSequenceNumber() {
+ return ((bitField0_ & 0x00000100) == 0x00000100);
+ }
+ /**
+ * <code>optional uint64 orig_sequence_number = 11;</code>
+ */
+ public long getOrigSequenceNumber() {
+ return origSequenceNumber_;
+ }
+
private void initFields() {
encodedRegionName_ = com.google.protobuf.ByteString.EMPTY;
tableName_ = com.google.protobuf.ByteString.EMPTY;
@@ -1334,6 +1365,7 @@ public final class WALProtos {
clusterIds_ = java.util.Collections.emptyList();
nonceGroup_ = 0L;
nonce_ = 0L;
+ origSequenceNumber_ = 0L;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -1411,6 +1443,9 @@ public final class WALProtos {
if (((bitField0_ & 0x00000080) == 0x00000080)) {
output.writeUInt64(10, nonce_);
}
+ if (((bitField0_ & 0x00000100) == 0x00000100)) {
+ output.writeUInt64(11, origSequenceNumber_);
+ }
getUnknownFields().writeTo(output);
}
@@ -1460,6 +1495,10 @@ public final class WALProtos {
size += com.google.protobuf.CodedOutputStream
.computeUInt64Size(10, nonce_);
}
+ if (((bitField0_ & 0x00000100) == 0x00000100)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeUInt64Size(11, origSequenceNumber_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -1527,6 +1566,11 @@ public final class WALProtos {
result = result && (getNonce()
== other.getNonce());
}
+ result = result && (hasOrigSequenceNumber() == other.hasOrigSequenceNumber());
+ if (hasOrigSequenceNumber()) {
+ result = result && (getOrigSequenceNumber()
+ == other.getOrigSequenceNumber());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -1580,6 +1624,10 @@ public final class WALProtos {
hash = (37 * hash) + NONCE_FIELD_NUMBER;
hash = (53 * hash) + hashLong(getNonce());
}
+ if (hasOrigSequenceNumber()) {
+ hash = (37 * hash) + ORIG_SEQUENCE_NUMBER_FIELD_NUMBER;
+ hash = (53 * hash) + hashLong(getOrigSequenceNumber());
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -1728,6 +1776,8 @@ public final class WALProtos {
bitField0_ = (bitField0_ & ~0x00000100);
nonce_ = 0L;
bitField0_ = (bitField0_ & ~0x00000200);
+ origSequenceNumber_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000400);
return this;
}
@@ -1810,6 +1860,10 @@ public final class WALProtos {
to_bitField0_ |= 0x00000080;
}
result.nonce_ = nonce_;
+ if (((from_bitField0_ & 0x00000400) == 0x00000400)) {
+ to_bitField0_ |= 0x00000100;
+ }
+ result.origSequenceNumber_ = origSequenceNumber_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -1902,6 +1956,9 @@ public final class WALProtos {
if (other.hasNonce()) {
setNonce(other.getNonce());
}
+ if (other.hasOrigSequenceNumber()) {
+ setOrigSequenceNumber(other.getOrigSequenceNumber());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -2977,6 +3034,39 @@ public final class WALProtos {
return this;
}
+ // optional uint64 orig_sequence_number = 11;
+ private long origSequenceNumber_ ;
+ /**
+ * <code>optional uint64 orig_sequence_number = 11;</code>
+ */
+ public boolean hasOrigSequenceNumber() {
+ return ((bitField0_ & 0x00000400) == 0x00000400);
+ }
+ /**
+ * <code>optional uint64 orig_sequence_number = 11;</code>
+ */
+ public long getOrigSequenceNumber() {
+ return origSequenceNumber_;
+ }
+ /**
+ * <code>optional uint64 orig_sequence_number = 11;</code>
+ */
+ public Builder setOrigSequenceNumber(long value) {
+ bitField0_ |= 0x00000400;
+ origSequenceNumber_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional uint64 orig_sequence_number = 11;</code>
+ */
+ public Builder clearOrigSequenceNumber() {
+ bitField0_ = (bitField0_ & ~0x00000400);
+ origSequenceNumber_ = 0L;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:WALKey)
}
@@ -5176,24 +5266,24 @@ public final class WALProtos {
java.lang.String[] descriptorData = {
"\n\tWAL.proto\032\013HBase.proto\"Y\n\tWALHeader\022\027\n" +
"\017has_compression\030\001 \001(\010\022\026\n\016encryption_key" +
- "\030\002 \001(\014\022\033\n\023has_tag_compression\030\003 \001(\010\"\202\002\n\006" +
+ "\030\002 \001(\014\022\033\n\023has_tag_compression\030\003 \001(\010\"\240\002\n\006" +
"WALKey\022\033\n\023encoded_region_name\030\001 \002(\014\022\022\n\nt" +
"able_name\030\002 \002(\014\022\033\n\023log_sequence_number\030\003" +
" \002(\004\022\022\n\nwrite_time\030\004 \002(\004\022\035\n\ncluster_id\030\005" +
" \001(\0132\005.UUIDB\002\030\001\022\034\n\006scopes\030\006 \003(\0132\014.Family" +
"Scope\022\032\n\022following_kv_count\030\007 \001(\r\022\032\n\013clu" +
"ster_ids\030\010 \003(\0132\005.UUID\022\022\n\nnonceGroup\030\t \001(" +
- "\004\022\r\n\005nonce\030\n \001(\004\"=\n\013FamilyScope\022\016\n\006famil",
- "y\030\001 \002(\014\022\036\n\nscope_type\030\002 \002(\0162\n.ScopeType\"" +
- "\251\001\n\024CompactionDescriptor\022\022\n\ntable_name\030\001" +
- " \002(\014\022\033\n\023encoded_region_name\030\002 \002(\014\022\023\n\013fam" +
- "ily_name\030\003 \002(\014\022\030\n\020compaction_input\030\004 \003(\t" +
- "\022\031\n\021compaction_output\030\005 \003(\t\022\026\n\016store_hom" +
- "e_dir\030\006 \002(\t\"\014\n\nWALTrailer*F\n\tScopeType\022\033" +
- "\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATI" +
- "ON_SCOPE_GLOBAL\020\001B?\n*org.apache.hadoop.h" +
- "base.protobuf.generatedB\tWALProtosH\001\210\001\000\240" +
- "\001\001"
+ "\004\022\r\n\005nonce\030\n \001(\004\022\034\n\024orig_sequence_number",
+ "\030\013 \001(\004\"=\n\013FamilyScope\022\016\n\006family\030\001 \002(\014\022\036\n" +
+ "\nscope_type\030\002 \002(\0162\n.ScopeType\"\251\001\n\024Compac" +
+ "tionDescriptor\022\022\n\ntable_name\030\001 \002(\014\022\033\n\023en" +
+ "coded_region_name\030\002 \002(\014\022\023\n\013family_name\030\003" +
+ " \002(\014\022\030\n\020compaction_input\030\004 \003(\t\022\031\n\021compac" +
+ "tion_output\030\005 \003(\t\022\026\n\016store_home_dir\030\006 \002(" +
+ "\t\"\014\n\nWALTrailer*F\n\tScopeType\022\033\n\027REPLICAT" +
+ "ION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_G" +
+ "LOBAL\020\001B?\n*org.apache.hadoop.hbase.proto" +
+ "buf.generatedB\tWALProtosH\001\210\001\000\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -5211,7 +5301,7 @@ public final class WALProtos {
internal_static_WALKey_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_WALKey_descriptor,
- new java.lang.String[] { "EncodedRegionName", "TableName", "LogSequenceNumber", "WriteTime", "ClusterId", "Scopes", "FollowingKvCount", "ClusterIds", "NonceGroup", "Nonce", });
+ new java.lang.String[] { "EncodedRegionName", "TableName", "LogSequenceNumber", "WriteTime", "ClusterId", "Scopes", "FollowingKvCount", "ClusterIds", "NonceGroup", "Nonce", "OrigSequenceNumber", });
internal_static_FamilyScope_descriptor =
getDescriptor().getMessageTypes().get(2);
internal_static_FamilyScope_fieldAccessorTable = new
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-protocol/src/main/protobuf/WAL.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/WAL.proto b/hbase-protocol/src/main/protobuf/WAL.proto
index 8c7b84b..0ae65ec 100644
--- a/hbase-protocol/src/main/protobuf/WAL.proto
+++ b/hbase-protocol/src/main/protobuf/WAL.proto
@@ -54,6 +54,7 @@ message WALKey {
optional uint64 nonceGroup = 9;
optional uint64 nonce = 10;
+ optional uint64 orig_sequence_number = 11;
/*
optional CustomEntryType custom_entry_type = 9;
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
index 0bfefaa..2f6ea39 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
@@ -765,7 +765,7 @@ public class HFileReaderV2 extends AbstractHFileReader {
KeyValue ret = new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+ blockBuffer.position(), getCellBufSize());
if (this.reader.shouldIncludeMemstoreTS()) {
- ret.setMvccVersion(currMemstoreTS);
+ ret.setSequenceId(currMemstoreTS);
}
return ret;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
index 2dc2388..348b6ba 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
@@ -106,6 +106,9 @@ public class ReplicationProtbufUtil {
uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
keyBuilder.addClusterIds(uuidBuilder.build());
}
+ if(key.getOrigLogSeqNum() > 0) {
+ keyBuilder.setOrigSequenceNumber(key.getOrigLogSeqNum());
+ }
WALEdit edit = entry.getEdit();
NavigableMap<byte[], Integer> scopes = key.getScopes();
if (scopes != null && !scopes.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index ad084a5..d90357b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -271,7 +271,7 @@ public class DefaultMemStore implements MemStore {
assert alloc.getBytes() != null;
alloc.put(0, kv.getBuffer(), kv.getOffset(), len);
KeyValue newKv = new KeyValue(alloc.getBytes(), alloc.getOffset(), len);
- newKv.setMvccVersion(kv.getMvccVersion());
+ newKv.setSequenceId(kv.getMvccVersion());
return newKv;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 2429ed5..93eada8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2155,6 +2155,7 @@ public class HRegion implements HeapSize { // , Writable{
/** This method is potentially expensive and should only be used for non-replay CP path. */
public abstract Mutation[] getMutationsForCoprocs();
public abstract boolean isInReplay();
+ public abstract long getReplaySequenceId();
public boolean isDone() {
return nextIndexToProcess == operations.length;
@@ -2194,11 +2195,18 @@ public class HRegion implements HeapSize { // , Writable{
public boolean isInReplay() {
return false;
}
+
+ @Override
+ public long getReplaySequenceId() {
+ return 0;
+ }
}
private static class ReplayBatch extends BatchOperationInProgress<HLogSplitter.MutationReplay> {
- public ReplayBatch(MutationReplay[] operations) {
+ private long replaySeqId = 0;
+ public ReplayBatch(MutationReplay[] operations, long seqId) {
super(operations);
+ this.replaySeqId = seqId;
}
@Override
@@ -2226,6 +2234,11 @@ public class HRegion implements HeapSize { // , Writable{
public boolean isInReplay() {
return true;
}
+
+ @Override
+ public long getReplaySequenceId() {
+ return this.replaySeqId;
+ }
}
/**
@@ -2252,13 +2265,14 @@ public class HRegion implements HeapSize { // , Writable{
/**
* Replay a batch of mutations.
* @param mutations mutations to replay.
+ * @param replaySeqId SeqId for current mutations
* @return an array of OperationStatus which internally contains the
* OperationStatusCode and the exceptionMessage if any.
* @throws IOException
*/
- public OperationStatus[] batchReplay(HLogSplitter.MutationReplay[] mutations)
+ public OperationStatus[] batchReplay(HLogSplitter.MutationReplay[] mutations, long replaySeqId)
throws IOException {
- return batchMutate(new ReplayBatch(mutations));
+ return batchMutate(new ReplayBatch(mutations, replaySeqId));
}
/**
@@ -2475,7 +2489,7 @@ public class HRegion implements HeapSize { // , Writable{
// ------------------------------------
// STEP 2. Update any LATEST_TIMESTAMP timestamps
// ----------------------------------
- for (int i = firstIndex; i < lastIndexExclusive; i++) {
+ for (int i = firstIndex; !isInReplay && i < lastIndexExclusive; i++) {
// skip invalid
if (batchOp.retCodeDetails[i].getOperationStatusCode()
!= OperationStatusCode.NOT_RUN) continue;
@@ -2485,16 +2499,18 @@ public class HRegion implements HeapSize { // , Writable{
updateKVTimestamps(familyMaps[i].values(), byteNow);
noOfPuts++;
} else {
- if (!isInReplay) {
- prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
- }
+ prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
noOfDeletes++;
}
}
lock(this.updatesLock.readLock(), numReadyToWrite);
locked = true;
- mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
+ if(isInReplay) {
+ mvccNum = batchOp.getReplaySequenceId();
+ } else {
+ mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
+ }
//
// ------------------------------------
// Acquire the latest mvcc number
@@ -2591,6 +2607,9 @@ public class HRegion implements HeapSize { // , Writable{
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now,
mutation.getClusterIds(), currentNonceGroup, currentNonce);
+ if(isInReplay) {
+ walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId());
+ }
txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
getSequenceId(), true, memstoreCells);
}
@@ -2952,7 +2971,7 @@ public class HRegion implements HeapSize { // , Writable{
Store store = getStore(family);
for (Cell cell: cells) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- kv.setMvccVersion(mvccNum);
+ kv.setSequenceId(mvccNum);
Pair<Long, Cell> ret = store.add(kv);
size += ret.getFirst();
memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
@@ -3213,6 +3232,7 @@ public class HRegion implements HeapSize { // , Writable{
try {
reader = HLogFactory.createReader(fs, edits, conf);
long currentEditSeqId = -1;
+ long currentReplaySeqId = -1;
long firstSeqIdInLog = -1;
long skippedEdits = 0;
long editsCount = 0;
@@ -3275,6 +3295,8 @@ public class HRegion implements HeapSize { // , Writable{
firstSeqIdInLog = key.getLogSeqNum();
}
currentEditSeqId = key.getLogSeqNum();
+ currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ?
+ key.getOrigLogSeqNum() : currentEditSeqId;
boolean flush = false;
for (KeyValue kv: val.getKeyValues()) {
// Check this edit is for me. Also, guard against writing the special
@@ -3309,6 +3331,7 @@ public class HRegion implements HeapSize { // , Writable{
skippedEdits++;
continue;
}
+ kv.setSequenceId(currentReplaySeqId);
// Once we are over the limit, restoreEdit will keep returning true to
// flush -- but don't flush until we've played all the kvs that make up
// the WALEdit.
@@ -4922,7 +4945,7 @@ public class HRegion implements HeapSize { // , Writable{
writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
// 6. Apply to memstore
for (KeyValue kv : mutations) {
- kv.setMvccVersion(mvccNum);
+ kv.setSequenceId(mvccNum);
Store store = getStore(kv);
if (store == null) {
checkFamily(CellUtil.cloneFamily(kv));
@@ -5168,7 +5191,7 @@ public class HRegion implements HeapSize { // , Writable{
// so only need to update the timestamp to 'now'
newKV.updateLatestStamp(Bytes.toBytes(now));
}
- newKV.setMvccVersion(mvccNum);
+ newKV.setSequenceId(mvccNum);
// Give coprocessors a chance to update the new cell
if (coprocessorHost != null) {
newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
@@ -5382,7 +5405,7 @@ public class HRegion implements HeapSize { // , Writable{
System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getTagsArray(),
newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen);
}
- newKV.setMvccVersion(mvccNum);
+ newKV.setSequenceId(mvccNum);
// Give coprocessors a chance to update the new cell
if (coprocessorHost != null) {
newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
@@ -6220,4 +6243,14 @@ public class HRegion implements HeapSize { // , Writable{
WALEdit.EMPTY_WALEDIT, this.sequenceId, false, cells);
return key;
}
+
+ /**
+ * Explictly sync wal
+ * @throws IOException
+ */
+ public void syncWal() throws IOException {
+ if(this.log != null) {
+ this.log.sync();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java
index 2d247e9..0b5f5d0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java
@@ -246,7 +246,7 @@ public class MultiVersionConsistencyControl {
public static class WriteEntry {
private long writeNumber;
- private boolean completed = false;
+ private volatile boolean completed = false;
WriteEntry(long writeNumber) {
this.writeNumber = writeNumber;
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 9a0e8a4..b84f9a2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -633,12 +633,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse.
* @param region
* @param mutations
+ * @param replaySeqId
* @return an array of OperationStatus which internally contains the OperationStatusCode and the
* exceptionMessage if any
* @throws IOException
*/
private OperationStatus [] doReplayBatchOp(final HRegion region,
- final List<HLogSplitter.MutationReplay> mutations) throws IOException {
+ final List<HLogSplitter.MutationReplay> mutations, long replaySeqId) throws IOException {
HLogSplitter.MutationReplay[] mArray = new HLogSplitter.MutationReplay[mutations.size()];
long before = EnvironmentEdgeManager.currentTimeMillis();
@@ -657,7 +658,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (!region.getRegionInfo().isMetaTable()) {
regionServer.cacheFlusher.reclaimMemStoreMemory();
}
- return region.batchReplay(mArray);
+ return region.batchReplay(mArray, replaySeqId);
} finally {
if (regionServer.metricsRegionServer != null) {
long after = EnvironmentEdgeManager.currentTimeMillis();
@@ -1330,7 +1331,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
entries.get(0).getKey().getEncodedRegionName().toStringUtf8());
RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey, WALEdit>>();
- List<HLogSplitter.MutationReplay> mutations = new ArrayList<HLogSplitter.MutationReplay>();
// when tag is enabled, we need tag replay edits with log sequence number
boolean needAddReplayTag = (HFile.getFormatVersion(regionServer.conf) >= 3);
for (WALEntry entry : entries) {
@@ -1354,18 +1354,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
walEntries.add(walEntry);
}
- mutations.addAll(edits);
- }
-
- if (!mutations.isEmpty()) {
- OperationStatus[] result = doReplayBatchOp(region, mutations);
- // check if it's a partial success
- for (int i = 0; result != null && i < result.length; i++) {
- if (result[i] != OperationStatus.SUCCESS) {
- throw new IOException(result[i].getExceptionMsg());
+ if(edits!=null && !edits.isEmpty()) {
+ long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
+ entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
+ OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
+ // check if it's a partial success
+ for (int i = 0; result != null && i < result.length; i++) {
+ if (result[i] != OperationStatus.SUCCESS) {
+ throw new IOException(result[i].getExceptionMsg());
+ }
}
}
}
+
+ //sync wal at the end because ASYNC_WAL is used above
+ region.syncWal();
+
if (coprocessorHost != null) {
for (Pair<HLogKey, WALEdit> wal : walEntries) {
coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(),
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index f0f92ae..1b07594 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -211,15 +211,6 @@ public class StoreFileScanner implements KeyValueScanner {
return false;
}
- // For the optimisation in HBASE-4346, we set the KV's memstoreTS to
- // 0, if it is older than all the scanners' read points. It is possible
- // that a newer KV's memstoreTS was reset to 0. But, there is an
- // older KV which was not reset to 0 (because it was
- // not old enough during flush). Make sure that we set it correctly now,
- // so that the comparision order does not change.
- if (cur.getMvccVersion() <= readPt) {
- KeyValueUtil.ensureKeyValue(cur).setMvccVersion(0);
- }
return true;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 9e792c4..a1d629a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -58,6 +58,9 @@ public abstract class Compactor {
private int compactionKVMax;
protected Compression.Algorithm compactionCompression;
+
+ /** specify how many days to keep MVCC values during major compaction **/
+ protected int keepSeqIdPeriod;
//TODO: depending on Store is not good but, realistically, all compactors currently do.
Compactor(final Configuration conf, final Store store) {
@@ -67,6 +70,8 @@ public abstract class Compactor {
this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
this.compactionCompression = (this.store.getFamily() == null) ?
Compression.Algorithm.NONE : this.store.getFamily().getCompactionCompression();
+ this.keepSeqIdPeriod = Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD,
+ HConstants.MIN_KEEP_SEQID_PERIOD), HConstants.MIN_KEEP_SEQID_PERIOD);
}
/**
@@ -92,19 +97,30 @@ public abstract class Compactor {
public long maxMVCCReadpoint = 0;
/** Max tags length**/
public int maxTagsLength = 0;
+ /** Min SeqId to keep during a major compaction **/
+ public long minSeqIdToKeep = 0;
}
/**
* Extracts some details about the files to compact that are commonly needed by compactors.
* @param filesToCompact Files.
- * @param calculatePutTs Whether earliest put TS is needed.
+ * @param allFiles Whether all files are included for compaction
* @return The result.
*/
protected FileDetails getFileDetails(
- Collection<StoreFile> filesToCompact, boolean calculatePutTs) throws IOException {
+ Collection<StoreFile> filesToCompact, boolean allFiles) throws IOException {
FileDetails fd = new FileDetails();
+ long oldestHFileTimeStampToKeepMVCC = System.currentTimeMillis() -
+ (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod);
for (StoreFile file : filesToCompact) {
+ if(allFiles && (file.getModificationTimeStamp() < oldestHFileTimeStampToKeepMVCC)) {
+ // when isAllFiles is true, all files are compacted so we can calculate the smallest
+ // MVCC value to keep
+ if(fd.minSeqIdToKeep < file.getMaxMemstoreTS()) {
+ fd.minSeqIdToKeep = file.getMaxMemstoreTS();
+ }
+ }
long seqNum = file.getMaxSequenceId();
fd.maxSeqId = Math.max(fd.maxSeqId, seqNum);
StoreFile.Reader r = file.getReader();
@@ -130,7 +146,7 @@ public abstract class Compactor {
// If required, calculate the earliest put timestamp of all involved storefiles.
// This is used to remove family delete marker during compaction.
long earliestPutTs = 0;
- if (calculatePutTs) {
+ if (allFiles) {
tmp = fileInfo.get(StoreFile.EARLIEST_PUT_TS);
if (tmp == null) {
// There's a file with no information, must be an old one
@@ -148,7 +164,7 @@ public abstract class Compactor {
", size=" + StringUtils.humanReadableInt(r.length()) +
", encoding=" + r.getHFileReader().getDataBlockEncoding() +
", seqNum=" + seqNum +
- (calculatePutTs ? ", earliestPutTs=" + earliestPutTs: ""));
+ (allFiles ? ", earliestPutTs=" + earliestPutTs: ""));
}
}
return fd;
@@ -202,10 +218,11 @@ public abstract class Compactor {
* @param scanner Where to read from.
* @param writer Where to write to.
* @param smallestReadPoint Smallest read point.
+ * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
* @return Whether compaction ended; false if it was interrupted for some reason.
*/
protected boolean performCompaction(InternalScanner scanner,
- CellSink writer, long smallestReadPoint) throws IOException {
+ CellSink writer, long smallestReadPoint, boolean cleanSeqId) throws IOException {
int bytesWritten = 0;
// Since scanner.next() can return 'false' but still be delivering data,
// we have to use a do/while loop.
@@ -218,8 +235,8 @@ public abstract class Compactor {
// output to writer:
for (Cell c : kvs) {
KeyValue kv = KeyValueUtil.ensureKeyValue(c);
- if (kv.getMvccVersion() <= smallestReadPoint) {
- kv.setMvccVersion(0);
+ if (cleanSeqId && kv.getSequenceId() <= smallestReadPoint) {
+ kv.setSequenceId(0);
}
writer.append(kv);
++progress.currentCompactedKVs;
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
index 3e8523d..d5b2b63 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
@@ -54,6 +54,7 @@ public class DefaultCompactor extends Compactor {
StoreFile.Writer writer = null;
List<Path> newFiles = new ArrayList<Path>();
+ boolean cleanSeqId = false;
try {
InternalScanner scanner = null;
try {
@@ -71,9 +72,13 @@ public class DefaultCompactor extends Compactor {
}
// Create the writer even if no kv(Empty store file is also ok),
// because we need record the max seq id for the store file, see HBASE-6059
+ if(fd.minSeqIdToKeep > 0) {
+ smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
+ cleanSeqId = true;
+ }
writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0);
- boolean finished = performCompaction(scanner, writer, smallestReadPoint);
+ boolean finished = performCompaction(scanner, writer, smallestReadPoint, cleanSeqId);
if (!finished) {
writer.close();
store.getFileSystem().delete(writer.getPath(), false);
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
index 11556e5..487ff46 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
@@ -90,6 +90,7 @@ public class StripeCompactor extends Compactor {
boolean finished = false;
InternalScanner scanner = null;
+ boolean cleanSeqId = false;
try {
// Get scanner to use.
ScanType coprocScanType = ScanType.COMPACT_RETAIN_DELETES;
@@ -108,6 +109,10 @@ public class StripeCompactor extends Compactor {
}
// Create the writer factory for compactions.
+ if(fd.minSeqIdToKeep > 0) {
+ smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
+ cleanSeqId = true;
+ }
final boolean needMvcc = fd.maxMVCCReadpoint >= smallestReadPoint;
final Compression.Algorithm compression = store.getFamily().getCompactionCompression();
StripeMultiFileWriter.WriterFactory factory = new StripeMultiFileWriter.WriterFactory() {
@@ -122,7 +127,7 @@ public class StripeCompactor extends Compactor {
// It is ok here if storeScanner is null.
StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
mw.init(storeScanner, factory, store.getComparator());
- finished = performCompaction(scanner, mw, smallestReadPoint);
+ finished = performCompaction(scanner, mw, smallestReadPoint, cleanSeqId);
if (!finished) {
throw new InterruptedIOException( "Aborting compaction of store " + store +
" in region " + store.getRegionInfo().getRegionNameAsString() +
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
index 1e9472a..a9c2055 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
@@ -91,9 +91,9 @@ class FSWALEntry extends HLog.Entry {
*/
long stampRegionSequenceId() {
long regionSequenceId = this.regionSequenceIdReference.incrementAndGet();
- if(memstoreKVs != null && !memstoreKVs.isEmpty()) {
+ if (!this.getEdit().isReplay() && memstoreKVs != null && !memstoreKVs.isEmpty()) {
for(KeyValue kv : this.memstoreKVs){
- kv.setMvccVersion(regionSequenceId);
+ kv.setSequenceId(regionSequenceId);
}
}
HLogKey key = getKey();
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
index ad1c001..5019ff5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
@@ -119,6 +119,7 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
private byte [] encodedRegionName;
private TableName tablename;
private long logSeqNum;
+ private long origLogSeqNum = 0;
private CountDownLatch seqNumAssignedLatch = new CountDownLatch(1);
// Time at which this edit was written.
private long writeTime;
@@ -256,6 +257,22 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
}
/**
+ * Used to set original seq Id for HLogKey during wal replay
+ * @param seqId
+ */
+ public void setOrigLogSeqNum(final long seqId) {
+ this.origLogSeqNum = seqId;
+ }
+
+ /**
+ * Return a positive long if current HLogKey is created from a replay edit
+ * @return original sequence number of the WALEdit
+ */
+ public long getOrigLogSeqNum() {
+ return this.origLogSeqNum;
+ }
+
+ /**
* Wait for sequence number is assigned & return the assigned value
* @return long the new assigned sequence number
* @throws InterruptedException
@@ -536,6 +553,9 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
}
builder.setLogSequenceNumber(this.logSeqNum);
builder.setWriteTime(writeTime);
+ if(this.origLogSeqNum > 0) {
+ builder.setOrigSequenceNumber(this.origLogSeqNum);
+ }
if (this.nonce != HConstants.NO_NONCE) {
builder.setNonce(nonce);
}
@@ -599,5 +619,8 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
}
this.logSeqNum = walKey.getLogSequenceNumber();
this.writeTime = walKey.getWriteTime();
+ if(walKey.hasOrigSequenceNumber()) {
+ this.origLogSeqNum = walKey.getOrigSequenceNumber();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
index 0ce4a64..873e863 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
@@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.Mutation;
@@ -1863,6 +1864,10 @@ public class HLogSplitter {
public MutationReplay(MutationType type, Mutation mutation, long nonceGroup, long nonce) {
this.type = type;
this.mutation = mutation;
+ if(this.mutation.getDurability() != Durability.SKIP_WAL) {
+ // using ASYNC_WAL for relay
+ this.mutation.setDurability(Durability.ASYNC_WAL);
+ }
this.nonceGroup = nonceGroup;
this.nonce = nonce;
}
@@ -1875,10 +1880,10 @@ public class HLogSplitter {
/**
* Tag original sequence number for each edit to be replayed
- * @param entry
+ * @param seqId
* @param cell
*/
- private static Cell tagReplayLogSequenceNumber(WALEntry entry, Cell cell) {
+ private static Cell tagReplayLogSequenceNumber(long seqId, Cell cell) {
// Tag puts with original sequence number if there is no LOG_REPLAY_TAG yet
boolean needAddRecoveryTag = true;
if (cell.getTagsLength() > 0) {
@@ -1891,8 +1896,7 @@ public class HLogSplitter {
}
if (needAddRecoveryTag) {
List<Tag> newTags = new ArrayList<Tag>();
- Tag replayTag = new Tag(TagType.LOG_REPLAY_TAG_TYPE, Bytes.toBytes(entry.getKey()
- .getLogSequenceNumber()));
+ Tag replayTag = new Tag(TagType.LOG_REPLAY_TAG_TYPE, Bytes.toBytes(seqId));
newTags.add(replayTag);
return KeyValue.cloneAndAddTags(cell, newTags);
}
@@ -1918,6 +1922,8 @@ public class HLogSplitter {
return new ArrayList<MutationReplay>();
}
+ long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
+ entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
int count = entry.getAssociatedCellCount();
List<MutationReplay> mutations = new ArrayList<MutationReplay>();
Cell previousCell = null;
@@ -1958,7 +1964,7 @@ public class HLogSplitter {
} else {
Cell tmpNewCell = cell;
if (addLogReplayTag) {
- tmpNewCell = tagReplayLogSequenceNumber(entry, cell);
+ tmpNewCell = tagReplayLogSequenceNumber(replaySeqId, cell);
}
((Put) m).add(KeyValueUtil.ensureKeyValue(tmpNewCell));
}
@@ -1973,8 +1979,8 @@ public class HLogSplitter {
clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
}
key = new HLogKey(walKey.getEncodedRegionName().toByteArray(), TableName.valueOf(walKey
- .getTableName().toByteArray()), walKey.getLogSequenceNumber(),
- walKey.getWriteTime(), clusterIds, walKey.getNonceGroup(), walKey.getNonce());
+ .getTableName().toByteArray()), replaySeqId, walKey.getWriteTime(), clusterIds,
+ walKey.getNonceGroup(), walKey.getNonce());
logEntry.setFirst(key);
logEntry.setSecond(val);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index 77734a7..8f73431 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -1906,7 +1906,7 @@ public class AccessController extends BaseRegionObserver
newKv.getValueArray(), newKv.getValueOffset(), newKv.getValueLength(),
tags);
// Preserve mvcc data
- rewriteKv.setMvccVersion(newKv.getMvccVersion());
+ rewriteKv.setSequenceId(newKv.getMvccVersion());
return rewriteKv;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
index 9cd21d8..39f65db 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
@@ -1255,7 +1255,7 @@ public class VisibilityController extends BaseRegionObserver implements MasterOb
newKv.getTimestamp(), KeyValue.Type.codeToType(newKv.getTypeByte()),
newKv.getValueArray(), newKv.getValueOffset(), newKv.getValueLength(), tags);
// Preserve mvcc data
- rewriteKv.setMvccVersion(newKv.getMvccVersion());
+ rewriteKv.setSequenceId(newKv.getMvccVersion());
return rewriteKv;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index 86c30d1..fde40ad 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -3055,7 +3055,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
}
}
- private static <T> String safeGetAsStr(List<T> lst, int i) {
+ public static <T> String safeGetAsStr(List<T> lst, int i) {
if (0 <= i && i < lst.size()) {
return lst.get(i).toString();
} else {
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
index 37456a8..09561cb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
@@ -176,7 +176,7 @@ public class TestHFileBlock {
totalSize += kv.getLength();
if (includesMemstoreTS) {
long memstoreTS = randomizer.nextLong();
- kv.setMvccVersion(memstoreTS);
+ kv.setSequenceId(memstoreTS);
totalSize += WritableUtils.getVIntSize(memstoreTS);
}
hbw.write(kv);
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
index ebe95b1..3743fdd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
@@ -241,7 +241,7 @@ public class TestDefaultMemStore extends TestCase {
mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
KeyValue kv1 = new KeyValue(row, f, q1, v);
- kv1.setMvccVersion(w.getWriteNumber());
+ kv1.setSequenceId(w.getWriteNumber());
memstore.add(kv1);
KeyValueScanner s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0);
@@ -254,7 +254,7 @@ public class TestDefaultMemStore extends TestCase {
w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
KeyValue kv2 = new KeyValue(row, f, q2, v);
- kv2.setMvccVersion(w.getWriteNumber());
+ kv2.setSequenceId(w.getWriteNumber());
memstore.add(kv2);
s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0);
@@ -285,11 +285,11 @@ public class TestDefaultMemStore extends TestCase {
mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
KeyValue kv11 = new KeyValue(row, f, q1, v1);
- kv11.setMvccVersion(w.getWriteNumber());
+ kv11.setSequenceId(w.getWriteNumber());
memstore.add(kv11);
KeyValue kv12 = new KeyValue(row, f, q2, v1);
- kv12.setMvccVersion(w.getWriteNumber());
+ kv12.setSequenceId(w.getWriteNumber());
memstore.add(kv12);
mvcc.completeMemstoreInsert(w);
@@ -300,11 +300,11 @@ public class TestDefaultMemStore extends TestCase {
// START INSERT 2: Write both columns val2
w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
KeyValue kv21 = new KeyValue(row, f, q1, v2);
- kv21.setMvccVersion(w.getWriteNumber());
+ kv21.setSequenceId(w.getWriteNumber());
memstore.add(kv21);
KeyValue kv22 = new KeyValue(row, f, q2, v2);
- kv22.setMvccVersion(w.getWriteNumber());
+ kv22.setSequenceId(w.getWriteNumber());
memstore.add(kv22);
// BEFORE COMPLETING INSERT 2, SEE FIRST KVS
@@ -337,11 +337,11 @@ public class TestDefaultMemStore extends TestCase {
mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
KeyValue kv11 = new KeyValue(row, f, q1, v1);
- kv11.setMvccVersion(w.getWriteNumber());
+ kv11.setSequenceId(w.getWriteNumber());
memstore.add(kv11);
KeyValue kv12 = new KeyValue(row, f, q2, v1);
- kv12.setMvccVersion(w.getWriteNumber());
+ kv12.setSequenceId(w.getWriteNumber());
memstore.add(kv12);
mvcc.completeMemstoreInsert(w);
@@ -353,7 +353,7 @@ public class TestDefaultMemStore extends TestCase {
w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
KeyValue kvDel = new KeyValue(row, f, q2, kv11.getTimestamp(),
KeyValue.Type.DeleteColumn);
- kvDel.setMvccVersion(w.getWriteNumber());
+ kvDel.setSequenceId(w.getWriteNumber());
memstore.add(kvDel);
// BEFORE COMPLETING DELETE, SEE FIRST KVS
@@ -414,7 +414,7 @@ public class TestDefaultMemStore extends TestCase {
byte[] v = Bytes.toBytes(i);
KeyValue kv = new KeyValue(row, f, q1, i, v);
- kv.setMvccVersion(w.getWriteNumber());
+ kv.setSequenceId(w.getWriteNumber());
memstore.add(kv);
mvcc.completeMemstoreInsert(w);
@@ -827,7 +827,7 @@ public class TestDefaultMemStore extends TestCase {
KeyValue kv2 = KeyValueTestUtil.create("r", "f", "q", 101, "v");
KeyValue kv3 = KeyValueTestUtil.create("r", "f", "q", 102, "v");
- kv1.setMvccVersion(1); kv2.setMvccVersion(1);kv3.setMvccVersion(1);
+ kv1.setSequenceId(1); kv2.setSequenceId(1);kv3.setSequenceId(1);
l.add(kv1); l.add(kv2); l.add(kv3);
this.memstore.upsert(l, 2);// readpoint is 2
@@ -835,7 +835,7 @@ public class TestDefaultMemStore extends TestCase {
assert(newSize > oldSize);
KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v");
- kv4.setMvccVersion(1);
+ kv4.setSequenceId(1);
l.clear(); l.add(kv4);
this.memstore.upsert(l, 3);
assertEquals(newSize, this.memstore.size.get());
@@ -877,7 +877,7 @@ public class TestDefaultMemStore extends TestCase {
// test the case that the timeOfOldestEdit is updated after a KV upsert
List<Cell> l = new ArrayList<Cell>();
KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
- kv1.setMvccVersion(100);
+ kv1.setSequenceId(100);
l.add(kv1);
memstore.upsert(l, 1000);
t = memstore.timeOfOldestEdit();
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index fd944f9..8a588e7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -528,6 +528,7 @@ public class TestHRegion {
}
long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
assertEquals(maxSeqId, seqId);
+ region.getMVCC().initialize(seqId);
Get get = new Get(row);
Result result = region.get(get);
for (long i = minSeqId; i <= maxSeqId; i += 10) {
@@ -579,6 +580,7 @@ public class TestHRegion {
}
long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
assertEquals(maxSeqId, seqId);
+ region.getMVCC().initialize(seqId);
Get get = new Get(row);
Result result = region.get(get);
for (long i = minSeqId; i <= maxSeqId; i += 10) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
index 2cd5e3a..c71f4f9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
@@ -689,7 +689,7 @@ public class TestReversibleScanners {
private static KeyValue makeKV(int rowNum, int cqNum, byte[] familyName) {
KeyValue kv = new KeyValue(ROWS[rowNum], familyName, QUALS[cqNum], TS,
VALUES[rowNum % VALUESIZE]);
- kv.setMvccVersion(makeMVCC(rowNum, cqNum));
+ kv.setSequenceId(makeMVCC(rowNum, cqNum));
return kv;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d07bc87c/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSeekOptimizations.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSeekOptimizations.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSeekOptimizations.java
index 988d82f..450dd82 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSeekOptimizations.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSeekOptimizations.java
@@ -18,7 +18,6 @@
*/
package org.apache.hadoop.hbase.regionserver;
-import static org.apache.hadoop.hbase.HBaseTestingUtility.assertKVListsEqual;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
@@ -449,5 +448,32 @@ public class TestSeekOptimizations {
}
+ public void assertKVListsEqual(String additionalMsg,
+ final List<? extends Cell> expected,
+ final List<? extends Cell> actual) {
+ final int eLen = expected.size();
+ final int aLen = actual.size();
+ final int minLen = Math.min(eLen, aLen);
+
+ int i;
+ for (i = 0; i < minLen
+ && KeyValue.COMPARATOR.compareOnlyKeyPortion(expected.get(i), actual.get(i)) == 0;
+ ++i) {}
+
+ if (additionalMsg == null) {
+ additionalMsg = "";
+ }
+ if (!additionalMsg.isEmpty()) {
+ additionalMsg = ". " + additionalMsg;
+ }
+
+ if (eLen != aLen || i != minLen) {
+ throw new AssertionError(
+ "Expected and actual KV arrays differ at position " + i + ": " +
+ HBaseTestingUtility.safeGetAsStr(expected, i) + " (length " + eLen +") vs. " +
+ HBaseTestingUtility.safeGetAsStr(actual, i) + " (length " + aLen + ")" + additionalMsg);
+ }
+ }
+
}