You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/08/28 21:32:00 UTC
svn commit: r1518335 [1/2] - in /hbase/trunk:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/
hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/
hbase-protocol/src/main/protobuf/ hbase-server/src/main/java/org/apache/ha...
Author: stack
Date: Wed Aug 28 19:32:00 2013
New Revision: 1518335
URL: http://svn.apache.org/r1518335
Log:
HBASE-7709 Infinite loop possible in Master/Master replication
Modified:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
hbase/trunk/hbase-protocol/src/main/protobuf/WAL.proto
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotLogSplitter.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java Wed Aug 28 19:32:00 2013
@@ -39,6 +39,10 @@ import org.apache.hadoop.hbase.io.HeapSi
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class Mutation extends OperationWithAttributes implements Row, CellScannable,
@@ -57,8 +61,10 @@ public abstract class Mutation extends O
// familyMap
ClassSize.TREEMAP);
- // Attribute used in Mutations to indicate the originating cluster.
- private static final String CLUSTER_ID_ATTR = "_c.id_";
+ /**
+ * The attribute for storing the list of clusters that have consumed the change.
+ */
+ private static final String CONSUMED_CLUSTER_IDS = "_cs.id";
protected byte [] row = null;
protected long ts = HConstants.LATEST_TIMESTAMP;
@@ -225,26 +231,33 @@ public abstract class Mutation extends O
}
/**
- * Set the replication custer id.
- * @param clusterId
+ * Marks that the clusters with the given clusterIds have consumed the mutation
+ * @param clusterIds of the clusters that have consumed the mutation
*/
- public void setClusterId(UUID clusterId) {
- if (clusterId == null) return;
- byte[] val = new byte[2*Bytes.SIZEOF_LONG];
- Bytes.putLong(val, 0, clusterId.getMostSignificantBits());
- Bytes.putLong(val, Bytes.SIZEOF_LONG, clusterId.getLeastSignificantBits());
- setAttribute(CLUSTER_ID_ATTR, val);
+ public void setClusterIds(List<UUID> clusterIds) {
+ ByteArrayDataOutput out = ByteStreams.newDataOutput();
+ out.writeInt(clusterIds.size());
+ for (UUID clusterId : clusterIds) {
+ out.writeLong(clusterId.getMostSignificantBits());
+ out.writeLong(clusterId.getLeastSignificantBits());
+ }
+ setAttribute(CONSUMED_CLUSTER_IDS, out.toByteArray());
}
/**
- * @return The replication cluster id.
+ * @return the set of clusterIds that have consumed the mutation
*/
- public UUID getClusterId() {
- byte[] attr = getAttribute(CLUSTER_ID_ATTR);
- if (attr == null) {
- return HConstants.DEFAULT_CLUSTER_ID;
+ public List<UUID> getClusterIds() {
+ List<UUID> clusterIds = new ArrayList<UUID>();
+ byte[] bytes = getAttribute(CONSUMED_CLUSTER_IDS);
+ if(bytes != null) {
+ ByteArrayDataInput in = ByteStreams.newDataInput(bytes);
+ int numClusters = in.readInt();
+ for(int i=0; i<numClusters; i++){
+ clusterIds.add(new UUID(in.readLong(), in.readLong()));
+ }
}
- return new UUID(Bytes.toLong(attr,0), Bytes.toLong(attr, Bytes.SIZEOF_LONG));
+ return clusterIds;
}
/**
Modified: hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java (original)
+++ hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java Wed Aug 28 19:32:00 2013
@@ -567,19 +567,43 @@ public final class WALProtos {
*/
long getWriteTime();
- // optional .UUID cluster_id = 5;
+ // optional .UUID cluster_id = 5 [deprecated = true];
/**
- * <code>optional .UUID cluster_id = 5;</code>
+ * <code>optional .UUID cluster_id = 5 [deprecated = true];</code>
+ *
+ * <pre>
+ *
+ *This parameter is deprecated in favor of clusters which
+ *contains the list of clusters that have consumed the change.
+ *It is retained so that the log created by earlier releases (0.94)
+ *can be read by the newer releases.
+ * </pre>
*/
- boolean hasClusterId();
+ @java.lang.Deprecated boolean hasClusterId();
/**
- * <code>optional .UUID cluster_id = 5;</code>
+ * <code>optional .UUID cluster_id = 5 [deprecated = true];</code>
+ *
+ * <pre>
+ *
+ *This parameter is deprecated in favor of clusters which
+ *contains the list of clusters that have consumed the change.
+ *It is retained so that the log created by earlier releases (0.94)
+ *can be read by the newer releases.
+ * </pre>
*/
- org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterId();
+ @java.lang.Deprecated org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterId();
/**
- * <code>optional .UUID cluster_id = 5;</code>
+ * <code>optional .UUID cluster_id = 5 [deprecated = true];</code>
+ *
+ * <pre>
+ *
+ *This parameter is deprecated in favor of clusters which
+ *contains the list of clusters that have consumed the change.
+ *It is retained so that the log created by earlier releases (0.94)
+ *can be read by the newer releases.
+ * </pre>
*/
- org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdOrBuilder();
+ @java.lang.Deprecated org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdOrBuilder();
// repeated .FamilyScope scopes = 6;
/**
@@ -609,30 +633,67 @@ public final class WALProtos {
// optional uint32 following_kv_count = 7;
/**
* <code>optional uint32 following_kv_count = 7;</code>
+ */
+ boolean hasFollowingKvCount();
+ /**
+ * <code>optional uint32 following_kv_count = 7;</code>
+ */
+ int getFollowingKvCount();
+
+ // repeated .UUID cluster_ids = 8;
+ /**
+ * <code>repeated .UUID cluster_ids = 8;</code>
+ *
+ * <pre>
+ *
+ *cluster_ids field contains the list of clusters that have
+ *consumed the change
+ * </pre>
+ */
+ java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID>
+ getClusterIdsList();
+ /**
+ * <code>repeated .UUID cluster_ids = 8;</code>
*
* <pre>
*
- *optional CustomEntryType custom_entry_type = 8;
+ *cluster_ids field contains the list of clusters that have
+ *consumed the change
+ * </pre>
+ */
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterIds(int index);
+ /**
+ * <code>repeated .UUID cluster_ids = 8;</code>
+ *
+ * <pre>
*
- *enum CustomEntryType {
- *COMPACTION = 0;
- *}
+ *cluster_ids field contains the list of clusters that have
+ *consumed the change
* </pre>
*/
- boolean hasFollowingKvCount();
+ int getClusterIdsCount();
/**
- * <code>optional uint32 following_kv_count = 7;</code>
+ * <code>repeated .UUID cluster_ids = 8;</code>
*
* <pre>
*
- *optional CustomEntryType custom_entry_type = 8;
+ *cluster_ids field contains the list of clusters that have
+ *consumed the change
+ * </pre>
+ */
+ java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder>
+ getClusterIdsOrBuilderList();
+ /**
+ * <code>repeated .UUID cluster_ids = 8;</code>
+ *
+ * <pre>
*
- *enum CustomEntryType {
- *COMPACTION = 0;
- *}
+ *cluster_ids field contains the list of clusters that have
+ *consumed the change
* </pre>
*/
- int getFollowingKvCount();
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdsOrBuilder(
+ int index);
}
/**
* Protobuf type {@code WALKey}
@@ -735,6 +796,14 @@ public final class WALProtos {
followingKvCount_ = input.readUInt32();
break;
}
+ case 66: {
+ if (!((mutable_bitField0_ & 0x00000080) == 0x00000080)) {
+ clusterIds_ = new java.util.ArrayList<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID>();
+ mutable_bitField0_ |= 0x00000080;
+ }
+ clusterIds_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.PARSER, extensionRegistry));
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -746,6 +815,9 @@ public final class WALProtos {
if (((mutable_bitField0_ & 0x00000020) == 0x00000020)) {
scopes_ = java.util.Collections.unmodifiableList(scopes_);
}
+ if (((mutable_bitField0_ & 0x00000080) == 0x00000080)) {
+ clusterIds_ = java.util.Collections.unmodifiableList(clusterIds_);
+ }
this.unknownFields = unknownFields.build();
makeExtensionsImmutable();
}
@@ -842,25 +914,49 @@ public final class WALProtos {
return writeTime_;
}
- // optional .UUID cluster_id = 5;
+ // optional .UUID cluster_id = 5 [deprecated = true];
public static final int CLUSTER_ID_FIELD_NUMBER = 5;
private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID clusterId_;
/**
- * <code>optional .UUID cluster_id = 5;</code>
+ * <code>optional .UUID cluster_id = 5 [deprecated = true];</code>
+ *
+ * <pre>
+ *
+ *This parameter is deprecated in favor of clusters which
+ *contains the list of clusters that have consumed the change.
+ *It is retained so that the log created by earlier releases (0.94)
+ *can be read by the newer releases.
+ * </pre>
*/
- public boolean hasClusterId() {
+ @java.lang.Deprecated public boolean hasClusterId() {
return ((bitField0_ & 0x00000010) == 0x00000010);
}
/**
- * <code>optional .UUID cluster_id = 5;</code>
+ * <code>optional .UUID cluster_id = 5 [deprecated = true];</code>
+ *
+ * <pre>
+ *
+ *This parameter is deprecated in favor of clusters which
+ *contains the list of clusters that have consumed the change.
+ *It is retained so that the log created by earlier releases (0.94)
+ *can be read by the newer releases.
+ * </pre>
*/
- public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterId() {
+ @java.lang.Deprecated public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterId() {
return clusterId_;
}
/**
- * <code>optional .UUID cluster_id = 5;</code>
+ * <code>optional .UUID cluster_id = 5 [deprecated = true];</code>
+ *
+ * <pre>
+ *
+ *This parameter is deprecated in favor of clusters which
+ *contains the list of clusters that have consumed the change.
+ *It is retained so that the log created by earlier releases (0.94)
+ *can be read by the newer releases.
+ * </pre>
*/
- public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdOrBuilder() {
+ @java.lang.Deprecated public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdOrBuilder() {
return clusterId_;
}
@@ -905,33 +1001,81 @@ public final class WALProtos {
private int followingKvCount_;
/**
* <code>optional uint32 following_kv_count = 7;</code>
+ */
+ public boolean hasFollowingKvCount() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ /**
+ * <code>optional uint32 following_kv_count = 7;</code>
+ */
+ public int getFollowingKvCount() {
+ return followingKvCount_;
+ }
+
+ // repeated .UUID cluster_ids = 8;
+ public static final int CLUSTER_IDS_FIELD_NUMBER = 8;
+ private java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID> clusterIds_;
+ /**
+ * <code>repeated .UUID cluster_ids = 8;</code>
*
* <pre>
*
- *optional CustomEntryType custom_entry_type = 8;
+ *cluster_ids field contains the list of clusters that have
+ *consumed the change
+ * </pre>
+ */
+ public java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID> getClusterIdsList() {
+ return clusterIds_;
+ }
+ /**
+ * <code>repeated .UUID cluster_ids = 8;</code>
*
- *enum CustomEntryType {
- *COMPACTION = 0;
- *}
+ * <pre>
+ *
+ *cluster_ids field contains the list of clusters that have
+ *consumed the change
* </pre>
*/
- public boolean hasFollowingKvCount() {
- return ((bitField0_ & 0x00000020) == 0x00000020);
+ public java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder>
+ getClusterIdsOrBuilderList() {
+ return clusterIds_;
}
/**
- * <code>optional uint32 following_kv_count = 7;</code>
+ * <code>repeated .UUID cluster_ids = 8;</code>
*
* <pre>
*
- *optional CustomEntryType custom_entry_type = 8;
+ *cluster_ids field contains the list of clusters that have
+ *consumed the change
+ * </pre>
+ */
+ public int getClusterIdsCount() {
+ return clusterIds_.size();
+ }
+ /**
+ * <code>repeated .UUID cluster_ids = 8;</code>
+ *
+ * <pre>
*
- *enum CustomEntryType {
- *COMPACTION = 0;
- *}
+ *cluster_ids field contains the list of clusters that have
+ *consumed the change
* </pre>
*/
- public int getFollowingKvCount() {
- return followingKvCount_;
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterIds(int index) {
+ return clusterIds_.get(index);
+ }
+ /**
+ * <code>repeated .UUID cluster_ids = 8;</code>
+ *
+ * <pre>
+ *
+ *cluster_ids field contains the list of clusters that have
+ *consumed the change
+ * </pre>
+ */
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdsOrBuilder(
+ int index) {
+ return clusterIds_.get(index);
}
private void initFields() {
@@ -942,6 +1086,7 @@ public final class WALProtos {
clusterId_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.getDefaultInstance();
scopes_ = java.util.Collections.emptyList();
followingKvCount_ = 0;
+ clusterIds_ = java.util.Collections.emptyList();
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -976,6 +1121,12 @@ public final class WALProtos {
return false;
}
}
+ for (int i = 0; i < getClusterIdsCount(); i++) {
+ if (!getClusterIds(i).isInitialized()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ }
memoizedIsInitialized = 1;
return true;
}
@@ -1004,6 +1155,9 @@ public final class WALProtos {
if (((bitField0_ & 0x00000020) == 0x00000020)) {
output.writeUInt32(7, followingKvCount_);
}
+ for (int i = 0; i < clusterIds_.size(); i++) {
+ output.writeMessage(8, clusterIds_.get(i));
+ }
getUnknownFields().writeTo(output);
}
@@ -1041,6 +1195,10 @@ public final class WALProtos {
size += com.google.protobuf.CodedOutputStream
.computeUInt32Size(7, followingKvCount_);
}
+ for (int i = 0; i < clusterIds_.size(); i++) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeMessageSize(8, clusterIds_.get(i));
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -1096,6 +1254,8 @@ public final class WALProtos {
result = result && (getFollowingKvCount()
== other.getFollowingKvCount());
}
+ result = result && getClusterIdsList()
+ .equals(other.getClusterIdsList());
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -1137,6 +1297,10 @@ public final class WALProtos {
hash = (37 * hash) + FOLLOWING_KV_COUNT_FIELD_NUMBER;
hash = (53 * hash) + getFollowingKvCount();
}
+ if (getClusterIdsCount() > 0) {
+ hash = (37 * hash) + CLUSTER_IDS_FIELD_NUMBER;
+ hash = (53 * hash) + getClusterIdsList().hashCode();
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -1244,6 +1408,7 @@ public final class WALProtos {
if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
getClusterIdFieldBuilder();
getScopesFieldBuilder();
+ getClusterIdsFieldBuilder();
}
}
private static Builder create() {
@@ -1274,6 +1439,12 @@ public final class WALProtos {
}
followingKvCount_ = 0;
bitField0_ = (bitField0_ & ~0x00000040);
+ if (clusterIdsBuilder_ == null) {
+ clusterIds_ = java.util.Collections.emptyList();
+ bitField0_ = (bitField0_ & ~0x00000080);
+ } else {
+ clusterIdsBuilder_.clear();
+ }
return this;
}
@@ -1339,6 +1510,15 @@ public final class WALProtos {
to_bitField0_ |= 0x00000020;
}
result.followingKvCount_ = followingKvCount_;
+ if (clusterIdsBuilder_ == null) {
+ if (((bitField0_ & 0x00000080) == 0x00000080)) {
+ clusterIds_ = java.util.Collections.unmodifiableList(clusterIds_);
+ bitField0_ = (bitField0_ & ~0x00000080);
+ }
+ result.clusterIds_ = clusterIds_;
+ } else {
+ result.clusterIds_ = clusterIdsBuilder_.build();
+ }
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -1399,6 +1579,32 @@ public final class WALProtos {
if (other.hasFollowingKvCount()) {
setFollowingKvCount(other.getFollowingKvCount());
}
+ if (clusterIdsBuilder_ == null) {
+ if (!other.clusterIds_.isEmpty()) {
+ if (clusterIds_.isEmpty()) {
+ clusterIds_ = other.clusterIds_;
+ bitField0_ = (bitField0_ & ~0x00000080);
+ } else {
+ ensureClusterIdsIsMutable();
+ clusterIds_.addAll(other.clusterIds_);
+ }
+ onChanged();
+ }
+ } else {
+ if (!other.clusterIds_.isEmpty()) {
+ if (clusterIdsBuilder_.isEmpty()) {
+ clusterIdsBuilder_.dispose();
+ clusterIdsBuilder_ = null;
+ clusterIds_ = other.clusterIds_;
+ bitField0_ = (bitField0_ & ~0x00000080);
+ clusterIdsBuilder_ =
+ com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders ?
+ getClusterIdsFieldBuilder() : null;
+ } else {
+ clusterIdsBuilder_.addAllMessages(other.clusterIds_);
+ }
+ }
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -1432,6 +1638,12 @@ public final class WALProtos {
return false;
}
}
+ for (int i = 0; i < getClusterIdsCount(); i++) {
+ if (!getClusterIds(i).isInitialized()) {
+
+ return false;
+ }
+ }
return true;
}
@@ -1592,20 +1804,36 @@ public final class WALProtos {
return this;
}
- // optional .UUID cluster_id = 5;
+ // optional .UUID cluster_id = 5 [deprecated = true];
private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID clusterId_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.getDefaultInstance();
private com.google.protobuf.SingleFieldBuilder<
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder> clusterIdBuilder_;
/**
- * <code>optional .UUID cluster_id = 5;</code>
+ * <code>optional .UUID cluster_id = 5 [deprecated = true];</code>
+ *
+ * <pre>
+ *
+ *This parameter is deprecated in favor of clusters which
+ *contains the list of clusters that have consumed the change.
+ *It is retained so that the log created by earlier releases (0.94)
+ *can be read by the newer releases.
+ * </pre>
*/
- public boolean hasClusterId() {
+ @java.lang.Deprecated public boolean hasClusterId() {
return ((bitField0_ & 0x00000010) == 0x00000010);
}
/**
- * <code>optional .UUID cluster_id = 5;</code>
+ * <code>optional .UUID cluster_id = 5 [deprecated = true];</code>
+ *
+ * <pre>
+ *
+ *This parameter is deprecated in favor of clusters which
+ *contains the list of clusters that have consumed the change.
+ *It is retained so that the log created by earlier releases (0.94)
+ *can be read by the newer releases.
+ * </pre>
*/
- public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterId() {
+ @java.lang.Deprecated public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterId() {
if (clusterIdBuilder_ == null) {
return clusterId_;
} else {
@@ -1613,9 +1841,17 @@ public final class WALProtos {
}
}
/**
- * <code>optional .UUID cluster_id = 5;</code>
+ * <code>optional .UUID cluster_id = 5 [deprecated = true];</code>
+ *
+ * <pre>
+ *
+ *This parameter is deprecated in favor of clusters which
+ *contains the list of clusters that have consumed the change.
+ *It is retained so that the log created by earlier releases (0.94)
+ *can be read by the newer releases.
+ * </pre>
*/
- public Builder setClusterId(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID value) {
+ @java.lang.Deprecated public Builder setClusterId(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID value) {
if (clusterIdBuilder_ == null) {
if (value == null) {
throw new NullPointerException();
@@ -1629,9 +1865,17 @@ public final class WALProtos {
return this;
}
/**
- * <code>optional .UUID cluster_id = 5;</code>
+ * <code>optional .UUID cluster_id = 5 [deprecated = true];</code>
+ *
+ * <pre>
+ *
+ *This parameter is deprecated in favor of clusters which
+ *contains the list of clusters that have consumed the change.
+ *It is retained so that the log created by earlier releases (0.94)
+ *can be read by the newer releases.
+ * </pre>
*/
- public Builder setClusterId(
+ @java.lang.Deprecated public Builder setClusterId(
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder builderForValue) {
if (clusterIdBuilder_ == null) {
clusterId_ = builderForValue.build();
@@ -1643,9 +1887,17 @@ public final class WALProtos {
return this;
}
/**
- * <code>optional .UUID cluster_id = 5;</code>
+ * <code>optional .UUID cluster_id = 5 [deprecated = true];</code>
+ *
+ * <pre>
+ *
+ *This parameter is deprecated in favor of clusters which
+ *contains the list of clusters that have consumed the change.
+ *It is retained so that the log created by earlier releases (0.94)
+ *can be read by the newer releases.
+ * </pre>
*/
- public Builder mergeClusterId(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID value) {
+ @java.lang.Deprecated public Builder mergeClusterId(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID value) {
if (clusterIdBuilder_ == null) {
if (((bitField0_ & 0x00000010) == 0x00000010) &&
clusterId_ != org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.getDefaultInstance()) {
@@ -1662,9 +1914,17 @@ public final class WALProtos {
return this;
}
/**
- * <code>optional .UUID cluster_id = 5;</code>
+ * <code>optional .UUID cluster_id = 5 [deprecated = true];</code>
+ *
+ * <pre>
+ *
+ *This parameter is deprecated in favor of clusters which
+ *contains the list of clusters that have consumed the change.
+ *It is retained so that the log created by earlier releases (0.94)
+ *can be read by the newer releases.
+ * </pre>
*/
- public Builder clearClusterId() {
+ @java.lang.Deprecated public Builder clearClusterId() {
if (clusterIdBuilder_ == null) {
clusterId_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.getDefaultInstance();
onChanged();
@@ -1675,17 +1935,33 @@ public final class WALProtos {
return this;
}
/**
- * <code>optional .UUID cluster_id = 5;</code>
+ * <code>optional .UUID cluster_id = 5 [deprecated = true];</code>
+ *
+ * <pre>
+ *
+ *This parameter is deprecated in favor of clusters which
+ *contains the list of clusters that have consumed the change.
+ *It is retained so that the log created by earlier releases (0.94)
+ *can be read by the newer releases.
+ * </pre>
*/
- public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder getClusterIdBuilder() {
+ @java.lang.Deprecated public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder getClusterIdBuilder() {
bitField0_ |= 0x00000010;
onChanged();
return getClusterIdFieldBuilder().getBuilder();
}
/**
- * <code>optional .UUID cluster_id = 5;</code>
+ * <code>optional .UUID cluster_id = 5 [deprecated = true];</code>
+ *
+ * <pre>
+ *
+ *This parameter is deprecated in favor of clusters which
+ *contains the list of clusters that have consumed the change.
+ *It is retained so that the log created by earlier releases (0.94)
+ *can be read by the newer releases.
+ * </pre>
*/
- public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdOrBuilder() {
+ @java.lang.Deprecated public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdOrBuilder() {
if (clusterIdBuilder_ != null) {
return clusterIdBuilder_.getMessageOrBuilder();
} else {
@@ -1693,7 +1969,15 @@ public final class WALProtos {
}
}
/**
- * <code>optional .UUID cluster_id = 5;</code>
+ * <code>optional .UUID cluster_id = 5 [deprecated = true];</code>
+ *
+ * <pre>
+ *
+ *This parameter is deprecated in favor of clusters which
+ *contains the list of clusters that have consumed the change.
+ *It is retained so that the log created by earlier releases (0.94)
+ *can be read by the newer releases.
+ * </pre>
*/
private com.google.protobuf.SingleFieldBuilder<
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder>
@@ -1953,70 +2237,382 @@ public final class WALProtos {
private int followingKvCount_ ;
/**
* <code>optional uint32 following_kv_count = 7;</code>
+ */
+ public boolean hasFollowingKvCount() {
+ return ((bitField0_ & 0x00000040) == 0x00000040);
+ }
+ /**
+ * <code>optional uint32 following_kv_count = 7;</code>
+ */
+ public int getFollowingKvCount() {
+ return followingKvCount_;
+ }
+ /**
+ * <code>optional uint32 following_kv_count = 7;</code>
+ */
+ public Builder setFollowingKvCount(int value) {
+ bitField0_ |= 0x00000040;
+ followingKvCount_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional uint32 following_kv_count = 7;</code>
+ */
+ public Builder clearFollowingKvCount() {
+ bitField0_ = (bitField0_ & ~0x00000040);
+ followingKvCount_ = 0;
+ onChanged();
+ return this;
+ }
+
+ // repeated .UUID cluster_ids = 8;
+ private java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID> clusterIds_ =
+ java.util.Collections.emptyList();
+ private void ensureClusterIdsIsMutable() {
+ if (!((bitField0_ & 0x00000080) == 0x00000080)) {
+ clusterIds_ = new java.util.ArrayList<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID>(clusterIds_);
+ bitField0_ |= 0x00000080;
+ }
+ }
+
+ private com.google.protobuf.RepeatedFieldBuilder<
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder> clusterIdsBuilder_;
+
+ /**
+ * <code>repeated .UUID cluster_ids = 8;</code>
*
* <pre>
*
- *optional CustomEntryType custom_entry_type = 8;
+ *cluster_ids field contains the list of clusters that have
+ *consumed the change
+ * </pre>
+ */
+ public java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID> getClusterIdsList() {
+ if (clusterIdsBuilder_ == null) {
+ return java.util.Collections.unmodifiableList(clusterIds_);
+ } else {
+ return clusterIdsBuilder_.getMessageList();
+ }
+ }
+ /**
+ * <code>repeated .UUID cluster_ids = 8;</code>
+ *
+ * <pre>
*
- *enum CustomEntryType {
- *COMPACTION = 0;
- *}
+ *cluster_ids field contains the list of clusters that have
+ *consumed the change
* </pre>
*/
- public boolean hasFollowingKvCount() {
- return ((bitField0_ & 0x00000040) == 0x00000040);
+ public int getClusterIdsCount() {
+ if (clusterIdsBuilder_ == null) {
+ return clusterIds_.size();
+ } else {
+ return clusterIdsBuilder_.getCount();
+ }
}
/**
- * <code>optional uint32 following_kv_count = 7;</code>
+ * <code>repeated .UUID cluster_ids = 8;</code>
*
* <pre>
*
- *optional CustomEntryType custom_entry_type = 8;
+ *cluster_ids field contains the list of clusters that have
+ *consumed the change
+ * </pre>
+ */
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterIds(int index) {
+ if (clusterIdsBuilder_ == null) {
+ return clusterIds_.get(index);
+ } else {
+ return clusterIdsBuilder_.getMessage(index);
+ }
+ }
+ /**
+ * <code>repeated .UUID cluster_ids = 8;</code>
*
- *enum CustomEntryType {
- *COMPACTION = 0;
- *}
+ * <pre>
+ *
+ *cluster_ids field contains the list of clusters that have
+ *consumed the change
* </pre>
*/
- public int getFollowingKvCount() {
- return followingKvCount_;
+ public Builder setClusterIds(
+ int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID value) {
+ if (clusterIdsBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureClusterIdsIsMutable();
+ clusterIds_.set(index, value);
+ onChanged();
+ } else {
+ clusterIdsBuilder_.setMessage(index, value);
+ }
+ return this;
}
/**
- * <code>optional uint32 following_kv_count = 7;</code>
+ * <code>repeated .UUID cluster_ids = 8;</code>
*
* <pre>
*
- *optional CustomEntryType custom_entry_type = 8;
+ *cluster_ids field contains the list of clusters that have
+ *consumed the change
+ * </pre>
+ */
+ public Builder setClusterIds(
+ int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder builderForValue) {
+ if (clusterIdsBuilder_ == null) {
+ ensureClusterIdsIsMutable();
+ clusterIds_.set(index, builderForValue.build());
+ onChanged();
+ } else {
+ clusterIdsBuilder_.setMessage(index, builderForValue.build());
+ }
+ return this;
+ }
+ /**
+ * <code>repeated .UUID cluster_ids = 8;</code>
*
- *enum CustomEntryType {
- *COMPACTION = 0;
- *}
+ * <pre>
+ *
+ *cluster_ids field contains the list of clusters that have
+ *consumed the change
* </pre>
*/
- public Builder setFollowingKvCount(int value) {
- bitField0_ |= 0x00000040;
- followingKvCount_ = value;
- onChanged();
+ public Builder addClusterIds(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID value) {
+ if (clusterIdsBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureClusterIdsIsMutable();
+ clusterIds_.add(value);
+ onChanged();
+ } else {
+ clusterIdsBuilder_.addMessage(value);
+ }
return this;
}
/**
- * <code>optional uint32 following_kv_count = 7;</code>
+ * <code>repeated .UUID cluster_ids = 8;</code>
*
* <pre>
*
- *optional CustomEntryType custom_entry_type = 8;
+ *cluster_ids field contains the list of clusters that have
+ *consumed the change
+ * </pre>
+ */
+ public Builder addClusterIds(
+ int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID value) {
+ if (clusterIdsBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureClusterIdsIsMutable();
+ clusterIds_.add(index, value);
+ onChanged();
+ } else {
+ clusterIdsBuilder_.addMessage(index, value);
+ }
+ return this;
+ }
+ /**
+ * <code>repeated .UUID cluster_ids = 8;</code>
+ *
+ * <pre>
*
- *enum CustomEntryType {
- *COMPACTION = 0;
- *}
+ *cluster_ids field contains the list of clusters that have
+ *consumed the change
* </pre>
*/
- public Builder clearFollowingKvCount() {
- bitField0_ = (bitField0_ & ~0x00000040);
- followingKvCount_ = 0;
- onChanged();
+ public Builder addClusterIds(
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder builderForValue) {
+ if (clusterIdsBuilder_ == null) {
+ ensureClusterIdsIsMutable();
+ clusterIds_.add(builderForValue.build());
+ onChanged();
+ } else {
+ clusterIdsBuilder_.addMessage(builderForValue.build());
+ }
+ return this;
+ }
+ /**
+ * <code>repeated .UUID cluster_ids = 8;</code>
+ *
+ * <pre>
+ *
+ *cluster_ids field contains the list of clusters that have
+ *consumed the change
+ * </pre>
+ */
+ public Builder addClusterIds(
+ int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder builderForValue) {
+ if (clusterIdsBuilder_ == null) {
+ ensureClusterIdsIsMutable();
+ clusterIds_.add(index, builderForValue.build());
+ onChanged();
+ } else {
+ clusterIdsBuilder_.addMessage(index, builderForValue.build());
+ }
+ return this;
+ }
+ /**
+ * <code>repeated .UUID cluster_ids = 8;</code>
+ *
+ * <pre>
+ *
+ *cluster_ids field contains the list of clusters that have
+ *consumed the change
+ * </pre>
+ */
+ public Builder addAllClusterIds(
+ java.lang.Iterable<? extends org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID> values) {
+ if (clusterIdsBuilder_ == null) {
+ ensureClusterIdsIsMutable();
+ super.addAll(values, clusterIds_);
+ onChanged();
+ } else {
+ clusterIdsBuilder_.addAllMessages(values);
+ }
+ return this;
+ }
+ /**
+ * <code>repeated .UUID cluster_ids = 8;</code>
+ *
+ * <pre>
+ *
+ *cluster_ids field contains the list of clusters that have
+ *consumed the change
+ * </pre>
+ */
+ public Builder clearClusterIds() {
+ if (clusterIdsBuilder_ == null) {
+ clusterIds_ = java.util.Collections.emptyList();
+ bitField0_ = (bitField0_ & ~0x00000080);
+ onChanged();
+ } else {
+ clusterIdsBuilder_.clear();
+ }
return this;
}
+ /**
+ * <code>repeated .UUID cluster_ids = 8;</code>
+ *
+ * <pre>
+ *
+ *cluster_ids field contains the list of clusters that have
+ *consumed the change
+ * </pre>
+ */
+ public Builder removeClusterIds(int index) {
+ if (clusterIdsBuilder_ == null) {
+ ensureClusterIdsIsMutable();
+ clusterIds_.remove(index);
+ onChanged();
+ } else {
+ clusterIdsBuilder_.remove(index);
+ }
+ return this;
+ }
+ /**
+ * <code>repeated .UUID cluster_ids = 8;</code>
+ *
+ * <pre>
+ *
+ *cluster_ids field contains the list of clusters that have
+ *consumed the change
+ * </pre>
+ */
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder getClusterIdsBuilder(
+ int index) {
+ return getClusterIdsFieldBuilder().getBuilder(index);
+ }
+ /**
+ * <code>repeated .UUID cluster_ids = 8;</code>
+ *
+ * <pre>
+ *
+ *cluster_ids field contains the list of clusters that have
+ *consumed the change
+ * </pre>
+ */
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdsOrBuilder(
+ int index) {
+ if (clusterIdsBuilder_ == null) {
+ return clusterIds_.get(index); } else {
+ return clusterIdsBuilder_.getMessageOrBuilder(index);
+ }
+ }
+ /**
+ * <code>repeated .UUID cluster_ids = 8;</code>
+ *
+ * <pre>
+ *
+ *cluster_ids field contains the list of clusters that have
+ *consumed the change
+ * </pre>
+ */
+ public java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder>
+ getClusterIdsOrBuilderList() {
+ if (clusterIdsBuilder_ != null) {
+ return clusterIdsBuilder_.getMessageOrBuilderList();
+ } else {
+ return java.util.Collections.unmodifiableList(clusterIds_);
+ }
+ }
+ /**
+ * <code>repeated .UUID cluster_ids = 8;</code>
+ *
+ * <pre>
+ *
+ *cluster_ids field contains the list of clusters that have
+ *consumed the change
+ * </pre>
+ */
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder addClusterIdsBuilder() {
+ return getClusterIdsFieldBuilder().addBuilder(
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.getDefaultInstance());
+ }
+ /**
+ * <code>repeated .UUID cluster_ids = 8;</code>
+ *
+ * <pre>
+ *
+ *cluster_ids field contains the list of clusters that have
+ *consumed the change
+ * </pre>
+ */
+ public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder addClusterIdsBuilder(
+ int index) {
+ return getClusterIdsFieldBuilder().addBuilder(
+ index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.getDefaultInstance());
+ }
+ /**
+ * <code>repeated .UUID cluster_ids = 8;</code>
+ *
+ * <pre>
+ *
+ *cluster_ids field contains the list of clusters that have
+ *consumed the change
+ * </pre>
+ */
+ public java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder>
+ getClusterIdsBuilderList() {
+ return getClusterIdsFieldBuilder().getBuilderList();
+ }
+ private com.google.protobuf.RepeatedFieldBuilder<
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder>
+ getClusterIdsFieldBuilder() {
+ if (clusterIdsBuilder_ == null) {
+ clusterIdsBuilder_ = new com.google.protobuf.RepeatedFieldBuilder<
+ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder>(
+ clusterIds_,
+ ((bitField0_ & 0x00000080) == 0x00000080),
+ getParentForChildren(),
+ isClean());
+ clusterIds_ = null;
+ }
+ return clusterIdsBuilder_;
+ }
// @@protoc_insertion_point(builder_scope:WALKey)
}
@@ -4216,22 +4812,22 @@ public final class WALProtos {
static {
java.lang.String[] descriptorData = {
"\n\tWAL.proto\032\013hbase.proto\"$\n\tWALHeader\022\027\n" +
- "\017has_compression\030\001 \001(\010\"\277\001\n\006WALKey\022\033\n\023enc" +
+ "\017has_compression\030\001 \001(\010\"\337\001\n\006WALKey\022\033\n\023enc" +
"oded_region_name\030\001 \002(\014\022\022\n\ntable_name\030\002 \002" +
"(\014\022\033\n\023log_sequence_number\030\003 \002(\004\022\022\n\nwrite" +
- "_time\030\004 \002(\004\022\031\n\ncluster_id\030\005 \001(\0132\005.UUID\022\034" +
- "\n\006scopes\030\006 \003(\0132\014.FamilyScope\022\032\n\022followin" +
- "g_kv_count\030\007 \001(\r\"=\n\013FamilyScope\022\016\n\006famil" +
- "y\030\001 \002(\014\022\036\n\nscope_type\030\002 \002(\0162\n.ScopeType\"" +
- "\251\001\n\024CompactionDescriptor\022\022\n\ntable_name\030\001" +
- " \002(\014\022\033\n\023encoded_region_name\030\002 \002(\014\022\023\n\013fam",
- "ily_name\030\003 \002(\014\022\030\n\020compaction_input\030\004 \003(\t" +
- "\022\031\n\021compaction_output\030\005 \003(\t\022\026\n\016store_hom" +
- "e_dir\030\006 \002(\t\"\014\n\nWALTrailer*F\n\tScopeType\022\033" +
- "\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATI" +
- "ON_SCOPE_GLOBAL\020\001B?\n*org.apache.hadoop.h" +
- "base.protobuf.generatedB\tWALProtosH\001\210\001\000\240" +
- "\001\001"
+ "_time\030\004 \002(\004\022\035\n\ncluster_id\030\005 \001(\0132\005.UUIDB\002" +
+ "\030\001\022\034\n\006scopes\030\006 \003(\0132\014.FamilyScope\022\032\n\022foll" +
+ "owing_kv_count\030\007 \001(\r\022\032\n\013cluster_ids\030\010 \003(" +
+ "\0132\005.UUID\"=\n\013FamilyScope\022\016\n\006family\030\001 \002(\014\022" +
+ "\036\n\nscope_type\030\002 \002(\0162\n.ScopeType\"\251\001\n\024Comp" +
+ "actionDescriptor\022\022\n\ntable_name\030\001 \002(\014\022\033\n\023",
+ "encoded_region_name\030\002 \002(\014\022\023\n\013family_name" +
+ "\030\003 \002(\014\022\030\n\020compaction_input\030\004 \003(\t\022\031\n\021comp" +
+ "action_output\030\005 \003(\t\022\026\n\016store_home_dir\030\006 " +
+ "\002(\t\"\014\n\nWALTrailer*F\n\tScopeType\022\033\n\027REPLIC" +
+ "ATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE" +
+ "_GLOBAL\020\001B?\n*org.apache.hadoop.hbase.pro" +
+ "tobuf.generatedB\tWALProtosH\001\210\001\000\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -4249,7 +4845,7 @@ public final class WALProtos {
internal_static_WALKey_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_WALKey_descriptor,
- new java.lang.String[] { "EncodedRegionName", "TableName", "LogSequenceNumber", "WriteTime", "ClusterId", "Scopes", "FollowingKvCount", });
+ new java.lang.String[] { "EncodedRegionName", "TableName", "LogSequenceNumber", "WriteTime", "ClusterId", "Scopes", "FollowingKvCount", "ClusterIds", });
internal_static_FamilyScope_descriptor =
getDescriptor().getMessageTypes().get(2);
internal_static_FamilyScope_fieldAccessorTable = new
Modified: hbase/trunk/hbase-protocol/src/main/protobuf/WAL.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/protobuf/WAL.proto?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/protobuf/WAL.proto (original)
+++ hbase/trunk/hbase-protocol/src/main/protobuf/WAL.proto Wed Aug 28 19:32:00 2013
@@ -33,13 +33,24 @@ message WALKey {
required bytes table_name = 2;
required uint64 log_sequence_number = 3;
required uint64 write_time = 4;
- optional UUID cluster_id = 5;
+ /*
+ This parameter is deprecated in favor of clusters which
+ contains the list of clusters that have consumed the change.
+ It is retained so that the log created by earlier releases (0.94)
+ can be read by the newer releases.
+ */
+ optional UUID cluster_id = 5 [deprecated=true];
repeated FamilyScope scopes = 6;
optional uint32 following_kv_count = 7;
+ /*
+ This field contains the list of clusters that have
+ consumed the change
+ */
+ repeated UUID cluster_ids = 8;
/*
- optional CustomEntryType custom_entry_type = 8;
-
+ optional CustomEntryType custom_entry_type = 9;
+
enum CustomEntryType {
COMPACTION = 0;
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java Wed Aug 28 19:32:00 2013
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -114,7 +115,7 @@ public class Import {
static class Importer
extends TableMapper<ImmutableBytesWritable, Mutation> {
private Map<byte[], byte[]> cfRenameMap;
- private UUID clusterId;
+ private List<UUID> clusterIds;
/**
* @param row The current table row key.
@@ -159,11 +160,11 @@ public class Import {
}
}
if (put != null) {
- put.setClusterId(clusterId);
+ put.setClusterIds(clusterIds);
context.write(key, put);
}
if (delete != null) {
- delete.setClusterId(clusterId);
+ delete.setClusterIds(clusterIds);
context.write(key, delete);
}
}
@@ -177,7 +178,7 @@ public class Import {
ZooKeeperWatcher zkw = null;
try {
zkw = new ZooKeeperWatcher(conf, context.getTaskAttemptID().toString(), null);
- clusterId = ZKClusterId.getUUIDForCluster(zkw);
+ clusterIds = Collections.singletonList(ZKClusterId.getUUIDForCluster(zkw));
} catch (ZooKeeperConnectionException e) {
LOG.error("Problem connecting to ZooKeper during task setup", e);
} catch (KeeperException e) {
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java Wed Aug 28 19:32:00 2013
@@ -115,6 +115,7 @@ public class ReplicationProtbufUtil {
AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder();
AdminProtos.ReplicateWALEntryRequest.Builder builder =
AdminProtos.ReplicateWALEntryRequest.newBuilder();
+ HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
for (HLog.Entry entry: entries) {
entryBuilder.clear();
WALProtos.WALKey.Builder keyBuilder = entryBuilder.getKeyBuilder();
@@ -124,11 +125,10 @@ public class ReplicationProtbufUtil {
keyBuilder.setTableName(ByteString.copyFrom(key.getTablename().getName()));
keyBuilder.setLogSequenceNumber(key.getLogSeqNum());
keyBuilder.setWriteTime(key.getWriteTime());
- UUID clusterId = key.getClusterId();
- if (clusterId != null) {
- HBaseProtos.UUID.Builder uuidBuilder = keyBuilder.getClusterIdBuilder();
+ for(UUID clusterId : key.getClusterIds()) {
uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits());
uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
+ keyBuilder.addClusterIds(uuidBuilder.build());
}
WALEdit edit = entry.getEdit();
NavigableMap<byte[], Integer> scopes = key.getScopes();
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java Wed Aug 28 19:32:00 2013
@@ -18,9 +18,10 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.UUID;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -41,8 +42,8 @@ implements RowProcessor<S,T> {
}
@Override
- public UUID getClusterId() {
- return HConstants.DEFAULT_CLUSTER_ID;
+ public List<UUID> getClusterIds() {
+ return new ArrayList<UUID>();
}
@Override
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Aug 28 19:32:00 2013
@@ -1779,15 +1779,13 @@ public class HRegion implements HeapSize
/**
* This is used only by unit tests. Not required to be a public API.
* @param familyMap map of family to edits for the given family.
- * @param clusterId
* @param durability
* @throws IOException
*/
- void delete(NavigableMap<byte[], List<Cell>> familyMap, UUID clusterId,
+ void delete(NavigableMap<byte[], List<Cell>> familyMap,
Durability durability) throws IOException {
Delete delete = new Delete(FOR_UNIT_TESTS_ONLY);
delete.setFamilyMap(familyMap);
- delete.setClusterId(clusterId);
delete.setDurability(durability);
doBatchMutate(delete);
}
@@ -2206,7 +2204,7 @@ public class HRegion implements HeapSize
Mutation mutation = batchOp.operations[firstIndex];
if (walEdit.size() > 0) {
txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
- walEdit, mutation.getClusterId(), now, this.htableDescriptor);
+ walEdit, mutation.getClusterIds(), now, this.htableDescriptor);
}
// -------------------------------
@@ -2598,7 +2596,6 @@ public class HRegion implements HeapSize
familyMap.put(family, edits);
Put p = new Put(row);
p.setFamilyMap(familyMap);
- p.setClusterId(HConstants.DEFAULT_CLUSTER_ID);
doBatchMutate(p);
}
@@ -4534,7 +4531,7 @@ public class HRegion implements HeapSize
if (!walEdit.isEmpty()) {
txid = this.log.appendNoSync(this.getRegionInfo(),
this.htableDescriptor.getTableName(), walEdit,
- processor.getClusterId(), now, this.htableDescriptor);
+ processor.getClusterIds(), now, this.htableDescriptor);
}
// 8. Release region lock
if (locked) {
@@ -4761,7 +4758,7 @@ public class HRegion implements HeapSize
// cluster. A slave cluster receives the final value (not the delta)
// as a Put.
txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
- walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(),
+ walEdits, new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(),
this.htableDescriptor);
} else {
recordMutationWithoutWal(append.getFamilyCellMap());
@@ -4911,7 +4908,7 @@ public class HRegion implements HeapSize
// cluster. A slave cluster receives the final value (not the delta)
// as a Put.
txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
- walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(),
+ walEdits, new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(),
this.htableDescriptor);
} else {
recordMutationWithoutWal(increment.getFamilyCellMap());
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java Wed Aug 28 19:32:00 2013
@@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
@InterfaceAudience.Public
@@ -107,9 +106,9 @@ public interface RowProcessor<S extends
/**
- * @return The replication cluster id.
+ * @return The cluster ids that have the change.
*/
- UUID getClusterId();
+ List<UUID> getClusterIds();
/**
* Human readable name of the processor
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java Wed Aug 28 19:32:00 2013
@@ -27,6 +27,7 @@ import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -821,12 +822,12 @@ class FSHLog implements HLog, Syncable {
* @param encodedRegionName Encoded name of the region as returned by
* <code>HRegionInfo#getEncodedNameAsBytes()</code>.
* @param tableName
- * @param clusterId
+ * @param clusterIds that have consumed the change
* @return New log key.
*/
protected HLogKey makeKey(byte[] encodedRegionName, TableName tableName, long seqnum,
- long now, UUID clusterId) {
- return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterId);
+ long now, List<UUID> clusterIds) {
+ return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterIds);
}
@Override
@@ -839,7 +840,7 @@ class FSHLog implements HLog, Syncable {
@Override
public void append(HRegionInfo info, TableName tableName, WALEdit edits,
final long now, HTableDescriptor htd, boolean isInMemstore) throws IOException {
- append(info, tableName, edits, HConstants.DEFAULT_CLUSTER_ID, now, htd, true, isInMemstore);
+ append(info, tableName, edits, new ArrayList<UUID>(), now, htd, true, isInMemstore);
}
/**
@@ -862,15 +863,16 @@ class FSHLog implements HLog, Syncable {
* @param info
* @param tableName
* @param edits
- * @param clusterId The originating clusterId for this edit (for replication)
+ * @param clusterIds that have consumed the change (for replication)
* @param now
* @param doSync shall we sync?
* @return txid of this transaction
* @throws IOException
*/
@SuppressWarnings("deprecation")
- private long append(HRegionInfo info, TableName tableName, WALEdit edits, UUID clusterId,
- final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore)
+ private long append(HRegionInfo info, TableName tableName, WALEdit edits,
+ List<UUID> clusterIds, final long now, HTableDescriptor htd, boolean doSync,
+ boolean isInMemstore)
throws IOException {
if (edits.isEmpty()) return this.unflushedEntries.get();
if (this.closed) {
@@ -890,7 +892,7 @@ class FSHLog implements HLog, Syncable {
// actual name.
byte [] encodedRegionName = info.getEncodedNameAsBytes();
if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum);
- HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId);
+ HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterIds);
doWrite(info, logKey, edits, htd);
this.numEntries.incrementAndGet();
txid = this.unflushedEntries.incrementAndGet();
@@ -914,9 +916,9 @@ class FSHLog implements HLog, Syncable {
@Override
public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits,
- UUID clusterId, final long now, HTableDescriptor htd)
+ List<UUID> clusterIds, final long now, HTableDescriptor htd)
throws IOException {
- return append(info, tableName, edits, clusterId, now, htd, false, true);
+ return append(info, tableName, edits, clusterIds, now, htd, false, true);
}
/**
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Wed Aug 28 19:32:00 2013
@@ -22,6 +22,8 @@ package org.apache.hadoop.hbase.regionse
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.List;
+import java.util.Set;
import java.util.UUID;
import java.util.regex.Pattern;
@@ -264,7 +266,7 @@ public interface HLog {
void closeAndDelete() throws IOException;
/**
- * Same as {@link #appendNoSync(HRegionInfo, TableName, WALEdit, UUID, long, HTableDescriptor)},
+ * Same as {@link #appendNoSync(HRegionInfo, TableName, WALEdit, List, long, HTableDescriptor)},
* except it causes a sync on the log
*/
public void append(HRegionInfo info, TableName tableName, WALEdit edits,
@@ -285,23 +287,20 @@ public interface HLog {
final long now, HTableDescriptor htd, boolean isInMemstore) throws IOException;
/**
- * Append a set of edits to the log. Log edits are keyed by (encoded)
- * regionName, rowname, and log-sequence-id. The HLog is not flushed after
- * this transaction is written to the log.
- *
+ * Append a set of edits to the log. Log edits are keyed by (encoded) regionName, rowname, and
+ * log-sequence-id. The HLog is not flushed after this transaction is written to the log.
* @param info
* @param tableName
* @param edits
- * @param clusterId
- * The originating clusterId for this edit (for replication)
+ * @param clusterIds The clusters that have consumed the change (for replication)
* @param now
* @param htd
* @return txid of this transaction
* @throws IOException
*/
- public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits,
- UUID clusterId, final long now, HTableDescriptor htd) throws IOException;
-
+ public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits,
+ List<UUID> clusterIds, final long now, HTableDescriptor htd) throws IOException;
+
void hsync() throws IOException;
void hflush() throws IOException;
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java Wed Aug 28 19:32:00 2013
@@ -22,7 +22,11 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.EOFException;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
@@ -93,6 +97,13 @@ public class HLogKey implements Writable
}
}
+ /*
+ * This is used for reading the log entries created by the previous releases
+ * (0.94.11) which write the clusters information to the scopes of WALEdit.
+ */
+ private static final String PREFIX_CLUSTER_KEY = ".";
+
+
private static final Version VERSION = Version.COMPRESSED;
// The encoded region name.
@@ -102,15 +113,23 @@ public class HLogKey implements Writable
// Time at which this edit was written.
private long writeTime;
- private UUID clusterId;
-
+ // The first element in the list is the cluster id on which the change has originated
+ private List<UUID> clusterIds;
+
private NavigableMap<byte[], Integer> scopes;
private CompressionContext compressionContext;
public HLogKey() {
- this(null, null, 0L, HConstants.LATEST_TIMESTAMP,
- HConstants.DEFAULT_CLUSTER_ID);
+ init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
+ new ArrayList<UUID>());
+ }
+
+ public HLogKey(final byte[] encodedRegionName, final TableName tablename, long logSeqNum,
+ final long now, UUID clusterId) {
+ List<UUID> clusterIds = new ArrayList<UUID>();
+ clusterIds.add(clusterId);
+ init(encodedRegionName, tablename, logSeqNum, now, clusterIds);
}
/**
@@ -123,13 +142,18 @@ public class HLogKey implements Writable
* @param tablename - name of table
* @param logSeqNum - log sequence number
* @param now Time at which this edit was written.
- * @param clusterId of the cluster (used in Replication)
+ * @param clusterIds the clusters that have consumed the change(used in Replication)
*/
public HLogKey(final byte [] encodedRegionName, final TableName tablename,
- long logSeqNum, final long now, UUID clusterId) {
+ long logSeqNum, final long now, List<UUID> clusterIds){
+ init(encodedRegionName, tablename, logSeqNum, now, clusterIds);
+ }
+
+ protected void init(final byte [] encodedRegionName, final TableName tablename,
+ long logSeqNum, final long now, List<UUID> clusterIds) {
this.logSeqNum = logSeqNum;
this.writeTime = now;
- this.clusterId = clusterId;
+ this.clusterIds = clusterIds;
this.encodedRegionName = encodedRegionName;
this.tablename = tablename;
}
@@ -171,14 +195,6 @@ public class HLogKey implements Writable
return this.writeTime;
}
- /**
- * Get the id of the original cluster
- * @return Cluster id.
- */
- public UUID getClusterId() {
- return clusterId;
- }
-
public NavigableMap<byte[], Integer> getScopes() {
return scopes;
}
@@ -187,12 +203,47 @@ public class HLogKey implements Writable
this.scopes = scopes;
}
+ public void readOlderScopes(NavigableMap<byte[], Integer> scopes) {
+ if (scopes != null) {
+ Iterator<Map.Entry<byte[], Integer>> iterator = scopes.entrySet()
+ .iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<byte[], Integer> scope = iterator.next();
+ String key = Bytes.toString(scope.getKey());
+ if (key.startsWith(PREFIX_CLUSTER_KEY)) {
+ addClusterId(UUID.fromString(key.substring(PREFIX_CLUSTER_KEY
+ .length())));
+ iterator.remove();
+ }
+ }
+ if (scopes.size() > 0) {
+ this.scopes = scopes;
+ }
+ }
+ }
+
+ /**
+ * Marks that the cluster with the given clusterId has consumed the change
+ */
+ public void addClusterId(UUID clusterId) {
+ if (!clusterIds.contains(clusterId)) {
+ clusterIds.add(clusterId);
+ }
+ }
+
+ /**
+ * @return the set of cluster Ids that have consumed the change
+ */
+ public List<UUID> getClusterIds() {
+ return clusterIds;
+ }
+
/**
- * Set the cluster id of this key.
- * @param clusterId
+ * @return the cluster id on which the change has originated. It there is no such cluster, it
+ * returns DEFAULT_CLUSTER_ID (cases where replication is not enabled)
*/
- public void setClusterId(UUID clusterId) {
- this.clusterId = clusterId;
+ public UUID getOriginatingClusterId(){
+ return clusterIds.isEmpty() ? HConstants.DEFAULT_CLUSTER_ID : clusterIds.get(0);
}
@Override
@@ -232,7 +283,6 @@ public class HLogKey implements Writable
int result = Bytes.hashCode(this.encodedRegionName);
result ^= this.logSeqNum;
result ^= this.writeTime;
- result ^= this.clusterId.hashCode();
return result;
}
@@ -299,13 +349,16 @@ public class HLogKey implements Writable
}
out.writeLong(this.logSeqNum);
out.writeLong(this.writeTime);
- // avoid storing 16 bytes when replication is not enabled
- if (this.clusterId == HConstants.DEFAULT_CLUSTER_ID) {
- out.writeBoolean(false);
- } else {
+ // Don't need to write the clusters information as we are using protobufs from 0.95
+ // Writing only the first clusterId for testing the legacy read
+ Iterator<UUID> iterator = clusterIds.iterator();
+ if(iterator.hasNext()){
out.writeBoolean(true);
- out.writeLong(this.clusterId.getMostSignificantBits());
- out.writeLong(this.clusterId.getLeastSignificantBits());
+ UUID clusterId = iterator.next();
+ out.writeLong(clusterId.getMostSignificantBits());
+ out.writeLong(clusterId.getLeastSignificantBits());
+ } else {
+ out.writeBoolean(false);
}
}
@@ -344,10 +397,13 @@ public class HLogKey implements Writable
this.logSeqNum = in.readLong();
this.writeTime = in.readLong();
- this.clusterId = HConstants.DEFAULT_CLUSTER_ID;
+
+ this.clusterIds.clear();
if (version.atLeast(Version.INITIAL)) {
if (in.readBoolean()) {
- this.clusterId = new UUID(in.readLong(), in.readLong());
+ // read the older log
+ // Definitely is the originating cluster
+ clusterIds.add(new UUID(in.readLong(), in.readLong()));
}
} else {
try {
@@ -357,6 +413,7 @@ public class HLogKey implements Writable
// Means it's a very old key, just continue
}
}
+ // Do not need to read the clusters information as we are using protobufs from 0.95
}
public WALKey.Builder getBuilder(
@@ -373,10 +430,11 @@ public class HLogKey implements Writable
}
builder.setLogSequenceNumber(this.logSeqNum);
builder.setWriteTime(writeTime);
- if (this.clusterId != HConstants.DEFAULT_CLUSTER_ID) {
- builder.setClusterId(HBaseProtos.UUID.newBuilder()
- .setLeastSigBits(this.clusterId.getLeastSignificantBits())
- .setMostSigBits(this.clusterId.getMostSignificantBits()));
+ HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
+ for (UUID clusterId : clusterIds) {
+ uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits());
+ uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
+ builder.addClusterIds(uuidBuilder.build());
}
if (scopes != null) {
for (Map.Entry<byte[], Integer> e : scopes.entrySet()) {
@@ -401,10 +459,15 @@ public class HLogKey implements Writable
this.encodedRegionName = walKey.getEncodedRegionName().toByteArray();
this.tablename = TableName.valueOf(walKey.getTableName().toByteArray());
}
- this.clusterId = HConstants.DEFAULT_CLUSTER_ID;
+ clusterIds.clear();
if (walKey.hasClusterId()) {
- this.clusterId = new UUID(
- walKey.getClusterId().getMostSigBits(), walKey.getClusterId().getLeastSigBits());
+ //When we are reading the older log (0.95.1 release)
+ //This is definitely the originating cluster
+ clusterIds.add(new UUID(walKey.getClusterId().getMostSigBits(), walKey.getClusterId()
+ .getLeastSigBits()));
+ }
+ for (HBaseProtos.UUID clusterId : walKey.getClusterIdsList()) {
+ clusterIds.add(new UUID(clusterId.getMostSigBits(), clusterId.getLeastSigBits()));
}
this.scopes = null;
if (walKey.getScopesCount() > 0) {
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Wed Aug 28 19:32:00 2013
@@ -22,8 +22,6 @@ import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
@@ -37,7 +35,6 @@ import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
@@ -1484,11 +1481,11 @@ public class HLogSplitter {
if (kv.isDelete()) {
del = new Delete(kv.getRow());
- del.setClusterId(entry.getKey().getClusterId());
+ del.setClusterIds(entry.getKey().getClusterIds());
preRow = del;
} else {
put = new Put(kv.getRow());
- put.setClusterId(entry.getKey().getClusterId());
+ put.setClusterIds(entry.getKey().getClusterIds());
preRow = put;
}
preKey = loc.getHostnamePort() + KEY_DELIMITER + table;
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java Wed Aug 28 19:32:00 2013
@@ -217,7 +217,7 @@ public class SequenceFileLogReader exten
// Scopes are probably in WAL edit, move to key
NavigableMap<byte[], Integer> scopes = e.getEdit().getAndRemoveScopes();
if (scopes != null) {
- e.getKey().setScopes(scopes);
+ e.getKey().readOlderScopes(scopes);
}
return true;
} catch (IOException ioe) {
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java Wed Aug 28 19:32:00 2013
@@ -116,13 +116,13 @@ public class ReplicationSink {
long totalReplicated = 0;
// Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per
// invocation of this method per table and cluster id.
- Map<TableName, Map<UUID,List<Row>>> rowMap = new TreeMap<TableName, Map<UUID,List<Row>>>();
+ Map<TableName, Map<List<UUID>, List<Row>>> rowMap =
+ new TreeMap<TableName, Map<List<UUID>, List<Row>>>();
for (WALEntry entry : entries) {
TableName table =
TableName.valueOf(entry.getKey().getTableName().toByteArray());
Cell previousCell = null;
Mutation m = null;
- java.util.UUID uuid = toUUID(entry.getKey().getClusterId());
int count = entry.getAssociatedCellCount();
for (int i = 0; i < count; i++) {
// Throw index out of bounds if our cell count is off
@@ -135,8 +135,12 @@ public class ReplicationSink {
m = CellUtil.isDelete(cell)?
new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()):
new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
- m.setClusterId(uuid);
- addToHashMultiMap(rowMap, table, uuid, m);
+ List<UUID> clusterIds = new ArrayList<UUID>();
+ for(HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()){
+ clusterIds.add(toUUID(clusterId));
+ }
+ m.setClusterIds(clusterIds);
+ addToHashMultiMap(rowMap, table, clusterIds, m);
}
if (CellUtil.isDelete(cell)) {
((Delete)m).addDeleteMarker(KeyValueUtil.ensureKeyValue(cell));
@@ -147,7 +151,7 @@ public class ReplicationSink {
}
totalReplicated++;
}
- for (Entry<TableName, Map<UUID,List<Row>>> entry : rowMap.entrySet()) {
+ for (Entry<TableName, Map<List<UUID>,List<Row>>> entry : rowMap.entrySet()) {
batch(entry.getKey(), entry.getValue().values());
}
int size = entries.size();
@@ -181,7 +185,7 @@ public class ReplicationSink {
* @param key1
* @param key2
* @param value
- * @return
+ * @return the list of values corresponding to key1 and key2
*/
private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2,List<V>>> map, K1 key1, K2 key2, V value) {
Map<K2,List<V>> innerMap = map.get(key1);
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Wed Aug 28 19:32:00 2013
@@ -38,7 +38,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
@@ -55,10 +54,8 @@ import org.apache.hadoop.hbase.replicati
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.ipc.RemoteException;
-import org.apache.zookeeper.KeeperException;
/**
* Class that handles the source of a replication stream.
@@ -395,20 +392,15 @@ public class ReplicationSource extends T
seenEntries++;
// Remove all KVs that should not be replicated
HLogKey logKey = entry.getKey();
- // don't replicate if the log entries originated in the peer
- if (!logKey.getClusterId().equals(peerClusterId)) {
+ // don't replicate if the log entries have already been consumed by the cluster
+ if (!logKey.getClusterIds().contains(peerClusterId)) {
removeNonReplicableEdits(entry);
// Don't replicate catalog entries, if the WALEdit wasn't
// containing anything to replicate and if we're currently not set to replicate
if (!logKey.getTablename().equals(TableName.META_TABLE_NAME) &&
edit.size() != 0) {
- // Only set the clusterId if is a local key.
- // This ensures that the originator sets the cluster id
- // and all replicas retain the initial cluster id.
- // This is *only* place where a cluster id other than the default is set.
- if (HConstants.DEFAULT_CLUSTER_ID == logKey.getClusterId()) {
- logKey.setClusterId(this.clusterId);
- }
+ //Mark that the current cluster has the change
+ logKey.addClusterId(clusterId);
currentNbOperations += countDistinctRowKeys(edit);
currentNbEntries++;
currentSize += entry.getEdit().size();
@@ -817,4 +809,4 @@ public class ReplicationSource extends T
", currently replicating from: " + this.currentPath +
" at position: " + position;
}
-}
+}
\ No newline at end of file
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotLogSplitter.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotLogSplitter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotLogSplitter.java Wed Aug 28 19:32:00 2013
@@ -172,7 +172,7 @@ class SnapshotLogSplitter implements Clo
// Append Entry
key = new HLogKey(newRegionName, tableName,
- key.getLogSeqNum(), key.getWriteTime(), key.getClusterId());
+ key.getLogSeqNum(), key.getWriteTime(), key.getClusterIds());
writer.append(new HLog.Entry(key, entry.getEdit()));
}
} catch (IOException e) {
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Wed Aug 28 19:32:00 2013
@@ -1231,7 +1231,7 @@ public class TestHRegion extends HBaseTe
NavigableMap<byte[], List<Cell>> deleteMap =
new TreeMap<byte[], List<Cell>>(Bytes.BYTES_COMPARATOR);
deleteMap.put(family, kvs);
- region.delete(deleteMap, HConstants.DEFAULT_CLUSTER_ID, Durability.SYNC_WAL);
+ region.delete(deleteMap, Durability.SYNC_WAL);
} catch (Exception e) {
assertTrue("Family " +new String(family)+ " does not exist", false);
}
@@ -1243,7 +1243,7 @@ public class TestHRegion extends HBaseTe
NavigableMap<byte[], List<Cell>> deleteMap =
new TreeMap<byte[], List<Cell>>(Bytes.BYTES_COMPARATOR);
deleteMap.put(family, kvs);
- region.delete(deleteMap, HConstants.DEFAULT_CLUSTER_ID, Durability.SYNC_WAL);
+ region.delete(deleteMap, Durability.SYNC_WAL);
} catch (Exception e) {
ok = true;
}
@@ -1571,7 +1571,7 @@ public class TestHRegion extends HBaseTe
NavigableMap<byte[], List<Cell>> deleteMap =
new TreeMap<byte[], List<Cell>>(Bytes.BYTES_COMPARATOR);
deleteMap.put(fam1, kvs);
- region.delete(deleteMap, HConstants.DEFAULT_CLUSTER_ID, Durability.SYNC_WAL);
+ region.delete(deleteMap, Durability.SYNC_WAL);
// extract the key values out the memstore:
// This is kinda hacky, but better than nothing...
@@ -3853,7 +3853,7 @@ public class TestHRegion extends HBaseTe
//verify append called or not
verify(log, expectAppend ? times(1) : never())
.appendNoSync((HRegionInfo)any(), eq(tableName),
- (WALEdit)any(), (UUID)any(), anyLong(), (HTableDescriptor)any());
+ (WALEdit)any(), (List<UUID>)any(), anyLong(), (HTableDescriptor)any());
//verify sync called or not
if (expectSync || expectSyncFromLogSyncer) {
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java Wed Aug 28 19:32:00 2013
@@ -19,9 +19,11 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,7 +36,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
@@ -103,7 +104,7 @@ public final class HLogPerformanceEvalua
HRegionInfo hri = region.getRegionInfo();
if (this.noSync) {
hlog.appendNoSync(hri, hri.getTableName(), walEdit,
- HConstants.DEFAULT_CLUSTER_ID, now, htd);
+ new ArrayList<UUID>(), now, htd);
} else {
hlog.append(hri, hri.getTableName(), walEdit, now, htd);
}