You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/08/28 21:32:00 UTC

svn commit: r1518335 [1/2] - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ hbase-protocol/src/main/protobuf/ hbase-server/src/main/java/org/apache/ha...

Author: stack
Date: Wed Aug 28 19:32:00 2013
New Revision: 1518335

URL: http://svn.apache.org/r1518335
Log:
HBASE-7709 Infinite loop possible in Master/Master replication

Modified:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
    hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
    hbase/trunk/hbase-protocol/src/main/protobuf/WAL.proto
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotLogSplitter.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java Wed Aug 28 19:32:00 2013
@@ -39,6 +39,10 @@ import org.apache.hadoop.hbase.io.HeapSi
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public abstract class Mutation extends OperationWithAttributes implements Row, CellScannable,
@@ -57,8 +61,10 @@ public abstract class Mutation extends O
       // familyMap
       ClassSize.TREEMAP);
 
-  // Attribute used in Mutations to indicate the originating cluster.
-  private static final String CLUSTER_ID_ATTR = "_c.id_";
+  /**
+   * The attribute for storing the list of clusters that have consumed the change.
+   */
+  private static final String CONSUMED_CLUSTER_IDS = "_cs.id";
 
   protected byte [] row = null;
   protected long ts = HConstants.LATEST_TIMESTAMP;
@@ -225,26 +231,33 @@ public abstract class Mutation extends O
   }
 
   /**
-   * Set the replication custer id.
-   * @param clusterId
+   * Marks that the clusters with the given clusterIds have consumed the mutation
+   * @param clusterIds of the clusters that have consumed the mutation
    */
