You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2013/08/29 00:58:55 UTC

svn commit: r1518410 - in /hbase/branches/0.94/src: main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/wal/ main/java/org/apache/hadoop/hbase/replication/regionserver...

Author: larsh
Date: Wed Aug 28 22:58:55 2013
New Revision: 1518410

URL: http://svn.apache.org/r1518410
Log:
HBASE-7709 Infinite loop possible in Master/Master replication (Vasu Mariyala)

Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/Mutation.java?rev=1518410&r1=1518409&r2=1518410&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/Mutation.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/Mutation.java Wed Aug 28 22:58:55 2013
@@ -33,12 +33,20 @@ import org.apache.hadoop.hbase.HConstant
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.util.Bytes;
 
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+
 public abstract class Mutation extends OperationWithAttributes implements Row {
   private static final Log LOG = LogFactory.getLog(Mutation.class);
   // Attribute used in Mutations to indicate the originating cluster.
   private static final String CLUSTER_ID_ATTR = "_c.id_";
   private static final String DURABILITY_ID_ATTR = "_dur_";
 
+  /**
+   * The attribute for storing the list of clusters that have consumed the change.
+   */
+  private static final String CONSUMED_CLUSTER_IDS = "_cs.id";
   protected byte [] row = null;
   protected long ts = HConstants.LATEST_TIMESTAMP;
   protected long lockId = -1L;
@@ -243,6 +251,36 @@ public abstract class Mutation extends O
   }
 
   /**
+   * Marks that the clusters with the given clusterIds have consumed the mutation
+   * @param clusterIds of the clusters that have consumed the mutation
+   */
+  public void setClusterIds(List<UUID> clusterIds) {
+    ByteArrayDataOutput out = ByteStreams.newDataOutput();
+    out.writeInt(clusterIds.size());
+    for (UUID clusterId : clusterIds) {
+      out.writeLong(clusterId.getMostSignificantBits());
+      out.writeLong(clusterId.getLeastSignificantBits());
+    }
+    setAttribute(CONSUMED_CLUSTER_IDS, out.toByteArray());
+  }
+
+  /**
+   * @return the set of cluster Ids that have consumed the mutation
+   */
+  public List<UUID> getClusterIds() {
+    List<UUID> clusterIds = new ArrayList<UUID>();
+    byte[] bytes = getAttribute(CONSUMED_CLUSTER_IDS);
+    if(bytes != null) {
+      ByteArrayDataInput in = ByteStreams.newDataInput(bytes);
+      int numClusters = in.readInt();
+      for(int i=0; i<numClusters; i++){
+        clusterIds.add(new UUID(in.readLong(), in.readLong()));
+      }
+    }
+    return clusterIds;
+  }
+
+  /**
    * @return the total number of KeyValues
    */
   public int size() {

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1518410&r1=1518409&r2=1518410&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Aug 28 22:58:55 2013
@@ -1995,6 +1995,7 @@ public class HRegion implements HeapSize
         // bunch up all edits across all column families into a
         // single WALEdit.
         addFamilyMapToWALEdit(familyMap, walEdit);
+        walEdit.addClusterIds(delete.getClusterIds());
         this.log.append(regionInfo, this.htableDescriptor.getName(),
             walEdit, clusterId, now, this.htableDescriptor);
       }
@@ -2448,6 +2449,7 @@ public class HRegion implements HeapSize
       // STEP 5. Append the edit to WAL. Do not sync wal.
       // -------------------------
       Mutation first = batchOp.operations[firstIndex].getFirst();
+      walEdit.addClusterIds(first.getClusterIds());
       txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(),
                walEdit, first.getClusterId(), now, this.htableDescriptor);
 
