You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2015/02/13 20:08:36 UTC
[2/2] hbase git commit: HBASE-11569 Flush / Compaction handling from
secondary region replicas
HBASE-11569 Flush / Compaction handling from secondary region replicas
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3e10e6e1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3e10e6e1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3e10e6e1
Branch: refs/heads/master
Commit: 3e10e6e1a61aa2a481a1450fecb94bce23809dfe
Parents: cfc131e
Author: Enis Soztutar <en...@apache.org>
Authored: Fri Feb 13 11:08:24 2015 -0800
Committer: Enis Soztutar <en...@apache.org>
Committed: Fri Feb 13 11:08:24 2015 -0800
----------------------------------------------------------------------
.../hadoop/hbase/protobuf/ProtobufUtil.java | 2 +
.../java/org/apache/hadoop/hbase/KeyValue.java | 39 +-
.../hbase/protobuf/generated/WALProtos.java | 299 ++++-
hbase-protocol/src/main/protobuf/WAL.proto | 2 +
.../hbase/regionserver/DefaultMemStore.java | 10 +-
.../hadoop/hbase/regionserver/HRegion.java | 728 ++++++++++-
.../hbase/regionserver/HRegionFileSystem.java | 10 +-
.../hadoop/hbase/regionserver/HStore.java | 107 +-
.../hadoop/hbase/regionserver/MemStore.java | 6 +
.../hbase/regionserver/RSRpcServices.java | 19 +-
.../apache/hadoop/hbase/regionserver/Store.java | 28 +-
.../hbase/regionserver/StoreFlushContext.java | 16 +
.../hbase/regionserver/wal/ReplayHLogKey.java | 53 +
.../RegionReplicaReplicationEndpoint.java | 2 +-
.../hbase/util/ServerRegionReplicaUtil.java | 21 +-
.../apache/hadoop/hbase/wal/WALSplitter.java | 12 +-
.../hadoop/hbase/regionserver/TestHRegion.java | 28 +-
.../regionserver/TestHRegionReplayEvents.java | 1162 ++++++++++++++++++
.../regionserver/TestPerColumnFamilyFlush.java | 12 +-
19 files changed, 2439 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/3e10e6e1/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index 43e91d2..038b148 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -2584,6 +2584,7 @@ public final class ProtobufUtil {
FlushDescriptor.Builder desc = FlushDescriptor.newBuilder()
.setAction(action)
.setEncodedRegionName(ByteStringer.wrap(hri.getEncodedNameAsBytes()))
+ .setRegionName(ByteStringer.wrap(hri.getRegionName()))
.setFlushSequenceNumber(flushSeqId)
.setTableName(ByteStringer.wrap(hri.getTable().getName()));
@@ -2609,6 +2610,7 @@ public final class ProtobufUtil {
.setEventType(eventType)
.setTableName(ByteStringer.wrap(hri.getTable().getName()))
.setEncodedRegionName(ByteStringer.wrap(hri.getEncodedNameAsBytes()))
+ .setRegionName(ByteStringer.wrap(hri.getRegionName()))
.setLogSequenceNumber(seqId)
.setServer(toServerName(server));
http://git-wip-us.apache.org/repos/asf/hbase/blob/3e10e6e1/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
index 04551de..3ae324a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
@@ -47,7 +47,7 @@ import org.apache.hadoop.io.RawComparator;
import com.google.common.annotations.VisibleForTesting;
/**
- * An HBase Key/Value. This is the fundamental HBase Type.
+ * An HBase Key/Value. This is the fundamental HBase Type.
* <p>
* HBase applications and users should use the Cell interface and avoid directly using KeyValue
* and member functions not defined in Cell.
@@ -297,6 +297,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
return seqId;
}
+ @Override
public void setSequenceId(long seqId) {
this.seqId = seqId;
}
@@ -577,7 +578,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
this(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
qlength, timestamp, type, value, voffset, vlength, null);
}
-
+
/**
* Constructs KeyValue structure filled with specified values. Uses the provided buffer as the
* data buffer.
@@ -742,9 +743,9 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
public KeyValue(Cell c) {
this(c.getRowArray(), c.getRowOffset(), (int)c.getRowLength(),
- c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(),
- c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(),
- c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(),
+ c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(),
+ c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(),
+ c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(),
c.getValueLength(), c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
this.seqId = c.getSequenceId();
}
@@ -955,7 +956,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
final int rlength, final byte [] family, final int foffset, int flength,
final byte [] qualifier, final int qoffset, int qlength,
final long timestamp, final Type type,
- final byte [] value, final int voffset,
+ final byte [] value, final int voffset,
int vlength, byte[] tags, int tagsOffset, int tagsLength) {
checkParameters(row, rlength, family, flength, qlength, vlength);
@@ -1115,6 +1116,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
//
//---------------------------------------------------------------------------
+ @Override
public String toString() {
if (this.bytes == null || this.bytes.length == 0) {
return "empty";
@@ -1125,10 +1127,10 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
/**
* @param k Key portion of a KeyValue.
- * @return Key as a String, empty string if k is null.
+ * @return Key as a String, empty string if k is null.
*/
public static String keyToString(final byte [] k) {
- if (k == null) {
+ if (k == null) {
return "";
}
return keyToString(k, 0, k.length);
@@ -1464,6 +1466,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
* save on allocations.
* @return Value in a new byte array.
*/
+ @Override
@Deprecated // use CellUtil.getValueArray()
public byte [] getValue() {
return CellUtil.cloneValue(this);
@@ -1477,6 +1480,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
* lengths instead.
* @return Row in a new byte array.
*/
+ @Override
@Deprecated // use CellUtil.getRowArray()
public byte [] getRow() {
return CellUtil.cloneRow(this);
@@ -1534,6 +1538,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
* lengths instead.
* @return Returns family. Makes a copy.
*/
+ @Override
@Deprecated // use CellUtil.getFamilyArray
public byte [] getFamily() {
return CellUtil.cloneFamily(this);
@@ -1548,6 +1553,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
* Use {@link #getBuffer()} with appropriate offsets and lengths instead.
* @return Returns qualifier. Makes a copy.
*/
+ @Override
@Deprecated // use CellUtil.getQualifierArray
public byte [] getQualifier() {
return CellUtil.cloneQualifier(this);
@@ -1846,7 +1852,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
return compareFlatKey(l,loff,llen, r,roff,rlen);
}
-
+
/**
* Compares the only the user specified portion of a Key. This is overridden by MetaComparator.
* @param left
@@ -2355,7 +2361,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
in.readFully(bytes);
return new KeyValue(bytes, 0, length);
}
-
+
/**
* Create a new KeyValue by copying existing cell and adding new tags
* @param c
@@ -2371,9 +2377,9 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
existingTags = newTags;
}
return new KeyValue(c.getRowArray(), c.getRowOffset(), (int)c.getRowLength(),
- c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(),
- c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(),
- c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(),
+ c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(),
+ c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(),
+ c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(),
c.getValueLength(), existingTags);
}
@@ -2478,6 +2484,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
this.comparator = c;
}
+ @Override
public int compare(KeyValue left, KeyValue right) {
return comparator.compareRows(left, right);
}
@@ -2486,7 +2493,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
/**
* Avoids redundant comparisons for better performance.
- *
+ *
* TODO get rid of this wart
*/
public interface SamePrefixComparator<T> {
@@ -2509,6 +2516,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
* TODO: With V3 consider removing this.
* @return legacy class name for FileFileTrailer#comparatorClassName
*/
+ @Override
public String getLegacyKeyComparatorName() {
return "org.apache.hadoop.hbase.util.Bytes$ByteArrayComparator";
}
@@ -2516,6 +2524,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
/**
* @deprecated Since 0.99.2.
*/
+ @Override
@Deprecated
public int compareFlatKey(byte[] left, int loffset, int llength, byte[] right,
int roffset, int rlength) {
@@ -2527,6 +2536,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
return compareOnlyKeyPortion(left, right);
}
+ @Override
@VisibleForTesting
public int compareOnlyKeyPortion(Cell left, Cell right) {
int c = Bytes.BYTES_RAWCOMPARATOR.compare(left.getRowArray(), left.getRowOffset(),
@@ -2553,6 +2563,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
return (0xff & left.getTypeByte()) - (0xff & right.getTypeByte());
}
+ @Override
public byte[] calcIndexKey(byte[] lastKeyOfPreviousBlock, byte[] firstKeyInBlock) {
return firstKeyInBlock;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3e10e6e1/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
index c9fa854..35192cc 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
@@ -5522,6 +5522,24 @@ public final class WALProtos {
*/
org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptorOrBuilder getStoreFlushesOrBuilder(
int index);
+
+ // optional bytes region_name = 6;
+ /**
+ * <code>optional bytes region_name = 6;</code>
+ *
+ * <pre>
+ * full region name
+ * </pre>
+ */
+ boolean hasRegionName();
+ /**
+ * <code>optional bytes region_name = 6;</code>
+ *
+ * <pre>
+ * full region name
+ * </pre>
+ */
+ com.google.protobuf.ByteString getRegionName();
}
/**
* Protobuf type {@code FlushDescriptor}
@@ -5613,6 +5631,11 @@ public final class WALProtos {
storeFlushes_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor.PARSER, extensionRegistry));
break;
}
+ case 50: {
+ bitField0_ |= 0x00000010;
+ regionName_ = input.readBytes();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -6772,12 +6795,37 @@ public final class WALProtos {
return storeFlushes_.get(index);
}
+ // optional bytes region_name = 6;
+ public static final int REGION_NAME_FIELD_NUMBER = 6;
+ private com.google.protobuf.ByteString regionName_;
+ /**
+ * <code>optional bytes region_name = 6;</code>
+ *
+ * <pre>
+ * full region name
+ * </pre>
+ */
+ public boolean hasRegionName() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ /**
+ * <code>optional bytes region_name = 6;</code>
+ *
+ * <pre>
+ * full region name
+ * </pre>
+ */
+ public com.google.protobuf.ByteString getRegionName() {
+ return regionName_;
+ }
+
private void initFields() {
action_ = org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction.START_FLUSH;
tableName_ = com.google.protobuf.ByteString.EMPTY;
encodedRegionName_ = com.google.protobuf.ByteString.EMPTY;
flushSequenceNumber_ = 0L;
storeFlushes_ = java.util.Collections.emptyList();
+ regionName_ = com.google.protobuf.ByteString.EMPTY;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -6824,6 +6872,9 @@ public final class WALProtos {
for (int i = 0; i < storeFlushes_.size(); i++) {
output.writeMessage(5, storeFlushes_.get(i));
}
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ output.writeBytes(6, regionName_);
+ }
getUnknownFields().writeTo(output);
}
@@ -6853,6 +6904,10 @@ public final class WALProtos {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(5, storeFlushes_.get(i));
}
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(6, regionName_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -6898,6 +6953,11 @@ public final class WALProtos {
}
result = result && getStoreFlushesList()
.equals(other.getStoreFlushesList());
+ result = result && (hasRegionName() == other.hasRegionName());
+ if (hasRegionName()) {
+ result = result && getRegionName()
+ .equals(other.getRegionName());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -6931,6 +6991,10 @@ public final class WALProtos {
hash = (37 * hash) + STORE_FLUSHES_FIELD_NUMBER;
hash = (53 * hash) + getStoreFlushesList().hashCode();
}
+ if (hasRegionName()) {
+ hash = (37 * hash) + REGION_NAME_FIELD_NUMBER;
+ hash = (53 * hash) + getRegionName().hashCode();
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -7060,6 +7124,8 @@ public final class WALProtos {
} else {
storeFlushesBuilder_.clear();
}
+ regionName_ = com.google.protobuf.ByteString.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000020);
return this;
}
@@ -7113,6 +7179,10 @@ public final class WALProtos {
} else {
result.storeFlushes_ = storeFlushesBuilder_.build();
}
+ if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+ to_bitField0_ |= 0x00000010;
+ }
+ result.regionName_ = regionName_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -7167,6 +7237,9 @@ public final class WALProtos {
}
}
}
+ if (other.hasRegionName()) {
+ setRegionName(other.getRegionName());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -7593,6 +7666,58 @@ public final class WALProtos {
return storeFlushesBuilder_;
}
+ // optional bytes region_name = 6;
+ private com.google.protobuf.ByteString regionName_ = com.google.protobuf.ByteString.EMPTY;
+ /**
+ * <code>optional bytes region_name = 6;</code>
+ *
+ * <pre>
+ * full region name
+ * </pre>
+ */
+ public boolean hasRegionName() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ /**
+ * <code>optional bytes region_name = 6;</code>
+ *
+ * <pre>
+ * full region name
+ * </pre>
+ */
+ public com.google.protobuf.ByteString getRegionName() {
+ return regionName_;
+ }
+ /**
+ * <code>optional bytes region_name = 6;</code>
+ *
+ * <pre>
+ * full region name
+ * </pre>
+ */
+ public Builder setRegionName(com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000020;
+ regionName_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional bytes region_name = 6;</code>
+ *
+ * <pre>
+ * full region name
+ * </pre>
+ */
+ public Builder clearRegionName() {
+ bitField0_ = (bitField0_ & ~0x00000020);
+ regionName_ = getDefaultInstance().getRegionName();
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:FlushDescriptor)
}
@@ -9772,6 +9897,24 @@ public final class WALProtos {
* </pre>
*/
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getServerOrBuilder();
+
+ // optional bytes region_name = 7;
+ /**
+ * <code>optional bytes region_name = 7;</code>
+ *
+ * <pre>
+ * full region name
+ * </pre>
+ */
+ boolean hasRegionName();
+ /**
+ * <code>optional bytes region_name = 7;</code>
+ *
+ * <pre>
+ * full region name
+ * </pre>
+ */
+ com.google.protobuf.ByteString getRegionName();
}
/**
* Protobuf type {@code RegionEventDescriptor}
@@ -9876,6 +10019,11 @@ public final class WALProtos {
bitField0_ |= 0x00000010;
break;
}
+ case 58: {
+ bitField0_ |= 0x00000020;
+ regionName_ = input.readBytes();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -10135,6 +10283,30 @@ public final class WALProtos {
return server_;
}
+ // optional bytes region_name = 7;
+ public static final int REGION_NAME_FIELD_NUMBER = 7;
+ private com.google.protobuf.ByteString regionName_;
+ /**
+ * <code>optional bytes region_name = 7;</code>
+ *
+ * <pre>
+ * full region name
+ * </pre>
+ */
+ public boolean hasRegionName() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ /**
+ * <code>optional bytes region_name = 7;</code>
+ *
+ * <pre>
+ * full region name
+ * </pre>
+ */
+ public com.google.protobuf.ByteString getRegionName() {
+ return regionName_;
+ }
+
private void initFields() {
eventType_ = org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType.REGION_OPEN;
tableName_ = com.google.protobuf.ByteString.EMPTY;
@@ -10142,6 +10314,7 @@ public final class WALProtos {
logSequenceNumber_ = 0L;
stores_ = java.util.Collections.emptyList();
server_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
+ regionName_ = com.google.protobuf.ByteString.EMPTY;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -10197,6 +10370,9 @@ public final class WALProtos {
if (((bitField0_ & 0x00000010) == 0x00000010)) {
output.writeMessage(6, server_);
}
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ output.writeBytes(7, regionName_);
+ }
getUnknownFields().writeTo(output);
}
@@ -10230,6 +10406,10 @@ public final class WALProtos {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(6, server_);
}
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(7, regionName_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -10280,6 +10460,11 @@ public final class WALProtos {
result = result && getServer()
.equals(other.getServer());
}
+ result = result && (hasRegionName() == other.hasRegionName());
+ if (hasRegionName()) {
+ result = result && getRegionName()
+ .equals(other.getRegionName());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -10317,6 +10502,10 @@ public final class WALProtos {
hash = (37 * hash) + SERVER_FIELD_NUMBER;
hash = (53 * hash) + getServer().hashCode();
}
+ if (hasRegionName()) {
+ hash = (37 * hash) + REGION_NAME_FIELD_NUMBER;
+ hash = (53 * hash) + getRegionName().hashCode();
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -10453,6 +10642,8 @@ public final class WALProtos {
serverBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000020);
+ regionName_ = com.google.protobuf.ByteString.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000040);
return this;
}
@@ -10514,6 +10705,10 @@ public final class WALProtos {
} else {
result.server_ = serverBuilder_.build();
}
+ if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+ to_bitField0_ |= 0x00000020;
+ }
+ result.regionName_ = regionName_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -10571,6 +10766,9 @@ public final class WALProtos {
if (other.hasServer()) {
mergeServer(other.getServer());
}
+ if (other.hasRegionName()) {
+ setRegionName(other.getRegionName());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -11156,6 +11354,58 @@ public final class WALProtos {
return serverBuilder_;
}
+ // optional bytes region_name = 7;
+ private com.google.protobuf.ByteString regionName_ = com.google.protobuf.ByteString.EMPTY;
+ /**
+ * <code>optional bytes region_name = 7;</code>
+ *
+ * <pre>
+ * full region name
+ * </pre>
+ */
+ public boolean hasRegionName() {
+ return ((bitField0_ & 0x00000040) == 0x00000040);
+ }
+ /**
+ * <code>optional bytes region_name = 7;</code>
+ *
+ * <pre>
+ * full region name
+ * </pre>
+ */
+ public com.google.protobuf.ByteString getRegionName() {
+ return regionName_;
+ }
+ /**
+ * <code>optional bytes region_name = 7;</code>
+ *
+ * <pre>
+ * full region name
+ * </pre>
+ */
+ public Builder setRegionName(com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000040;
+ regionName_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional bytes region_name = 7;</code>
+ *
+ * <pre>
+ * full region name
+ * </pre>
+ */
+ public Builder clearRegionName() {
+ bitField0_ = (bitField0_ & ~0x00000040);
+ regionName_ = getDefaultInstance().getRegionName();
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:RegionEventDescriptor)
}
@@ -11598,32 +11848,33 @@ public final class WALProtos {
"n_name\030\002 \002(\014\022\023\n\013family_name\030\003 \002(\014\022\030\n\020com" +
"paction_input\030\004 \003(\t\022\031\n\021compaction_output" +
"\030\005 \003(\t\022\026\n\016store_home_dir\030\006 \002(\t\022\023\n\013region" +
- "_name\030\007 \001(\014\"\353\002\n\017FlushDescriptor\022,\n\006actio" +
+ "_name\030\007 \001(\014\"\200\003\n\017FlushDescriptor\022,\n\006actio" +
"n\030\001 \002(\0162\034.FlushDescriptor.FlushAction\022\022\n",
"\ntable_name\030\002 \002(\014\022\033\n\023encoded_region_name" +
"\030\003 \002(\014\022\035\n\025flush_sequence_number\030\004 \001(\004\022<\n" +
"\rstore_flushes\030\005 \003(\0132%.FlushDescriptor.S" +
- "toreFlushDescriptor\032Y\n\024StoreFlushDescrip" +
- "tor\022\023\n\013family_name\030\001 \002(\014\022\026\n\016store_home_d" +
- "ir\030\002 \002(\t\022\024\n\014flush_output\030\003 \003(\t\"A\n\013FlushA" +
- "ction\022\017\n\013START_FLUSH\020\000\022\020\n\014COMMIT_FLUSH\020\001" +
- "\022\017\n\013ABORT_FLUSH\020\002\"R\n\017StoreDescriptor\022\023\n\013" +
- "family_name\030\001 \002(\014\022\026\n\016store_home_dir\030\002 \002(" +
- "\t\022\022\n\nstore_file\030\003 \003(\t\"\215\001\n\022BulkLoadDescri",
- "ptor\022\036\n\ntable_name\030\001 \002(\0132\n.TableName\022\033\n\023" +
- "encoded_region_name\030\002 \002(\014\022 \n\006stores\030\003 \003(" +
- "\0132\020.StoreDescriptor\022\030\n\020bulkload_seq_num\030" +
- "\004 \002(\003\"\212\002\n\025RegionEventDescriptor\0224\n\nevent" +
- "_type\030\001 \002(\0162 .RegionEventDescriptor.Even" +
- "tType\022\022\n\ntable_name\030\002 \002(\014\022\033\n\023encoded_reg" +
- "ion_name\030\003 \002(\014\022\033\n\023log_sequence_number\030\004 " +
- "\001(\004\022 \n\006stores\030\005 \003(\0132\020.StoreDescriptor\022\033\n" +
- "\006server\030\006 \001(\0132\013.ServerName\".\n\tEventType\022" +
- "\017\n\013REGION_OPEN\020\000\022\020\n\014REGION_CLOSE\020\001\"\014\n\nWA",
- "LTrailer*F\n\tScopeType\022\033\n\027REPLICATION_SCO" +
- "PE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_GLOBAL\020\001" +
- "B?\n*org.apache.hadoop.hbase.protobuf.gen" +
- "eratedB\tWALProtosH\001\210\001\000\240\001\001"
+ "toreFlushDescriptor\022\023\n\013region_name\030\006 \001(\014" +
+ "\032Y\n\024StoreFlushDescriptor\022\023\n\013family_name\030" +
+ "\001 \002(\014\022\026\n\016store_home_dir\030\002 \002(\t\022\024\n\014flush_o" +
+ "utput\030\003 \003(\t\"A\n\013FlushAction\022\017\n\013START_FLUS" +
+ "H\020\000\022\020\n\014COMMIT_FLUSH\020\001\022\017\n\013ABORT_FLUSH\020\002\"R" +
+ "\n\017StoreDescriptor\022\023\n\013family_name\030\001 \002(\014\022\026" +
+ "\n\016store_home_dir\030\002 \002(\t\022\022\n\nstore_file\030\003 \003",
+ "(\t\"\215\001\n\022BulkLoadDescriptor\022\036\n\ntable_name\030" +
+ "\001 \002(\0132\n.TableName\022\033\n\023encoded_region_name" +
+ "\030\002 \002(\014\022 \n\006stores\030\003 \003(\0132\020.StoreDescriptor" +
+ "\022\030\n\020bulkload_seq_num\030\004 \002(\003\"\237\002\n\025RegionEve" +
+ "ntDescriptor\0224\n\nevent_type\030\001 \002(\0162 .Regio" +
+ "nEventDescriptor.EventType\022\022\n\ntable_name" +
+ "\030\002 \002(\014\022\033\n\023encoded_region_name\030\003 \002(\014\022\033\n\023l" +
+ "og_sequence_number\030\004 \001(\004\022 \n\006stores\030\005 \003(\013" +
+ "2\020.StoreDescriptor\022\033\n\006server\030\006 \001(\0132\013.Ser" +
+ "verName\022\023\n\013region_name\030\007 \001(\014\".\n\tEventTyp",
+ "e\022\017\n\013REGION_OPEN\020\000\022\020\n\014REGION_CLOSE\020\001\"\014\n\n" +
+ "WALTrailer*F\n\tScopeType\022\033\n\027REPLICATION_S" +
+ "COPE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_GLOBAL" +
+ "\020\001B?\n*org.apache.hadoop.hbase.protobuf.g" +
+ "eneratedB\tWALProtosH\001\210\001\000\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -11659,7 +11910,7 @@ public final class WALProtos {
internal_static_FlushDescriptor_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_FlushDescriptor_descriptor,
- new java.lang.String[] { "Action", "TableName", "EncodedRegionName", "FlushSequenceNumber", "StoreFlushes", });
+ new java.lang.String[] { "Action", "TableName", "EncodedRegionName", "FlushSequenceNumber", "StoreFlushes", "RegionName", });
internal_static_FlushDescriptor_StoreFlushDescriptor_descriptor =
internal_static_FlushDescriptor_descriptor.getNestedTypes().get(0);
internal_static_FlushDescriptor_StoreFlushDescriptor_fieldAccessorTable = new
@@ -11683,7 +11934,7 @@ public final class WALProtos {
internal_static_RegionEventDescriptor_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_RegionEventDescriptor_descriptor,
- new java.lang.String[] { "EventType", "TableName", "EncodedRegionName", "LogSequenceNumber", "Stores", "Server", });
+ new java.lang.String[] { "EventType", "TableName", "EncodedRegionName", "LogSequenceNumber", "Stores", "Server", "RegionName", });
internal_static_WALTrailer_descriptor =
getDescriptor().getMessageTypes().get(8);
internal_static_WALTrailer_fieldAccessorTable = new
http://git-wip-us.apache.org/repos/asf/hbase/blob/3e10e6e1/hbase-protocol/src/main/protobuf/WAL.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/WAL.proto b/hbase-protocol/src/main/protobuf/WAL.proto
index 169a9b2..3fd6025 100644
--- a/hbase-protocol/src/main/protobuf/WAL.proto
+++ b/hbase-protocol/src/main/protobuf/WAL.proto
@@ -122,6 +122,7 @@ message FlushDescriptor {
required bytes encoded_region_name = 3;
optional uint64 flush_sequence_number = 4;
repeated StoreFlushDescriptor store_flushes = 5;
+ optional bytes region_name = 6; // full region name
}
message StoreDescriptor {
@@ -155,6 +156,7 @@ message RegionEventDescriptor {
optional uint64 log_sequence_number = 4;
repeated StoreDescriptor stores = 5;
optional ServerName server = 6; // Server who opened the region
+ optional bytes region_name = 7; // full region name
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/3e10e6e1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index 48b78c2..081d7a5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -208,6 +208,11 @@ public class DefaultMemStore implements MemStore {
return this.snapshotSize > 0 ? this.snapshotSize : keySize();
}
+ @Override
+ public long getSnapshotSize() {
+ return this.snapshotSize;
+ }
+
/**
* Write an update
* @param cell
@@ -462,6 +467,7 @@ public class DefaultMemStore implements MemStore {
* @param now
* @return Timestamp
*/
+ @Override
public long updateColumnValue(byte[] row,
byte[] family,
byte[] qualifier,
@@ -524,7 +530,7 @@ public class DefaultMemStore implements MemStore {
* atomically. Scans will only see each KeyValue update as atomic.
*
* @param cells
- * @param readpoint readpoint below which we can safely remove duplicate KVs
+ * @param readpoint readpoint below which we can safely remove duplicate KVs
* @return change in memstore size
*/
@Override
@@ -1031,7 +1037,7 @@ public class DefaultMemStore implements MemStore {
public long size() {
return heapSize();
}
-
+
/**
* Code to help figure if our approximation of object heap sizes is close
* enough. See hbase-900. Fills memstores then waits so user can heap
http://git-wip-us.apache.org/repos/asf/hbase/blob/3e10e6e1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 53e732a..aa65ddd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -34,6 +34,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.RandomAccess;
@@ -61,7 +62,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import com.google.protobuf.ByteString;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -72,7 +72,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompoundConfiguration;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -100,6 +99,7 @@ import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
@@ -133,12 +133,16 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.ReplayHLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
@@ -176,6 +180,7 @@ import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
+import com.google.protobuf.TextFormat;
/**
* HRegion stores data for a certain region of a table. It stores all columns
@@ -255,6 +260,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
private final AtomicLong sequenceId = new AtomicLong(-1L);
/**
+ * The sequence id of the last replayed open region event from the primary region. This is used
+ * to skip entries before this due to the possibility of replay edits coming out of order from
+ * replication.
+ */
+ protected volatile long lastReplayedOpenRegionSeqId = -1L;
+
+ /**
* Operation enum is used in {@link HRegion#startRegionOperation} to provide operation context for
* startRegionOperation to possibly invoke different checks before any region operations. Not all
* operations have to be defined here. It's only needed when a special check is need in
@@ -262,7 +274,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
*/
public enum Operation {
ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE,
- REPLAY_BATCH_MUTATE, COMPACT_REGION
+ REPLAY_BATCH_MUTATE, COMPACT_REGION, REPLAY_EVENT
}
//////////////////////////////////////////////////////////////////////////////
@@ -367,6 +379,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// The following map is populated when opening the region
Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+ /** Saved state from replaying prepare flush cache */
+ private PrepareFlushResult prepareFlushResult = null;
+
/**
* Config setting for whether to allow writes when a region is in recovering or not.
*/
@@ -516,6 +531,54 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
public boolean isCompactionNeeded() {
return result == Result.FLUSHED_COMPACTION_NEEDED;
}
+
+ @Override
+ public String toString() {
+ return new StringBuilder()
+ .append("flush result:").append(result).append(", ")
+ .append("failureReason:").append(failureReason).append(",")
+ .append("flush seq id").append(flushSequenceId).toString();
+ }
+ }
+
+ /** A result object from prepare flush cache stage */
+ @VisibleForTesting
+ static class PrepareFlushResult {
+ final FlushResult result; // indicating a failure result from prepare
+ final TreeMap<byte[], StoreFlushContext> storeFlushCtxs;
+ final TreeMap<byte[], List<Path>> committedFiles;
+ final long startTime;
+ final long flushOpSeqId;
+ final long flushedSeqId;
+ final long totalFlushableSize;
+
+ /** Constructs an early exit case */
+ PrepareFlushResult(FlushResult result, long flushSeqId) {
+ this(result, null, null, Math.max(0, flushSeqId), 0, 0, 0);
+ }
+
+ /** Constructs a successful prepare flush result */
+ PrepareFlushResult(
+ TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
+ TreeMap<byte[], List<Path>> committedFiles, long startTime, long flushSeqId,
+ long flushedSeqId, long totalFlushableSize) {
+ this(null, storeFlushCtxs, committedFiles, startTime,
+ flushSeqId, flushedSeqId, totalFlushableSize);
+ }
+
+ private PrepareFlushResult(
+ FlushResult result,
+ TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
+ TreeMap<byte[], List<Path>> committedFiles, long startTime, long flushSeqId,
+ long flushedSeqId, long totalFlushableSize) {
+ this.result = result;
+ this.storeFlushCtxs = storeFlushCtxs;
+ this.committedFiles = committedFiles;
+ this.startTime = startTime;
+ this.flushOpSeqId = flushSeqId;
+ this.flushedSeqId = flushedSeqId;
+ this.totalFlushableSize = totalFlushableSize;
+ }
}
final WriteState writestate = new WriteState();
@@ -771,6 +834,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// Initialize all the HStores
status.setStatus("Initializing all the Stores");
long maxSeqId = initializeRegionStores(reporter, status);
+ this.lastReplayedOpenRegionSeqId = maxSeqId;
this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this));
this.writestate.flushRequested = false;
@@ -1229,9 +1293,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
status.setStatus("Disabling compacts and flushes for region");
+ boolean canFlush = true;
synchronized (writestate) {
// Disable compacting and flushing by background threads for this
// region.
+ canFlush = !writestate.readOnly;
writestate.writesEnabled = false;
LOG.debug("Closing " + this + ": disabling compactions & flushes");
waitForFlushesAndCompactions();
@@ -1239,7 +1305,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// If we were not just flushing, is it worth doing a preflush...one
// that will clear out of the bulk of the memstore before we put up
// the close flag?
- if (!abort && worthPreFlushing()) {
+ if (!abort && worthPreFlushing() && canFlush) {
status.setStatus("Pre-flushing region before close");
LOG.info("Running close preflush of " + this.getRegionNameAsString());
try {
@@ -1262,7 +1328,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
LOG.debug("Updates disabled for region " + this);
// Don't flush the cache if we are aborting
- if (!abort) {
+ if (!abort && canFlush) {
int flushCount = 0;
while (this.getMemstoreSize().get() > 0) {
try {
@@ -1300,7 +1366,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// close each store in parallel
for (final Store store : stores.values()) {
- assert abort || store.getFlushableSize() == 0;
+ assert abort || store.getFlushableSize() == 0 || writestate.readOnly;
completionService
.submit(new Callable<Pair<byte[], Collection<StoreFile>>>() {
@Override
@@ -1336,7 +1402,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
this.closed.set(true);
- if (memstoreSize.get() != 0) LOG.error("Memstore size is " + memstoreSize.get());
+ if (!canFlush) {
+ addAndGetGlobalMemstoreSize(-memstoreSize.get());
+ } else if (memstoreSize.get() != 0) {
+ LOG.error("Memstore size is " + memstoreSize.get());
+ }
if (coprocessorHost != null) {
status.setStatus("Running coprocessor post-close hooks");
this.coprocessorHost.postClose(abort);
@@ -1362,6 +1432,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
*/
public void waitForFlushesAndCompactions() {
synchronized (writestate) {
+ if (this.writestate.readOnly) {
+ // we should not wait for replayed flushed if we are read only (for example in case the
+ // region is a secondary replica).
+ return;
+ }
boolean interrupted = false;
try {
while (writestate.compacting > 0 || writestate.flushing) {
@@ -1592,6 +1667,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
/**
+ * This is a helper function that compact the given store
+ * It is used by utilities and testing
+ *
+ * @throws IOException e
+ */
+ @VisibleForTesting
+ void compactStore(byte[] family, CompactionThroughputController throughputController)
+ throws IOException {
+ Store s = getStore(family);
+ CompactionContext compaction = s.requestCompaction();
+ if (compaction != null) {
+ compact(compaction, s, throughputController);
+ }
+ }
+
+ /*
* Called by compaction thread and after region is opened to compact the
* HStores if necessary.
*
@@ -1738,6 +1829,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
status.setStatus("Running coprocessor pre-flush hooks");
coprocessorHost.preFlush();
}
+ // TODO: this should be managed within memstore with the snapshot, updated only after flush
+ // successful
if (numMutationsWithoutWAL.get() > 0) {
numMutationsWithoutWAL.set(0);
dataInMemoryWithoutWAL.set(0);
@@ -1903,6 +1996,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
*/
protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
final Collection<Store> storesToFlush, MonitoredTask status) throws IOException {
+ PrepareFlushResult result
+ = internalPrepareFlushCache(wal, myseqid, storesToFlush, status, false);
+ if (result.result == null) {
+ return internalFlushCacheAndCommit(wal, status, result, storesToFlush);
+ } else {
+ return result.result; // early exit due to failure from prepare stage
+ }
+ }
+
+ protected PrepareFlushResult internalPrepareFlushCache(
+ final WAL wal, final long myseqid, final Collection<Store> storesToFlush,
+ MonitoredTask status, boolean isReplay)
+ throws IOException {
+
if (this.rsServices != null && this.rsServices.isAborted()) {
// Don't flush when server aborting, it's unsafe
throw new IOException("Aborting flush because server is aborted...");
@@ -1930,10 +2037,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
w.setWriteNumber(flushSeqId);
mvcc.waitForPreviousTransactionsComplete(w);
w = null;
- return flushResult;
+ return new PrepareFlushResult(flushResult, myseqid);
} else {
- return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
- "Nothing to flush");
+ return new PrepareFlushResult(
+ new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush"),
+ myseqid);
}
}
} finally {
@@ -1977,7 +2085,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
flushedFamilyNames.add(store.getFamily().getName());
}
- List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
+ TreeMap<byte[], StoreFlushContext> storeFlushCtxs
+ = new TreeMap<byte[], StoreFlushContext>(Bytes.BYTES_COMPARATOR);
TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>(
Bytes.BYTES_COMPARATOR);
// The sequence id of this flush operation which is used to log FlushMarker and pass to
@@ -1998,7 +2107,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
String msg = "Flush will not be started for ["
+ this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
status.setStatus(msg);
- return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
+ return new PrepareFlushResult(new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg),
+ myseqid);
}
flushOpSeqId = getNextSequenceId(wal);
long oldestUnflushedSeqId = wal.getEarliestMemstoreSeqNum(encodedRegionName);
@@ -2013,12 +2123,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
for (Store s : storesToFlush) {
totalFlushableSizeOfFlushableStores += s.getFlushableSize();
- storeFlushCtxs.add(s.createFlushContext(flushOpSeqId));
+ storeFlushCtxs.put(s.getFamily().getName(), s.createFlushContext(flushOpSeqId));
committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL
}
// write the snapshot start to WAL
- if (wal != null) {
+ if (wal != null && !writestate.readOnly) {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
// no sync. Sync is below where we do not hold the updates lock
@@ -2027,7 +2137,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
// Prepare flush (take a snapshot)
- for (StoreFlushContext flush : storeFlushCtxs) {
+ for (StoreFlushContext flush : storeFlushCtxs.values()) {
flush.prepare();
}
} catch (IOException ex) {
@@ -2075,15 +2185,32 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
mvcc.waitForPreviousTransactionsComplete(w);
// set w to null to prevent mvcc.advanceMemstore from being called again inside finally block
w = null;
- s = "Flushing stores of " + this;
- status.setStatus(s);
- if (LOG.isTraceEnabled()) LOG.trace(s);
} finally {
if (w != null) {
// in case of failure just mark current w as complete
mvcc.advanceMemstore(w);
}
}
+ return new PrepareFlushResult(storeFlushCtxs, committedFiles, startTime, flushOpSeqId,
+ flushedSeqId, totalFlushableSizeOfFlushableStores);
+ }
+
+ protected FlushResult internalFlushCacheAndCommit(
+ final WAL wal, MonitoredTask status, final PrepareFlushResult prepareResult,
+ final Collection<Store> storesToFlush)
+ throws IOException {
+
+ // prepare flush context is carried via PrepareFlushResult
+ TreeMap<byte[], StoreFlushContext> storeFlushCtxs = prepareResult.storeFlushCtxs;
+ TreeMap<byte[], List<Path>> committedFiles = prepareResult.committedFiles;
+ long startTime = prepareResult.startTime;
+ long flushOpSeqId = prepareResult.flushOpSeqId;
+ long flushedSeqId = prepareResult.flushedSeqId;
+ long totalFlushableSizeOfFlushableStores = prepareResult.totalFlushableSize;
+
+ String s = "Flushing stores of " + this;
+ status.setStatus(s);
+ if (LOG.isTraceEnabled()) LOG.trace(s);
// Any failure from here on out will be catastrophic requiring server
// restart so wal content can be replayed and put back into the memstore.
@@ -2096,7 +2223,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// just-made new flush store file. The new flushed file is still in the
// tmp directory.
- for (StoreFlushContext flush : storeFlushCtxs) {
+ for (StoreFlushContext flush : storeFlushCtxs.values()) {
flush.flushCache(status);
}
@@ -2104,7 +2231,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// all the store scanners to reset/reseek).
Iterator<Store> it = storesToFlush.iterator();
// stores.values() and storeFlushCtxs have same order
- for (StoreFlushContext flush : storeFlushCtxs) {
+ for (StoreFlushContext flush : storeFlushCtxs.values()) {
boolean needsCompaction = flush.commit(status);
if (needsCompaction) {
compactionRequested = true;
@@ -2593,6 +2720,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
*/
public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId)
throws IOException {
+ if (!RegionReplicaUtil.isDefaultReplica(getRegionInfo())
+ && replaySeqId < lastReplayedOpenRegionSeqId) {
+ // if it is a secondary replica we should ignore these entries silently
+ // since they are coming out of order
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(getRegionInfo().getEncodedName() + " : "
+ + "Skipping " + mutations.length + " mutations with replaySeqId=" + replaySeqId
+ + " which is < than lastReplayedOpenRegionSeqId=" + lastReplayedOpenRegionSeqId);
+ for (MutationReplay mut : mutations) {
+ LOG.trace(getRegionInfo().getEncodedName() + " : Skipping : " + mut.mutation);
+ }
+ }
+
+ OperationStatus[] statuses = new OperationStatus[mutations.length];
+ for (int i = 0; i < statuses.length; i++) {
+ statuses[i] = OperationStatus.SUCCESS;
+ }
+ return statuses;
+ }
return batchMutate(new ReplayBatch(mutations, replaySeqId));
}
@@ -2897,7 +3043,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
// txid should always increase, so having the one from the last call is ok.
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
- walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
+ walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), now, m.getClusterIds(),
currentNonceGroup, currentNonce);
txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey,
@@ -2923,14 +3069,29 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// STEP 5. Append the final edit to WAL. Do not sync wal.
// -------------------------
Mutation mutation = batchOp.getMutation(firstIndex);
+ if (isInReplay) {
+ // use wal key from the original
+ walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
+ this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
+ mutation.getClusterIds(), currentNonceGroup, currentNonce);
+ long replaySeqId = batchOp.getReplaySequenceId();
+ walKey.setOrigLogSeqNum(replaySeqId);
+
+ // ensure that the sequence id of the region is at least as big as orig log seq id
+ while (true) {
+ long seqId = getSequenceId().get();
+ if (seqId >= replaySeqId) break;
+ if (getSequenceId().compareAndSet(seqId, replaySeqId)) break;
+ }
+ }
if (walEdit.size() > 0) {
+ if (!isInReplay) {
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
mutation.getClusterIds(), currentNonceGroup, currentNonce);
- if(isInReplay) {
- walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId());
}
+
txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
getSequenceId(), true, memstoreCells);
}
@@ -3803,7 +3964,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
CompactionDescriptor compaction = WALEdit.getCompaction(cell);
if (compaction != null) {
//replay the compaction
- completeCompactionMarker(compaction);
+ replayWALCompactionMarker(compaction, false, true, Long.MAX_VALUE);
}
skippedEdits++;
continue;
@@ -3886,15 +4047,506 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* that was not finished. We could find one recovering a WAL after a regionserver crash.
* See HBASE-2331.
*/
- void completeCompactionMarker(CompactionDescriptor compaction)
+ void replayWALCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles,
+ boolean removeFiles, long replaySeqId)
+ throws IOException {
+ checkTargetRegion(compaction.getEncodedRegionName().toByteArray(),
+ "Compaction marker from WAL ", compaction);
+
+ if (replaySeqId < lastReplayedOpenRegionSeqId) {
+ LOG.warn("Skipping replaying compaction event :" + TextFormat.shortDebugString(compaction)
+ + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
+ + " of " + lastReplayedOpenRegionSeqId);
+ return;
+ }
+
+ startRegionOperation(Operation.REPLAY_EVENT);
+ try {
+ Store store = this.getStore(compaction.getFamilyName().toByteArray());
+ if (store == null) {
+ LOG.warn("Found Compaction WAL edit for deleted family:" +
+ Bytes.toString(compaction.getFamilyName().toByteArray()));
+ return;
+ }
+ store.replayCompactionMarker(compaction, pickCompactionFiles, removeFiles);
+ } finally {
+ closeRegionOperation(Operation.REPLAY_EVENT);
+ }
+ }
+
+ void replayWALFlushMarker(FlushDescriptor flush) throws IOException {
+ checkTargetRegion(flush.getEncodedRegionName().toByteArray(),
+ "Flush marker from WAL ", flush);
+
+ if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
+ return; // if primary nothing to do
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Replaying flush marker " + TextFormat.shortDebugString(flush));
+ }
+
+ startRegionOperation(Operation.REPLAY_EVENT); // use region close lock to guard against close
+ try {
+ FlushAction action = flush.getAction();
+ switch (action) {
+ case START_FLUSH:
+ replayWALFlushStartMarker(flush);
+ break;
+ case COMMIT_FLUSH:
+ replayWALFlushCommitMarker(flush);
+ break;
+ case ABORT_FLUSH:
+ replayWALFlushAbortMarker(flush);
+ break;
+ default:
+ LOG.warn("Received a flush event with unknown action, ignoring. "
+ + TextFormat.shortDebugString(flush));
+ break;
+ }
+ } finally {
+ closeRegionOperation(Operation.REPLAY_EVENT);
+ }
+ }
+
+ /** Replay the flush marker from primary region by creating a corresponding snapshot of
+ * the store memstores, only if the memstores do not have a higher seqId from an earlier wal
+ * edit (because the events may be coming out of order).
+ */
+ @VisibleForTesting
+ PrepareFlushResult replayWALFlushStartMarker(FlushDescriptor flush) throws IOException {
+ long flushSeqId = flush.getFlushSequenceNumber();
+
+ HashSet<Store> storesToFlush = new HashSet<Store>();
+ for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) {
+ byte[] family = storeFlush.getFamilyName().toByteArray();
+ Store store = getStore(family);
+ if (store == null) {
+ LOG.info("Received a flush start marker from primary, but the family is not found. Ignoring"
+ + " StoreFlushDescriptor:" + TextFormat.shortDebugString(storeFlush));
+ continue;
+ }
+ storesToFlush.add(store);
+ }
+
+ MonitoredTask status = TaskMonitor.get().createStatus("Preparing flush " + this);
+
+ // we will use writestate as a coarse-grain lock for all the replay events
+ // (flush, compaction, region open etc)
+ synchronized (writestate) {
+ try {
+ if (flush.getFlushSequenceNumber() < lastReplayedOpenRegionSeqId) {
+ LOG.warn("Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
+ + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
+ + " of " + lastReplayedOpenRegionSeqId);
+ return null;
+ }
+ if (numMutationsWithoutWAL.get() > 0) {
+ numMutationsWithoutWAL.set(0);
+ dataInMemoryWithoutWAL.set(0);
+ }
+
+ if (!writestate.flushing) {
+ // we do not have an active snapshot and corresponding this.prepareResult. This means
+ // we can just snapshot our memstores and continue as normal.
+
+ // invoke prepareFlushCache. Send null as wal since we do not want the flush events in wal
+ PrepareFlushResult prepareResult = internalPrepareFlushCache(null,
+ flushSeqId, storesToFlush, status, true);
+ if (prepareResult.result == null) {
+ // save the PrepareFlushResult so that we can use it later from commit flush
+ this.writestate.flushing = true;
+ this.prepareFlushResult = prepareResult;
+ status.markComplete("Flush prepare successful");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getRegionInfo().getEncodedName() + " : "
+ + " Prepared flush with seqId:" + flush.getFlushSequenceNumber());
+ }
+ } else {
+ status.abort("Flush prepare failed with " + prepareResult.result);
+ // nothing much to do. prepare flush failed because of some reason.
+ }
+ return prepareResult;
+ } else {
+ // we already have an active snapshot.
+ if (flush.getFlushSequenceNumber() == this.prepareFlushResult.flushOpSeqId) {
+ // They define the same flush. Log and continue.
+ LOG.warn("Received a flush prepare marker with the same seqId: " +
+ + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
+ + prepareFlushResult.flushOpSeqId + ". Ignoring");
+ // ignore
+ } else if (flush.getFlushSequenceNumber() < this.prepareFlushResult.flushOpSeqId) {
+ // We received a flush with a smaller seqNum than what we have prepared. We can only
+ // ignore this prepare flush request.
+ LOG.warn("Received a flush prepare marker with a smaller seqId: " +
+ + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
+ + prepareFlushResult.flushOpSeqId + ". Ignoring");
+ // ignore
+ } else {
+ // We received a flush with a larger seqNum than what we have prepared
+ LOG.warn("Received a flush prepare marker with a larger seqId: " +
+ + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
+ + prepareFlushResult.flushOpSeqId + ". Ignoring");
+ // We do not have multiple active snapshots in the memstore or a way to merge current
+ // memstore snapshot with the contents and resnapshot for now. We cannot take
+ // another snapshot and drop the previous one because that will cause temporary
+ // data loss in the secondary. So we ignore this for now, deferring the resolution
+ // to happen when we see the corresponding flush commit marker. If we have a memstore
+ // snapshot with x, and later received another prepare snapshot with y (where x < y),
+ // when we see flush commit for y, we will drop snapshot for x, and can also drop all
+ // the memstore edits if everything in memstore is < y. This is the usual case for
+ // RS crash + recovery where we might see consequtive prepare flush wal markers.
+ // Otherwise, this will cause more memory to be used in secondary replica until a
+ // further prapare + commit flush is seen and replayed.
+ }
+ }
+ } finally {
+ status.cleanup();
+ writestate.notifyAll();
+ }
+ }
+ return null;
+ }
+
+ @VisibleForTesting
+ void replayWALFlushCommitMarker(FlushDescriptor flush) throws IOException {
+ MonitoredTask status = TaskMonitor.get().createStatus("Committing flush " + this);
+
+ // check whether we have the memstore snapshot with the corresponding seqId. Replay to
+ // secondary region replicas are in order, except for when the region moves or then the
+ // region server crashes. In those cases, we may receive replay requests out of order from
+ // the original seqIds.
+ synchronized (writestate) {
+ try {
+ if (flush.getFlushSequenceNumber() < lastReplayedOpenRegionSeqId) {
+ LOG.warn("Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
+ + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
+ + " of " + lastReplayedOpenRegionSeqId);
+ return;
+ }
+
+ if (writestate.flushing) {
+ PrepareFlushResult prepareFlushResult = this.prepareFlushResult;
+ if (flush.getFlushSequenceNumber() == prepareFlushResult.flushOpSeqId) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getRegionInfo().getEncodedName() + " : "
+ + "Received a flush commit marker with seqId:" + flush.getFlushSequenceNumber()
+ + " and a previous prepared snapshot was found");
+ }
+ // This is the regular case where we received commit flush after prepare flush
+ // corresponding to the same seqId.
+ replayFlushInStores(flush, prepareFlushResult, true);
+
+ // Set down the memstore size by amount of flush.
+ this.addAndGetGlobalMemstoreSize(-prepareFlushResult.totalFlushableSize);
+
+ this.prepareFlushResult = null;
+ writestate.flushing = false;
+ } else if (flush.getFlushSequenceNumber() < prepareFlushResult.flushOpSeqId) {
+ // This should not happen normally. However, lets be safe and guard against these cases
+ // we received a flush commit with a smaller seqId than what we have prepared
+ // we will pick the flush file up from this commit (if we have not seen it), but we
+ // will not drop the memstore
+ LOG.warn("Received a flush commit marker with smaller seqId: "
+ + flush.getFlushSequenceNumber() + " than what we have prepared with seqId: "
+ + prepareFlushResult.flushOpSeqId + ". Picking up new file, but not dropping"
+ +" prepared memstore snapshot");
+ replayFlushInStores(flush, prepareFlushResult, false);
+
+ // snapshot is not dropped, so memstore sizes should not be decremented
+ // we still have the prepared snapshot, flushing should still be true
+ } else {
+ // This should not happen normally. However, lets be safe and guard against these cases
+ // we received a flush commit with a larger seqId than what we have prepared
+ // we will pick the flush file for this. We will also obtain the updates lock and
+ // look for contents of the memstore to see whether we have edits after this seqId.
+ // If not, we will drop all the memstore edits and the snapshot as well.
+ LOG.warn("Received a flush commit marker with larger seqId: "
+ + flush.getFlushSequenceNumber() + " than what we have prepared with seqId: " +
+ prepareFlushResult.flushOpSeqId + ". Picking up new file and dropping prepared"
+ +" memstore snapshot");
+
+ replayFlushInStores(flush, prepareFlushResult, true);
+
+ // Set down the memstore size by amount of flush.
+ this.addAndGetGlobalMemstoreSize(-prepareFlushResult.totalFlushableSize);
+
+ // Inspect the memstore contents to see whether the memstore contains only edits
+ // with seqId smaller than the flush seqId. If so, we can discard those edits.
+ dropMemstoreContentsForSeqId(flush.getFlushSequenceNumber(), null);
+
+ this.prepareFlushResult = null;
+ writestate.flushing = false;
+ }
+ } else {
+ LOG.warn(getRegionInfo().getEncodedName() + " : "
+ + "Received a flush commit marker with seqId:" + flush.getFlushSequenceNumber()
+ + ", but no previous prepared snapshot was found");
+ // There is no corresponding prepare snapshot from before.
+ // We will pick up the new flushed file
+ replayFlushInStores(flush, null, false);
+
+ // Inspect the memstore contents to see whether the memstore contains only edits
+ // with seqId smaller than the flush seqId. If so, we can discard those edits.
+ dropMemstoreContentsForSeqId(flush.getFlushSequenceNumber(), null);
+ }
+
+ status.markComplete("Flush commit successful");
+
+ // Update the last flushed sequence id for region.
+ this.maxFlushedSeqId = flush.getFlushSequenceNumber();
+
+ // advance the mvcc read point so that the new flushed file is visible.
+ // there may be some in-flight transactions, but they won't be made visible since they are
+ // either greater than flush seq number or they were already dropped via flush.
+ // TODO: If we are using FlushAllStoresPolicy, then this can make edits visible from other
+ // stores while they are still in flight because the flush commit marker will not contain
+ // flushes from ALL stores.
+ getMVCC().advanceMemstoreReadPointIfNeeded(flush.getFlushSequenceNumber());
+
+ // C. Finally notify anyone waiting on memstore to clear:
+ // e.g. checkResources().
+ synchronized (this) {
+ notifyAll(); // FindBugs NN_NAKED_NOTIFY
+ }
+ } finally {
+ status.cleanup();
+ writestate.notifyAll();
+ }
+ }
+ }
+
+ /**
+ * Replays the given flush descriptor by opening the flush files in stores and dropping the
+ * memstore snapshots if requested.
+ * @param flush
+ * @param prepareFlushResult
+ * @param dropMemstoreSnapshot
+ * @throws IOException
+ */
+ private void replayFlushInStores(FlushDescriptor flush, PrepareFlushResult prepareFlushResult,
+ boolean dropMemstoreSnapshot)
throws IOException {
- Store store = this.getStore(compaction.getFamilyName().toByteArray());
- if (store == null) {
- LOG.warn("Found Compaction WAL edit for deleted family:" +
- Bytes.toString(compaction.getFamilyName().toByteArray()));
+ for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) {
+ byte[] family = storeFlush.getFamilyName().toByteArray();
+ Store store = getStore(family);
+ if (store == null) {
+ LOG.warn("Received a flush commit marker from primary, but the family is not found." +
+ "Ignoring StoreFlushDescriptor:" + storeFlush);
+ continue;
+ }
+ List<String> flushFiles = storeFlush.getFlushOutputList();
+ StoreFlushContext ctx = null;
+ long startTime = EnvironmentEdgeManager.currentTime();
+ if (prepareFlushResult == null) {
+ ctx = store.createFlushContext(flush.getFlushSequenceNumber());
+ } else {
+ ctx = prepareFlushResult.storeFlushCtxs.get(family);
+ startTime = prepareFlushResult.startTime;
+ }
+
+ if (ctx == null) {
+ LOG.warn("Unexpected: flush commit marker received from store "
+ + Bytes.toString(family) + " but no associated flush context. Ignoring");
+ continue;
+ }
+ ctx.replayFlush(flushFiles, dropMemstoreSnapshot); // replay the flush
+
+ // Record latest flush time
+ this.lastStoreFlushTimeMap.put(store, startTime);
+ }
+ }
+
+ /**
+ * Drops the memstore contents after replaying a flush descriptor or region open event replay
+ * if the memstore edits have seqNums smaller than the given seq id
+ * @param flush the flush descriptor
+ * @throws IOException
+ */
+ private void dropMemstoreContentsForSeqId(long seqId, Store store) throws IOException {
+ this.updatesLock.writeLock().lock();
+ try {
+ mvcc.waitForPreviousTransactionsComplete();
+ long currentSeqId = getSequenceId().get();
+ if (seqId >= currentSeqId) {
+ // then we can drop the memstore contents since everything is below this seqId
+ LOG.info("Dropping memstore contents as well since replayed flush seqId: "
+ + seqId + " is greater than current seqId:" + currentSeqId);
+
+ // Prepare flush (take a snapshot) and then abort (drop the snapshot)
+ if (store == null ) {
+ for (Store s : stores.values()) {
+ dropStoreMemstoreContentsForSeqId(s, currentSeqId);
+ }
+ } else {
+ dropStoreMemstoreContentsForSeqId(store, currentSeqId);
+ }
+ } else {
+ LOG.info("Not dropping memstore contents since replayed flush seqId: "
+ + seqId + " is smaller than current seqId:" + currentSeqId);
+ }
+ } finally {
+ this.updatesLock.writeLock().unlock();
+ }
+ }
+
+ private void dropStoreMemstoreContentsForSeqId(Store s, long currentSeqId) throws IOException {
+ this.addAndGetGlobalMemstoreSize(-s.getFlushableSize());
+ StoreFlushContext ctx = s.createFlushContext(currentSeqId);
+ ctx.prepare();
+ ctx.abort();
+ }
+
+ private void replayWALFlushAbortMarker(FlushDescriptor flush) {
+ // nothing to do for now. A flush abort will cause a RS abort which means that the region
+ // will be opened somewhere else later. We will see the region open event soon, and replaying
+ // that will drop the snapshot
+ }
+
+ @VisibleForTesting
+ PrepareFlushResult getPrepareFlushResult() {
+ return prepareFlushResult;
+ }
+
+ void replayWALRegionEventMarker(RegionEventDescriptor regionEvent) throws IOException {
+ checkTargetRegion(regionEvent.getEncodedRegionName().toByteArray(),
+ "RegionEvent marker from WAL ", regionEvent);
+
+ startRegionOperation(Operation.REPLAY_EVENT);
+ try {
+ if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
+ return; // if primary nothing to do
+ }
+
+ if (regionEvent.getEventType() == EventType.REGION_CLOSE) {
+ // nothing to do on REGION_CLOSE for now.
+ return;
+ }
+ if (regionEvent.getEventType() != EventType.REGION_OPEN) {
+ LOG.warn("Unknown region event received, ignoring :"
+ + TextFormat.shortDebugString(regionEvent));
+ return;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Replaying region open event marker " + TextFormat.shortDebugString(regionEvent));
+ }
+
+ // we will use writestate as a coarse-grain lock for all the replay events
+ synchronized (writestate) {
+ // Replication can deliver events out of order when primary region moves or the region
+ // server crashes, since there is no coordination between replication of different wal files
+ // belonging to different region servers. We have to safe guard against this case by using
+ // region open event's seqid. Since this is the first event that the region puts (after
+ // possibly flushing recovered.edits), after seeing this event, we can ignore every edit
+ // smaller than this seqId
+ if (this.lastReplayedOpenRegionSeqId < regionEvent.getLogSequenceNumber()) {
+ this.lastReplayedOpenRegionSeqId = regionEvent.getLogSequenceNumber();
+ } else {
+ LOG.warn("Skipping replaying region event :" + TextFormat.shortDebugString(regionEvent)
+ + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
+ + " of " + lastReplayedOpenRegionSeqId);
+ return;
+ }
+
+ // region open lists all the files that the region has at the time of the opening. Just pick
+ // all the files and drop prepared flushes and empty memstores
+ for (StoreDescriptor storeDescriptor : regionEvent.getStoresList()) {
+ // stores of primary may be different now
+ byte[] family = storeDescriptor.getFamilyName().toByteArray();
+ Store store = getStore(family);
+ if (store == null) {
+ LOG.warn("Received a region open marker from primary, but the family is not found. "
+ + "Ignoring. StoreDescriptor:" + storeDescriptor);
+ continue;
+ }
+
+ long storeSeqId = store.getMaxSequenceId();
+ List<String> storeFiles = storeDescriptor.getStoreFileList();
+ store.refreshStoreFiles(storeFiles); // replace the files with the new ones
+ if (store.getMaxSequenceId() != storeSeqId) {
+ // Record latest flush time if we picked up new files
+ lastStoreFlushTimeMap.put(store, EnvironmentEdgeManager.currentTime());
+ }
+
+ if (writestate.flushing) {
+ // only drop memstore snapshots if they are smaller than last flush for the store
+ if (this.prepareFlushResult.flushOpSeqId <= regionEvent.getLogSequenceNumber()) {
+ StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs.get(family);
+ if (ctx != null) {
+ long snapshotSize = store.getFlushableSize();
+ ctx.abort();
+ this.addAndGetGlobalMemstoreSize(-snapshotSize);
+ this.prepareFlushResult.storeFlushCtxs.remove(family);
+ }
+ }
+ }
+
+ // Drop the memstore contents if they are now smaller than the latest seen flushed file
+ dropMemstoreContentsForSeqId(regionEvent.getLogSequenceNumber(), store);
+ if (storeSeqId > this.maxFlushedSeqId) {
+ this.maxFlushedSeqId = storeSeqId;
+ }
+ }
+
+ // if all stores ended up dropping their snapshots, we can safely drop the
+ // prepareFlushResult
+ if (writestate.flushing) {
+ boolean canDrop = true;
+ for (Entry<byte[], StoreFlushContext> entry
+ : prepareFlushResult.storeFlushCtxs.entrySet()) {
+ Store store = getStore(entry.getKey());
+ if (store == null) {
+ continue;
+ }
+ if (store.getSnapshotSize() > 0) {
+ canDrop = false;
+ }
+ }
+
+ // this means that all the stores in the region has finished flushing, but the WAL marker
+ // may not have been written or we did not receive it yet.
+ if (canDrop) {
+ writestate.flushing = false;
+ this.prepareFlushResult = null;
+ }
+ }
+
+
+ // advance the mvcc read point so that the new flushed file is visible.
+ // there may be some in-flight transactions, but they won't be made visible since they are
+ // either greater than flush seq number or they were already dropped via flush.
+ getMVCC().advanceMemstoreReadPointIfNeeded(this.maxFlushedSeqId);
+
+ // C. Finally notify anyone waiting on memstore to clear:
+ // e.g. checkResources().
+ synchronized (this) {
+ notifyAll(); // FindBugs NN_NAKED_NOTIFY
+ }
+ }
+ } finally {
+ closeRegionOperation(Operation.REPLAY_EVENT);
+ }
+ }
+
+ /** Checks whether the given regionName is either equal to our region, or that
+ * the regionName is the primary region to our corresponding range for the secondary replica.
+ */
+ private void checkTargetRegion(byte[] encodedRegionName, String exceptionMsg, Object payload)
+ throws WrongRegionException {
+ if (Bytes.equals(this.getRegionInfo().getEncodedNameAsBytes(), encodedRegionName)) {
return;
}
- store.completeCompactionMarker(compaction);
+
+ if (!RegionReplicaUtil.isDefaultReplica(this.getRegionInfo()) &&
+ Bytes.equals(encodedRegionName,
+ this.fs.getRegionInfoForFS().getEncodedNameAsBytes())) {
+ return;
+ }
+
+ throw new WrongRegionException(exceptionMsg + payload
+ + " targetted for region " + Bytes.toStringBinary(encodedRegionName)
+ + " does not match this region: " + this.getRegionInfo());
}
/**
@@ -4127,8 +4779,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @param familyPaths List of Pair<byte[] column family, String hfilePath>
* @param bulkLoadListener Internal hooks enabling massaging/preparation of a
* file about to be bulk loaded
- * @param assignSeqId Force a flush, get it's sequenceId to preserve the guarantee that
- * all the edits lower than the highest sequential ID from all the
+ * @param assignSeqId Force a flush, get it's sequenceId to preserve the guarantee that
+ * all the edits lower than the highest sequential ID from all the
* HFiles are flushed on disk.
* @return true if successful, false if failed recoverably
* @throws IOException if failed unrecoverably.
@@ -4217,7 +4869,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
}
store.bulkLoadHFile(finalPath, seqId);
-
+
if(storeFiles.containsKey(familyName)) {
storeFiles.get(familyName).add(new Path(finalPath));
} else {
@@ -4265,7 +4917,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
}
}
-
+
closeBulkRegionOperation();
}
}
@@ -4989,7 +5641,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
checkClassLoading();
this.openSeqNum = initialize(reporter);
this.setSequenceId(openSeqNum);
- if (wal != null && getRegionServerServices() != null) {
+ if (wal != null && getRegionServerServices() != null && !writestate.readOnly) {
writeRegionOpenMarker(wal, openSeqNum);
}
return this;
@@ -5660,7 +6312,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
}
if (cell.getTagsLength() > 0) {
- Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(),
+ Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(),
cell.getTagsOffset(), cell.getTagsLength());
while (i.hasNext()) {
newTags.add(i.next());
@@ -6080,8 +6732,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
ClassSize.ARRAY +
- 44 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
- (11 * Bytes.SIZEOF_LONG) +
+ 45 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
+ (12 * Bytes.SIZEOF_LONG) +
4 * Bytes.SIZEOF_BOOLEAN);
// woefully out of date - currently missing:
http://git-wip-us.apache.org/repos/asf/hbase/blob/3e10e6e1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index 0751634..014ec2c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -119,6 +119,10 @@ public class HRegionFileSystem {
return this.regionInfo;
}
+ public HRegionInfo getRegionInfoForFS() {
+ return this.regionInfoForFs;
+ }
+
/** @return {@link Path} to the region's root directory. */
public Path getTableDir() {
return this.tableDir;
@@ -205,7 +209,7 @@ public class HRegionFileSystem {
continue;
}
StoreFileInfo info = ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, regionInfo,
- regionInfoForFs, familyName, status);
+ regionInfoForFs, familyName, status.getPath());
storeFiles.add(info);
}
@@ -234,8 +238,8 @@ public class HRegionFileSystem {
StoreFileInfo getStoreFileInfo(final String familyName, final String fileName)
throws IOException {
Path familyDir = getStoreDir(familyName);
- FileStatus status = fs.getFileStatus(new Path(familyDir, fileName));
- return new StoreFileInfo(this.conf, this.fs, status);
+ return ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, regionInfo,
+ regionInfoForFs, familyName, new Path(familyDir, fileName));
}
/**