-  public void setClusterId(UUID clusterId) {
-    if (clusterId == null) return;
-    byte[] val = new byte[2*Bytes.SIZEOF_LONG];
-    Bytes.putLong(val, 0, clusterId.getMostSignificantBits());
-    Bytes.putLong(val, Bytes.SIZEOF_LONG, clusterId.getLeastSignificantBits());
-    setAttribute(CLUSTER_ID_ATTR, val);
+  public void setClusterIds(List<UUID> clusterIds) {
+    ByteArrayDataOutput out = ByteStreams.newDataOutput();
+    out.writeInt(clusterIds.size());
+    for (UUID clusterId : clusterIds) {
+      out.writeLong(clusterId.getMostSignificantBits());
+      out.writeLong(clusterId.getLeastSignificantBits());
+    }
+    setAttribute(CONSUMED_CLUSTER_IDS, out.toByteArray());
   }
 
   /**
-   * @return The replication cluster id.
+   * @return the set of clusterIds that have consumed the mutation
    */
-  public UUID getClusterId() {
-    byte[] attr = getAttribute(CLUSTER_ID_ATTR);
-    if (attr == null) {
-      return HConstants.DEFAULT_CLUSTER_ID;
+  public List<UUID> getClusterIds() {
+    List<UUID> clusterIds = new ArrayList<UUID>();
+    byte[] bytes = getAttribute(CONSUMED_CLUSTER_IDS);
+    if(bytes != null) {
+      ByteArrayDataInput in = ByteStreams.newDataInput(bytes);
+      int numClusters = in.readInt();
+      for(int i=0; i<numClusters; i++){
+        clusterIds.add(new UUID(in.readLong(), in.readLong()));
+      }
     }
-    return new UUID(Bytes.toLong(attr,0), Bytes.toLong(attr, Bytes.SIZEOF_LONG));
+    return clusterIds;
   }
 
   /**

Modified: hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java (original)
+++ hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java Wed Aug 28 19:32:00 2013
@@ -567,19 +567,43 @@ public final class WALProtos {
      */
     long getWriteTime();
 
-    // optional .UUID cluster_id = 5;
+    // optional .UUID cluster_id = 5 [deprecated = true];
     /**
-     * <code>optional .UUID cluster_id = 5;</code>
+     * <code>optional .UUID cluster_id = 5 [deprecated = true];</code>
+     *
+     * <pre>
+     *
+     *This parameter is deprecated in favor of clusters which 
+     *contains the list of clusters that have consumed the change.
+     *It is retained so that the log created by earlier releases (0.94) 
+     *can be read by the newer releases.
+     * </pre>
      */
-    boolean hasClusterId();
+    @java.lang.Deprecated boolean hasClusterId();
     /**
-     * <code>optional .UUID cluster_id = 5;</code>
+     * <code>optional .UUID cluster_id = 5 [deprecated = true];</code>
+     *
+     * <pre>
+     *
+     *This parameter is deprecated in favor of clusters which 
+     *contains the list of clusters that have consumed the change.
+     *It is retained so that the log created by earlier releases (0.94) 
+     *can be read by the newer releases.
+     * </pre>
      */
-    org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterId();
+    @java.lang.Deprecated org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterId();
     /**
-     * <code>optional .UUID cluster_id = 5;</code>
+     * <code>optional .UUID cluster_id = 5 [deprecated = true];</code>
+     *
+     * <pre>
+     *
+     *This parameter is deprecated in favor of clusters which 
+     *contains the list of clusters that have consumed the change.
+     *It is retained so that the log created by earlier releases (0.94) 
+     *can be read by the newer releases.
+     * </pre>
      */
-    org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdOrBuilder();
+    @java.lang.Deprecated org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdOrBuilder();
 
     // repeated .FamilyScope scopes = 6;
     /**
@@ -609,30 +633,67 @@ public final class WALProtos {
     // optional uint32 following_kv_count = 7;
     /**
      * <code>optional uint32 following_kv_count = 7;</code>
+     */
+    boolean hasFollowingKvCount();
+    /**
+     * <code>optional uint32 following_kv_count = 7;</code>
+     */
+    int getFollowingKvCount();
+
+    // repeated .UUID cluster_ids = 8;
+    /**
+     * <code>repeated .UUID cluster_ids = 8;</code>
+     *
+     * <pre>
+     *
+     *cluster_ids field contains the list of clusters that have
+     *consumed the change
+     * </pre>
+     */
+    java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID> 
+        getClusterIdsList();
+    /**
+     * <code>repeated .UUID cluster_ids = 8;</code>
      *
      * <pre>
      *
-     *optional CustomEntryType custom_entry_type = 8;
+     *cluster_ids field contains the list of clusters that have
+     *consumed the change
+     * </pre>
+     */
+    org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterIds(int index);
+    /**
+     * <code>repeated .UUID cluster_ids = 8;</code>
+     *
+     * <pre>
      *
-     *enum CustomEntryType {
-     *COMPACTION = 0;
-     *}
+     *cluster_ids field contains the list of clusters that have
+     *consumed the change
      * </pre>
      */
-    boolean hasFollowingKvCount();
+    int getClusterIdsCount();
     /**
-     * <code>optional uint32 following_kv_count = 7;</code>
+     * <code>repeated .UUID cluster_ids = 8;</code>
      *
      * <pre>
      *
-     *optional CustomEntryType custom_entry_type = 8;
+     *cluster_ids field contains the list of clusters that have
+     *consumed the change
+     * </pre>
+     */
+    java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder> 
+        getClusterIdsOrBuilderList();
+    /**
+     * <code>repeated .UUID cluster_ids = 8;</code>
+     *
+     * <pre>
      *
-     *enum CustomEntryType {
-     *COMPACTION = 0;
-     *}
+     *cluster_ids field contains the list of clusters that have
+     *consumed the change
      * </pre>
      */
-    int getFollowingKvCount();
+    org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdsOrBuilder(
+        int index);
   }
   /**
    * Protobuf type {@code WALKey}
@@ -735,6 +796,14 @@ public final class WALProtos {
               followingKvCount_ = input.readUInt32();
               break;
             }
+            case 66: {
+              if (!((mutable_bitField0_ & 0x00000080) == 0x00000080)) {
+                clusterIds_ = new java.util.ArrayList<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID>();
+                mutable_bitField0_ |= 0x00000080;
+              }
+              clusterIds_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.PARSER, extensionRegistry));
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -746,6 +815,9 @@ public final class WALProtos {
         if (((mutable_bitField0_ & 0x00000020) == 0x00000020)) {
           scopes_ = java.util.Collections.unmodifiableList(scopes_);
         }
+        if (((mutable_bitField0_ & 0x00000080) == 0x00000080)) {
+          clusterIds_ = java.util.Collections.unmodifiableList(clusterIds_);
+        }
         this.unknownFields = unknownFields.build();
         makeExtensionsImmutable();
       }
@@ -842,25 +914,49 @@ public final class WALProtos {
       return writeTime_;
     }
 
-    // optional .UUID cluster_id = 5;
+    // optional .UUID cluster_id = 5 [deprecated = true];
     public static final int CLUSTER_ID_FIELD_NUMBER = 5;
     private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID clusterId_;
     /**
-     * <code>optional .UUID cluster_id = 5;</code>
+     * <code>optional .UUID cluster_id = 5 [deprecated = true];</code>
+     *
+     * <pre>
+     *
+     *This parameter is deprecated in favor of clusters which 
+     *contains the list of clusters that have consumed the change.
+     *It is retained so that the log created by earlier releases (0.94) 
+     *can be read by the newer releases.
+     * </pre>
      */
-    public boolean hasClusterId() {
+    @java.lang.Deprecated public boolean hasClusterId() {
       return ((bitField0_ & 0x00000010) == 0x00000010);
     }
     /**
-     * <code>optional .UUID cluster_id = 5;</code>
+     * <code>optional .UUID cluster_id = 5 [deprecated = true];</code>
+     *
+     * <pre>
+     *
+     *This parameter is deprecated in favor of clusters which 
+     *contains the list of clusters that have consumed the change.
+     *It is retained so that the log created by earlier releases (0.94) 
+     *can be read by the newer releases.
+     * </pre>
      */
-    public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterId() {
+    @java.lang.Deprecated public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterId() {
       return clusterId_;
     }
     /**
-     * <code>optional .UUID cluster_id = 5;</code>
+     * <code>optional .UUID cluster_id = 5 [deprecated = true];</code>
+     *
+     * <pre>
+     *
+     *This parameter is deprecated in favor of clusters which 
+     *contains the list of clusters that have consumed the change.
+     *It is retained so that the log created by earlier releases (0.94) 
+     *can be read by the newer releases.
+     * </pre>
      */
-    public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdOrBuilder() {
+    @java.lang.Deprecated public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdOrBuilder() {
       return clusterId_;
     }
 
@@ -905,33 +1001,81 @@ public final class WALProtos {
     private int followingKvCount_;
     /**
      * <code>optional uint32 following_kv_count = 7;</code>
+     */
+    public boolean hasFollowingKvCount() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    /**
+     * <code>optional uint32 following_kv_count = 7;</code>
+     */
+    public int getFollowingKvCount() {
+      return followingKvCount_;
+    }
+
+    // repeated .UUID cluster_ids = 8;
+    public static final int CLUSTER_IDS_FIELD_NUMBER = 8;
+    private java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID> clusterIds_;
+    /**
+     * <code>repeated .UUID cluster_ids = 8;</code>
      *
      * <pre>
      *
-     *optional CustomEntryType custom_entry_type = 8;
+     *cluster_ids field contains the list of clusters that have
+     *consumed the change
+     * </pre>
+     */
+    public java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID> getClusterIdsList() {
+      return clusterIds_;
+    }
+    /**
+     * <code>repeated .UUID cluster_ids = 8;</code>
      *
-     *enum CustomEntryType {
-     *COMPACTION = 0;
-     *}
+     * <pre>
+     *
+     *cluster_ids field contains the list of clusters that have
+     *consumed the change
      * </pre>
      */
-    public boolean hasFollowingKvCount() {
-      return ((bitField0_ & 0x00000020) == 0x00000020);
+    public java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder> 
+        getClusterIdsOrBuilderList() {
+      return clusterIds_;
     }
     /**
-     * <code>optional uint32 following_kv_count = 7;</code>
+     * <code>repeated .UUID cluster_ids = 8;</code>
      *
      * <pre>
      *
-     *optional CustomEntryType custom_entry_type = 8;
+     *cluster_ids field contains the list of clusters that have
+     *consumed the change
+     * </pre>
+     */
+    public int getClusterIdsCount() {
+      return clusterIds_.size();
+    }
+    /**
+     * <code>repeated .UUID cluster_ids = 8;</code>
+     *
+     * <pre>
      *
-     *enum CustomEntryType {
-     *COMPACTION = 0;
-     *}
+     *cluster_ids field contains the list of clusters that have
+     *consumed the change
      * </pre>
      */
-    public int getFollowingKvCount() {
-      return followingKvCount_;
+    public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterIds(int index) {
+      return clusterIds_.get(index);
+    }
+    /**
+     * <code>repeated .UUID cluster_ids = 8;</code>
+     *
+     * <pre>
+     *
+     *cluster_ids field contains the list of clusters that have
+     *consumed the change
+     * </pre>
+     */
+    public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdsOrBuilder(
+        int index) {
+      return clusterIds_.get(index);
     }
 
     private void initFields() {
@@ -942,6 +1086,7 @@ public final class WALProtos {
       clusterId_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.getDefaultInstance();
       scopes_ = java.util.Collections.emptyList();
       followingKvCount_ = 0;
+      clusterIds_ = java.util.Collections.emptyList();
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -976,6 +1121,12 @@ public final class WALProtos {
           return false;
         }
       }
+      for (int i = 0; i < getClusterIdsCount(); i++) {
+        if (!getClusterIds(i).isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -1004,6 +1155,9 @@ public final class WALProtos {
       if (((bitField0_ & 0x00000020) == 0x00000020)) {
         output.writeUInt32(7, followingKvCount_);
       }
+      for (int i = 0; i < clusterIds_.size(); i++) {
+        output.writeMessage(8, clusterIds_.get(i));
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -1041,6 +1195,10 @@ public final class WALProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeUInt32Size(7, followingKvCount_);
       }
+      for (int i = 0; i < clusterIds_.size(); i++) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(8, clusterIds_.get(i));
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -1096,6 +1254,8 @@ public final class WALProtos {
         result = result && (getFollowingKvCount()
             == other.getFollowingKvCount());
       }
+      result = result && getClusterIdsList()
+          .equals(other.getClusterIdsList());
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -1137,6 +1297,10 @@ public final class WALProtos {
         hash = (37 * hash) + FOLLOWING_KV_COUNT_FIELD_NUMBER;
         hash = (53 * hash) + getFollowingKvCount();
       }
+      if (getClusterIdsCount() > 0) {
+        hash = (37 * hash) + CLUSTER_IDS_FIELD_NUMBER;
+        hash = (53 * hash) + getClusterIdsList().hashCode();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -1244,6 +1408,7 @@ public final class WALProtos {
         if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
           getClusterIdFieldBuilder();
           getScopesFieldBuilder();
+          getClusterIdsFieldBuilder();
         }
       }
       private static Builder create() {
@@ -1274,6 +1439,12 @@ public final class WALProtos {
         }
         followingKvCount_ = 0;
         bitField0_ = (bitField0_ & ~0x00000040);
+        if (clusterIdsBuilder_ == null) {
+          clusterIds_ = java.util.Collections.emptyList();
+          bitField0_ = (bitField0_ & ~0x00000080);
+        } else {
+          clusterIdsBuilder_.clear();
+        }
         return this;
       }
 
@@ -1339,6 +1510,15 @@ public final class WALProtos {
           to_bitField0_ |= 0x00000020;
         }
         result.followingKvCount_ = followingKvCount_;
+        if (clusterIdsBuilder_ == null) {
+          if (((bitField0_ & 0x00000080) == 0x00000080)) {
+            clusterIds_ = java.util.Collections.unmodifiableList(clusterIds_);
+            bitField0_ = (bitField0_ & ~0x00000080);
+          }
+          result.clusterIds_ = clusterIds_;
+        } else {
+          result.clusterIds_ = clusterIdsBuilder_.build();
+        }
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1399,6 +1579,32 @@ public final class WALProtos {
         if (other.hasFollowingKvCount()) {
           setFollowingKvCount(other.getFollowingKvCount());
         }
+        if (clusterIdsBuilder_ == null) {
+          if (!other.clusterIds_.isEmpty()) {
+            if (clusterIds_.isEmpty()) {
+              clusterIds_ = other.clusterIds_;
+              bitField0_ = (bitField0_ & ~0x00000080);
+            } else {
+              ensureClusterIdsIsMutable();
+              clusterIds_.addAll(other.clusterIds_);
+            }
+            onChanged();
+          }
+        } else {
+          if (!other.clusterIds_.isEmpty()) {
+            if (clusterIdsBuilder_.isEmpty()) {
+              clusterIdsBuilder_.dispose();
+              clusterIdsBuilder_ = null;
+              clusterIds_ = other.clusterIds_;
+              bitField0_ = (bitField0_ & ~0x00000080);
+              clusterIdsBuilder_ = 
+                com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders ?
+                   getClusterIdsFieldBuilder() : null;
+            } else {
+              clusterIdsBuilder_.addAllMessages(other.clusterIds_);
+            }
+          }
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1432,6 +1638,12 @@ public final class WALProtos {
             return false;
           }
         }
+        for (int i = 0; i < getClusterIdsCount(); i++) {
+          if (!getClusterIds(i).isInitialized()) {
+            
+            return false;
+          }
+        }
         return true;
       }
 
@@ -1592,20 +1804,36 @@ public final class WALProtos {
         return this;
       }
 
-      // optional .UUID cluster_id = 5;
+      // optional .UUID cluster_id = 5 [deprecated = true];
       private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID clusterId_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.getDefaultInstance();
       private com.google.protobuf.SingleFieldBuilder<
           org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder> clusterIdBuilder_;
       /**
-       * <code>optional .UUID cluster_id = 5;</code>
+       * <code>optional .UUID cluster_id = 5 [deprecated = true];</code>
+       *
+       * <pre>
+       *
+       *This parameter is deprecated in favor of clusters which 
+       *contains the list of clusters that have consumed the change.
+       *It is retained so that the log created by earlier releases (0.94) 
+       *can be read by the newer releases.
+       * </pre>
        */
-      public boolean hasClusterId() {
+      @java.lang.Deprecated public boolean hasClusterId() {
         return ((bitField0_ & 0x00000010) == 0x00000010);
       }
       /**
-       * <code>optional .UUID cluster_id = 5;</code>
+       * <code>optional .UUID cluster_id = 5 [deprecated = true];</code>
+       *
+       * <pre>
+       *
+       *This parameter is deprecated in favor of clusters which 
+       *contains the list of clusters that have consumed the change.
+       *It is retained so that the log created by earlier releases (0.94) 
+       *can be read by the newer releases.
+       * </pre>
        */
-      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterId() {
+      @java.lang.Deprecated public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterId() {
         if (clusterIdBuilder_ == null) {
           return clusterId_;
         } else {
@@ -1613,9 +1841,17 @@ public final class WALProtos {
         }
       }
       /**
-       * <code>optional .UUID cluster_id = 5;</code>
+       * <code>optional .UUID cluster_id = 5 [deprecated = true];</code>
+       *
+       * <pre>
+       *
+       *This parameter is deprecated in favor of clusters which 
+       *contains the list of clusters that have consumed the change.
+       *It is retained so that the log created by earlier releases (0.94) 
+       *can be read by the newer releases.
+       * </pre>
        */
-      public Builder setClusterId(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID value) {
+      @java.lang.Deprecated public Builder setClusterId(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID value) {
         if (clusterIdBuilder_ == null) {
           if (value == null) {
             throw new NullPointerException();
@@ -1629,9 +1865,17 @@ public final class WALProtos {
         return this;
       }
       /**
-       * <code>optional .UUID cluster_id = 5;</code>
+       * <code>optional .UUID cluster_id = 5 [deprecated = true];</code>
+       *
+       * <pre>
+       *
+       *This parameter is deprecated in favor of clusters which 
+       *contains the list of clusters that have consumed the change.
+       *It is retained so that the log created by earlier releases (0.94) 
+       *can be read by the newer releases.
+       * </pre>
        */
-      public Builder setClusterId(
+      @java.lang.Deprecated public Builder setClusterId(
           org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder builderForValue) {
         if (clusterIdBuilder_ == null) {
           clusterId_ = builderForValue.build();
@@ -1643,9 +1887,17 @@ public final class WALProtos {
         return this;
       }
       /**
-       * <code>optional .UUID cluster_id = 5;</code>
+       * <code>optional .UUID cluster_id = 5 [deprecated = true];</code>
+       *
+       * <pre>
+       *
+       *This parameter is deprecated in favor of clusters which 
+       *contains the list of clusters that have consumed the change.
+       *It is retained so that the log created by earlier releases (0.94) 
+       *can be read by the newer releases.
+       * </pre>
        */
-      public Builder mergeClusterId(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID value) {
+      @java.lang.Deprecated public Builder mergeClusterId(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID value) {
         if (clusterIdBuilder_ == null) {
           if (((bitField0_ & 0x00000010) == 0x00000010) &&
               clusterId_ != org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.getDefaultInstance()) {
@@ -1662,9 +1914,17 @@ public final class WALProtos {
         return this;
       }
       /**
-       * <code>optional .UUID cluster_id = 5;</code>
+       * <code>optional .UUID cluster_id = 5 [deprecated = true];</code>
+       *
+       * <pre>
+       *
+       *This parameter is deprecated in favor of clusters which 
+       *contains the list of clusters that have consumed the change.
+       *It is retained so that the log created by earlier releases (0.94) 
+       *can be read by the newer releases.
+       * </pre>
        */
-      public Builder clearClusterId() {
+      @java.lang.Deprecated public Builder clearClusterId() {
         if (clusterIdBuilder_ == null) {
           clusterId_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.getDefaultInstance();
           onChanged();
@@ -1675,17 +1935,33 @@ public final class WALProtos {
         return this;
       }
       /**
-       * <code>optional .UUID cluster_id = 5;</code>
+       * <code>optional .UUID cluster_id = 5 [deprecated = true];</code>
+       *
+       * <pre>
+       *
+       *This parameter is deprecated in favor of clusters which 
+       *contains the list of clusters that have consumed the change.
+       *It is retained so that the log created by earlier releases (0.94) 
+       *can be read by the newer releases.
+       * </pre>
        */
-      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder getClusterIdBuilder() {
+      @java.lang.Deprecated public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder getClusterIdBuilder() {
         bitField0_ |= 0x00000010;
         onChanged();
         return getClusterIdFieldBuilder().getBuilder();
       }
       /**
-       * <code>optional .UUID cluster_id = 5;</code>
+       * <code>optional .UUID cluster_id = 5 [deprecated = true];</code>
+       *
+       * <pre>
+       *
+       *This parameter is deprecated in favor of clusters which 
+       *contains the list of clusters that have consumed the change.
+       *It is retained so that the log created by earlier releases (0.94) 
+       *can be read by the newer releases.
+       * </pre>
        */
-      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdOrBuilder() {
+      @java.lang.Deprecated public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdOrBuilder() {
         if (clusterIdBuilder_ != null) {
           return clusterIdBuilder_.getMessageOrBuilder();
         } else {
@@ -1693,7 +1969,15 @@ public final class WALProtos {
         }
       }
       /**
-       * <code>optional .UUID cluster_id = 5;</code>
+       * <code>optional .UUID cluster_id = 5 [deprecated = true];</code>
+       *
+       * <pre>
+       *
+       *This parameter is deprecated in favor of clusters which 
+       *contains the list of clusters that have consumed the change.
+       *It is retained so that the log created by earlier releases (0.94) 
+       *can be read by the newer releases.
+       * </pre>
        */
       private com.google.protobuf.SingleFieldBuilder<
           org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder> 
@@ -1953,70 +2237,382 @@ public final class WALProtos {
       private int followingKvCount_ ;
       /**
        * <code>optional uint32 following_kv_count = 7;</code>
+       */
+      public boolean hasFollowingKvCount() {
+        return ((bitField0_ & 0x00000040) == 0x00000040);
+      }
+      /**
+       * <code>optional uint32 following_kv_count = 7;</code>
+       */
+      public int getFollowingKvCount() {
+        return followingKvCount_;
+      }
+      /**
+       * <code>optional uint32 following_kv_count = 7;</code>
+       */
+      public Builder setFollowingKvCount(int value) {
+        bitField0_ |= 0x00000040;
+        followingKvCount_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional uint32 following_kv_count = 7;</code>
+       */
+      public Builder clearFollowingKvCount() {
+        bitField0_ = (bitField0_ & ~0x00000040);
+        followingKvCount_ = 0;
+        onChanged();
+        return this;
+      }
+
+      // repeated .UUID cluster_ids = 8;
+      private java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID> clusterIds_ =
+        java.util.Collections.emptyList();
+      private void ensureClusterIdsIsMutable() {
+        if (!((bitField0_ & 0x00000080) == 0x00000080)) {
+          clusterIds_ = new java.util.ArrayList<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID>(clusterIds_);
+          bitField0_ |= 0x00000080;
+         }
+      }
+
+      private com.google.protobuf.RepeatedFieldBuilder<
+          org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder> clusterIdsBuilder_;
+
+      /**
+       * <code>repeated .UUID cluster_ids = 8;</code>
        *
        * <pre>
        *
-       *optional CustomEntryType custom_entry_type = 8;
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * </pre>
+       */
+      public java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID> getClusterIdsList() {
+        if (clusterIdsBuilder_ == null) {
+          return java.util.Collections.unmodifiableList(clusterIds_);
+        } else {
+          return clusterIdsBuilder_.getMessageList();
+        }
+      }
+      /**
+       * <code>repeated .UUID cluster_ids = 8;</code>
+       *
+       * <pre>
        *
-       *enum CustomEntryType {
-       *COMPACTION = 0;
-       *}
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
        * </pre>
        */
-      public boolean hasFollowingKvCount() {
-        return ((bitField0_ & 0x00000040) == 0x00000040);
+      public int getClusterIdsCount() {
+        if (clusterIdsBuilder_ == null) {
+          return clusterIds_.size();
+        } else {
+          return clusterIdsBuilder_.getCount();
+        }
       }
       /**
-       * <code>optional uint32 following_kv_count = 7;</code>
+       * <code>repeated .UUID cluster_ids = 8;</code>
        *
        * <pre>
        *
-       *optional CustomEntryType custom_entry_type = 8;
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * </pre>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterIds(int index) {
+        if (clusterIdsBuilder_ == null) {
+          return clusterIds_.get(index);
+        } else {
+          return clusterIdsBuilder_.getMessage(index);
+        }
+      }
+      /**
+       * <code>repeated .UUID cluster_ids = 8;</code>
        *
-       *enum CustomEntryType {
-       *COMPACTION = 0;
-       *}
+       * <pre>
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
        * </pre>
        */
-      public int getFollowingKvCount() {
-        return followingKvCount_;
+      public Builder setClusterIds(
+          int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID value) {
+        if (clusterIdsBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          ensureClusterIdsIsMutable();
+          clusterIds_.set(index, value);
+          onChanged();
+        } else {
+          clusterIdsBuilder_.setMessage(index, value);
+        }
+        return this;
       }
       /**
-       * <code>optional uint32 following_kv_count = 7;</code>
+       * <code>repeated .UUID cluster_ids = 8;</code>
        *
        * <pre>
        *
-       *optional CustomEntryType custom_entry_type = 8;
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * </pre>
+       */
+      public Builder setClusterIds(
+          int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder builderForValue) {
+        if (clusterIdsBuilder_ == null) {
+          ensureClusterIdsIsMutable();
+          clusterIds_.set(index, builderForValue.build());
+          onChanged();
+        } else {
+          clusterIdsBuilder_.setMessage(index, builderForValue.build());
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .UUID cluster_ids = 8;</code>
        *
-       *enum CustomEntryType {
-       *COMPACTION = 0;
-       *}
+       * <pre>
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
        * </pre>
        */
-      public Builder setFollowingKvCount(int value) {
-        bitField0_ |= 0x00000040;
-        followingKvCount_ = value;
-        onChanged();
+      public Builder addClusterIds(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID value) {
+        if (clusterIdsBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          ensureClusterIdsIsMutable();
+          clusterIds_.add(value);
+          onChanged();
+        } else {
+          clusterIdsBuilder_.addMessage(value);
+        }
         return this;
       }
       /**
-       * <code>optional uint32 following_kv_count = 7;</code>
+       * <code>repeated .UUID cluster_ids = 8;</code>
        *
        * <pre>
        *
-       *optional CustomEntryType custom_entry_type = 8;
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * </pre>
+       */
+      public Builder addClusterIds(
+          int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID value) {
+        if (clusterIdsBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          ensureClusterIdsIsMutable();
+          clusterIds_.add(index, value);
+          onChanged();
+        } else {
+          clusterIdsBuilder_.addMessage(index, value);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .UUID cluster_ids = 8;</code>
+       *
+       * <pre>
        *
-       *enum CustomEntryType {
-       *COMPACTION = 0;
-       *}
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
        * </pre>
        */
-      public Builder clearFollowingKvCount() {
-        bitField0_ = (bitField0_ & ~0x00000040);
-        followingKvCount_ = 0;
-        onChanged();
+      public Builder addClusterIds(
+          org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder builderForValue) {
+        if (clusterIdsBuilder_ == null) {
+          ensureClusterIdsIsMutable();
+          clusterIds_.add(builderForValue.build());
+          onChanged();
+        } else {
+          clusterIdsBuilder_.addMessage(builderForValue.build());
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .UUID cluster_ids = 8;</code>
+       *
+       * <pre>
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * </pre>
+       */
+      public Builder addClusterIds(
+          int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder builderForValue) {
+        if (clusterIdsBuilder_ == null) {
+          ensureClusterIdsIsMutable();
+          clusterIds_.add(index, builderForValue.build());
+          onChanged();
+        } else {
+          clusterIdsBuilder_.addMessage(index, builderForValue.build());
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .UUID cluster_ids = 8;</code>
+       *
+       * <pre>
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * </pre>
+       */
+      public Builder addAllClusterIds(
+          java.lang.Iterable<? extends org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID> values) {
+        if (clusterIdsBuilder_ == null) {
+          ensureClusterIdsIsMutable();
+          super.addAll(values, clusterIds_);
+          onChanged();
+        } else {
+          clusterIdsBuilder_.addAllMessages(values);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .UUID cluster_ids = 8;</code>
+       *
+       * <pre>
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * </pre>
+       */
+      public Builder clearClusterIds() {
+        if (clusterIdsBuilder_ == null) {
+          clusterIds_ = java.util.Collections.emptyList();
+          bitField0_ = (bitField0_ & ~0x00000080);
+          onChanged();
+        } else {
+          clusterIdsBuilder_.clear();
+        }
         return this;
       }
+      /**
+       * <code>repeated .UUID cluster_ids = 8;</code>
+       *
+       * <pre>
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * </pre>
+       */
+      public Builder removeClusterIds(int index) {
+        if (clusterIdsBuilder_ == null) {
+          ensureClusterIdsIsMutable();
+          clusterIds_.remove(index);
+          onChanged();
+        } else {
+          clusterIdsBuilder_.remove(index);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .UUID cluster_ids = 8;</code>
+       *
+       * <pre>
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * </pre>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder getClusterIdsBuilder(
+          int index) {
+        return getClusterIdsFieldBuilder().getBuilder(index);
+      }
+      /**
+       * <code>repeated .UUID cluster_ids = 8;</code>
+       *
+       * <pre>
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * </pre>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdsOrBuilder(
+          int index) {
+        if (clusterIdsBuilder_ == null) {
+          return clusterIds_.get(index);  } else {
+          return clusterIdsBuilder_.getMessageOrBuilder(index);
+        }
+      }
+      /**
+       * <code>repeated .UUID cluster_ids = 8;</code>
+       *
+       * <pre>
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * </pre>
+       */
+      public java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder> 
+           getClusterIdsOrBuilderList() {
+        if (clusterIdsBuilder_ != null) {
+          return clusterIdsBuilder_.getMessageOrBuilderList();
+        } else {
+          return java.util.Collections.unmodifiableList(clusterIds_);
+        }
+      }
+      /**
+       * <code>repeated .UUID cluster_ids = 8;</code>
+       *
+       * <pre>
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * </pre>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder addClusterIdsBuilder() {
+        return getClusterIdsFieldBuilder().addBuilder(
+            org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.getDefaultInstance());
+      }
+      /**
+       * <code>repeated .UUID cluster_ids = 8;</code>
+       *
+       * <pre>
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * </pre>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder addClusterIdsBuilder(
+          int index) {
+        return getClusterIdsFieldBuilder().addBuilder(
+            index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.getDefaultInstance());
+      }
+      /**
+       * <code>repeated .UUID cluster_ids = 8;</code>
+       *
+       * <pre>
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * </pre>
+       */
+      public java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder> 
+           getClusterIdsBuilderList() {
+        return getClusterIdsFieldBuilder().getBuilderList();
+      }
+      private com.google.protobuf.RepeatedFieldBuilder<
+          org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder> 
+          getClusterIdsFieldBuilder() {
+        if (clusterIdsBuilder_ == null) {
+          clusterIdsBuilder_ = new com.google.protobuf.RepeatedFieldBuilder<
+              org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder>(
+                  clusterIds_,
+                  ((bitField0_ & 0x00000080) == 0x00000080),
+                  getParentForChildren(),
+                  isClean());
+          clusterIds_ = null;
+        }
+        return clusterIdsBuilder_;
+      }
 
       // @@protoc_insertion_point(builder_scope:WALKey)
     }
@@ -4216,22 +4812,22 @@ public final class WALProtos {
   static {
     java.lang.String[] descriptorData = {
       "\n\tWAL.proto\032\013hbase.proto\"$\n\tWALHeader\022\027\n" +
-      "\017has_compression\030\001 \001(\010\"\277\001\n\006WALKey\022\033\n\023enc" +
+      "\017has_compression\030\001 \001(\010\"\337\001\n\006WALKey\022\033\n\023enc" +
       "oded_region_name\030\001 \002(\014\022\022\n\ntable_name\030\002 \002" +
       "(\014\022\033\n\023log_sequence_number\030\003 \002(\004\022\022\n\nwrite" +
-      "_time\030\004 \002(\004\022\031\n\ncluster_id\030\005 \001(\0132\005.UUID\022\034" +
-      "\n\006scopes\030\006 \003(\0132\014.FamilyScope\022\032\n\022followin" +
-      "g_kv_count\030\007 \001(\r\"=\n\013FamilyScope\022\016\n\006famil" +
-      "y\030\001 \002(\014\022\036\n\nscope_type\030\002 \002(\0162\n.ScopeType\"" +
-      "\251\001\n\024CompactionDescriptor\022\022\n\ntable_name\030\001" +
-      " \002(\014\022\033\n\023encoded_region_name\030\002 \002(\014\022\023\n\013fam",
-      "ily_name\030\003 \002(\014\022\030\n\020compaction_input\030\004 \003(\t" +
-      "\022\031\n\021compaction_output\030\005 \003(\t\022\026\n\016store_hom" +
-      "e_dir\030\006 \002(\t\"\014\n\nWALTrailer*F\n\tScopeType\022\033" +
-      "\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATI" +
-      "ON_SCOPE_GLOBAL\020\001B?\n*org.apache.hadoop.h" +
-      "base.protobuf.generatedB\tWALProtosH\001\210\001\000\240" +
-      "\001\001"
+      "_time\030\004 \002(\004\022\035\n\ncluster_id\030\005 \001(\0132\005.UUIDB\002" +
+      "\030\001\022\034\n\006scopes\030\006 \003(\0132\014.FamilyScope\022\032\n\022foll" +
+      "owing_kv_count\030\007 \001(\r\022\032\n\013cluster_ids\030\010 \003(" +
+      "\0132\005.UUID\"=\n\013FamilyScope\022\016\n\006family\030\001 \002(\014\022" +
+      "\036\n\nscope_type\030\002 \002(\0162\n.ScopeType\"\251\001\n\024Comp" +
+      "actionDescriptor\022\022\n\ntable_name\030\001 \002(\014\022\033\n\023",
+      "encoded_region_name\030\002 \002(\014\022\023\n\013family_name" +
+      "\030\003 \002(\014\022\030\n\020compaction_input\030\004 \003(\t\022\031\n\021comp" +
+      "action_output\030\005 \003(\t\022\026\n\016store_home_dir\030\006 " +
+      "\002(\t\"\014\n\nWALTrailer*F\n\tScopeType\022\033\n\027REPLIC" +
+      "ATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE" +
+      "_GLOBAL\020\001B?\n*org.apache.hadoop.hbase.pro" +
+      "tobuf.generatedB\tWALProtosH\001\210\001\000\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -4249,7 +4845,7 @@ public final class WALProtos {
           internal_static_WALKey_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_WALKey_descriptor,
-              new java.lang.String[] { "EncodedRegionName", "TableName", "LogSequenceNumber", "WriteTime", "ClusterId", "Scopes", "FollowingKvCount", });
+              new java.lang.String[] { "EncodedRegionName", "TableName", "LogSequenceNumber", "WriteTime", "ClusterId", "Scopes", "FollowingKvCount", "ClusterIds", });
           internal_static_FamilyScope_descriptor =
             getDescriptor().getMessageTypes().get(2);
           internal_static_FamilyScope_fieldAccessorTable = new

Modified: hbase/trunk/hbase-protocol/src/main/protobuf/WAL.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/protobuf/WAL.proto?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/protobuf/WAL.proto (original)
+++ hbase/trunk/hbase-protocol/src/main/protobuf/WAL.proto Wed Aug 28 19:32:00 2013
@@ -33,13 +33,24 @@ message WALKey {
   required bytes table_name = 2;
   required uint64 log_sequence_number = 3;
   required uint64 write_time = 4;
-  optional UUID cluster_id = 5;
+  /*
+  This parameter is deprecated in favor of clusters which 
+  contains the list of clusters that have consumed the change.
+  It is retained so that the log created by earlier releases (0.94) 
+  can be read by the newer releases.
+  */
+  optional UUID cluster_id = 5 [deprecated=true];
 
   repeated FamilyScope scopes = 6;
   optional uint32 following_kv_count = 7;
+  /*
+  This field contains the list of clusters that have
+  consumed the change
+  */
+  repeated UUID cluster_ids = 8;
 /*
-  optional CustomEntryType custom_entry_type = 8;
-
+  optional CustomEntryType custom_entry_type = 9;
+  
   enum CustomEntryType {
     COMPACTION = 0;
   }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java Wed Aug 28 19:32:00 2013
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -114,7 +115,7 @@ public class Import {
   static class Importer
   extends TableMapper<ImmutableBytesWritable, Mutation> {
     private Map<byte[], byte[]> cfRenameMap;
-    private UUID clusterId;
+    private List<UUID> clusterIds;
 
     /**
      * @param row  The current table row key.
@@ -159,11 +160,11 @@ public class Import {
         }
       }
       if (put != null) {
-        put.setClusterId(clusterId);
+        put.setClusterIds(clusterIds);
         context.write(key, put);
       }
       if (delete != null) {
-        delete.setClusterId(clusterId);
+        delete.setClusterIds(clusterIds);
         context.write(key, delete);
       }
     }
@@ -177,7 +178,7 @@ public class Import {
       ZooKeeperWatcher zkw = null;
       try {
         zkw = new ZooKeeperWatcher(conf, context.getTaskAttemptID().toString(), null);
-        clusterId = ZKClusterId.getUUIDForCluster(zkw);
+        clusterIds = Collections.singletonList(ZKClusterId.getUUIDForCluster(zkw));
       } catch (ZooKeeperConnectionException e) {
         LOG.error("Problem connecting to ZooKeper during task setup", e);
       } catch (KeeperException e) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java Wed Aug 28 19:32:00 2013
@@ -115,6 +115,7 @@ public class ReplicationProtbufUtil {
     AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder();
     AdminProtos.ReplicateWALEntryRequest.Builder builder =
       AdminProtos.ReplicateWALEntryRequest.newBuilder();
+    HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
     for (HLog.Entry entry: entries) {
       entryBuilder.clear();
       WALProtos.WALKey.Builder keyBuilder = entryBuilder.getKeyBuilder();
@@ -124,11 +125,10 @@ public class ReplicationProtbufUtil {
       keyBuilder.setTableName(ByteString.copyFrom(key.getTablename().getName()));
       keyBuilder.setLogSequenceNumber(key.getLogSeqNum());
       keyBuilder.setWriteTime(key.getWriteTime());
-      UUID clusterId = key.getClusterId();
-      if (clusterId != null) {
-        HBaseProtos.UUID.Builder uuidBuilder = keyBuilder.getClusterIdBuilder();
+      for(UUID clusterId : key.getClusterIds()) {
         uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits());
         uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
+        keyBuilder.addClusterIds(uuidBuilder.build());
       }
       WALEdit edit = entry.getEdit();
       NavigableMap<byte[], Integer> scopes = key.getScopes();

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java Wed Aug 28 19:32:00 2013
@@ -18,9 +18,10 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.UUID;
 
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 
@@ -41,8 +42,8 @@ implements RowProcessor<S,T> {
   }
 
   @Override
-  public UUID getClusterId() {
-    return HConstants.DEFAULT_CLUSTER_ID;
+  public List<UUID> getClusterIds() {
+    return new ArrayList<UUID>();
   }
 
   @Override

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Aug 28 19:32:00 2013
@@ -1779,15 +1779,13 @@ public class HRegion implements HeapSize
   /**
    * This is used only by unit tests. Not required to be a public API.
    * @param familyMap map of family to edits for the given family.
-   * @param clusterId
    * @param durability
    * @throws IOException
    */
-  void delete(NavigableMap<byte[], List<Cell>> familyMap, UUID clusterId,
+  void delete(NavigableMap<byte[], List<Cell>> familyMap,
       Durability durability) throws IOException {
     Delete delete = new Delete(FOR_UNIT_TESTS_ONLY);
     delete.setFamilyMap(familyMap);
-    delete.setClusterId(clusterId);
     delete.setDurability(durability);
     doBatchMutate(delete);
   }
@@ -2206,7 +2204,7 @@ public class HRegion implements HeapSize
       Mutation mutation = batchOp.operations[firstIndex];
       if (walEdit.size() > 0) {
         txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
-               walEdit, mutation.getClusterId(), now, this.htableDescriptor);
+               walEdit, mutation.getClusterIds(), now, this.htableDescriptor);
       }
 
       // -------------------------------
@@ -2598,7 +2596,6 @@ public class HRegion implements HeapSize
     familyMap.put(family, edits);
     Put p = new Put(row);
     p.setFamilyMap(familyMap);
-    p.setClusterId(HConstants.DEFAULT_CLUSTER_ID);
     doBatchMutate(p);
   }
 
@@ -4534,7 +4531,7 @@ public class HRegion implements HeapSize
           if (!walEdit.isEmpty()) {
             txid = this.log.appendNoSync(this.getRegionInfo(),
                 this.htableDescriptor.getTableName(), walEdit,
-                processor.getClusterId(), now, this.htableDescriptor);
+                processor.getClusterIds(), now, this.htableDescriptor);
           }
           // 8. Release region lock
           if (locked) {
@@ -4761,7 +4758,7 @@ public class HRegion implements HeapSize
             // cluster. A slave cluster receives the final value (not the delta)
             // as a Put.
             txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
-              walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(),
+              walEdits, new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(),
               this.htableDescriptor);
           } else {
             recordMutationWithoutWal(append.getFamilyCellMap());
@@ -4911,7 +4908,7 @@ public class HRegion implements HeapSize
             // cluster. A slave cluster receives the final value (not the delta)
             // as a Put.
             txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
-                walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(),
+                walEdits, new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(),
                 this.htableDescriptor);
           } else {
             recordMutationWithoutWal(increment.getFamilyCellMap());

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java Wed Aug 28 19:32:00 2013
@@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 
-import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
 
 @InterfaceAudience.Public
@@ -107,9 +106,9 @@ public interface RowProcessor<S extends 
 
 
   /**
-   * @return The replication cluster id.
+   * @return The cluster ids that have the change.
    */
-  UUID getClusterId();
+  List<UUID> getClusterIds();
 
   /**
    * Human readable name of the processor

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java Wed Aug 28 19:32:00 2013
@@ -27,6 +27,7 @@ import java.net.URLEncoder;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -821,12 +822,12 @@ class FSHLog implements HLog, Syncable {
    * @param encodedRegionName Encoded name of the region as returned by
    * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
    * @param tableName
-   * @param clusterId
+   * @param clusterIds that have consumed the change
    * @return New log key.
    */
   protected HLogKey makeKey(byte[] encodedRegionName, TableName tableName, long seqnum,
-      long now, UUID clusterId) {
-    return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterId);
+      long now, List<UUID> clusterIds) {
+    return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterIds);
   }
 
   @Override
@@ -839,7 +840,7 @@ class FSHLog implements HLog, Syncable {
   @Override
   public void append(HRegionInfo info, TableName tableName, WALEdit edits,
     final long now, HTableDescriptor htd, boolean isInMemstore) throws IOException {
-    append(info, tableName, edits, HConstants.DEFAULT_CLUSTER_ID, now, htd, true, isInMemstore);
+    append(info, tableName, edits, new ArrayList<UUID>(), now, htd, true, isInMemstore);
   }
 
   /**
@@ -862,15 +863,16 @@ class FSHLog implements HLog, Syncable {
    * @param info
    * @param tableName
    * @param edits
-   * @param clusterId The originating clusterId for this edit (for replication)
+   * @param clusterIds that have consumed the change (for replication)
    * @param now
    * @param doSync shall we sync?
    * @return txid of this transaction
    * @throws IOException
    */
   @SuppressWarnings("deprecation")
-  private long append(HRegionInfo info, TableName tableName, WALEdit edits, UUID clusterId,
-      final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore)
+  private long append(HRegionInfo info, TableName tableName, WALEdit edits,
+      List<UUID> clusterIds, final long now, HTableDescriptor htd, boolean doSync,
+      boolean isInMemstore)
     throws IOException {
       if (edits.isEmpty()) return this.unflushedEntries.get();
       if (this.closed) {
@@ -890,7 +892,7 @@ class FSHLog implements HLog, Syncable {
           // actual  name.
           byte [] encodedRegionName = info.getEncodedNameAsBytes();
           if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum);
-          HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId);
+          HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterIds);
           doWrite(info, logKey, edits, htd);
           this.numEntries.incrementAndGet();
           txid = this.unflushedEntries.incrementAndGet();
@@ -914,9 +916,9 @@ class FSHLog implements HLog, Syncable {
 
   @Override
   public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits,
-    UUID clusterId, final long now, HTableDescriptor htd)
+      List<UUID> clusterIds, final long now, HTableDescriptor htd)
     throws IOException {
-    return append(info, tableName, edits, clusterId, now, htd, false, true);
+    return append(info, tableName, edits, clusterIds, now, htd, false, true);
   }
 
   /**

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Wed Aug 28 19:32:00 2013
@@ -22,6 +22,8 @@ package org.apache.hadoop.hbase.regionse
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import java.util.regex.Pattern;
 
@@ -264,7 +266,7 @@ public interface HLog {
   void closeAndDelete() throws IOException;
 
   /**
-   * Same as {@link #appendNoSync(HRegionInfo, TableName, WALEdit, UUID, long, HTableDescriptor)},
+   * Same as {@link #appendNoSync(HRegionInfo, TableName, WALEdit, List, long, HTableDescriptor)},
    * except it causes a sync on the log
    */
   public void append(HRegionInfo info, TableName tableName, WALEdit edits,
@@ -285,23 +287,20 @@ public interface HLog {
       final long now, HTableDescriptor htd, boolean isInMemstore) throws IOException;
 
   /**
-   * Append a set of edits to the log. Log edits are keyed by (encoded)
-   * regionName, rowname, and log-sequence-id. The HLog is not flushed after
-   * this transaction is written to the log.
-   *
+   * Append a set of edits to the log. Log edits are keyed by (encoded) regionName, rowname, and
+   * log-sequence-id. The HLog is not flushed after this transaction is written to the log.
    * @param info
    * @param tableName
    * @param edits
-   * @param clusterId
-   *          The originating clusterId for this edit (for replication)
+   * @param clusterIds The clusters that have consumed the change (for replication)
    * @param now
    * @param htd
    * @return txid of this transaction
    * @throws IOException
    */
-  public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits,
-      UUID clusterId, final long now, HTableDescriptor htd) throws IOException;
-
+  public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, 
+      List<UUID> clusterIds, final long now, HTableDescriptor htd) throws IOException;
+  
   void hsync() throws IOException;
 
   void hflush() throws IOException;

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java Wed Aug 28 19:32:00 2013
@@ -22,7 +22,11 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.EOFException;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
@@ -93,6 +97,13 @@ public class HLogKey implements Writable
     }
   }
 
+  /*
+   * This is used for reading the log entries created by the previous releases
+   * (0.94.11) which write the clusters information to the scopes of WALEdit.
+   */
+  private static final String PREFIX_CLUSTER_KEY = ".";
+
+
   private static final Version VERSION = Version.COMPRESSED;
 
   //  The encoded region name.
@@ -102,15 +113,23 @@ public class HLogKey implements Writable
   // Time at which this edit was written.
   private long writeTime;
 
-  private UUID clusterId;
-
+  // The first element in the list is the cluster id on which the change has originated
+  private List<UUID> clusterIds;
+  
   private NavigableMap<byte[], Integer> scopes;
 
   private CompressionContext compressionContext;
 
   public HLogKey() {
-    this(null, null, 0L, HConstants.LATEST_TIMESTAMP,
-        HConstants.DEFAULT_CLUSTER_ID);
+    init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
+        new ArrayList<UUID>());
+  }
+
+  public HLogKey(final byte[] encodedRegionName, final TableName tablename, long logSeqNum,
+      final long now, UUID clusterId) {
+    List<UUID> clusterIds = new ArrayList<UUID>();
+    clusterIds.add(clusterId);
+    init(encodedRegionName, tablename, logSeqNum, now, clusterIds);
   }
 
   /**
@@ -123,13 +142,18 @@ public class HLogKey implements Writable
    * @param tablename   - name of table
    * @param logSeqNum   - log sequence number
    * @param now Time at which this edit was written.
-   * @param clusterId of the cluster (used in Replication)
+   * @param clusterIds the clusters that have consumed the change(used in Replication)
    */
   public HLogKey(final byte [] encodedRegionName, final TableName tablename,
-      long logSeqNum, final long now, UUID clusterId) {
+      long logSeqNum, final long now, List<UUID> clusterIds){
+    init(encodedRegionName, tablename, logSeqNum, now, clusterIds);
+  }
+  
+  protected void init(final byte [] encodedRegionName, final TableName tablename,
+      long logSeqNum, final long now, List<UUID> clusterIds) {
     this.logSeqNum = logSeqNum;
     this.writeTime = now;
-    this.clusterId = clusterId;
+    this.clusterIds = clusterIds;
     this.encodedRegionName = encodedRegionName;
     this.tablename = tablename;
   }
@@ -171,14 +195,6 @@ public class HLogKey implements Writable
     return this.writeTime;
   }
 
-  /**
-   * Get the id of the original cluster
-   * @return Cluster id.
-   */
-  public UUID getClusterId() {
-    return clusterId;
-  }
-
   public NavigableMap<byte[], Integer> getScopes() {
     return scopes;
   }
@@ -187,12 +203,47 @@ public class HLogKey implements Writable
     this.scopes = scopes;
   }
 
+  public void readOlderScopes(NavigableMap<byte[], Integer> scopes) {
+    if (scopes != null) {
+      Iterator<Map.Entry<byte[], Integer>> iterator = scopes.entrySet()
+          .iterator();
+      while (iterator.hasNext()) {
+        Map.Entry<byte[], Integer> scope = iterator.next();
+        String key = Bytes.toString(scope.getKey());
+        if (key.startsWith(PREFIX_CLUSTER_KEY)) {
+          addClusterId(UUID.fromString(key.substring(PREFIX_CLUSTER_KEY
+              .length())));
+          iterator.remove();
+        }
+      }
+      if (scopes.size() > 0) {
+        this.scopes = scopes;
+      }
+    }
+  }
+
+  /**
+   * Marks that the cluster with the given clusterId has consumed the change
+   */
+  public void addClusterId(UUID clusterId) {
+    if (!clusterIds.contains(clusterId)) {
+      clusterIds.add(clusterId);
+    }
+  }
+
+  /**
+   * @return the set of cluster Ids that have consumed the change
+   */
+  public List<UUID> getClusterIds() {
+    return clusterIds;
+  }
+
   /**
-   * Set the cluster id of this key.
-   * @param clusterId
+   * @return the cluster id on which the change has originated. It there is no such cluster, it
+   *         returns DEFAULT_CLUSTER_ID (cases where replication is not enabled)
    */
-  public void setClusterId(UUID clusterId) {
-    this.clusterId = clusterId;
+  public UUID getOriginatingClusterId(){
+    return clusterIds.isEmpty() ? HConstants.DEFAULT_CLUSTER_ID : clusterIds.get(0);
   }
 
   @Override
@@ -232,7 +283,6 @@ public class HLogKey implements Writable
     int result = Bytes.hashCode(this.encodedRegionName);
     result ^= this.logSeqNum;
     result ^= this.writeTime;
-    result ^= this.clusterId.hashCode();
     return result;
   }
 
@@ -299,13 +349,16 @@ public class HLogKey implements Writable
     }
     out.writeLong(this.logSeqNum);
     out.writeLong(this.writeTime);
-    // avoid storing 16 bytes when replication is not enabled
-    if (this.clusterId == HConstants.DEFAULT_CLUSTER_ID) {
-      out.writeBoolean(false);
-    } else {
+    // Don't need to write the clusters information as we are using protobufs from 0.95
+    // Writing only the first clusterId for testing the legacy read
+    Iterator<UUID> iterator = clusterIds.iterator();
+    if(iterator.hasNext()){
       out.writeBoolean(true);
-      out.writeLong(this.clusterId.getMostSignificantBits());
-      out.writeLong(this.clusterId.getLeastSignificantBits());
+      UUID clusterId = iterator.next();
+      out.writeLong(clusterId.getMostSignificantBits());
+      out.writeLong(clusterId.getLeastSignificantBits());
+    } else {
+      out.writeBoolean(false);
     }
   }
 
@@ -344,10 +397,13 @@ public class HLogKey implements Writable
 
     this.logSeqNum = in.readLong();
     this.writeTime = in.readLong();
-    this.clusterId = HConstants.DEFAULT_CLUSTER_ID;
+
+    this.clusterIds.clear();
     if (version.atLeast(Version.INITIAL)) {
       if (in.readBoolean()) {
-        this.clusterId = new UUID(in.readLong(), in.readLong());
+        // read the older log
+        // Definitely is the originating cluster
+        clusterIds.add(new UUID(in.readLong(), in.readLong()));
       }
     } else {
       try {
@@ -357,6 +413,7 @@ public class HLogKey implements Writable
         // Means it's a very old key, just continue
       }
     }
+    // Do not need to read the clusters information as we are using protobufs from 0.95
   }
 
   public WALKey.Builder getBuilder(
@@ -373,10 +430,11 @@ public class HLogKey implements Writable
     }
     builder.setLogSequenceNumber(this.logSeqNum);
     builder.setWriteTime(writeTime);
-    if (this.clusterId != HConstants.DEFAULT_CLUSTER_ID) {
-      builder.setClusterId(HBaseProtos.UUID.newBuilder()
-          .setLeastSigBits(this.clusterId.getLeastSignificantBits())
-          .setMostSigBits(this.clusterId.getMostSignificantBits()));
+    HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
+    for (UUID clusterId : clusterIds) {
+      uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits());
+      uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
+      builder.addClusterIds(uuidBuilder.build());
     }
     if (scopes != null) {
       for (Map.Entry<byte[], Integer> e : scopes.entrySet()) {
@@ -401,10 +459,15 @@ public class HLogKey implements Writable
       this.encodedRegionName = walKey.getEncodedRegionName().toByteArray();
       this.tablename = TableName.valueOf(walKey.getTableName().toByteArray());
     }
-    this.clusterId = HConstants.DEFAULT_CLUSTER_ID;
+    clusterIds.clear();
     if (walKey.hasClusterId()) {
-      this.clusterId = new UUID(
-          walKey.getClusterId().getMostSigBits(), walKey.getClusterId().getLeastSigBits());
+      //When we are reading the older log (0.95.1 release)
+      //This is definitely the originating cluster
+      clusterIds.add(new UUID(walKey.getClusterId().getMostSigBits(), walKey.getClusterId()
+          .getLeastSigBits()));
+    }
+    for (HBaseProtos.UUID clusterId : walKey.getClusterIdsList()) {
+      clusterIds.add(new UUID(clusterId.getMostSigBits(), clusterId.getLeastSigBits()));
     }
     this.scopes = null;
     if (walKey.getScopesCount() > 0) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Wed Aug 28 19:32:00 2013
@@ -22,8 +22,6 @@ import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
 import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -37,7 +35,6 @@ import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.Future;
@@ -1484,11 +1481,11 @@ public class HLogSplitter {
 
             if (kv.isDelete()) {
               del = new Delete(kv.getRow());
-              del.setClusterId(entry.getKey().getClusterId());
+              del.setClusterIds(entry.getKey().getClusterIds());
               preRow = del;
             } else {
               put = new Put(kv.getRow());
-              put.setClusterId(entry.getKey().getClusterId());
+              put.setClusterIds(entry.getKey().getClusterIds());
               preRow = put;
             }
             preKey = loc.getHostnamePort() + KEY_DELIMITER + table;

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java Wed Aug 28 19:32:00 2013
@@ -217,7 +217,7 @@ public class SequenceFileLogReader exten
       // Scopes are probably in WAL edit, move to key
       NavigableMap<byte[], Integer> scopes = e.getEdit().getAndRemoveScopes();
       if (scopes != null) {
-        e.getKey().setScopes(scopes);
+        e.getKey().readOlderScopes(scopes);
       }
       return true;
     } catch (IOException ioe) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java Wed Aug 28 19:32:00 2013
@@ -116,13 +116,13 @@ public class ReplicationSink {
       long totalReplicated = 0;
       // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per
       // invocation of this method per table and cluster id.
-      Map<TableName, Map<UUID,List<Row>>> rowMap = new TreeMap<TableName, Map<UUID,List<Row>>>();
+      Map<TableName, Map<List<UUID>, List<Row>>> rowMap =
+          new TreeMap<TableName, Map<List<UUID>, List<Row>>>();
       for (WALEntry entry : entries) {
         TableName table =
             TableName.valueOf(entry.getKey().getTableName().toByteArray());
         Cell previousCell = null;
         Mutation m = null;
-        java.util.UUID uuid = toUUID(entry.getKey().getClusterId());
         int count = entry.getAssociatedCellCount();
         for (int i = 0; i < count; i++) {
           // Throw index out of bounds if our cell count is off
@@ -135,8 +135,12 @@ public class ReplicationSink {
             m = CellUtil.isDelete(cell)?
               new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()):
               new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
-            m.setClusterId(uuid);
-            addToHashMultiMap(rowMap, table, uuid, m);
+            List<UUID> clusterIds = new ArrayList<UUID>();
+            for(HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()){
+              clusterIds.add(toUUID(clusterId));
+            }
+            m.setClusterIds(clusterIds);
+            addToHashMultiMap(rowMap, table, clusterIds, m);
           }
           if (CellUtil.isDelete(cell)) {
             ((Delete)m).addDeleteMarker(KeyValueUtil.ensureKeyValue(cell));
@@ -147,7 +151,7 @@ public class ReplicationSink {
         }
         totalReplicated++;
       }
-      for (Entry<TableName, Map<UUID,List<Row>>> entry : rowMap.entrySet()) {
+      for (Entry<TableName, Map<List<UUID>,List<Row>>> entry : rowMap.entrySet()) {
         batch(entry.getKey(), entry.getValue().values());
       }
       int size = entries.size();
@@ -181,7 +185,7 @@ public class ReplicationSink {
    * @param key1
    * @param key2
    * @param value
-   * @return
+   * @return the list of values corresponding to key1 and key2
    */
   private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2,List<V>>> map, K1 key1, K2 key2, V value) {
     Map<K2,List<V>> innerMap = map.get(key1);

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Wed Aug 28 19:32:00 2013
@@ -38,7 +38,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.TableName;
@@ -55,10 +54,8 @@ import org.apache.hadoop.hbase.replicati
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.ipc.RemoteException;
-import org.apache.zookeeper.KeeperException;
 
 /**
  * Class that handles the source of a replication stream.
@@ -395,20 +392,15 @@ public class ReplicationSource extends T
       seenEntries++;
       // Remove all KVs that should not be replicated
       HLogKey logKey = entry.getKey();
-      // don't replicate if the log entries originated in the peer
-      if (!logKey.getClusterId().equals(peerClusterId)) {
+      // don't replicate if the log entries have already been consumed by the cluster
+      if (!logKey.getClusterIds().contains(peerClusterId)) {
         removeNonReplicableEdits(entry);
         // Don't replicate catalog entries, if the WALEdit wasn't
         // containing anything to replicate and if we're currently not set to replicate
         if (!logKey.getTablename().equals(TableName.META_TABLE_NAME) &&
             edit.size() != 0) {
-          // Only set the clusterId if is a local key.
-          // This ensures that the originator sets the cluster id
-          // and all replicas retain the initial cluster id.
-          // This is *only* place where a cluster id other than the default is set.
-          if (HConstants.DEFAULT_CLUSTER_ID == logKey.getClusterId()) {
-            logKey.setClusterId(this.clusterId);
-          }
+          //Mark that the current cluster has the change
+          logKey.addClusterId(clusterId);
           currentNbOperations += countDistinctRowKeys(edit);
           currentNbEntries++;
           currentSize += entry.getEdit().size();
@@ -817,4 +809,4 @@ public class ReplicationSource extends T
       ", currently replicating from: " + this.currentPath +
       " at position: " + position;
   }
-}
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotLogSplitter.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotLogSplitter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotLogSplitter.java Wed Aug 28 19:32:00 2013
@@ -172,7 +172,7 @@ class SnapshotLogSplitter implements Clo
 
         // Append Entry
         key = new HLogKey(newRegionName, tableName,
-                          key.getLogSeqNum(), key.getWriteTime(), key.getClusterId());
+                          key.getLogSeqNum(), key.getWriteTime(), key.getClusterIds());
         writer.append(new HLog.Entry(key, entry.getEdit()));
       }
     } catch (IOException e) {

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Wed Aug 28 19:32:00 2013
@@ -1231,7 +1231,7 @@ public class TestHRegion extends HBaseTe
         NavigableMap<byte[], List<Cell>> deleteMap =
           new TreeMap<byte[], List<Cell>>(Bytes.BYTES_COMPARATOR);
         deleteMap.put(family, kvs);
-        region.delete(deleteMap, HConstants.DEFAULT_CLUSTER_ID, Durability.SYNC_WAL);
+        region.delete(deleteMap, Durability.SYNC_WAL);
       } catch (Exception e) {
         assertTrue("Family " +new String(family)+ " does not exist", false);
       }
@@ -1243,7 +1243,7 @@ public class TestHRegion extends HBaseTe
         NavigableMap<byte[], List<Cell>> deleteMap =
           new TreeMap<byte[], List<Cell>>(Bytes.BYTES_COMPARATOR);
         deleteMap.put(family, kvs);
-        region.delete(deleteMap, HConstants.DEFAULT_CLUSTER_ID, Durability.SYNC_WAL);
+        region.delete(deleteMap, Durability.SYNC_WAL);
       } catch (Exception e) {
         ok = true;
       }
@@ -1571,7 +1571,7 @@ public class TestHRegion extends HBaseTe
       NavigableMap<byte[], List<Cell>> deleteMap =
         new TreeMap<byte[], List<Cell>>(Bytes.BYTES_COMPARATOR);
       deleteMap.put(fam1, kvs);
-      region.delete(deleteMap, HConstants.DEFAULT_CLUSTER_ID, Durability.SYNC_WAL);
+      region.delete(deleteMap, Durability.SYNC_WAL);
 
       // extract the key values out the memstore:
       // This is kinda hacky, but better than nothing...
@@ -3853,7 +3853,7 @@ public class TestHRegion extends HBaseTe
     //verify append called or not
     verify(log, expectAppend ? times(1) : never())
       .appendNoSync((HRegionInfo)any(), eq(tableName),
-        (WALEdit)any(), (UUID)any(), anyLong(), (HTableDescriptor)any());
+        (WALEdit)any(), (List<UUID>)any(), anyLong(), (HTableDescriptor)any());
 
     //verify sync called or not
     if (expectSync || expectSyncFromLogSyncer) {

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java?rev=1518335&r1=1518334&r2=1518335&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java Wed Aug 28 19:32:00 2013
@@ -19,9 +19,11 @@
 package org.apache.hadoop.hbase.regionserver.wal;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.UUID;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -34,7 +36,6 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
@@ -103,7 +104,7 @@ public final class HLogPerformanceEvalua
           HRegionInfo hri = region.getRegionInfo();
           if (this.noSync) {
             hlog.appendNoSync(hri, hri.getTableName(), walEdit,
-                              HConstants.DEFAULT_CLUSTER_ID, now, htd);
+                              new ArrayList<UUID>(), now, htd);
           } else {
             hlog.append(hri, hri.getTableName(), walEdit, now, htd);
           }