@@ -2904,6 +2906,7 @@ public class HRegion implements HeapSize
       // will contain uncommitted transactions.
       if (writeToWAL) {
         addFamilyMapToWALEdit(familyMap, walEdit);
+        walEdit.addClusterIds(put.getClusterIds());
         this.log.append(regionInfo, this.htableDescriptor.getName(),
             walEdit, clusterId, now, this.htableDescriptor);
       } else {

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java?rev=1518410&r1=1518409&r2=1518410&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java Wed Aug 28 22:58:55 2013
@@ -28,6 +28,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.NavigableMap;
 import java.util.TreeMap;
+import java.util.UUID;
 
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.codec.Decoder;
@@ -73,10 +74,29 @@ import org.apache.hadoop.io.Writable;
  */
 public class WALEdit implements Writable, HeapSize {
 
+  /*
+   * The cluster id of the cluster which has consumed the change represented by this class is
+   * prefixed with the value of this variable while storing in the scopes variable. This is to
+   * ensure that the cluster ids don't interfere with the column family replication settings stored
+   * in the scopes. The value is chosen to start with period as the column families can't start with
+   * it.
+   */
+  private static final String PREFIX_CLUSTER_KEY = ".";
   private final int VERSION_2 = -1;
 
   private final ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
-  private NavigableMap<byte[], Integer> scopes;
+
+  /**
+   * This variable contains the information of the column family replication settings and contains
+   * the clusters that have already consumed the change represented by the object. This overloading
+   * of scopes with the consumed clusterids was introduced while porting the fix for HBASE-7709 back
+   * to 0.94 release. However, this overloading has been removed in the newer releases(0.95.2+). To
+   * check/change the column family settings, please use the getFromScope and putIntoScope methods
+   * and for marking/checking if a cluster has consumed the change, please use addCluster,
+   * addClusters and getClusters methods.
+   */
+  private final NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+      Bytes.BYTES_COMPARATOR);
 
   // default to decoding uncompressed data - needed for replication, which enforces that
   // uncompressed edits are sent across the wire. In the regular case (reading/writing WAL), the
@@ -116,22 +136,58 @@ public class WALEdit implements Writable
     return kvs;
   }
 
-  public NavigableMap<byte[], Integer> getScopes() {
-    return scopes;
+  public Integer getFromScope(byte[] key) {
+    return scopes.get(key);
   }
 
+  public void putIntoScope(byte[] key, Integer value) {
+    scopes.put(key, value);
+  }
 
-  public void setScopes (NavigableMap<byte[], Integer> scopes) {
-    // We currently process the map outside of WALEdit,
-    // TODO revisit when replication is part of core
-    this.scopes = scopes;
+  public boolean hasKeyInScope(byte[] key) {
+    return scopes.containsKey(key);
+  }
+
+  /**
+   * @return true if the cluster with the given clusterId has consumed the change.
+   */
+  public boolean hasClusterId(UUID clusterId) {
+    return hasKeyInScope(Bytes.toBytes(PREFIX_CLUSTER_KEY + clusterId.toString()));
+  }
+
+  /**
+   * Marks that the cluster with the given clusterId has consumed the change.
+   */
+  public void addClusterId(UUID clusterId) {
+    scopes.put(Bytes.toBytes(PREFIX_CLUSTER_KEY + clusterId.toString()), 1);
+  }
+
+  /**
+   * Marks that the clusters with the given clusterIds have consumed the change.
+   */
+  public void addClusterIds(List<UUID> clusterIds) {
+    for (UUID clusterId : clusterIds) {
+      addClusterId(clusterId);
+    }
+  }
+
+  /**
+   * @return the set of cluster Ids that have consumed the change.
+   */
+  public List<UUID> getClusterIds() {
+    List<UUID> clusterIds = new ArrayList<UUID>();
+    for (byte[] keyBytes : scopes.keySet()) {
+      String key = Bytes.toString(keyBytes);
+      if (key.startsWith(PREFIX_CLUSTER_KEY)) {
+        clusterIds.add(UUID.fromString(key.substring(PREFIX_CLUSTER_KEY.length())));
+      }
+    }
+    return clusterIds;
   }
 
   public void readFields(DataInput in) throws IOException {
     kvs.clear();
-    if (scopes != null) {
-      scopes.clear();
-    }
+    scopes.clear();
     Decoder decoder = this.codec.getDecoder((DataInputStream) in);
     int versionOrLength = in.readInt();
     int length = versionOrLength;
@@ -148,15 +204,12 @@ public class WALEdit implements Writable
 
     //its a new style WAL, so we need replication scopes too
     if (versionOrLength == VERSION_2) {
-      int numFamilies = in.readInt();
-      if (numFamilies > 0) {
-        if (scopes == null) {
-          scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
-        }
-        for (int i = 0; i < numFamilies; i++) {
-          byte[] fam = Bytes.readByteArray(in);
+      int numEntries = in.readInt();
+      if (numEntries > 0) {
+        for (int i = 0; i < numEntries; i++) {
+          byte[] key = Bytes.readByteArray(in);
           int scope = in.readInt();
-          scopes.put(fam, scope);
+          scopes.put(key, scope);
         }
       }
     }
@@ -173,14 +226,10 @@ public class WALEdit implements Writable
     }
     kvEncoder.flush();
 
-    if (scopes == null) {
-      out.writeInt(0);
-    } else {
-      out.writeInt(scopes.size());
-      for (byte[] key : scopes.keySet()) {
-        Bytes.writeByteArray(out, key);
-        out.writeInt(scopes.get(key));
-      }
+    out.writeInt(scopes.size());
+    for (byte[] key : scopes.keySet()) {
+      Bytes.writeByteArray(out, key);
+      out.writeInt(scopes.get(key));
     }
   }
 
@@ -189,11 +238,9 @@ public class WALEdit implements Writable
     for (KeyValue kv : kvs) {
       ret += kv.heapSize();
     }
-    if (scopes != null) {
-      ret += ClassSize.TREEMAP;
-      ret += ClassSize.align(scopes.size() * ClassSize.MAP_ENTRY);
-      // TODO this isn't quite right, need help here
-    }
+    ret += ClassSize.TREEMAP;
+    ret += ClassSize.align(scopes.size() * ClassSize.MAP_ENTRY);
+    // TODO this isn't quite right, need help here
     return ret;
   }
 
@@ -205,9 +252,7 @@ public class WALEdit implements Writable
       sb.append(kv.toString());
       sb.append("; ");
     }
-    if (scopes != null) {
-      sb.append(" scopes: " + scopes.toString());
-    }
+    sb.append(" scopes: " + scopes.toString());
     sb.append(">]");
     return sb.toString();
   }

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=1518410&r1=1518409&r2=1518410&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java Wed Aug 28 22:58:55 2013
@@ -172,20 +172,15 @@ public class Replication implements WALA
   @Override
   public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey,
                                        WALEdit logEdit) {
-    NavigableMap<byte[], Integer> scopes =
-        new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
     byte[] family;
     for (KeyValue kv : logEdit.getKeyValues()) {
       family = kv.getFamily();
       int scope = htd.getFamily(family).getScope();
       if (scope != REPLICATION_SCOPE_LOCAL &&
-          !scopes.containsKey(family)) {
-        scopes.put(family, scope);
+          !logEdit.hasKeyInScope(family)) {
+        logEdit.putIntoScope(family, scope);
       }
     }
-    if (!scopes.isEmpty()) {
-      logEdit.setScopes(scopes);
-    }
   }
 
   @Override

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java?rev=1518410&r1=1518409&r2=1518410&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java Wed Aug 28 22:58:55 2013
@@ -110,9 +110,11 @@ public class ReplicationSink {
     // to the same table.
     try {
       long totalReplicated = 0;
-      // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per
-      // invocation of this method per table and cluster id.
-      Map<byte[], Map<UUID,List<Row>>> rowMap = new TreeMap<byte[], Map<UUID,List<Row>>>(Bytes.BYTES_COMPARATOR);
+      // Map of table => list of Rows, grouped by clusters that consumed the change, we only want to
+      // flushCommits once per
+      // invocation of this method per table and clusters that have consumed the change.
+      Map<byte[], Map<List<UUID>, List<Row>>> rowMap =
+          new TreeMap<byte[], Map<List<UUID>, List<Row>>>(Bytes.BYTES_COMPARATOR);
       for (HLog.Entry entry : entries) {
         WALEdit edit = entry.getEdit();
         byte[] table = entry.getKey().getTablename();
@@ -123,14 +125,19 @@ public class ReplicationSink {
         for (KeyValue kv : kvs) {
           if (lastKV == null || lastKV.getType() != kv.getType() || !lastKV.matchingRow(kv)) {
             UUID clusterId = entry.getKey().getClusterId();
+            List<UUID> clusterIds = edit.getClusterIds();
             if (kv.isDelete()) {
               del = new Delete(kv.getRow());
               del.setClusterId(clusterId);
-              addToHashMultiMap(rowMap, table, clusterId, del);
+              del.setClusterIds(clusterIds);
+              clusterIds.add(clusterId);
+              addToHashMultiMap(rowMap, table, clusterIds, del);
             } else {
               put = new Put(kv.getRow());
               put.setClusterId(clusterId);
-              addToHashMultiMap(rowMap, table, clusterId, put);
+              put.setClusterIds(clusterIds);
+              clusterIds.add(clusterId);
+              addToHashMultiMap(rowMap, table, clusterIds, put);
             }
           }
           if (kv.isDelete()) {
@@ -142,7 +149,7 @@ public class ReplicationSink {
         }
         totalReplicated++;
       }
-      for(Map.Entry<byte[], Map<UUID, List<Row>>> entry : rowMap.entrySet()) {
+      for(Map.Entry<byte[], Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) {
         batch(entry.getKey(), entry.getValue().values());
       }
       this.metrics.setAgeOfLastAppliedOp(
@@ -162,7 +169,7 @@ public class ReplicationSink {
    * @param key1
    * @param key2
    * @param value
-   * @return
+   * @return the list of values for the combination of key1 and key2
    */
   private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2,List<V>>> map, K1 key1, K2 key2, V value) {
     Map<K2,List<V>> innerMap = map.get(key1);

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1518410&r1=1518409&r2=1518410&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Wed Aug 28 22:58:55 2013
@@ -30,7 +30,6 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
-import java.util.NavigableMap;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
@@ -59,7 +58,6 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.zookeeper.ClusterId;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.zookeeper.KeeperException;
 
@@ -477,8 +475,14 @@ public class ReplicationSource extends T
       seenEntries++;
       // Remove all KVs that should not be replicated
       HLogKey logKey = entry.getKey();
-      // don't replicate if the log entries originated in the peer
-      if (!logKey.getClusterId().equals(peerClusterId)) {
+      List<UUID> consumedClusterIds = edit.getClusterIds();
+      // This cluster id has been added to resolve the scenario of A -> B -> A where A has old
+      // point release and B has the new point release which has the fix HBASE-7709. A change on
+      // cluster A would infinitely replicate to
+      // cluster B if we don't add the original cluster id to the set.
+      consumedClusterIds.add(logKey.getClusterId());
+      // don't replicate if the log entries if it has not already been replicated
+      if (!consumedClusterIds.contains(peerClusterId)) {
         removeNonReplicableEdits(edit);
         // Don't replicate catalog entries, if the WALEdit wasn't
         // containing anything to replicate and if we're currently not set to replicate
@@ -491,6 +495,8 @@ public class ReplicationSource extends T
           // This is *only* place where a cluster id other than the default is set.
           if (HConstants.DEFAULT_CLUSTER_ID == logKey.getClusterId()) {
             logKey.setClusterId(this.clusterId);
+          } else if (logKey.getClusterId() != this.clusterId) {
+            edit.addClusterId(clusterId);
           }
           currentNbOperations += countDistinctRowKeys(edit);
           currentNbEntries++;
@@ -664,13 +670,12 @@ public class ReplicationSource extends T
    * @param edit The KV to check for replication
    */
   protected void removeNonReplicableEdits(WALEdit edit) {
-    NavigableMap<byte[], Integer> scopes = edit.getScopes();
     List<KeyValue> kvs = edit.getKeyValues();
     for (int i = edit.size()-1; i >= 0; i--) {
       KeyValue kv = kvs.get(i);
       // The scope will be null or empty if
       // there's nothing to replicate in that WALEdit
-      if (scopes == null || !scopes.containsKey(kv.getFamily())) {
+      if (!edit.hasKeyInScope(kv.getFamily())) {
         kvs.remove(i);
       }
     }
@@ -928,4 +933,4 @@ public class ReplicationSource extends T
       return Long.parseLong(parts[parts.length-1]);
     }
   }
-}
+}
\ No newline at end of file

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java?rev=1518410&r1=1518409&r2=1518410&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java Wed Aug 28 22:58:55 2013
@@ -23,8 +23,10 @@ import static org.junit.Assert.assertArr
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
+import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -55,15 +57,11 @@ public class TestMasterReplication {
 
   private static final Log LOG = LogFactory.getLog(TestReplicationBase.class);
 
-  private Configuration conf1;
-  private Configuration conf2;
-  private Configuration conf3;
-
-  private HBaseTestingUtility utility1;
-  private HBaseTestingUtility utility2;
-  private HBaseTestingUtility utility3;
-  
-  private MiniZooKeeperCluster miniZK; 
+  private Configuration baseConfiguration;
+
+  private HBaseTestingUtility[] utilities;
+  private Configuration[] configurations;
+  private MiniZooKeeperCluster miniZK;
 
   private static final long SLEEP_TIME = 500;
   private static final int NB_RETRIES = 10;
@@ -85,44 +83,21 @@ public class TestMasterReplication {
 
   @Before
   public void setUp() throws Exception {
-    conf1 = HBaseConfiguration.create();
-    conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
+    baseConfiguration = HBaseConfiguration.create();
     // smaller block size and capacity to trigger more operations
     // and test them
-    conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20);
-    conf1.setInt("replication.source.size.capacity", 1024);
-    conf1.setLong("replication.source.sleepforretries", 100);
-    conf1.setInt("hbase.regionserver.maxlogs", 10);
-    conf1.setLong("hbase.master.logcleaner.ttl", 10);
-    conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
-    conf1.setBoolean("dfs.support.append", true);
-    conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
-    conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
+    baseConfiguration.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20);
+    baseConfiguration.setInt("replication.source.size.capacity", 1024);
+    baseConfiguration.setLong("replication.source.sleepforretries", 100);
+    baseConfiguration.setInt("hbase.regionserver.maxlogs", 10);
+    baseConfiguration.setLong("hbase.master.logcleaner.ttl", 10);
+    baseConfiguration.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
+    baseConfiguration.setBoolean("dfs.support.append", true);
+    baseConfiguration.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
+    baseConfiguration.setStrings(
+        CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
         CoprocessorCounter.class.getName());
 
-    utility1 = new HBaseTestingUtility(conf1);
-    utility1.startMiniZKCluster();
-    miniZK = utility1.getZkCluster();
-    // By setting the mini ZK cluster through this method, even though this is
-    // already utility1's mini ZK cluster, we are telling utility1 not to shut
-    // the mini ZK cluster when we shut down the HBase cluster.
-    utility1.setZkCluster(miniZK);
-    new ZooKeeperWatcher(conf1, "cluster1", null, true);
-
-    conf2 = new Configuration(conf1);
-    conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
-
-    utility2 = new HBaseTestingUtility(conf2);
-    utility2.setZkCluster(miniZK);
-    new ZooKeeperWatcher(conf2, "cluster2", null, true);
-
-    conf3 = new Configuration(conf1);
-    conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3");
-
-    utility3 = new HBaseTestingUtility(conf3);
-    utility3.setZkCluster(miniZK);
-    new ZooKeeperWatcher(conf3, "cluster3", null, true);
-
     table = new HTableDescriptor(tableName);
     HColumnDescriptor fam = new HColumnDescriptor(famName);
     fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
@@ -131,181 +106,300 @@ public class TestMasterReplication {
     table.addFamily(fam);
   }
 
-  @After
-  public void tearDown() throws IOException {
-    miniZK.shutdown();
+  /**
+   * It tests the replication scenario involving 0 -> 1 -> 0. It does it by
+   * adding and deleting a row to a table in each cluster, checking if it's
+   * replicated. It also tests that the puts and deletes are not replicated back
+   * to the originating cluster.
+   */
+  @Test(timeout = 300000)
+  public void testCyclicReplication1() throws Exception {
+    LOG.info("testSimplePutDelete");
+    int numClusters = 2;
+    HTable[] htables = null;
+    try {
+      startMiniClusters(numClusters);
+      createTableOnClusters(table);
+
+      htables = getHTablesOnClusters(tableName);
+
+      // Test the replication scenarios of 0 -> 1 -> 0
+      addPeer("1", 0, 1);
+      addPeer("1", 1, 0);
+
+      int[] expectedCounts = new int[] { 2, 2 };
+
+      // add rows to both clusters,
+      // make sure they are both replication
+      putAndWait(row, famName, htables[0], htables[1]);
+      putAndWait(row1, famName, htables[1], htables[0]);
+      validateCounts(htables, put, expectedCounts);
+
+      deleteAndWait(row, htables[0], htables[1]);
+      deleteAndWait(row1, htables[1], htables[0]);
+      validateCounts(htables, delete, expectedCounts);
+    } finally {
+      close(htables);
+      shutDownMiniClusters();
+    }
   }
 
-  @Test(timeout=300000)
-  public void testCyclicReplication() throws Exception {
-    LOG.info("testCyclicReplication");
-    utility1.startMiniCluster();
-    utility2.startMiniCluster();
-    utility3.startMiniCluster();
-    ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
-    ReplicationAdmin admin2 = new ReplicationAdmin(conf2);
-    ReplicationAdmin admin3 = new ReplicationAdmin(conf3);
-
-    new HBaseAdmin(conf1).createTable(table);
-    new HBaseAdmin(conf2).createTable(table);
-    new HBaseAdmin(conf3).createTable(table);
-    HTable htable1 = new HTable(conf1, tableName);
-    htable1.setWriteBufferSize(1024);
-    HTable htable2 = new HTable(conf2, tableName);
-    htable2.setWriteBufferSize(1024);
-    HTable htable3 = new HTable(conf3, tableName);
-    htable3.setWriteBufferSize(1024);
-    
-    admin1.addPeer("1", utility2.getClusterKey());
-    admin2.addPeer("1", utility3.getClusterKey());
-    admin3.addPeer("1", utility1.getClusterKey());
-
-    // put "row" and wait 'til it got around
-    putAndWait(row, famName, htable1, htable3);
-    // it should have passed through table2
-    check(row,famName,htable2);
-
-    putAndWait(row1, famName, htable2, htable1);
-    check(row,famName,htable3);
-    putAndWait(row2, famName, htable3, htable2);
-    check(row,famName,htable1);
-    
-    deleteAndWait(row,htable1,htable3);
-    deleteAndWait(row1,htable2,htable1);
-    deleteAndWait(row2,htable3,htable2);
-
-    assertEquals("Puts were replicated back ", 3, getCount(htable1, put));
-    assertEquals("Puts were replicated back ", 3, getCount(htable2, put));
-    assertEquals("Puts were replicated back ", 3, getCount(htable3, put));
-    assertEquals("Deletes were replicated back ", 3, getCount(htable1, delete));
-    assertEquals("Deletes were replicated back ", 3, getCount(htable2, delete));
-    assertEquals("Deletes were replicated back ", 3, getCount(htable3, delete));
-
-    // Test HBASE-9158
-    admin2.disablePeer("1");
-    // we now have an edit that was replicated into cluster originating from cluster 1
-    putAndWait(row3, famName, htable1, htable2);
-    // now add a local edit to cluster 2
-    Put put = new Put(row4);
-    put.add(famName, row4, row4);
-    htable2.put(put);
-    // reenable replication from cluster 2 to cluster 3
-    admin2.enablePeer("1");
-    // without HBASE-9158 the edit for row4 would have been marked with cluster 1's id
-    // and hence not replicated to cluster 1
-    wait(row4, htable1);
-    
-    utility3.shutdownMiniCluster();
-    utility2.shutdownMiniCluster();
-    utility1.shutdownMiniCluster();
+  /**
+   * Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and
+   * deleting rows to a table in each clusters and ensuring that the each of
+   * these clusters get the appropriate mutations. It also tests the grouping
+   * scenario where a cluster needs to replicate the edits originating from
+   * itself and also the edits that it received using replication from a
+   * different cluster. The scenario is explained in HBASE-9158
+   */
+  @Test(timeout = 300000)
+  public void testCyclicReplication2() throws Exception {
+    LOG.info("testCyclicReplication1");
+    int numClusters = 3;
+    HTable[] htables = null;
+    try {
+      startMiniClusters(numClusters);
+      createTableOnClusters(table);
+
+      // Test the replication scenario of 0 -> 1 -> 2 -> 0
+      addPeer("1", 0, 1);
+      addPeer("1", 1, 2);
+      addPeer("1", 2, 0);
+
+      htables = getHTablesOnClusters(tableName);
+
+      // put "row" and wait 'til it got around
+      putAndWait(row, famName, htables[0], htables[2]);
+      putAndWait(row1, famName, htables[1], htables[0]);
+      putAndWait(row2, famName, htables[2], htables[1]);
+
+      deleteAndWait(row, htables[0], htables[2]);
+      deleteAndWait(row1, htables[1], htables[0]);
+      deleteAndWait(row2, htables[2], htables[1]);
+
+      int[] expectedCounts = new int[] { 3, 3, 3 };
+      validateCounts(htables, put, expectedCounts);
+      validateCounts(htables, delete, expectedCounts);
+
+      // Test HBASE-9158
+      disablePeer("1", 2);
+      // we now have an edit that was replicated into cluster originating from
+      // cluster 0
+      putAndWait(row3, famName, htables[0], htables[1]);
+      // now add a local edit to cluster 1
+      htables[1].put(new Put(row4).add(famName, row4, row4));
+      // re-enable replication from cluster 2 to cluster 0
+      enablePeer("1", 2);
+      // without HBASE-9158 the edit for row4 would have been marked with
+      // cluster 0's id
+      // and hence not replicated to cluster 0
+      wait(row4, htables[0], true);
+    } finally {
+      close(htables);
+      shutDownMiniClusters();
+    }
   }
 
   /**
-   * Add a row to a table in each cluster, check it's replicated,
-   * delete it, check's gone
-   * Also check the puts and deletes are not replicated back to
-   * the originating cluster.
+   * Tests cyclic replication scenario of 0 -> 1 -> 2 -> 1.
    */
-  @Test(timeout=300000)
-  public void testSimplePutDelete() throws Exception {
-    LOG.info("testSimplePutDelete");
-    utility1.startMiniCluster();
-    utility2.startMiniCluster();
+  @Test(timeout = 300000)
+  public void testCyclicReplication3() throws Exception {
+    LOG.info("testCyclicReplication2");
+    int numClusters = 3;
+    HTable[] htables = null;
+    try {
+      startMiniClusters(numClusters);
+      createTableOnClusters(table);
+
+      // Test the replication scenario of 0 -> 1 -> 2 -> 1
+      addPeer("1", 0, 1);
+      addPeer("1", 1, 2);
+      addPeer("1", 2, 1);
+
+      htables = getHTablesOnClusters(tableName);
+
+      // put "row" and wait 'til it got around
+      putAndWait(row, famName, htables[0], htables[2]);
+      putAndWait(row1, famName, htables[1], htables[2]);
+      putAndWait(row2, famName, htables[2], htables[1]);
+
+      deleteAndWait(row, htables[0], htables[2]);
+      deleteAndWait(row1, htables[1], htables[2]);
+      deleteAndWait(row2, htables[2], htables[1]);
+
+      int[] expectedCounts = new int[] { 1, 3, 3 };
+      validateCounts(htables, put, expectedCounts);
+      validateCounts(htables, delete, expectedCounts);
+    } finally {
+      close(htables);
+      shutDownMiniClusters();
+    }
+  }
 
-    ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
-    ReplicationAdmin admin2 = new ReplicationAdmin(conf2);
+  @After
+  public void tearDown() throws IOException {
+    configurations = null;
+    utilities = null;
+  }
+
+  @SuppressWarnings("resource")
+  private void startMiniClusters(int numClusters) throws Exception {
+    Random random = new Random();
+    utilities = new HBaseTestingUtility[numClusters];
+    configurations = new Configuration[numClusters];
+    for (int i = 0; i < numClusters; i++) {
+      Configuration conf = new Configuration(baseConfiguration);
+      conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/" + i + random.nextInt());
+      HBaseTestingUtility utility = new HBaseTestingUtility(conf);
+      if (i == 0) {
+        utility.startMiniZKCluster();
+        miniZK = utility.getZkCluster();
+      } else {
+        utility.setZkCluster(miniZK);
+      }
+      utility.startMiniCluster();
+      utilities[i] = utility;
+      configurations[i] = conf;
+      new ZooKeeperWatcher(conf, "cluster" + i, null, true);
+    }
+  }
 
-    new HBaseAdmin(conf1).createTable(table);
-    new HBaseAdmin(conf2).createTable(table);
-    HTable htable1 = new HTable(conf1, tableName);
-    htable1.setWriteBufferSize(1024);
-    HTable htable2 = new HTable(conf2, tableName);
-    htable2.setWriteBufferSize(1024);
+  private void shutDownMiniClusters() throws Exception {
+    int numClusters = utilities.length;
+    for (int i = numClusters - 1; i >= 0; i--) {
+      if (utilities[i] != null) {
+        utilities[i].shutdownMiniCluster();
+      }
+    }
+    miniZK.shutdown();
+  }
 
-    // set M-M
-    admin1.addPeer("1", utility2.getClusterKey());
-    admin2.addPeer("1", utility1.getClusterKey());
+  private void createTableOnClusters(HTableDescriptor table) throws Exception {
+    int numClusters = configurations.length;
+    for (int i = 0; i < numClusters; i++) {
+      HBaseAdmin hbaseAdmin = null;
+      try {
+        hbaseAdmin = new HBaseAdmin(configurations[i]);
+        hbaseAdmin.createTable(table);
+      } finally {
+        close(hbaseAdmin);
+      }
+    }
+  }
 
-    // add rows to both clusters,
-    // make sure they are both replication
-    putAndWait(row, famName, htable1, htable2);
-    putAndWait(row1, famName, htable2, htable1);
+  private void addPeer(String id, int masterClusterNumber,
+      int slaveClusterNumber) throws Exception {
+    ReplicationAdmin replicationAdmin = null;
+    try {
+      replicationAdmin = new ReplicationAdmin(
+          configurations[masterClusterNumber]);
+      replicationAdmin.addPeer(id,
+          utilities[slaveClusterNumber].getClusterKey());
+    } finally {
+      close(replicationAdmin);
+    }
+  }
 
-    // make sure "row" did not get replicated back.
-    assertEquals("Puts were replicated back ", 2, getCount(htable1, put));
+  private void disablePeer(String id, int masterClusterNumber) throws Exception {
+    ReplicationAdmin replicationAdmin = null;
+    try {
+      replicationAdmin = new ReplicationAdmin(
+          configurations[masterClusterNumber]);
+      replicationAdmin.disablePeer(id);
+    } finally {
+      close(replicationAdmin);
+    }
+  }
 
-    // delete "row" and wait
-    deleteAndWait(row, htable1, htable2);
+  private void enablePeer(String id, int masterClusterNumber) throws Exception {
+    ReplicationAdmin replicationAdmin = null;
+    try {
+      replicationAdmin = new ReplicationAdmin(
+          configurations[masterClusterNumber]);
+      replicationAdmin.enablePeer(id);
+    } finally {
+      close(replicationAdmin);
+    }
+  }
 
-    // make the 2nd cluster replicated back
-    assertEquals("Puts were replicated back ", 2, getCount(htable2, put));
+  private void close(Closeable... closeables) {
+    try {
+      if (closeables != null) {
+        for (Closeable closeable : closeables) {
+          closeable.close();
+        }
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception occured while closing the object:", e);
+    }
+  }
 
-    deleteAndWait(row1, htable2, htable1);
+  @SuppressWarnings("resource")
+  private HTable[] getHTablesOnClusters(byte[] tableName) throws Exception {
+    int numClusters = utilities.length;
+    HTable[] htables = new HTable[numClusters];
+    for (int i = 0; i < numClusters; i++) {
+      HTable htable = new HTable(configurations[i], tableName);
+      htable.setWriteBufferSize(1024);
+      htables[i] = htable;
+    }
+    return htables;
+  }
 
-    assertEquals("Deletes were replicated back ", 2, getCount(htable1, delete));
-    utility2.shutdownMiniCluster();
-    utility1.shutdownMiniCluster();
+  private void validateCounts(HTable[] htables, byte[] type,
+      int[] expectedCounts) throws IOException {
+    for (int i = 0; i < htables.length; i++) {
+      assertEquals(Bytes.toString(type) + " were replicated back ",
+          expectedCounts[i], getCount(htables[i], type));
+    }
   }
 
-  private int getCount(HTable t, byte[] type)  throws IOException {
+  private int getCount(HTable t, byte[] type) throws IOException {
     Get test = new Get(row);
-    test.setAttribute("count", new byte[]{});
+    test.setAttribute("count", new byte[] {});
     Result res = t.get(test);
     return Bytes.toInt(res.getValue(count, type));
   }
 
   private void deleteAndWait(byte[] row, HTable source, HTable target)
-  throws Exception {
+      throws Exception {
     Delete del = new Delete(row);
     source.delete(del);
-
-    Get get = new Get(row);
-    for (int i = 0; i < NB_RETRIES; i++) {
-      if (i==NB_RETRIES-1) {
-        fail("Waited too much time for del replication");
-      }
-      Result res = target.get(get);
-      if (res.size() >= 1) {
-        LOG.info("Row not deleted");
-        Thread.sleep(SLEEP_TIME);
-      } else {
-        break;
-      }
-    }
-  }
-
-  private void check(byte[] row, byte[] fam, HTable t) throws IOException {
-    Get get = new Get(row);
-    Result res = t.get(get);
-    if (res.size() == 0) {
-      fail("Row is missing");
-    }
+    wait(row, target, true);
   }
 
   private void putAndWait(byte[] row, byte[] fam, HTable source, HTable target)
-  throws Exception {
+      throws Exception {
     Put put = new Put(row);
     put.add(fam, row, row);
     source.put(put);
-
-    wait(row, target);
+    wait(row, target, false);
   }
 
-  private void wait(byte[] row, HTable target) throws Exception {
+  private void wait(byte[] row, HTable target, boolean isDeleted)
+      throws Exception {
     Get get = new Get(row);
     for (int i = 0; i < NB_RETRIES; i++) {
-      if (i==NB_RETRIES-1) {
-        fail("Waited too much time for put replication");
+      if (i == NB_RETRIES - 1) {
+        fail("Waited too much time for replication. Row:" + Bytes.toString(row)
+            + ". IsDeleteReplication:" + isDeleted);
       }
       Result res = target.get(get);
-      if (res.size() == 0) {
-        LOG.info("Row not available");
+      boolean sleep = isDeleted ? res.size() > 0 : res.size() == 0;
+      if (sleep) {
+        LOG.info("Waiting for more time for replication. Row:"
+            + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted);
         Thread.sleep(SLEEP_TIME);
       } else {
-        assertArrayEquals(res.value(), row);
+        if (!isDeleted) {
+          assertArrayEquals(res.value(), row);
+        }
+        LOG.info("Obtained row:"
+            + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted);
         break;
       }
-    }    
+    }
   }
 
   /**