You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2016/10/18 01:29:13 UTC

[1/3] hbase git commit: HBASE-16653 Backport HBASE-11393 to branches which support namespace

Repository: hbase
Updated Branches:
  refs/heads/branch-1 6df7554d2 -> 66941910b


http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
index 2c9fc0f..dd15e4c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
@@ -22,10 +22,14 @@ package org.apache.hadoop.hbase.replication;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -37,6 +41,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
@@ -183,13 +188,13 @@ public class TestPerTableCFReplication {
     Map<TableName, List<String>> tabCFsMap = null;
 
     // 1. null or empty string, result should be null
-    tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig(null);
+    tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(null);
     assertEquals(null, tabCFsMap);
 
-    tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("");
+    tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig("");
     assertEquals(null, tabCFsMap);
 
-    tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("   ");
+    tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig("   ");
     assertEquals(null, tabCFsMap);
 
     TableName tab1 = TableName.valueOf("tab1");
@@ -197,20 +202,20 @@ public class TestPerTableCFReplication {
     TableName tab3 = TableName.valueOf("tab3");
 
     // 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3"
-    tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("tab1");
+    tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig("tab1");
     assertEquals(1, tabCFsMap.size()); // only one table
     assertTrue(tabCFsMap.containsKey(tab1));   // its table name is "tab1"
     assertFalse(tabCFsMap.containsKey(tab2));  // not other table
     assertEquals(null, tabCFsMap.get(tab1));   // null cf-list,
 
-    tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("tab2:cf1");
+    tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig("tab2:cf1");
     assertEquals(1, tabCFsMap.size()); // only one table
     assertTrue(tabCFsMap.containsKey(tab2));   // its table name is "tab2"
     assertFalse(tabCFsMap.containsKey(tab1));  // not other table
     assertEquals(1, tabCFsMap.get(tab2).size());   // cf-list contains only 1 cf
     assertEquals("cf1", tabCFsMap.get(tab2).get(0));// the only cf is "cf1"
 
-    tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("tab3 : cf1 , cf3");
+    tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig("tab3 : cf1 , cf3");
     assertEquals(1, tabCFsMap.size()); // only one table
     assertTrue(tabCFsMap.containsKey(tab3));   // its table name is "tab2"
     assertFalse(tabCFsMap.containsKey(tab1));  // not other table
@@ -219,7 +224,7 @@ public class TestPerTableCFReplication {
     assertTrue(tabCFsMap.get(tab3).contains("cf3"));// contains "cf3"
 
     // 3. multiple tables: "tab1 ; tab2:cf1 ; tab3:cf1,cf3"
-    tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("tab1 ; tab2:cf1 ; tab3:cf1,cf3");
+    tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig("tab1 ; tab2:cf1 ; tab3:cf1,cf3");
     // 3.1 contains 3 tables : "tab1", "tab2" and "tab3"
     assertEquals(3, tabCFsMap.size());
     assertTrue(tabCFsMap.containsKey(tab1));
@@ -237,7 +242,7 @@ public class TestPerTableCFReplication {
 
     // 4. contiguous or additional ";"(table delimiter) or ","(cf delimiter) can be tolerated
     // still use the example of multiple tables: "tab1 ; tab2:cf1 ; tab3:cf1,cf3"
-    tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig(
+    tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(
       "tab1 ; ; tab2:cf1 ; tab3:cf1,,cf3 ;");
     // 4.1 contains 3 tables : "tab1", "tab2" and "tab3"
     assertEquals(3, tabCFsMap.size());
@@ -256,7 +261,7 @@ public class TestPerTableCFReplication {
 
     // 5. invalid format "tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3"
     //    "tab1:tt:cf1" and "tab2::cf1" are invalid and will be ignored totally
-    tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig(
+    tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(
       "tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3");
     // 5.1 no "tab1" and "tab2", only "tab3"
     assertEquals(1, tabCFsMap.size()); // only one table
@@ -267,7 +272,100 @@ public class TestPerTableCFReplication {
     assertEquals(2, tabCFsMap.get(tab3).size());
     assertTrue(tabCFsMap.get(tab3).contains("cf1"));
     assertTrue(tabCFsMap.get(tab3).contains("cf3"));
- }
+  }
+
+  @Test
+  public void testTableCFsHelperConverter() {
+
+    ZooKeeperProtos.TableCF[] tableCFs = null;
+    Map<TableName, List<String>> tabCFsMap = null;
+
+    // 1. null or empty string, result should be null
+    assertNull(ReplicationSerDeHelper.convert(tabCFsMap));
+
+    tabCFsMap = new HashMap<TableName, List<String>>();
+    tableCFs = ReplicationSerDeHelper.convert(tabCFsMap);
+    assertEquals(0, tableCFs.length);
+
+    TableName tab1 = TableName.valueOf("tab1");
+    TableName tab2 = TableName.valueOf("tab2");
+    TableName tab3 = TableName.valueOf("tab3");
+
+    // 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3"
+    tabCFsMap.clear();
+    tabCFsMap.put(tab1, null);
+    tableCFs = ReplicationSerDeHelper.convert(tabCFsMap);
+    assertEquals(1, tableCFs.length); // only one table
+    assertEquals(tab1.toString(),
+        tableCFs[0].getTableName().getQualifier().toStringUtf8());
+    assertEquals(0, tableCFs[0].getFamiliesCount());
+
+    tabCFsMap.clear();
+    tabCFsMap.put(tab2, new ArrayList<String>());
+    tabCFsMap.get(tab2).add("cf1");
+    tableCFs = ReplicationSerDeHelper.convert(tabCFsMap);
+    assertEquals(1, tableCFs.length); // only one table
+    assertEquals(tab2.toString(),
+        tableCFs[0].getTableName().getQualifier().toStringUtf8());
+    assertEquals(1, tableCFs[0].getFamiliesCount());
+    assertEquals("cf1", tableCFs[0].getFamilies(0).toStringUtf8());
+
+    tabCFsMap.clear();
+    tabCFsMap.put(tab3, new ArrayList<String>());
+    tabCFsMap.get(tab3).add("cf1");
+    tabCFsMap.get(tab3).add("cf3");
+    tableCFs = ReplicationSerDeHelper.convert(tabCFsMap);
+    assertEquals(1, tableCFs.length);
+    assertEquals(tab3.toString(),
+        tableCFs[0].getTableName().getQualifier().toStringUtf8());
+    assertEquals(2, tableCFs[0].getFamiliesCount());
+    assertEquals("cf1", tableCFs[0].getFamilies(0).toStringUtf8());
+    assertEquals("cf3", tableCFs[0].getFamilies(1).toStringUtf8());
+
+    tabCFsMap.clear();
+    tabCFsMap.put(tab1, null);
+    tabCFsMap.put(tab2, new ArrayList<String>());
+    tabCFsMap.get(tab2).add("cf1");
+    tabCFsMap.put(tab3, new ArrayList<String>());
+    tabCFsMap.get(tab3).add("cf1");
+    tabCFsMap.get(tab3).add("cf3");
+
+    tableCFs = ReplicationSerDeHelper.convert(tabCFsMap);
+    assertEquals(3, tableCFs.length);
+    assertNotNull(ReplicationSerDeHelper.getTableCF(tableCFs, tab1.toString()));
+    assertNotNull(ReplicationSerDeHelper.getTableCF(tableCFs, tab2.toString()));
+    assertNotNull(ReplicationSerDeHelper.getTableCF(tableCFs, tab3.toString()));
+
+    assertEquals(0,
+        ReplicationSerDeHelper.getTableCF(tableCFs, tab1.toString()).getFamiliesCount());
+
+    assertEquals(1,
+        ReplicationSerDeHelper.getTableCF(tableCFs, tab2.toString()).getFamiliesCount());
+    assertEquals("cf1",
+        ReplicationSerDeHelper.getTableCF(tableCFs, tab2.toString()).getFamilies(0).toStringUtf8());
+
+    assertEquals(2,
+        ReplicationSerDeHelper.getTableCF(tableCFs, tab3.toString()).getFamiliesCount());
+    assertEquals("cf1",
+        ReplicationSerDeHelper.getTableCF(tableCFs, tab3.toString()).getFamilies(0).toStringUtf8());
+    assertEquals("cf3",
+        ReplicationSerDeHelper.getTableCF(tableCFs, tab3.toString()).getFamilies(1).toStringUtf8());
+
+    tabCFsMap = ReplicationSerDeHelper.convert2Map(tableCFs);
+    assertEquals(3, tabCFsMap.size());
+    assertTrue(tabCFsMap.containsKey(tab1));
+    assertTrue(tabCFsMap.containsKey(tab2));
+    assertTrue(tabCFsMap.containsKey(tab3));
+    // 3.2 table "tab1" : null cf-list
+    assertEquals(null, tabCFsMap.get(tab1));
+    // 3.3 table "tab2" : cf-list contains a single cf "cf1"
+    assertEquals(1, tabCFsMap.get(tab2).size());
+    assertEquals("cf1", tabCFsMap.get(tab2).get(0));
+    // 3.4 table "tab3" : cf-list contains "cf1" and "cf3"
+    assertEquals(2, tabCFsMap.get(tab3).size());
+    assertTrue(tabCFsMap.get(tab3).contains("cf1"));
+    assertTrue(tabCFsMap.get(tab3).contains("cf3"));
+  }
 
   @Test(timeout=300000)
   public void testPerTableCFReplication() throws Exception {
@@ -304,8 +402,23 @@ public class TestPerTableCFReplication {
       Table htab3C = connection3.getTable(tabCName);
 
       // A. add cluster2/cluster3 as peers to cluster1
-      replicationAdmin.addPeer("2", utility2.getClusterKey(), "TC;TB:f1,f3");
-      replicationAdmin.addPeer("3", utility3.getClusterKey(), "TA;TB:f1,f2");
+      ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
+      rpc2.setClusterKey(utility2.getClusterKey());
+      Map<TableName, List<String>> tableCFs = new HashMap<>();
+      tableCFs.put(tabCName, null);
+      tableCFs.put(tabBName, new ArrayList<String>());
+      tableCFs.get(tabBName).add("f1");
+      tableCFs.get(tabBName).add("f3");
+      replicationAdmin.addPeer("2", rpc2, tableCFs);
+
+      ReplicationPeerConfig rpc3 = new ReplicationPeerConfig();
+      rpc3.setClusterKey(utility3.getClusterKey());
+      tableCFs.clear();
+      tableCFs.put(tabAName, null);
+      tableCFs.put(tabBName, new ArrayList<String>());
+      tableCFs.get(tabBName).add("f1");
+      tableCFs.get(tabBName).add("f2");
+      replicationAdmin.addPeer("3", rpc3, tableCFs);
 
       // A1. tableA can only replicated to cluster3
       putAndWaitWithFamily(row1, f1Name, htab1A, htab3A);
@@ -348,8 +461,20 @@ public class TestPerTableCFReplication {
       deleteAndWaitWithFamily(row1, f3Name, htab1C, htab2C);
 
       // B. change peers' replicable table-cf config
-      replicationAdmin.setPeerTableCFs("2", "TA:f1,f2; TC:f2,f3");
-      replicationAdmin.setPeerTableCFs("3", "TB; TC:f3");
+      tableCFs.clear();
+      tableCFs.put(tabAName, new ArrayList<String>());
+      tableCFs.get(tabAName).add("f1");
+      tableCFs.get(tabAName).add("f2");
+      tableCFs.put(tabCName, new ArrayList<String>());
+      tableCFs.get(tabCName).add("f2");
+      tableCFs.get(tabCName).add("f3");
+      replicationAdmin.setPeerTableCFs("2", tableCFs);
+
+      tableCFs.clear();
+      tableCFs.put(tabBName, null);
+      tableCFs.put(tabCName, new ArrayList<String>());
+      tableCFs.get(tabCName).add("f3");
+      replicationAdmin.setPeerTableCFs("3", tableCFs);
 
       // B1. cf 'f1' of tableA can only replicated to cluster2
       putAndWaitWithFamily(row2, f1Name, htab1A, htab2A);

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index e52a600..5283433 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -124,7 +124,9 @@ public class TestReplicationBase {
     utility2.setZkCluster(miniZK);
     zkw2 = new ZooKeeperWatcher(conf2, "cluster2", null, true);
 
-    admin.addPeer("2", utility2.getClusterKey());
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(utility2.getClusterKey());
+    admin.addPeer("2", rpc);
 
     LOG.info("Setup second Zk");
     CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1);

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
index c293444..ba634dd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
@@ -369,7 +369,9 @@ public class TestReplicationSmallTests extends TestReplicationBase {
       }
     }
 
-    admin.addPeer("2", utility2.getClusterKey());
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(utility2.getClusterKey());
+    admin.addPeer("2", rpc);
     Thread.sleep(SLEEP_TIME);
     rowKey = Bytes.toBytes("do rep");
     put = new Put(rowKey);

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index 7f7ee98..001f147 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -170,7 +170,7 @@ public abstract class TestReplicationStateBasic {
 
     try {
       rp.addPeer(ID_ONE,
-        new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:hbase"), null);
+        new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:hbase"));
       fail("Should throw an IllegalArgumentException because "
             + "zookeeper.znode.parent is missing leading '/'.");
     } catch (IllegalArgumentException e) {
@@ -179,7 +179,7 @@ public abstract class TestReplicationStateBasic {
 
     try {
       rp.addPeer(ID_ONE,
-        new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:/"), null);
+        new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:/"));
       fail("Should throw an IllegalArgumentException because zookeeper.znode.parent is missing.");
     } catch (IllegalArgumentException e) {
       // Expected.
@@ -187,7 +187,7 @@ public abstract class TestReplicationStateBasic {
 
     try {
       rp.addPeer(ID_ONE,
-        new ReplicationPeerConfig().setClusterKey("hostname1.example.org::/hbase"), null);
+        new ReplicationPeerConfig().setClusterKey("hostname1.example.org::/hbase"));
       fail("Should throw an IllegalArgumentException because "
           + "hbase.zookeeper.property.clientPort is missing.");
     } catch (IllegalArgumentException e) {
@@ -207,7 +207,7 @@ public abstract class TestReplicationStateBasic {
     files1.add("file_3");
     assertNull(rqc.getReplicableHFiles(ID_ONE));
     assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
-    rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
+    rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
     rq1.addPeerToHFileRefs(ID_ONE);
     rq1.addHFileRefs(ID_ONE, files1);
     assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
@@ -229,8 +229,8 @@ public abstract class TestReplicationStateBasic {
     rqc.init();
 
     rp.init();
-    rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
-    rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), null);
+    rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
+    rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO));
 
     List<String> files1 = new ArrayList<String>(3);
     files1.add("file_1");
@@ -288,9 +288,9 @@ public abstract class TestReplicationStateBasic {
     assertNumberOfPeers(0);
 
     // Add some peers
-    rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
+    rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
     assertNumberOfPeers(1);
-    rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), null);
+    rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO));
     assertNumberOfPeers(2);
 
     // Test methods with a peer that is added but not connected
@@ -305,7 +305,7 @@ public abstract class TestReplicationStateBasic {
     assertNumberOfPeers(1);
 
     // Add one peer
-    rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
+    rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
     rp.peerAdded(ID_ONE);
     assertNumberOfPeers(2);
     assertTrue(rp.getStatusOfPeer(ID_ONE));
@@ -365,7 +365,7 @@ public abstract class TestReplicationStateBasic {
         rq3.addLog("qId" + i, "filename" + j);
       }
       //Add peers for the corresponding queues so they are not orphans
-      rp.addPeer("qId" + i, new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus" + i), null);
+      rp.addPeer("qId" + i, new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus" + i));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
index fd02d1a..a949e92 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
@@ -203,7 +203,9 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
     /**
      * set M-S : Master: utility1 Slave1: utility2
      */
-    admin1.addPeer("1", utility2.getClusterKey());
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(utility2.getClusterKey());
+    admin1.addPeer("1", rpc);
 
     admin1.close();
     admin2.close();

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
index a5df432..7b2e1fd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
@@ -146,7 +146,7 @@ public class TestReplicationTrackerZKImpl {
 
   @Test(timeout = 30000)
   public void testPeerRemovedEvent() throws Exception {
-    rp.addPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null);
+    rp.addPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
     rt.registerListener(new DummyReplicationListener());
     rp.removePeer("5");
     // wait for event
@@ -159,7 +159,7 @@ public class TestReplicationTrackerZKImpl {
   @Test(timeout = 30000)
   public void testPeerListChangedEvent() throws Exception {
     // add a peer
-    rp.addPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null);
+    rp.addPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
     zkw.getRecoverableZooKeeper().getZooKeeper().getChildren("/hbase/replication/peers/5", true);
     rt.registerListener(new DummyReplicationListener());
     rp.disablePeer("5");
@@ -183,16 +183,16 @@ public class TestReplicationTrackerZKImpl {
   public void testPeerNameControl() throws Exception {
     int exists = 0;
     int hyphen = 0;
-    rp.addPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null);
+    rp.addPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
 
     try{
-      rp.addPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null);
+      rp.addPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
     }catch(IllegalArgumentException e){
       exists++;
     }
 
     try{
-      rp.addPeer("6-ec2", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null);
+      rp.addPeer("6-ec2", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
     }catch(IllegalArgumentException e){
       hyphen++;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java
index 5010365..a246241 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java
@@ -130,7 +130,9 @@ public class TestReplicationWithTags {
     utility2 = new HBaseTestingUtility(conf2);
     utility2.setZkCluster(miniZK);
 
-    replicationAdmin.addPeer("2", utility2.getClusterKey());
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(utility2.getClusterKey());
+    replicationAdmin.addPeer("2", rpc);
 
     LOG.info("Setup second Zk");
     utility1.startMiniCluster(2);

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java
new file mode 100644
index 0000000..f53aef3
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java
@@ -0,0 +1,164 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.master;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.replication.ReplicationSerDeHelper;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+@Category({ReplicationTests.class, SmallTests.class})
+public class TestTableCFsUpdater extends TableCFsUpdater {
+
+  private static final Log LOG = LogFactory.getLog(TestTableCFsUpdater.class);
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static ZooKeeperWatcher zkw = null;
+  private static Abortable abortable = null;
+
+  public TestTableCFsUpdater() {
+    super(zkw, TEST_UTIL.getConfiguration(), abortable);
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniZKCluster();
+    Configuration conf = TEST_UTIL.getConfiguration();
+    abortable = new Abortable() {
+      @Override
+      public void abort(String why, Throwable e) {
+        LOG.info(why, e);
+      }
+
+      @Override
+      public boolean isAborted() {
+        return false;
+      }
+    };
+    zkw = new ZooKeeperWatcher(conf, "TableCFs", abortable, true);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniZKCluster();
+  }
+
+  @Test
+  public void testUpgrade() throws KeeperException, InterruptedException,
+      DeserializationException {
+    String peerId = "1";
+    TableName tab1 = TableName.valueOf("table1");
+    TableName tab2 = TableName.valueOf("table2");
+    TableName tab3 = TableName.valueOf("table3");
+
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(zkw.getQuorum());
+    String peerNode = getPeerNode(peerId);
+    ZKUtil.createWithParents(zkw, peerNode, ReplicationSerDeHelper.toByteArray(rpc));
+
+    String tableCFs = "table1:cf1,cf2;table2:cf3;table3";
+    String tableCFsNode = getTableCFsNode(peerId);
+    LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId);
+    ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs));
+
+    ReplicationPeerConfig actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
+    String actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode));
+
+    assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
+    assertNull(actualRpc.getTableCFsMap());
+    assertEquals(tableCFs, actualTableCfs);
+
+    peerId = "2";
+    rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(zkw.getQuorum());
+    peerNode = getPeerNode(peerId);
+    ZKUtil.createWithParents(zkw, peerNode, ReplicationSerDeHelper.toByteArray(rpc));
+
+    tableCFs = "table1:cf1,cf3;table2:cf2";
+    tableCFsNode = getTableCFsNode(peerId);
+    LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId);
+    ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs));
+
+    actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
+    actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode));
+
+    assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
+    assertNull(actualRpc.getTableCFsMap());
+    assertEquals(tableCFs, actualTableCfs);
+
+
+    update();
+
+    peerId = "1";
+    peerNode = getPeerNode(peerId);
+    actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
+    assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
+    Map<TableName, List<String>> tableNameListMap = actualRpc.getTableCFsMap();
+    assertEquals(3, tableNameListMap.size());
+    assertTrue(tableNameListMap.containsKey(tab1));
+    assertTrue(tableNameListMap.containsKey(tab2));
+    assertTrue(tableNameListMap.containsKey(tab3));
+    assertEquals(2, tableNameListMap.get(tab1).size());
+    assertEquals("cf1", tableNameListMap.get(tab1).get(0));
+    assertEquals("cf2", tableNameListMap.get(tab1).get(1));
+    assertEquals(1, tableNameListMap.get(tab2).size());
+    assertEquals("cf3", tableNameListMap.get(tab2).get(0));
+    assertNull(tableNameListMap.get(tab3));
+
+
+    peerId = "2";
+    peerNode = getPeerNode(peerId);
+    actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
+    assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
+    tableNameListMap = actualRpc.getTableCFsMap();
+    assertEquals(2, tableNameListMap.size());
+    assertTrue(tableNameListMap.containsKey(tab1));
+    assertTrue(tableNameListMap.containsKey(tab2));
+    assertEquals(2, tableNameListMap.get(tab1).size());
+    assertEquals("cf1", tableNameListMap.get(tab1).get(0));
+    assertEquals("cf3", tableNameListMap.get(tab1).get(1));
+    assertEquals(1, tableNameListMap.get(tab2).size());
+    assertEquals("cf2", tableNameListMap.get(tab2).get(0));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 7614b0f..24c6ef3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -535,7 +535,7 @@ public class TestReplicationSourceManager {
           FailInitializeDummyReplicationSource.class.getName());
       final ReplicationPeers rp = manager.getReplicationPeers();
       // Set up the znode and ReplicationPeer for the fake peer
-      rp.addPeer("FakePeer", new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase"), null);
+      rp.addPeer("FakePeer", new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase"));
       // Wait for the peer to get created and connected
       Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
         @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java
index dc4a340..f9ae011 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
 import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.visibility.VisibilityController.VisibilityReplication;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -131,7 +132,9 @@ public class TestVisibilityLabelReplicationWithExpAsString extends TestVisibilit
     TEST_UTIL1 = new HBaseTestingUtility(conf1);
     TEST_UTIL1.setZkCluster(miniZK);
     zkw2 = new ZooKeeperWatcher(conf1, "cluster2", null, true);
-    replicationAdmin.addPeer("2", TEST_UTIL1.getClusterKey());
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(TEST_UTIL1.getClusterKey());
+    replicationAdmin.addPeer("2", rpc);
 
     TEST_UTIL.startMiniCluster(1);
     // Wait for the labels table to become available

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
index 419ad91..79cf0ac 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsResponse;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.visibility.VisibilityController.VisibilityReplication;
 import org.junit.experimental.categories.Category;
@@ -180,7 +181,9 @@ public class TestVisibilityLabelsReplication {
     TEST_UTIL1 = new HBaseTestingUtility(conf1);
     TEST_UTIL1.setZkCluster(miniZK);
     zkw2 = new ZooKeeperWatcher(conf1, "cluster2", null, true);
-    replicationAdmin.addPeer("2", TEST_UTIL1.getClusterKey());
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(TEST_UTIL1.getClusterKey());
+    replicationAdmin.addPeer("2", rpc);
 
     TEST_UTIL.startMiniCluster(1);
     // Wait for the labels table to become available

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java
index d8087f5..2a3e7f4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java
@@ -108,6 +108,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.SplitTransactionImpl;
 import org.apache.hadoop.hbase.regionserver.TestEndToEndSplitTransaction;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
@@ -2260,7 +2261,9 @@ public class TestHBaseFsck {
     ReplicationAdmin replicationAdmin = new ReplicationAdmin(conf);
     Assert.assertEquals(0, replicationAdmin.getPeersCount());
     String zkPort =  conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
-    replicationAdmin.addPeer("1", "127.0.0.1:2181" + zkPort + ":/hbase");
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey("127.0.0.1:2181" + zkPort + ":/hbase");
+    replicationAdmin.addPeer("1", rpc);
     replicationAdmin.getPeersCount();
     Assert.assertEquals(1, replicationAdmin.getPeersCount());
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-shell/src/main/ruby/hbase/replication_admin.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
index d0719d8..2e240e1 100644
--- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
@@ -39,11 +39,7 @@ module Hbase
     #----------------------------------------------------------------------------------------------
     # Add a new peer cluster to replicate to
     def add_peer(id, args = {}, peer_tableCFs = nil)
-      # make add_peer backwards compatible to take in string for clusterKey and peer_tableCFs
-      if args.is_a?(String)
-        cluster_key = args
-        @replication_admin.addPeer(id, cluster_key, peer_tableCFs)
-      elsif args.is_a?(Hash)
+      if args.is_a?(Hash)
         unless peer_tableCFs.nil?
           raise(ArgumentError, "peer_tableCFs should be specified as TABLE_CFS in args")
         end
@@ -87,9 +83,18 @@ module Hbase
           }
         end
 
-        @replication_admin.add_peer(id, replication_peer_config, table_cfs)
+        unless table_cfs.nil?
+          # convert table_cfs to TableName
+          map = java.util.HashMap.new
+          table_cfs.each{|key, val|
+            map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val)
+          }
+          replication_peer_config.set_table_cfs_map(map)
+        end
+
+        @replication_admin.add_peer(id, replication_peer_config)
       else
-        raise(ArgumentError, "args must be either a String or Hash")
+        raise(ArgumentError, "args must be a Hash")
       end
     end
 
@@ -111,7 +116,7 @@ module Hbase
     #----------------------------------------------------------------------------------------------
     # List all peer clusters
     def list_peers
-      @replication_admin.listPeers
+      @replication_admin.listPeerConfigs
     end
 
     #----------------------------------------------------------------------------------------------
@@ -141,20 +146,42 @@ module Hbase
     #----------------------------------------------------------------------------------------------
     # Set new tableCFs config for the specified peer
     def set_peer_tableCFs(id, tableCFs)
-      @replication_admin.setPeerTableCFs(id, tableCFs)
+      unless tableCFs.nil?
+        # convert tableCFs to TableName
+        map = java.util.HashMap.new
+        tableCFs.each{|key, val|
+          map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val)
+        }
+      end
+      @replication_admin.setPeerTableCFs(id, map)
     end
 
     #----------------------------------------------------------------------------------------------
     # Append a tableCFs config for the specified peer
     def append_peer_tableCFs(id, tableCFs)
-      @replication_admin.appendPeerTableCFs(id, tableCFs)
+      unless tableCFs.nil?
+        # convert tableCFs to TableName
+        map = java.util.HashMap.new
+        tableCFs.each{|key, val|
+          map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val)
+        }
+      end
+      @replication_admin.appendPeerTableCFs(id, map)
     end
 
     #----------------------------------------------------------------------------------------------
     # Remove some tableCFs from the tableCFs config of the specified peer
     def remove_peer_tableCFs(id, tableCFs)
-      @replication_admin.removePeerTableCFs(id, tableCFs)
+      unless tableCFs.nil?
+        # convert tableCFs to TableName
+        map = java.util.HashMap.new
+        tableCFs.each{|key, val|
+          map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val)
+        }
+      end
+      @replication_admin.removePeerTableCFs(id, map)
     end
+
     #----------------------------------------------------------------------------------------------
     # Enables a table's replication switch
     def enable_tablerep(table_name)

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-shell/src/main/ruby/shell/commands/add_peer.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb
index cf9862a..d209a37 100644
--- a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb
@@ -33,7 +33,7 @@ Examples:
 
   hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase"
   hbase> add_peer '2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
-    TABLE_CFS => { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] }
+    TABLE_CFS => { "table1" => [], "ns2:table2" => ["cf1"], "ns3:table3" => ["cf1", "cf2"] }
 
 For a custom replication endpoint, the ENDPOINT_CLASSNAME can be provided. Two optional arguments
 are DATA and CONFIG which can be specified to set different either the peer_data or configuration
@@ -48,7 +48,7 @@ the key TABLE_CFS.
   hbase> add_peer '9', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.MyReplicationEndpoint',
     DATA => { "key1" => 1 }, CONFIG => { "config1" => "value1", "config2" => "value2" },
   hbase> add_peer '10', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.MyReplicationEndpoint',
-    TABLE_CFS => { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] }
+    TABLE_CFS => { "table1" => [], "ns2:table2" => ["cf1"], "ns3:table3" => ["cf1", "cf2"] }
   hbase> add_peer '11', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.MyReplicationEndpoint',
     DATA => { "key1" => 1 }, CONFIG => { "config1" => "value1", "config2" => "value2" },
     TABLE_CFS => { "table1" => [], "ns2:table2" => ["cf1"], "ns3:table3" => ["cf1", "cf2"] }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-shell/src/main/ruby/shell/commands/append_peer_tableCFs.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/append_peer_tableCFs.rb b/hbase-shell/src/main/ruby/shell/commands/append_peer_tableCFs.rb
index 3919b20..24a9976 100644
--- a/hbase-shell/src/main/ruby/shell/commands/append_peer_tableCFs.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/append_peer_tableCFs.rb
@@ -26,7 +26,7 @@ Append a replicable table-cf config for the specified peer
 Examples:
 
   # append a table / table-cf to be replicable for a peer
-  hbase> append_peer_tableCFs '2', "table4:cfA,cfB"
+  hbase> append_peer_tableCFs '2',  { "ns1:table4" => ["cfA", "cfB"] }
 
 EOF
       end

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
index cc1be04..6444c79 100644
--- a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
@@ -33,12 +33,14 @@ EOF
         now = Time.now
         peers = replication_admin.list_peers
 
-        formatter.header(["PEER_ID", "CLUSTER_KEY", "STATE", "TABLE_CFS"])
+        formatter.header(["PEER_ID", "CLUSTER_KEY", "ENDPOINT_CLASSNAME",
+          "STATE", "TABLE_CFS"])
 
         peers.entrySet().each do |e|
           state = replication_admin.get_peer_state(e.key)
           tableCFs = replication_admin.show_peer_tableCFs(e.key)
-          formatter.row([ e.key, e.value, state, tableCFs ])
+          formatter.row([ e.key, e.value.getClusterKey,
+            e.value.getReplicationEndpointImpl, state, tableCFs ])
         end
 
         formatter.footer(now)

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-shell/src/main/ruby/shell/commands/remove_peer_tableCFs.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/remove_peer_tableCFs.rb b/hbase-shell/src/main/ruby/shell/commands/remove_peer_tableCFs.rb
index 5b15b52..af64bda 100644
--- a/hbase-shell/src/main/ruby/shell/commands/remove_peer_tableCFs.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/remove_peer_tableCFs.rb
@@ -26,8 +26,8 @@ Remove a table / table-cf from the table-cfs config for the specified peer
 Examples:
 
   # Remove a table / table-cf from the replicable table-cfs for a peer
-  hbase> remove_peer_tableCFs '2', "table1"
-  hbase> remove_peer_tableCFs '2', "table1:cf1"
+  hbase> remove_peer_tableCFs '2', { "ns1:table1" => [] }
+  hbase> remove_peer_tableCFs '2', { "ns1:table1" => ["cf1"] }
 
 EOF
       end

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb b/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb
index 3a88dbb..5599aee 100644
--- a/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb
@@ -32,7 +32,9 @@ module Shell
     # set table / table-cf to be replicable for a peer, for a table without
     # an explicit column-family list, all replicable column-families (with
     # replication_scope == 1) will be replicated
-    hbase> set_peer_tableCFs '2', "table1; table2:cf1,cf2; table3:cfA,cfB"
+    hbase> set_peer_tableCFs '2', { "ns1:table1" => [],
+                                    "ns2:table2" => ["cf1", "cf2"],
+                                    "ns3:table3" => ["cfA", "cfB"] }
 
   EOF
       end

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-shell/src/test/java/org/apache/hadoop/hbase/client/TestReplicationShell.java
----------------------------------------------------------------------
diff --git a/hbase-shell/src/test/java/org/apache/hadoop/hbase/client/TestReplicationShell.java b/hbase-shell/src/test/java/org/apache/hadoop/hbase/client/TestReplicationShell.java
index 3f4af05..04fbc7a 100644
--- a/hbase-shell/src/test/java/org/apache/hadoop/hbase/client/TestReplicationShell.java
+++ b/hbase-shell/src/test/java/org/apache/hadoop/hbase/client/TestReplicationShell.java
@@ -28,7 +28,7 @@ import org.junit.experimental.categories.Category;
 
 @Category({ ClientTests.class, LargeTests.class })
 public class TestReplicationShell extends AbstractTestShell {
-  @Ignore ("Disabled because hangs on occasion.. about 10% of the time") @Test
+  @Test
   public void testRunShellTests() throws IOException {
     System.setProperty("shell.test.include", "replication_admin_test.rb");
     // Start all ruby tests

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
index 5b99c37..84bdf56 100644
--- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
+++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
@@ -62,108 +62,142 @@ module Hbase
       assert_raise(ArgumentError) do
         replication_admin.add_peer(@peer_id, ['test'])
       end
+      assert_raise(ArgumentError) do
+        replication_admin.add_peer(@peer_id, 'test')
+      end
     end
 
-    define_test "add_peer: single zk cluster key" do
+    define_test "add_peer: single zk cluster key - peer config" do
       cluster_key = "server1.cie.com:2181:/hbase"
 
-      replication_admin.add_peer(@peer_id, cluster_key)
+      args = { CLUSTER_KEY => cluster_key }
+      replication_admin.add_peer(@peer_id, args)
 
       assert_equal(1, replication_admin.list_peers.length)
       assert(replication_admin.list_peers.key?(@peer_id))
-      assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id))
+      assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id).get_cluster_key)
 
       # cleanup for future tests
       replication_admin.remove_peer(@peer_id)
     end
 
-    define_test "add_peer: multiple zk cluster key" do
+    define_test "add_peer: multiple zk cluster key - peer config" do
       cluster_key = "zk1,zk2,zk3:2182:/hbase-prod"
 
-      replication_admin.add_peer(@peer_id, cluster_key)
+      args = { CLUSTER_KEY => cluster_key }
+      replication_admin.add_peer(@peer_id, args)
 
       assert_equal(1, replication_admin.list_peers.length)
       assert(replication_admin.list_peers.key?(@peer_id))
-      assert_equal(replication_admin.list_peers.fetch(@peer_id), cluster_key)
+      assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id).get_cluster_key)
 
       # cleanup for future tests
       replication_admin.remove_peer(@peer_id)
     end
 
-    define_test "add_peer: multiple zk cluster key and table_cfs" do
+    define_test "add_peer: multiple zk cluster key and table_cfs - peer config" do
       cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
-      table_cfs_str = "table1;table2:cf1;table3:cf2,cf3"
+      table_cfs = { "table1" => [], "ns2:table2" => ["cf1"], "ns3:table3" => ["cf1", "cf2"] }
 
-      replication_admin.add_peer(@peer_id, cluster_key, table_cfs_str)
+      args = { CLUSTER_KEY => cluster_key, TABLE_CFS => table_cfs }
+      replication_admin.add_peer(@peer_id, args)
 
       assert_equal(1, replication_admin.list_peers.length)
       assert(replication_admin.list_peers.key?(@peer_id))
-      assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id))
-      assert_equal(table_cfs_str, replication_admin.show_peer_tableCFs(@peer_id))
+      assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id).get_cluster_key)
+
+      table_cfs_map = replication_admin.get_peer_config(@peer_id).getTableCFsMap()
+      assert_tablecfs_equal(table_cfs, table_cfs_map)
 
       # cleanup for future tests
       replication_admin.remove_peer(@peer_id)
     end
 
-    define_test "add_peer: single zk cluster key - peer config" do
-      cluster_key = "server1.cie.com:2181:/hbase"
+    def assert_tablecfs_equal(table_cfs, table_cfs_map)
+      assert_equal(table_cfs.length, table_cfs_map.length)
+      table_cfs_map.each{|key, value|
+        assert(table_cfs.has_key?(key.getNameAsString))
+        if table_cfs.fetch(key.getNameAsString).length == 0
+          assert_equal(nil, value)
+        else
+          assert_equal(table_cfs.fetch(key.getNameAsString).length, value.length)
+          value.each{|v|
+            assert(table_cfs.fetch(key.getNameAsString).include?(v))
+          }
+        end
+      }
+    end
 
-      args = { CLUSTER_KEY => cluster_key }
+    define_test "add_peer: should fail when args is a hash and peer_tableCFs provided" do
+      cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
+      table_cfs_str = "table1;table2:cf1;table3:cf1,cf2"
+
+      assert_raise(ArgumentError) do
+        args = { CLUSTER_KEY => cluster_key }
+        replication_admin.add_peer(@peer_id, args, table_cfs_str)
+      end
+    end
+
+    define_test "set_peer_tableCFs: works with table-cfs map" do
+      cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
+      args = { CLUSTER_KEY => cluster_key}
       replication_admin.add_peer(@peer_id, args)
 
       assert_equal(1, replication_admin.list_peers.length)
       assert(replication_admin.list_peers.key?(@peer_id))
-      assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id))
+      assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id).get_cluster_key)
+
+      table_cfs = { "table1" => [], "table2" => ["cf1"], "ns3:table3" => ["cf1", "cf2"] }
+      replication_admin.set_peer_tableCFs(@peer_id, table_cfs)
+      table_cfs_map = replication_admin.get_peer_config(@peer_id).getTableCFsMap()
+      assert_tablecfs_equal(table_cfs, table_cfs_map)
 
       # cleanup for future tests
       replication_admin.remove_peer(@peer_id)
     end
 
-    define_test "add_peer: multiple zk cluster key - peer config" do
-      cluster_key = "zk1,zk2,zk3:2182:/hbase-prod"
-
+    define_test "append_peer_tableCFs: works with table-cfs map" do
+      cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
       args = { CLUSTER_KEY => cluster_key }
       replication_admin.add_peer(@peer_id, args)
 
       assert_equal(1, replication_admin.list_peers.length)
       assert(replication_admin.list_peers.key?(@peer_id))
-      assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id))
+      assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id).get_cluster_key)
+
+      table_cfs = { "table1" => [], "ns2:table2" => ["cf1"] }
+      replication_admin.append_peer_tableCFs(@peer_id, table_cfs)
+      table_cfs_map = replication_admin.get_peer_config(@peer_id).getTableCFsMap()
+      assert_tablecfs_equal(table_cfs, table_cfs_map)
+
+      table_cfs = { "table1" => [], "ns2:table2" => ["cf1"], "ns3:table3" => ["cf1", "cf2"] }
+      replication_admin.append_peer_tableCFs(@peer_id, { "ns3:table3" => ["cf1", "cf2"] })
+      table_cfs_map = replication_admin.get_peer_config(@peer_id).getTableCFsMap()
+      assert_tablecfs_equal(table_cfs, table_cfs_map)
 
       # cleanup for future tests
       replication_admin.remove_peer(@peer_id)
     end
 
-    define_test "add_peer: multiple zk cluster key and table_cfs - peer config" do
+    define_test "remove_peer_tableCFs: works with table-cfs map" do
       cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
-      table_cfs = { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] }
-      #table_cfs_str = "default.table1;default.table3:cf1,cf2;default.table2:cf1"
-
+      table_cfs = { "table1" => [], "ns2:table2" => ["cf1"], "ns3:table3" => ["cf1", "cf2"] }
       args = { CLUSTER_KEY => cluster_key, TABLE_CFS => table_cfs }
       replication_admin.add_peer(@peer_id, args)
 
-      assert_equal(1, command(:list_peers).length)
-      assert(command(:list_peers).key?(@peer_id))
-      assert_equal(cluster_key, command(:list_peers).fetch(@peer_id).get_cluster_key)
+      assert_equal(1, replication_admin.list_peers.length)
+      assert(replication_admin.list_peers.key?(@peer_id))
+      assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id).get_cluster_key)
 
-      # Note: below assertion is dependent on the sort order of an unordered
-      # map and hence flaky depending on JVM
-      # Commenting out until HBASE-16274 is worked.
-      # assert_equal(table_cfs_str, command(:show_peer_tableCFs, @peer_id))
+      table_cfs = { "table1" => [], "ns2:table2" => ["cf1"] }
+      replication_admin.remove_peer_tableCFs(@peer_id, { "ns3:table3" => ["cf1", "cf2"] })
+      table_cfs_map = replication_admin.get_peer_config(@peer_id).getTableCFsMap()
+      assert_tablecfs_equal(table_cfs, table_cfs_map)
 
       # cleanup for future tests
       replication_admin.remove_peer(@peer_id)
     end
 
-    define_test "add_peer: should fail when args is a hash and peer_tableCFs provided" do
-      cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
-      table_cfs_str = "table1;table2:cf1;table3:cf1,cf2"
-
-      assert_raise(ArgumentError) do
-        args = { CLUSTER_KEY => cluster_key }
-        replication_admin.add_peer(@peer_id, args, table_cfs_str)
-      end
-    end
-
     define_test "get_peer_config: works with simple clusterKey peer" do
       cluster_key = "localhost:2181:/hbase-test"
       args = { CLUSTER_KEY => cluster_key }
@@ -180,8 +214,8 @@ module Hbase
       config_params = { "config1" => "value1", "config2" => "value2" }
       args = { CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => repl_impl,
                CONFIG => config_params }
-      command(:add_peer, @peer_id, args)
-      peer_config = command(:get_peer_config, @peer_id)
+      replication_admin.add_peer(@peer_id, args)
+      peer_config = replication_admin.get_peer_config(@peer_id)
       assert_equal(cluster_key, peer_config.get_cluster_key)
       assert_equal(repl_impl, peer_config.get_replication_endpoint_impl)
       assert_equal(2, peer_config.get_configuration.size)


[3/3] hbase git commit: HBASE-16653 Backport HBASE-11393 to branches which support namespace

Posted by ch...@apache.org.
HBASE-16653 Backport HBASE-11393 to branches which support namespace

Signed-off-by: chenheng <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/66941910
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/66941910
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/66941910

Branch: refs/heads/branch-1
Commit: 66941910bd07462fe496c5bbb591f4071f77b8fb
Parents: 6df7554
Author: Guanghao Zhang <zg...@gmail.com>
Authored: Mon Sep 26 19:33:43 2016 +0800
Committer: chenheng <ch...@apache.org>
Committed: Tue Oct 18 09:12:47 2016 +0800

----------------------------------------------------------------------
 .../client/replication/ReplicationAdmin.java    |   84 +-
 .../replication/ReplicationPeerConfig.java      |   16 +-
 .../replication/ReplicationPeerZKImpl.java      |   80 +-
 .../hbase/replication/ReplicationPeers.java     |   15 +-
 .../replication/ReplicationPeersZKImpl.java     |   60 +-
 .../replication/ReplicationSerDeHelper.java     |  189 +++
 .../replication/ReplicationStateZKBase.java     |   17 +
 .../protobuf/generated/ZooKeeperProtos.java     | 1155 +++++++++++++++++-
 .../src/main/protobuf/ZooKeeper.proto           |    8 +-
 .../org/apache/hadoop/hbase/master/HMaster.java |    8 +
 .../replication/master/TableCFsUpdater.java     |  120 ++
 .../hbase/client/TestReplicaWithCluster.java    |    5 +-
 .../replication/TestReplicationAdmin.java       |  193 +--
 .../cleaner/TestReplicationHFileCleaner.java    |    2 +-
 .../replication/TestMasterReplication.java      |    9 +-
 .../replication/TestMultiSlaveReplication.java  |    8 +-
 .../replication/TestPerTableCFReplication.java  |  153 ++-
 .../hbase/replication/TestReplicationBase.java  |    4 +-
 .../replication/TestReplicationSmallTests.java  |    4 +-
 .../replication/TestReplicationStateBasic.java  |   20 +-
 .../replication/TestReplicationSyncUpTool.java  |    4 +-
 .../TestReplicationTrackerZKImpl.java           |   10 +-
 .../replication/TestReplicationWithTags.java    |    4 +-
 .../replication/master/TestTableCFsUpdater.java |  164 +++
 .../TestReplicationSourceManager.java           |    2 +-
 ...sibilityLabelReplicationWithExpAsString.java |    5 +-
 .../TestVisibilityLabelsReplication.java        |    5 +-
 .../apache/hadoop/hbase/util/TestHBaseFsck.java |    5 +-
 .../src/main/ruby/hbase/replication_admin.rb    |   49 +-
 .../src/main/ruby/shell/commands/add_peer.rb    |    4 +-
 .../ruby/shell/commands/append_peer_tableCFs.rb |    2 +-
 .../src/main/ruby/shell/commands/list_peers.rb  |    6 +-
 .../ruby/shell/commands/remove_peer_tableCFs.rb |    4 +-
 .../ruby/shell/commands/set_peer_tableCFs.rb    |    4 +-
 .../hbase/client/TestReplicationShell.java      |    2 +-
 .../test/ruby/hbase/replication_admin_test.rb   |  118 +-
 36 files changed, 2167 insertions(+), 371 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index 1304396..9fca28b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
+import org.apache.hadoop.hbase.replication.ReplicationSerDeHelper;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 
@@ -184,8 +185,8 @@ public class ReplicationAdmin implements Closeable {
   @Deprecated
   public void addPeer(String id, String clusterKey, String tableCFs)
     throws ReplicationException {
-    this.replicationPeers.addPeer(id,
-      new ReplicationPeerConfig().setClusterKey(clusterKey), tableCFs);
+    this.addPeer(id, new ReplicationPeerConfig().setClusterKey(clusterKey),
+      parseTableCFsFromConfig(tableCFs));
   }
 
   /**
@@ -199,7 +200,19 @@ public class ReplicationAdmin implements Closeable {
    */
   public void addPeer(String id, ReplicationPeerConfig peerConfig,
       Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
-    this.replicationPeers.addPeer(id, peerConfig, getTableCfsStr(tableCfs));
+    if (tableCfs != null) {
+      peerConfig.setTableCFsMap(tableCfs);
+    }
+    this.replicationPeers.addPeer(id, peerConfig);
+  }
+
+  /**
+   * Add a new remote slave cluster for replication.
+   * @param id a short name that identifies the cluster
+   * @param peerConfig configuration for the replication slave cluster
+   */
+  public void addPeer(String id, ReplicationPeerConfig peerConfig) throws ReplicationException {
+    this.replicationPeers.addPeer(id, peerConfig);
   }
 
   public void updatePeerConfig(String id, ReplicationPeerConfig peerConfig)
@@ -208,52 +221,7 @@ public class ReplicationAdmin implements Closeable {
   }
 
   public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
-    if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
-      return null;
-    }
-
-    Map<TableName, List<String>> tableCFsMap = null;
-    // TODO: This should be a PB object rather than a String to be parsed!! See HBASE-11393
-    // parse out (table, cf-list) pairs from tableCFsConfig
-    // format: "table1:cf1,cf2;table2:cfA,cfB"
-    String[] tables = tableCFsConfig.split(";");
-    for (String tab : tables) {
-      // 1 ignore empty table config
-      tab = tab.trim();
-      if (tab.length() == 0) {
-        continue;
-      }
-      // 2 split to "table" and "cf1,cf2"
-      //   for each table: "table:cf1,cf2" or "table"
-      String[] pair = tab.split(":");
-      String tabName = pair[0].trim();
-      if (pair.length > 2 || tabName.length() == 0) {
-        LOG.error("ignore invalid tableCFs setting: " + tab);
-        continue;
-      }
-
-      // 3 parse "cf1,cf2" part to List<cf>
-      List<String> cfs = null;
-      if (pair.length == 2) {
-        String[] cfsList = pair[1].split(",");
-        for (String cf : cfsList) {
-          String cfName = cf.trim();
-          if (cfName.length() > 0) {
-            if (cfs == null) {
-              cfs = new ArrayList<String>();
-            }
-            cfs.add(cfName);
-          }
-        }
-      }
-
-      // 4 put <table, List<cf>> to map
-      if (tableCFsMap == null) {
-        tableCFsMap = new HashMap<TableName, List<String>>();
-      }
-      tableCFsMap.put(TableName.valueOf(tabName), cfs);
-    }
-    return tableCFsMap;
+    return ReplicationSerDeHelper.parseTableCFsFromConfig(tableCFsConfig);
   }
 
   @VisibleForTesting
@@ -338,7 +306,7 @@ public class ReplicationAdmin implements Closeable {
    * @param id a short name that identifies the cluster
    */
   public String getPeerTableCFs(String id) throws ReplicationException {
-    return this.replicationPeers.getPeerTableCFsConfig(id);
+    return ReplicationSerDeHelper.convertToString(this.replicationPeers.getPeerTableCFsConfig(id));
   }
 
   /**
@@ -348,7 +316,7 @@ public class ReplicationAdmin implements Closeable {
    */
   @Deprecated
   public void setPeerTableCFs(String id, String tableCFs) throws ReplicationException {
-    this.replicationPeers.setPeerTableCFsConfig(id, tableCFs);
+    this.setPeerTableCFs(id, parseTableCFsFromConfig(tableCFs));
   }
 
   /**
@@ -357,7 +325,7 @@ public class ReplicationAdmin implements Closeable {
    * @param tableCfs table-cfs config str
    */
   public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException {
-    appendPeerTableCFs(id, parseTableCFsFromConfig(tableCfs));
+    appendPeerTableCFs(id, ReplicationSerDeHelper.parseTableCFsFromConfig(tableCfs));
   }
 
   /**
@@ -370,7 +338,7 @@ public class ReplicationAdmin implements Closeable {
     if (tableCfs == null) {
       throw new ReplicationException("tableCfs is null");
     }
-    Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
+    Map<TableName, List<String>> preTableCfs = this.replicationPeers.getPeerTableCFsConfig(id);
     if (preTableCfs == null) {
       setPeerTableCFs(id, tableCfs);
       return;
@@ -406,7 +374,7 @@ public class ReplicationAdmin implements Closeable {
    * @throws ReplicationException
    */
   public void removePeerTableCFs(String id, String tableCf) throws ReplicationException {
-    removePeerTableCFs(id, parseTableCFsFromConfig(tableCf));
+    removePeerTableCFs(id, ReplicationSerDeHelper.parseTableCFsFromConfig(tableCf));
   }
 
   /**
@@ -421,7 +389,7 @@ public class ReplicationAdmin implements Closeable {
       throw new ReplicationException("tableCfs is null");
     }
 
-    Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
+    Map<TableName, List<String>> preTableCfs = this.replicationPeers.getPeerTableCFsConfig(id);
     if (preTableCfs == null) {
       throw new ReplicationException("Table-Cfs for peer" + id + " is null");
     }
@@ -464,7 +432,7 @@ public class ReplicationAdmin implements Closeable {
    */
   public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
       throws ReplicationException {
-    this.replicationPeers.setPeerTableCFsConfig(id, getTableCfsStr(tableCfs));
+    this.replicationPeers.setPeerTableCFsConfig(id, tableCfs);
   }
 
   /**
@@ -658,8 +626,8 @@ public class ReplicationAdmin implements Closeable {
       try {
         Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId);
         Configuration peerConf = pair.getSecond();
-        ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst(),
-            parseTableCFsFromConfig(this.getPeerTableCFs(peerId)));
+        ReplicationPeer peer = new ReplicationPeerZKImpl(zkw, pair.getSecond(),
+          peerId, pair.getFirst(), this.connection);
         listOfPeers.add(peer);
       } catch (ReplicationException e) {
         LOG.warn("Failed to get valid replication peers. "

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
index 043b38f..e2c7bc7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
@@ -18,10 +18,13 @@
 
 package org.apache.hadoop.hbase.replication;
 
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -37,7 +40,7 @@ public class ReplicationPeerConfig {
   private String replicationEndpointImpl;
   private final Map<byte[], byte[]> peerData;
   private final Map<String, String> configuration;
-
+  private Map<TableName, ? extends Collection<String>> tableCFsMap = null;
 
   public ReplicationPeerConfig() {
     this.peerData = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
@@ -78,10 +81,21 @@ public class ReplicationPeerConfig {
     return configuration;
   }
 
+  public Map<TableName, List<String>> getTableCFsMap() {
+    return (Map<TableName, List<String>>) tableCFsMap;
+  }
+
+  public void setTableCFsMap(Map<TableName, ? extends Collection<String>> tableCFsMap) {
+    this.tableCFsMap = tableCFsMap;
+  }
+
   @Override
   public String toString() {
     StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(",");
     builder.append("replicationEndpointImpl=").append(replicationEndpointImpl);
+    if (tableCFsMap != null) {
+      builder.append(tableCFsMap.toString());
+    }
     return builder.toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
index 6b10015..382545d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
@@ -42,7 +42,8 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
 
 @InterfaceAudience.Private
-public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closeable {
+public class ReplicationPeerZKImpl extends ReplicationStateZKBase implements ReplicationPeer,
+    Abortable, Closeable {
   private static final Log LOG = LogFactory.getLog(ReplicationPeerZKImpl.class);
 
   private ReplicationPeerConfig peerConfig;
@@ -52,8 +53,8 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea
   private final Configuration conf;
 
   private PeerStateTracker peerStateTracker;
-  private TableCFsTracker tableCFsTracker;
   private PeerConfigTracker peerConfigTracker;
+
   /**
    * Constructor that takes all the objects required to communicate with the specified peer, except
    * for the region server addresses.
@@ -61,39 +62,23 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea
    * @param id string representation of this peer's identifier
    * @param peerConfig configuration for the replication peer
    */
-  public ReplicationPeerZKImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig)
-      throws ReplicationException {
-    this.conf = conf;
-    this.peerConfig = peerConfig;
-    this.id = id;
-  }
-  
-  /**
-   * Constructor that takes all the objects required to communicate with the specified peer, except
-   * for the region server addresses.
-   * @param conf configuration object to this peer
-   * @param id string representation of this peer's identifier
-   * @param peerConfig configuration for the replication peer
-   * @param tableCFs table-cf configuration for this peer
-   */
-  public ReplicationPeerZKImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig,
-      Map<TableName, List<String>> tableCFs) throws ReplicationException {
+  public ReplicationPeerZKImpl(ZooKeeperWatcher zkWatcher, Configuration conf, String id,
+      ReplicationPeerConfig peerConfig, Abortable abortable) throws ReplicationException {
+    super(zkWatcher, conf, abortable);
     this.conf = conf;
     this.peerConfig = peerConfig;
     this.id = id;
-    this.tableCFs = tableCFs;
   }
 
   /**
    * start a state tracker to check whether this peer is enabled or not
    *
-   * @param zookeeper zk watcher for the local cluster
    * @param peerStateNode path to zk node which stores peer state
    * @throws KeeperException
    */
-  public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode)
+  public void startStateTracker(String peerStateNode)
       throws KeeperException {
-    ensurePeerEnabled(zookeeper, peerStateNode);
+    ensurePeerEnabled(peerStateNode);
     this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this);
     this.peerStateTracker.start();
     try {
@@ -112,25 +97,6 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea
 
   /**
    * start a table-cfs tracker to listen the (table, cf-list) map change
-   *
-   * @param zookeeper zk watcher for the local cluster
-   * @param tableCFsNode path to zk node which stores table-cfs
-   * @throws KeeperException
-   */
-  public void startTableCFsTracker(ZooKeeperWatcher zookeeper, String tableCFsNode)
-    throws KeeperException {
-    this.tableCFsTracker = new TableCFsTracker(tableCFsNode, zookeeper,
-        this);
-    this.tableCFsTracker.start();
-    this.readTableCFsZnode();
-  }
-
-  private void readTableCFsZnode() {
-    String currentTableCFs = Bytes.toString(tableCFsTracker.getData(false));
-    this.tableCFs = ReplicationAdmin.parseTableCFsFromConfig(currentTableCFs);
-  }
-  /**
-   * start a table-cfs tracker to listen the (table, cf-list) map change
    * @param zookeeper
    * @param peerConfigNode path to zk node which stores table-cfs
    * @throws KeeperException
@@ -154,6 +120,7 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea
     }
     return this.peerConfig;
   }
+
   @Override
   public PeerState getPeerState() {
     return peerState;
@@ -192,6 +159,7 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea
    */
   @Override
   public Map<TableName, List<String>> getTableCFs() {
+    this.tableCFs = peerConfig.getTableCFsMap();
     return this.tableCFs;
   }
 
@@ -260,7 +228,7 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea
    * @throws NodeExistsException
    * @throws KeeperException
    */
-  private static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
+  private boolean ensurePeerEnabled(final String path)
       throws NodeExistsException, KeeperException {
     if (ZKUtil.checkExists(zookeeper, path) == -1) {
       // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
@@ -297,32 +265,6 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea
   }
 
   /**
-   * Tracker for (table, cf-list) map of this peer
-   */
-  public class TableCFsTracker extends ZooKeeperNodeTracker {
-
-    public TableCFsTracker(String tableCFsZNode, ZooKeeperWatcher watcher,
-        Abortable abortable) {
-      super(watcher, tableCFsZNode, abortable);
-    }
-    
-    @Override
-    public synchronized void nodeCreated(String path) {
-      if (path.equals(node)) {
-        super.nodeCreated(path);
-        readTableCFsZnode();
-      }
-    }
-
-    @Override
-    public synchronized void nodeDataChanged(String path) {
-      if (path.equals(node)) {
-        super.nodeDataChanged(path);
-      }
-    }
-  }
-
-  /**
    * Tracker for PeerConfigNode of this peer
    */
   public class PeerConfigTracker extends ZooKeeperNodeTracker {

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
index b8d04b4..37d157a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hadoop.hbase.replication;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -49,10 +50,8 @@ public interface ReplicationPeers {
    * Add a new remote slave cluster for replication.
    * @param peerId a short that identifies the cluster
    * @param peerConfig configuration for the replication slave cluster
-   * @param tableCFs the table and column-family list which will be replicated for this peer or null
-   *          for all table and column families
    */
-  void addPeer(String peerId, ReplicationPeerConfig peerConfig, String tableCFs)
+  void addPeer(String peerId, ReplicationPeerConfig peerConfig)
       throws ReplicationException;
 
   /**
@@ -78,17 +77,19 @@ public interface ReplicationPeers {
   void disablePeer(String peerId) throws ReplicationException;
 
   /**
-   * Get the table and column-family list string of the peer from ZK.
+   * Get the table and column-family list of the peer from ZK.
    * @param peerId a short that identifies the cluster
    */
-  public String getPeerTableCFsConfig(String peerId) throws ReplicationException;
+  public Map<TableName, List<String>> getPeerTableCFsConfig(String peerId)
+      throws ReplicationException;
 
   /**
-   * Set the table and column-family list string of the peer to ZK.
+   * Set the table and column-family list of the peer to ZK.
    * @param peerId a short that identifies the cluster
    * @param tableCFs the table and column-family list which will be replicated for this peer
    */
-  public void setPeerTableCFsConfig(String peerId, String tableCFs) throws ReplicationException;
+  public void setPeerTableCFsConfig(String peerId,
+      Map<TableName, ? extends Collection<String>> tableCFs) throws ReplicationException;
 
   /**
    * Get the table and column-family-list map of the peer.

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index d717b0b..bb9842b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -78,15 +79,15 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
 
   // Map of peer clusters keyed by their id
   private Map<String, ReplicationPeerZKImpl> peerClusters;
-  private final String tableCFsNodeName;
   private final ReplicationQueuesClient queuesClient;
+  private Abortable abortable;
 
   private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class);
 
   public ReplicationPeersZKImpl(final ZooKeeperWatcher zk, final Configuration conf,
       final ReplicationQueuesClient queuesClient, Abortable abortable) {
     super(zk, conf, abortable);
-    this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs");
+    this.abortable = abortable;
     this.peerClusters = new ConcurrentHashMap<String, ReplicationPeerZKImpl>();
     this.queuesClient = queuesClient;
   }
@@ -104,7 +105,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
   }
 
   @Override
-  public void addPeer(String id, ReplicationPeerConfig peerConfig, String tableCFs)
+  public void addPeer(String id, ReplicationPeerConfig peerConfig)
       throws ReplicationException {
     try {
       if (peerExists(id)) {
@@ -129,18 +130,15 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
       ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
 
       List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
-      ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(this.peersZNode, id),
+      ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(getPeerNode(id),
         ReplicationSerDeHelper.toByteArray(peerConfig));
       // There is a race (if hbase.zookeeper.useMulti is false)
       // b/w PeerWatcher and ReplicationZookeeper#add method to create the
       // peer-state znode. This happens while adding a peer
       // The peer state data is set as "ENABLED" by default.
       ZKUtilOp op2 = ZKUtilOp.createAndFailSilent(getPeerStateNode(id), ENABLED_ZNODE_BYTES);
-      String tableCFsStr = (tableCFs == null) ? "" : tableCFs;
-      ZKUtilOp op3 = ZKUtilOp.createAndFailSilent(getTableCFsNode(id), Bytes.toBytes(tableCFsStr));
       listOfOps.add(op1);
       listOfOps.add(op2);
-      listOfOps.add(op3);
       ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
       // A peer is enabled by default
     } catch (KeeperException e) {
@@ -175,13 +173,17 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
   }
 
   @Override
-  public String getPeerTableCFsConfig(String id) throws ReplicationException {
+  public Map<TableName, List<String>> getPeerTableCFsConfig(String id) throws ReplicationException {
     try {
       if (!peerExists(id)) {
         throw new IllegalArgumentException("peer " + id + " doesn't exist");
       }
       try {
-        return Bytes.toString(ZKUtil.getData(this.zookeeper, getTableCFsNode(id)));
+        ReplicationPeerConfig rpc = getReplicationPeerConfig(id);
+        if (rpc == null) {
+          throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id);
+        }
+        return rpc.getTableCFsMap();
       } catch (Exception e) {
         throw new ReplicationException(e);
       }
@@ -191,20 +193,22 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
   }
 
   @Override
-  public void setPeerTableCFsConfig(String id, String tableCFsStr) throws ReplicationException {
+  public void setPeerTableCFsConfig(String id,
+      Map<TableName, ? extends Collection<String>> tableCFs) throws ReplicationException {
     try {
       if (!peerExists(id)) {
         throw new IllegalArgumentException("Cannot set peer tableCFs because id=" + id
             + " does not exist.");
       }
-      String tableCFsZKNode = getTableCFsNode(id);
-      byte[] tableCFs = Bytes.toBytes(tableCFsStr);
-      if (ZKUtil.checkExists(this.zookeeper, tableCFsZKNode) != -1) {
-        ZKUtil.setData(this.zookeeper, tableCFsZKNode, tableCFs);
-      } else {
-        ZKUtil.createAndWatch(this.zookeeper, tableCFsZKNode, tableCFs);
+      ReplicationPeerConfig rpc = getReplicationPeerConfig(id);
+      if (rpc == null) {
+        throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id);
       }
-      LOG.info("Peer tableCFs with id= " + id + " is now " + tableCFsStr);
+      rpc.setTableCFsMap(tableCFs);
+      ZKUtil.setData(this.zookeeper, getPeerNode(id),
+          ReplicationSerDeHelper.toByteArray(rpc));
+      LOG.info("Peer tableCFs with id= " + id + " is now "
+          + ReplicationSerDeHelper.convertToString(tableCFs));
     } catch (KeeperException e) {
       throw new ReplicationException("Unable to change tableCFs of the peer with id=" + id, e);
     }
@@ -289,7 +293,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
   @Override
   public ReplicationPeerConfig getReplicationPeerConfig(String peerId)
       throws ReplicationException {
-    String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
+    String znode = getPeerNode(peerId);
     byte[] data = null;
     try {
       data = ZKUtil.getData(this.zookeeper, znode);
@@ -458,14 +462,6 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
     return true;
   }
 
-  private String getTableCFsNode(String id) {
-    return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.tableCFsNodeName));
-  }
-
-  private String getPeerStateNode(String id) {
-    return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
-  }
-
   /**
    * Update the state znode of a peer cluster.
    * @param id
@@ -506,22 +502,16 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
     }
     Configuration peerConf = pair.getSecond();
 
-    ReplicationPeerZKImpl peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst());
+    ReplicationPeerZKImpl peer = new ReplicationPeerZKImpl(zookeeper, peerConf, peerId,
+        pair.getFirst(), abortable);
     try {
-      peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
+      peer.startStateTracker(getPeerStateNode(peerId));
     } catch (KeeperException e) {
       throw new ReplicationException("Error starting the peer state tracker for peerId=" +
           peerId, e);
     }
 
     try {
-      peer.startTableCFsTracker(this.zookeeper, this.getTableCFsNode(peerId));
-    } catch (KeeperException e) {
-      throw new ReplicationException("Error starting the peer tableCFs tracker for peerId=" +
-          peerId, e);
-    }
-
-    try {
       peer.startPeerConfigTracker(this.zookeeper, this.getPeerNode(peerId));
     }
     catch(KeeperException e) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSerDeHelper.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSerDeHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSerDeHelper.java
index 05f909d..cdb95f7f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSerDeHelper.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSerDeHelper.java
@@ -19,8 +19,10 @@
 package org.apache.hadoop.hbase.replication;
 
 import com.google.protobuf.ByteString;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
@@ -28,8 +30,13 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Strings;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 @InterfaceAudience.Private
@@ -39,6 +46,175 @@ public final class ReplicationSerDeHelper {
 
   private ReplicationSerDeHelper() {}
 
+  /** convert map to TableCFs Object */
+  public static ZooKeeperProtos.TableCF[] convert(
+      Map<TableName, ? extends Collection<String>> tableCfs) {
+    if (tableCfs == null) {
+      return null;
+    }
+    List<ZooKeeperProtos.TableCF> tableCFList = new ArrayList<>();
+    ZooKeeperProtos.TableCF.Builder tableCFBuilder =  ZooKeeperProtos.TableCF.newBuilder();
+    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
+      tableCFBuilder.clear();
+      tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(entry.getKey()));
+      Collection<String> v = entry.getValue();
+      if (v != null && !v.isEmpty()) {
+        for (String value : entry.getValue()) {
+          tableCFBuilder.addFamilies(ByteString.copyFromUtf8(value));
+        }
+      }
+      tableCFList.add(tableCFBuilder.build());
+    }
+    return tableCFList.toArray(new ZooKeeperProtos.TableCF[tableCFList.size()]);
+  }
+
+  public static String convertToString(Map<TableName, ? extends Collection<String>> tableCfs) {
+    if (tableCfs == null) {
+      return null;
+    }
+    return convert(convert(tableCfs));
+  }
+
+  /**
+   *  Convert string to TableCFs Object.
+   *  This is only for read TableCFs information from TableCF node.
+   *  Input String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;ns3.table3.
+   * */
+  public static ZooKeeperProtos.TableCF[] convert(String tableCFsConfig) {
+    if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
+      return null;
+    }
+    List<ZooKeeperProtos.TableCF> tableCFList = new ArrayList<>();
+    ZooKeeperProtos.TableCF.Builder tableCFBuilder = ZooKeeperProtos.TableCF.newBuilder();
+
+    String[] tables = tableCFsConfig.split(";");
+    for (String tab : tables) {
+      // 1 ignore empty table config
+      tab = tab.trim();
+      if (tab.length() == 0) {
+        continue;
+      }
+      // 2 split to "table" and "cf1,cf2"
+      //   for each table: "table#cf1,cf2" or "table"
+      String[] pair = tab.split(":");
+      String tabName = pair[0].trim();
+      if (pair.length > 2 || tabName.length() == 0) {
+        LOG.info("incorrect format:" + tableCFsConfig);
+        continue;
+      }
+
+      tableCFBuilder.clear();
+      // split namespace from tableName
+      String ns = "default";
+      String tName = tabName;
+      String[] dbs = tabName.split("\\.");
+      if (dbs != null && dbs.length == 2) {
+        ns = dbs[0];
+        tName = dbs[1];
+      }
+      tableCFBuilder.setTableName(
+        ProtobufUtil.toProtoTableName(TableName.valueOf(ns, tName)));
+
+      // 3 parse "cf1,cf2" part to List<cf>
+      if (pair.length == 2) {
+        String[] cfsList = pair[1].split(",");
+        for (String cf : cfsList) {
+          String cfName = cf.trim();
+          if (cfName.length() > 0) {
+            tableCFBuilder.addFamilies(ByteString.copyFromUtf8(cfName));
+          }
+        }
+      }
+      tableCFList.add(tableCFBuilder.build());
+    }
+    return tableCFList.toArray(new ZooKeeperProtos.TableCF[tableCFList.size()]);
+  }
+
+  /**
+   *  Convert TableCFs Object to String.
+   *  Output String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;table3
+   * */
+  public static String convert(ZooKeeperProtos.TableCF[] tableCFs) {
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0, n = tableCFs.length; i < n; i++) {
+      ZooKeeperProtos.TableCF tableCF = tableCFs[i];
+      String namespace = tableCF.getTableName().getNamespace().toStringUtf8();
+      if (!Strings.isEmpty(namespace)) {
+        sb.append(namespace).append(".").
+            append(tableCF.getTableName().getQualifier().toStringUtf8())
+            .append(":");
+      } else {
+        sb.append(tableCF.getTableName().toString()).append(":");
+      }
+      for (int j = 0; j < tableCF.getFamiliesCount(); j++) {
+        sb.append(tableCF.getFamilies(j).toStringUtf8()).append(",");
+      }
+      sb.deleteCharAt(sb.length() - 1).append(";");
+    }
+    if (sb.length() > 0) {
+      sb.deleteCharAt(sb.length() - 1);
+    }
+    return sb.toString();
+  }
+
+  /**
+   *  Get TableCF in TableCFs, if not exist, return null.
+   * */
+  public static ZooKeeperProtos.TableCF getTableCF(ZooKeeperProtos.TableCF[] tableCFs,
+                                           String table) {
+    for (int i = 0, n = tableCFs.length; i < n; i++) {
+      ZooKeeperProtos.TableCF tableCF = tableCFs[i];
+      if (tableCF.getTableName().getQualifier().toStringUtf8().equals(table)) {
+        return tableCF;
+      }
+    }
+    return null;
+  }
+
+  /**
+   *  Parse bytes into TableCFs.
+   *  It is used for backward compatibility.
+   *  Old format bytes have no PB_MAGIC Header
+   * */
+  public static ZooKeeperProtos.TableCF[] parseTableCFs(byte[] bytes) throws IOException {
+    if (bytes == null) {
+      return null;
+    }
+    return ReplicationSerDeHelper.convert(Bytes.toString(bytes));
+  }
+
+  /**
+   *  Convert tableCFs string into Map.
+   * */
+  public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
+    ZooKeeperProtos.TableCF[] tableCFs = convert(tableCFsConfig);
+    return convert2Map(tableCFs);
+  }
+
+  /**
+   *  Convert tableCFs Object to Map.
+   * */
+  public static Map<TableName, List<String>> convert2Map(ZooKeeperProtos.TableCF[] tableCFs) {
+    if (tableCFs == null || tableCFs.length == 0) {
+      return null;
+    }
+    Map<TableName, List<String>> tableCFsMap = new HashMap<TableName, List<String>>();
+    for (int i = 0, n = tableCFs.length; i < n; i++) {
+      ZooKeeperProtos.TableCF tableCF = tableCFs[i];
+      List<String> families = new ArrayList<>();
+      for (int j = 0, m = tableCF.getFamiliesCount(); j < m; j++) {
+        families.add(tableCF.getFamilies(j).toStringUtf8());
+      }
+      if (families.size() > 0) {
+        tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), families);
+      } else {
+        tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), null);
+      }
+    }
+
+    return tableCFsMap;
+  }
+
   /**
    * @param bytes Content of a peer znode.
    * @return ClusterKey parsed from the passed bytes.
@@ -82,6 +258,12 @@ public final class ReplicationSerDeHelper {
     for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) {
       peerConfig.getConfiguration().put(pair.getName(), pair.getValue());
     }
+
+    Map<TableName, ? extends Collection<String>> tableCFsMap = convert2Map(
+      peer.getTableCfsList().toArray(new ZooKeeperProtos.TableCF[peer.getTableCfsCount()]));
+    if (tableCFsMap != null) {
+      peerConfig.setTableCFsMap(tableCFsMap);
+    }
     return peerConfig;
   }
 
@@ -119,6 +301,13 @@ public final class ReplicationSerDeHelper {
           .build());
     }
 
+    ZooKeeperProtos.TableCF[] tableCFs = convert(peerConfig.getTableCFsMap());
+    if (tableCFs != null) {
+      for (int i = 0; i < tableCFs.length; i++) {
+        builder.addTableCfs(tableCFs[i]);
+      }
+    }
+
     return builder.build();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
index ed9359d..d0c3513 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
 
+import com.google.common.annotations.VisibleForTesting;
+
 
 /**
  * This is a base class for maintaining replication state in zookeeper.
@@ -52,6 +54,9 @@ public abstract class ReplicationStateZKBase {
   protected final String hfileRefsZNode;
   /** The cluster key of the local cluster */
   protected final String ourClusterKey;
+  /** The name of the znode that contains tableCFs */
+  protected final String tableCFsNodeName;
+
   protected final ZooKeeperWatcher zookeeper;
   protected final Configuration conf;
   protected final Abortable abortable;
@@ -77,6 +82,7 @@ public abstract class ReplicationStateZKBase {
     String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
       ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
     this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
+    this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs");
     this.ourClusterKey = ZKConfig.getZooKeeperClusterKey(this.conf);
     this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
     this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
@@ -119,6 +125,17 @@ public abstract class ReplicationStateZKBase {
     return path.split("/").length == peersZNode.split("/").length + 1;
   }
 
+  @VisibleForTesting
+  protected String getTableCFsNode(String id) {
+    return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.tableCFsNodeName));
+  }
+
+  @VisibleForTesting
+  protected String getPeerStateNode(String id) {
+    return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
+  }
+
+  @VisibleForTesting
   protected String getPeerNode(String id) {
     return ZKUtil.joinZNode(this.peersZNode, id);
   }


[2/3] hbase git commit: HBASE-16653 Backport HBASE-11393 to branches which support namespace

Posted by ch...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
index 09479c4..955995f 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
@@ -5032,6 +5032,719 @@ public final class ZooKeeperProtos {
     // @@protoc_insertion_point(class_scope:hbase.pb.Table)
   }
 
+  public interface TableCFOrBuilder
+      extends com.google.protobuf.MessageOrBuilder {
+
+    // optional .hbase.pb.TableName table_name = 1;
+    /**
+     * <code>optional .hbase.pb.TableName table_name = 1;</code>
+     */
+    boolean hasTableName();
+    /**
+     * <code>optional .hbase.pb.TableName table_name = 1;</code>
+     */
+    org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableName getTableName();
+    /**
+     * <code>optional .hbase.pb.TableName table_name = 1;</code>
+     */
+    org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableNameOrBuilder getTableNameOrBuilder();
+
+    // repeated bytes families = 2;
+    /**
+     * <code>repeated bytes families = 2;</code>
+     */
+    java.util.List<com.google.protobuf.ByteString> getFamiliesList();
+    /**
+     * <code>repeated bytes families = 2;</code>
+     */
+    int getFamiliesCount();
+    /**
+     * <code>repeated bytes families = 2;</code>
+     */
+    com.google.protobuf.ByteString getFamilies(int index);
+  }
+  /**
+   * Protobuf type {@code hbase.pb.TableCF}
+   */
+  public static final class TableCF extends
+      com.google.protobuf.GeneratedMessage
+      implements TableCFOrBuilder {
+    // Use TableCF.newBuilder() to construct.
+    private TableCF(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
+      super(builder);
+      this.unknownFields = builder.getUnknownFields();
+    }
+    private TableCF(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+    private static final TableCF defaultInstance;
+    public static TableCF getDefaultInstance() {
+      return defaultInstance;
+    }
+
+    public TableCF getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+
+    private final com.google.protobuf.UnknownFieldSet unknownFields;
+    @java.lang.Override
+    public final com.google.protobuf.UnknownFieldSet
+        getUnknownFields() {
+      return this.unknownFields;
+    }
+    private TableCF(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      initFields();
+      int mutable_bitField0_ = 0;
+      com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+          com.google.protobuf.UnknownFieldSet.newBuilder();
+      try {
+        boolean done = false;
+        while (!done) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              done = true;
+              break;
+            default: {
+              if (!parseUnknownField(input, unknownFields,
+                                     extensionRegistry, tag)) {
+                done = true;
+              }
+              break;
+            }
+            case 10: {
+              org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableName.Builder subBuilder = null;
+              if (((bitField0_ & 0x00000001) == 0x00000001)) {
+                subBuilder = tableName_.toBuilder();
+              }
+              tableName_ = input.readMessage(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableName.PARSER, extensionRegistry);
+              if (subBuilder != null) {
+                subBuilder.mergeFrom(tableName_);
+                tableName_ = subBuilder.buildPartial();
+              }
+              bitField0_ |= 0x00000001;
+              break;
+            }
+            case 18: {
+              if (!((mutable_bitField0_ & 0x00000002) == 0x00000002)) {
+                families_ = new java.util.ArrayList<com.google.protobuf.ByteString>();
+                mutable_bitField0_ |= 0x00000002;
+              }
+              families_.add(input.readBytes());
+              break;
+            }
+          }
+        }
+      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+        throw e.setUnfinishedMessage(this);
+      } catch (java.io.IOException e) {
+        throw new com.google.protobuf.InvalidProtocolBufferException(
+            e.getMessage()).setUnfinishedMessage(this);
+      } finally {
+        if (((mutable_bitField0_ & 0x00000002) == 0x00000002)) {
+          families_ = java.util.Collections.unmodifiableList(families_);
+        }
+        this.unknownFields = unknownFields.build();
+        makeExtensionsImmutable();
+      }
+    }
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_hbase_pb_TableCF_descriptor;
+    }
+
+    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_hbase_pb_TableCF_fieldAccessorTable
+          .ensureFieldAccessorsInitialized(
+              org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF.class, org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF.Builder.class);
+    }
+
+    public static com.google.protobuf.Parser<TableCF> PARSER =
+        new com.google.protobuf.AbstractParser<TableCF>() {
+      public TableCF parsePartialFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return new TableCF(input, extensionRegistry);
+      }
+    };
+
+    @java.lang.Override
+    public com.google.protobuf.Parser<TableCF> getParserForType() {
+      return PARSER;
+    }
+
+    private int bitField0_;
+    // optional .hbase.pb.TableName table_name = 1;
+    public static final int TABLE_NAME_FIELD_NUMBER = 1;
+    private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableName tableName_;
+    /**
+     * <code>optional .hbase.pb.TableName table_name = 1;</code>
+     */
+    public boolean hasTableName() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    /**
+     * <code>optional .hbase.pb.TableName table_name = 1;</code>
+     */
+    public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableName getTableName() {
+      return tableName_;
+    }
+    /**
+     * <code>optional .hbase.pb.TableName table_name = 1;</code>
+     */
+    public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableNameOrBuilder getTableNameOrBuilder() {
+      return tableName_;
+    }
+
+    // repeated bytes families = 2;
+    public static final int FAMILIES_FIELD_NUMBER = 2;
+    private java.util.List<com.google.protobuf.ByteString> families_;
+    /**
+     * <code>repeated bytes families = 2;</code>
+     */
+    public java.util.List<com.google.protobuf.ByteString>
+        getFamiliesList() {
+      return families_;
+    }
+    /**
+     * <code>repeated bytes families = 2;</code>
+     */
+    public int getFamiliesCount() {
+      return families_.size();
+    }
+    /**
+     * <code>repeated bytes families = 2;</code>
+     */
+    public com.google.protobuf.ByteString getFamilies(int index) {
+      return families_.get(index);
+    }
+
+    private void initFields() {
+      tableName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableName.getDefaultInstance();
+      families_ = java.util.Collections.emptyList();
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+
+      if (hasTableName()) {
+        if (!getTableName().isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeMessage(1, tableName_);
+      }
+      for (int i = 0; i < families_.size(); i++) {
+        output.writeBytes(2, families_.get(i));
+      }
+      getUnknownFields().writeTo(output);
+    }
+
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(1, tableName_);
+      }
+      {
+        int dataSize = 0;
+        for (int i = 0; i < families_.size(); i++) {
+          dataSize += com.google.protobuf.CodedOutputStream
+            .computeBytesSizeNoTag(families_.get(i));
+        }
+        size += dataSize;
+        size += 1 * getFamiliesList().size();
+      }
+      size += getUnknownFields().getSerializedSize();
+      memoizedSerializedSize = size;
+      return size;
+    }
+
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+
+    @java.lang.Override
+    public boolean equals(final java.lang.Object obj) {
+      if (obj == this) {
+       return true;
+      }
+      if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF)) {
+        return super.equals(obj);
+      }
+      org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF other = (org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF) obj;
+
+      boolean result = true;
+      result = result && (hasTableName() == other.hasTableName());
+      if (hasTableName()) {
+        result = result && getTableName()
+            .equals(other.getTableName());
+      }
+      result = result && getFamiliesList()
+          .equals(other.getFamiliesList());
+      result = result &&
+          getUnknownFields().equals(other.getUnknownFields());
+      return result;
+    }
+
+    private int memoizedHashCode = 0;
+    @java.lang.Override
+    public int hashCode() {
+      if (memoizedHashCode != 0) {
+        return memoizedHashCode;
+      }
+      int hash = 41;
+      hash = (19 * hash) + getDescriptorForType().hashCode();
+      if (hasTableName()) {
+        hash = (37 * hash) + TABLE_NAME_FIELD_NUMBER;
+        hash = (53 * hash) + getTableName().hashCode();
+      }
+      if (getFamiliesCount() > 0) {
+        hash = (37 * hash) + FAMILIES_FIELD_NUMBER;
+        hash = (53 * hash) + getFamiliesList().hashCode();
+      }
+      hash = (29 * hash) + getUnknownFields().hashCode();
+      memoizedHashCode = hash;
+      return hash;
+    }
+
+    public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input, extensionRegistry);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+
+    @java.lang.Override
+    protected Builder newBuilderForType(
+        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+      Builder builder = new Builder(parent);
+      return builder;
+    }
+    /**
+     * Protobuf type {@code hbase.pb.TableCF}
+     */
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessage.Builder<Builder>
+       implements org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCFOrBuilder {
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_hbase_pb_TableCF_descriptor;
+      }
+
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_hbase_pb_TableCF_fieldAccessorTable
+            .ensureFieldAccessorsInitialized(
+                org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF.class, org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF.Builder.class);
+      }
+
+      // Construct using org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+
+      private Builder(
+          com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+        super(parent);
+        maybeForceBuilderInitialization();
+      }
+      private void maybeForceBuilderInitialization() {
+        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+          getTableNameFieldBuilder();
+        }
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+
+      public Builder clear() {
+        super.clear();
+        if (tableNameBuilder_ == null) {
+          tableName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableName.getDefaultInstance();
+        } else {
+          tableNameBuilder_.clear();
+        }
+        bitField0_ = (bitField0_ & ~0x00000001);
+        families_ = java.util.Collections.emptyList();
+        bitField0_ = (bitField0_ & ~0x00000002);
+        return this;
+      }
+
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+
+      public com.google.protobuf.Descriptors.Descriptor
+          getDescriptorForType() {
+        return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_hbase_pb_TableCF_descriptor;
+      }
+
+      public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF getDefaultInstanceForType() {
+        return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF.getDefaultInstance();
+      }
+
+      public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF build() {
+        org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+
+      public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF buildPartial() {
+        org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF result = new org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        if (tableNameBuilder_ == null) {
+          result.tableName_ = tableName_;
+        } else {
+          result.tableName_ = tableNameBuilder_.build();
+        }
+        if (((bitField0_ & 0x00000002) == 0x00000002)) {
+          families_ = java.util.Collections.unmodifiableList(families_);
+          bitField0_ = (bitField0_ & ~0x00000002);
+        }
+        result.families_ = families_;
+        result.bitField0_ = to_bitField0_;
+        onBuilt();
+        return result;
+      }
+
+      public Builder mergeFrom(com.google.protobuf.Message other) {
+        if (other instanceof org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF) {
+          return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF)other);
+        } else {
+          super.mergeFrom(other);
+          return this;
+        }
+      }
+
+      public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF other) {
+        if (other == org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF.getDefaultInstance()) return this;
+        if (other.hasTableName()) {
+          mergeTableName(other.getTableName());
+        }
+        if (!other.families_.isEmpty()) {
+          if (families_.isEmpty()) {
+            families_ = other.families_;
+            bitField0_ = (bitField0_ & ~0x00000002);
+          } else {
+            ensureFamiliesIsMutable();
+            families_.addAll(other.families_);
+          }
+          onChanged();
+        }
+        this.mergeUnknownFields(other.getUnknownFields());
+        return this;
+      }
+
+      public final boolean isInitialized() {
+        if (hasTableName()) {
+          if (!getTableName().isInitialized()) {
+            
+            return false;
+          }
+        }
+        return true;
+      }
+
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF parsedMessage = null;
+        try {
+          parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+          parsedMessage = (org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF) e.getUnfinishedMessage();
+          throw e;
+        } finally {
+          if (parsedMessage != null) {
+            mergeFrom(parsedMessage);
+          }
+        }
+        return this;
+      }
+      private int bitField0_;
+
+      // optional .hbase.pb.TableName table_name = 1;
+      private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableName tableName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableName.getDefaultInstance();
+      private com.google.protobuf.SingleFieldBuilder<
+          org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableNameOrBuilder> tableNameBuilder_;
+      /**
+       * <code>optional .hbase.pb.TableName table_name = 1;</code>
+       */
+      public boolean hasTableName() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      /**
+       * <code>optional .hbase.pb.TableName table_name = 1;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableName getTableName() {
+        if (tableNameBuilder_ == null) {
+          return tableName_;
+        } else {
+          return tableNameBuilder_.getMessage();
+        }
+      }
+      /**
+       * <code>optional .hbase.pb.TableName table_name = 1;</code>
+       */
+      public Builder setTableName(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableName value) {
+        if (tableNameBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          tableName_ = value;
+          onChanged();
+        } else {
+          tableNameBuilder_.setMessage(value);
+        }
+        bitField0_ |= 0x00000001;
+        return this;
+      }
+      /**
+       * <code>optional .hbase.pb.TableName table_name = 1;</code>
+       */
+      public Builder setTableName(
+          org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableName.Builder builderForValue) {
+        if (tableNameBuilder_ == null) {
+          tableName_ = builderForValue.build();
+          onChanged();
+        } else {
+          tableNameBuilder_.setMessage(builderForValue.build());
+        }
+        bitField0_ |= 0x00000001;
+        return this;
+      }
+      /**
+       * <code>optional .hbase.pb.TableName table_name = 1;</code>
+       */
+      public Builder mergeTableName(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableName value) {
+        if (tableNameBuilder_ == null) {
+          if (((bitField0_ & 0x00000001) == 0x00000001) &&
+              tableName_ != org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableName.getDefaultInstance()) {
+            tableName_ =
+              org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableName.newBuilder(tableName_).mergeFrom(value).buildPartial();
+          } else {
+            tableName_ = value;
+          }
+          onChanged();
+        } else {
+          tableNameBuilder_.mergeFrom(value);
+        }
+        bitField0_ |= 0x00000001;
+        return this;
+      }
+      /**
+       * <code>optional .hbase.pb.TableName table_name = 1;</code>
+       */
+      public Builder clearTableName() {
+        if (tableNameBuilder_ == null) {
+          tableName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableName.getDefaultInstance();
+          onChanged();
+        } else {
+          tableNameBuilder_.clear();
+        }
+        bitField0_ = (bitField0_ & ~0x00000001);
+        return this;
+      }
+      /**
+       * <code>optional .hbase.pb.TableName table_name = 1;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableName.Builder getTableNameBuilder() {
+        bitField0_ |= 0x00000001;
+        onChanged();
+        return getTableNameFieldBuilder().getBuilder();
+      }
+      /**
+       * <code>optional .hbase.pb.TableName table_name = 1;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableNameOrBuilder getTableNameOrBuilder() {
+        if (tableNameBuilder_ != null) {
+          return tableNameBuilder_.getMessageOrBuilder();
+        } else {
+          return tableName_;
+        }
+      }
+      /**
+       * <code>optional .hbase.pb.TableName table_name = 1;</code>
+       */
+      private com.google.protobuf.SingleFieldBuilder<
+          org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableNameOrBuilder> 
+          getTableNameFieldBuilder() {
+        if (tableNameBuilder_ == null) {
+          tableNameBuilder_ = new com.google.protobuf.SingleFieldBuilder<
+              org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableNameOrBuilder>(
+                  tableName_,
+                  getParentForChildren(),
+                  isClean());
+          tableName_ = null;
+        }
+        return tableNameBuilder_;
+      }
+
+      // repeated bytes families = 2;
+      private java.util.List<com.google.protobuf.ByteString> families_ = java.util.Collections.emptyList();
+      private void ensureFamiliesIsMutable() {
+        if (!((bitField0_ & 0x00000002) == 0x00000002)) {
+          families_ = new java.util.ArrayList<com.google.protobuf.ByteString>(families_);
+          bitField0_ |= 0x00000002;
+         }
+      }
+      /**
+       * <code>repeated bytes families = 2;</code>
+       */
+      public java.util.List<com.google.protobuf.ByteString>
+          getFamiliesList() {
+        return java.util.Collections.unmodifiableList(families_);
+      }
+      /**
+       * <code>repeated bytes families = 2;</code>
+       */
+      public int getFamiliesCount() {
+        return families_.size();
+      }
+      /**
+       * <code>repeated bytes families = 2;</code>
+       */
+      public com.google.protobuf.ByteString getFamilies(int index) {
+        return families_.get(index);
+      }
+      /**
+       * <code>repeated bytes families = 2;</code>
+       */
+      public Builder setFamilies(
+          int index, com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  ensureFamiliesIsMutable();
+        families_.set(index, value);
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>repeated bytes families = 2;</code>
+       */
+      public Builder addFamilies(com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  ensureFamiliesIsMutable();
+        families_.add(value);
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>repeated bytes families = 2;</code>
+       */
+      public Builder addAllFamilies(
+          java.lang.Iterable<? extends com.google.protobuf.ByteString> values) {
+        ensureFamiliesIsMutable();
+        super.addAll(values, families_);
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>repeated bytes families = 2;</code>
+       */
+      public Builder clearFamilies() {
+        families_ = java.util.Collections.emptyList();
+        bitField0_ = (bitField0_ & ~0x00000002);
+        onChanged();
+        return this;
+      }
+
+      // @@protoc_insertion_point(builder_scope:hbase.pb.TableCF)
+    }
+
+    static {
+      defaultInstance = new TableCF(true);
+      defaultInstance.initFields();
+    }
+
+    // @@protoc_insertion_point(class_scope:hbase.pb.TableCF)
+  }
+
   public interface ReplicationPeerOrBuilder
       extends com.google.protobuf.MessageOrBuilder {
 
@@ -5129,6 +5842,31 @@ public final class ZooKeeperProtos {
      */
     org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPairOrBuilder getConfigurationOrBuilder(
         int index);
+
+    // repeated .hbase.pb.TableCF table_cfs = 5;
+    /**
+     * <code>repeated .hbase.pb.TableCF table_cfs = 5;</code>
+     */
+    java.util.List<org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF> 
+        getTableCfsList();
+    /**
+     * <code>repeated .hbase.pb.TableCF table_cfs = 5;</code>
+     */
+    org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF getTableCfs(int index);
+    /**
+     * <code>repeated .hbase.pb.TableCF table_cfs = 5;</code>
+     */
+    int getTableCfsCount();
+    /**
+     * <code>repeated .hbase.pb.TableCF table_cfs = 5;</code>
+     */
+    java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCFOrBuilder> 
+        getTableCfsOrBuilderList();
+    /**
+     * <code>repeated .hbase.pb.TableCF table_cfs = 5;</code>
+     */
+    org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCFOrBuilder getTableCfsOrBuilder(
+        int index);
   }
   /**
    * Protobuf type {@code hbase.pb.ReplicationPeer}
@@ -5212,6 +5950,14 @@ public final class ZooKeeperProtos {
               configuration_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.PARSER, extensionRegistry));
               break;
             }
+            case 42: {
+              if (!((mutable_bitField0_ & 0x00000010) == 0x00000010)) {
+                tableCfs_ = new java.util.ArrayList<org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF>();
+                mutable_bitField0_ |= 0x00000010;
+              }
+              tableCfs_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF.PARSER, extensionRegistry));
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -5226,6 +5972,9 @@ public final class ZooKeeperProtos {
         if (((mutable_bitField0_ & 0x00000008) == 0x00000008)) {
           configuration_ = java.util.Collections.unmodifiableList(configuration_);
         }
+        if (((mutable_bitField0_ & 0x00000010) == 0x00000010)) {
+          tableCfs_ = java.util.Collections.unmodifiableList(tableCfs_);
+        }
         this.unknownFields = unknownFields.build();
         makeExtensionsImmutable();
       }
@@ -5431,11 +6180,48 @@ public final class ZooKeeperProtos {
       return configuration_.get(index);
     }
 
+    // repeated .hbase.pb.TableCF table_cfs = 5;
+    public static final int TABLE_CFS_FIELD_NUMBER = 5;
+    private java.util.List<org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF> tableCfs_;
+    /**
+     * <code>repeated .hbase.pb.TableCF table_cfs = 5;</code>
+     */
+    public java.util.List<org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF> getTableCfsList() {
+      return tableCfs_;
+    }
+    /**
+     * <code>repeated .hbase.pb.TableCF table_cfs = 5;</code>
+     */
+    public java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCFOrBuilder> 
+        getTableCfsOrBuilderList() {
+      return tableCfs_;
+    }
+    /**
+     * <code>repeated .hbase.pb.TableCF table_cfs = 5;</code>
+     */
+    public int getTableCfsCount() {
+      return tableCfs_.size();
+    }
+    /**
+     * <code>repeated .hbase.pb.TableCF table_cfs = 5;</code>
+     */
+    public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF getTableCfs(int index) {
+      return tableCfs_.get(index);
+    }
+    /**
+     * <code>repeated .hbase.pb.TableCF table_cfs = 5;</code>
+     */
+    public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCFOrBuilder getTableCfsOrBuilder(
+        int index) {
+      return tableCfs_.get(index);
+    }
+
     private void initFields() {
       clusterkey_ = "";
       replicationEndpointImpl_ = "";
       data_ = java.util.Collections.emptyList();
       configuration_ = java.util.Collections.emptyList();
+      tableCfs_ = java.util.Collections.emptyList();
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -5458,6 +6244,12 @@ public final class ZooKeeperProtos {
           return false;
         }
       }
+      for (int i = 0; i < getTableCfsCount(); i++) {
+        if (!getTableCfs(i).isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -5477,6 +6269,9 @@ public final class ZooKeeperProtos {
       for (int i = 0; i < configuration_.size(); i++) {
         output.writeMessage(4, configuration_.get(i));
       }
+      for (int i = 0; i < tableCfs_.size(); i++) {
+        output.writeMessage(5, tableCfs_.get(i));
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -5502,6 +6297,10 @@ public final class ZooKeeperProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(4, configuration_.get(i));
       }
+      for (int i = 0; i < tableCfs_.size(); i++) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(5, tableCfs_.get(i));
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -5539,6 +6338,8 @@ public final class ZooKeeperProtos {
           .equals(other.getDataList());
       result = result && getConfigurationList()
           .equals(other.getConfigurationList());
+      result = result && getTableCfsList()
+          .equals(other.getTableCfsList());
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -5568,6 +6369,10 @@ public final class ZooKeeperProtos {
         hash = (37 * hash) + CONFIGURATION_FIELD_NUMBER;
         hash = (53 * hash) + getConfigurationList().hashCode();
       }
+      if (getTableCfsCount() > 0) {
+        hash = (37 * hash) + TABLE_CFS_FIELD_NUMBER;
+        hash = (53 * hash) + getTableCfsList().hashCode();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -5676,6 +6481,7 @@ public final class ZooKeeperProtos {
         if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
           getDataFieldBuilder();
           getConfigurationFieldBuilder();
+          getTableCfsFieldBuilder();
         }
       }
       private static Builder create() {
@@ -5700,6 +6506,12 @@ public final class ZooKeeperProtos {
         } else {
           configurationBuilder_.clear();
         }
+        if (tableCfsBuilder_ == null) {
+          tableCfs_ = java.util.Collections.emptyList();
+          bitField0_ = (bitField0_ & ~0x00000010);
+        } else {
+          tableCfsBuilder_.clear();
+        }
         return this;
       }
 
@@ -5754,6 +6566,15 @@ public final class ZooKeeperProtos {
         } else {
           result.configuration_ = configurationBuilder_.build();
         }
+        if (tableCfsBuilder_ == null) {
+          if (((bitField0_ & 0x00000010) == 0x00000010)) {
+            tableCfs_ = java.util.Collections.unmodifiableList(tableCfs_);
+            bitField0_ = (bitField0_ & ~0x00000010);
+          }
+          result.tableCfs_ = tableCfs_;
+        } else {
+          result.tableCfs_ = tableCfsBuilder_.build();
+        }
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -5832,6 +6653,32 @@ public final class ZooKeeperProtos {
             }
           }
         }
+        if (tableCfsBuilder_ == null) {
+          if (!other.tableCfs_.isEmpty()) {
+            if (tableCfs_.isEmpty()) {
+              tableCfs_ = other.tableCfs_;
+              bitField0_ = (bitField0_ & ~0x00000010);
+            } else {
+              ensureTableCfsIsMutable();
+              tableCfs_.addAll(other.tableCfs_);
+            }
+            onChanged();
+          }
+        } else {
+          if (!other.tableCfs_.isEmpty()) {
+            if (tableCfsBuilder_.isEmpty()) {
+              tableCfsBuilder_.dispose();
+              tableCfsBuilder_ = null;
+              tableCfs_ = other.tableCfs_;
+              bitField0_ = (bitField0_ & ~0x00000010);
+              tableCfsBuilder_ = 
+                com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders ?
+                   getTableCfsFieldBuilder() : null;
+            } else {
+              tableCfsBuilder_.addAllMessages(other.tableCfs_);
+            }
+          }
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -5853,6 +6700,12 @@ public final class ZooKeeperProtos {
             return false;
           }
         }
+        for (int i = 0; i < getTableCfsCount(); i++) {
+          if (!getTableCfs(i).isInitialized()) {
+            
+            return false;
+          }
+        }
         return true;
       }
 
@@ -6533,6 +7386,246 @@ public final class ZooKeeperProtos {
         return configurationBuilder_;
       }
 
+      // repeated .hbase.pb.TableCF table_cfs = 5;
+      private java.util.List<org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF> tableCfs_ =
+        java.util.Collections.emptyList();
+      private void ensureTableCfsIsMutable() {
+        if (!((bitField0_ & 0x00000010) == 0x00000010)) {
+          tableCfs_ = new java.util.ArrayList<org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF>(tableCfs_);
+          bitField0_ |= 0x00000010;
+         }
+      }
+
+      private com.google.protobuf.RepeatedFieldBuilder<
+          org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF, org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF.Builder, org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCFOrBuilder> tableCfsBuilder_;
+
+      /**
+       * <code>repeated .hbase.pb.TableCF table_cfs = 5;</code>
+       */
+      public java.util.List<org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF> getTableCfsList() {
+        if (tableCfsBuilder_ == null) {
+          return java.util.Collections.unmodifiableList(tableCfs_);
+        } else {
+          return tableCfsBuilder_.getMessageList();
+        }
+      }
+      /**
+       * <code>repeated .hbase.pb.TableCF table_cfs = 5;</code>
+       */
+      public int getTableCfsCount() {
+        if (tableCfsBuilder_ == null) {
+          return tableCfs_.size();
+        } else {
+          return tableCfsBuilder_.getCount();
+        }
+      }
+      /**
+       * <code>repeated .hbase.pb.TableCF table_cfs = 5;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF getTableCfs(int index) {
+        if (tableCfsBuilder_ == null) {
+          return tableCfs_.get(index);
+        } else {
+          return tableCfsBuilder_.getMessage(index);
+        }
+      }
+      /**
+       * <code>repeated .hbase.pb.TableCF table_cfs = 5;</code>
+       */
+      public Builder setTableCfs(
+          int index, org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF value) {
+        if (tableCfsBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          ensureTableCfsIsMutable();
+          tableCfs_.set(index, value);
+          onChanged();
+        } else {
+          tableCfsBuilder_.setMessage(index, value);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .hbase.pb.TableCF table_cfs = 5;</code>
+       */
+      public Builder setTableCfs(
+          int index, org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF.Builder builderForValue) {
+        if (tableCfsBuilder_ == null) {
+          ensureTableCfsIsMutable();
+          tableCfs_.set(index, builderForValue.build());
+          onChanged();
+        } else {
+          tableCfsBuilder_.setMessage(index, builderForValue.build());
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .hbase.pb.TableCF table_cfs = 5;</code>
+       */
+      public Builder addTableCfs(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF value) {
+        if (tableCfsBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          ensureTableCfsIsMutable();
+          tableCfs_.add(value);
+          onChanged();
+        } else {
+          tableCfsBuilder_.addMessage(value);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .hbase.pb.TableCF table_cfs = 5;</code>
+       */
+      public Builder addTableCfs(
+          int index, org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF value) {
+        if (tableCfsBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          ensureTableCfsIsMutable();
+          tableCfs_.add(index, value);
+          onChanged();
+        } else {
+          tableCfsBuilder_.addMessage(index, value);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .hbase.pb.TableCF table_cfs = 5;</code>
+       */
+      public Builder addTableCfs(
+          org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF.Builder builderForValue) {
+        if (tableCfsBuilder_ == null) {
+          ensureTableCfsIsMutable();
+          tableCfs_.add(builderForValue.build());
+          onChanged();
+        } else {
+          tableCfsBuilder_.addMessage(builderForValue.build());
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .hbase.pb.TableCF table_cfs = 5;</code>
+       */
+      public Builder addTableCfs(
+          int index, org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF.Builder builderForValue) {
+        if (tableCfsBuilder_ == null) {
+          ensureTableCfsIsMutable();
+          tableCfs_.add(index, builderForValue.build());
+          onChanged();
+        } else {
+          tableCfsBuilder_.addMessage(index, builderForValue.build());
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .hbase.pb.TableCF table_cfs = 5;</code>
+       */
+      public Builder addAllTableCfs(
+          java.lang.Iterable<? extends org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF> values) {
+        if (tableCfsBuilder_ == null) {
+          ensureTableCfsIsMutable();
+          super.addAll(values, tableCfs_);
+          onChanged();
+        } else {
+          tableCfsBuilder_.addAllMessages(values);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .hbase.pb.TableCF table_cfs = 5;</code>
+       */
+      public Builder clearTableCfs() {
+        if (tableCfsBuilder_ == null) {
+          tableCfs_ = java.util.Collections.emptyList();
+          bitField0_ = (bitField0_ & ~0x00000010);
+          onChanged();
+        } else {
+          tableCfsBuilder_.clear();
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .hbase.pb.TableCF table_cfs = 5;</code>
+       */
+      public Builder removeTableCfs(int index) {
+        if (tableCfsBuilder_ == null) {
+          ensureTableCfsIsMutable();
+          tableCfs_.remove(index);
+          onChanged();
+        } else {
+          tableCfsBuilder_.remove(index);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .hbase.pb.TableCF table_cfs = 5;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF.Builder getTableCfsBuilder(
+          int index) {
+        return getTableCfsFieldBuilder().getBuilder(index);
+      }
+      /**
+       * <code>repeated .hbase.pb.TableCF table_cfs = 5;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCFOrBuilder getTableCfsOrBuilder(
+          int index) {
+        if (tableCfsBuilder_ == null) {
+          return tableCfs_.get(index);  } else {
+          return tableCfsBuilder_.getMessageOrBuilder(index);
+        }
+      }
+      /**
+       * <code>repeated .hbase.pb.TableCF table_cfs = 5;</code>
+       */
+      public java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCFOrBuilder> 
+           getTableCfsOrBuilderList() {
+        if (tableCfsBuilder_ != null) {
+          return tableCfsBuilder_.getMessageOrBuilderList();
+        } else {
+          return java.util.Collections.unmodifiableList(tableCfs_);
+        }
+      }
+      /**
+       * <code>repeated .hbase.pb.TableCF table_cfs = 5;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF.Builder addTableCfsBuilder() {
+        return getTableCfsFieldBuilder().addBuilder(
+            org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF.getDefaultInstance());
+      }
+      /**
+       * <code>repeated .hbase.pb.TableCF table_cfs = 5;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF.Builder addTableCfsBuilder(
+          int index) {
+        return getTableCfsFieldBuilder().addBuilder(
+            index, org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF.getDefaultInstance());
+      }
+      /**
+       * <code>repeated .hbase.pb.TableCF table_cfs = 5;</code>
+       */
+      public java.util.List<org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF.Builder> 
+           getTableCfsBuilderList() {
+        return getTableCfsFieldBuilder().getBuilderList();
+      }
+      private com.google.protobuf.RepeatedFieldBuilder<
+          org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF, org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF.Builder, org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCFOrBuilder> 
+          getTableCfsFieldBuilder() {
+        if (tableCfsBuilder_ == null) {
+          tableCfsBuilder_ = new com.google.protobuf.RepeatedFieldBuilder<
+              org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF, org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF.Builder, org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCFOrBuilder>(
+                  tableCfs_,
+                  ((bitField0_ & 0x00000010) == 0x00000010),
+                  getParentForChildren(),
+                  isClean());
+          tableCfs_ = null;
+        }
+        return tableCfsBuilder_;
+      }
+
       // @@protoc_insertion_point(builder_scope:hbase.pb.ReplicationPeer)
     }
 
@@ -9756,6 +10849,11 @@ public final class ZooKeeperProtos {
     com.google.protobuf.GeneratedMessage.FieldAccessorTable
       internal_static_hbase_pb_Table_fieldAccessorTable;
   private static com.google.protobuf.Descriptors.Descriptor
+    internal_static_hbase_pb_TableCF_descriptor;
+  private static
+    com.google.protobuf.GeneratedMessage.FieldAccessorTable
+      internal_static_hbase_pb_TableCF_fieldAccessorTable;
+  private static com.google.protobuf.Descriptors.Descriptor
     internal_static_hbase_pb_ReplicationPeer_descriptor;
   private static
     com.google.protobuf.GeneratedMessage.FieldAccessorTable
@@ -9815,23 +10913,26 @@ public final class ZooKeeperProtos {
       "LOG_SPLITTING\020\001\022\016\n\nLOG_REPLAY\020\002\"w\n\005Table" +
       "\022-\n\005state\030\001 \002(\0162\025.hbase.pb.Table.State:\007",
       "ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISABLE" +
-      "D\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003\"\237\001\n\017Rep" +
-      "licationPeer\022\022\n\nclusterkey\030\001 \002(\t\022\037\n\027repl" +
-      "icationEndpointImpl\030\002 \001(\t\022&\n\004data\030\003 \003(\0132" +
-      "\030.hbase.pb.BytesBytesPair\022/\n\rconfigurati" +
-      "on\030\004 \003(\0132\030.hbase.pb.NameStringPair\"g\n\020Re" +
-      "plicationState\022/\n\005state\030\001 \002(\0162 .hbase.pb" +
-      ".ReplicationState.State\"\"\n\005State\022\013\n\007ENAB" +
-      "LED\020\000\022\014\n\010DISABLED\020\001\"+\n\027ReplicationHLogPo" +
-      "sition\022\020\n\010position\030\001 \002(\003\"%\n\017ReplicationL",
-      "ock\022\022\n\nlock_owner\030\001 \002(\t\"\252\001\n\tTableLock\022\'\n" +
-      "\ntable_name\030\001 \001(\0132\023.hbase.pb.TableName\022(" +
-      "\n\nlock_owner\030\002 \001(\0132\024.hbase.pb.ServerName" +
-      "\022\021\n\tthread_id\030\003 \001(\003\022\021\n\tis_shared\030\004 \001(\010\022\017" +
-      "\n\007purpose\030\005 \001(\t\022\023\n\013create_time\030\006 \001(\003\"\036\n\013" +
-      "SwitchState\022\017\n\007enabled\030\001 \001(\010BE\n*org.apac" +
-      "he.hadoop.hbase.protobuf.generatedB\017ZooK" +
-      "eeperProtosH\001\210\001\001\240\001\001"
+      "D\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003\"D\n\007Tabl" +
+      "eCF\022\'\n\ntable_name\030\001 \001(\0132\023.hbase.pb.Table" +
+      "Name\022\020\n\010families\030\002 \003(\014\"\305\001\n\017ReplicationPe" +
+      "er\022\022\n\nclusterkey\030\001 \002(\t\022\037\n\027replicationEnd" +
+      "pointImpl\030\002 \001(\t\022&\n\004data\030\003 \003(\0132\030.hbase.pb" +
+      ".BytesBytesPair\022/\n\rconfiguration\030\004 \003(\0132\030" +
+      ".hbase.pb.NameStringPair\022$\n\ttable_cfs\030\005 " +
+      "\003(\0132\021.hbase.pb.TableCF\"g\n\020ReplicationSta" +
+      "te\022/\n\005state\030\001 \002(\0162 .hbase.pb.Replication",
+      "State.State\"\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DIS" +
+      "ABLED\020\001\"+\n\027ReplicationHLogPosition\022\020\n\010po" +
+      "sition\030\001 \002(\003\"%\n\017ReplicationLock\022\022\n\nlock_" +
+      "owner\030\001 \002(\t\"\252\001\n\tTableLock\022\'\n\ntable_name\030" +
+      "\001 \001(\0132\023.hbase.pb.TableName\022(\n\nlock_owner" +
+      "\030\002 \001(\0132\024.hbase.pb.ServerName\022\021\n\tthread_i" +
+      "d\030\003 \001(\003\022\021\n\tis_shared\030\004 \001(\010\022\017\n\007purpose\030\005 " +
+      "\001(\t\022\023\n\013create_time\030\006 \001(\003\"\036\n\013SwitchState\022" +
+      "\017\n\007enabled\030\001 \001(\010BE\n*org.apache.hadoop.hb" +
+      "ase.protobuf.generatedB\017ZooKeeperProtosH",
+      "\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -9874,38 +10975,44 @@ public final class ZooKeeperProtos {
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_hbase_pb_Table_descriptor,
               new java.lang.String[] { "State", });
-          internal_static_hbase_pb_ReplicationPeer_descriptor =
+          internal_static_hbase_pb_TableCF_descriptor =
             getDescriptor().getMessageTypes().get(6);
+          internal_static_hbase_pb_TableCF_fieldAccessorTable = new
+            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+              internal_static_hbase_pb_TableCF_descriptor,
+              new java.lang.String[] { "TableName", "Families", });
+          internal_static_hbase_pb_ReplicationPeer_descriptor =
+            getDescriptor().getMessageTypes().get(7);
           internal_static_hbase_pb_ReplicationPeer_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_hbase_pb_ReplicationPeer_descriptor,
-              new java.lang.String[] { "Clusterkey", "ReplicationEndpointImpl", "Data", "Configuration", });
+              new java.lang.String[] { "Clusterkey", "ReplicationEndpointImpl", "Data", "Configuration", "TableCfs", });
           internal_static_hbase_pb_ReplicationState_descriptor =
-            getDescriptor().getMessageTypes().get(7);
+            getDescriptor().getMessageTypes().get(8);
           internal_static_hbase_pb_ReplicationState_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_hbase_pb_ReplicationState_descriptor,
               new java.lang.String[] { "State", });
           internal_static_hbase_pb_ReplicationHLogPosition_descriptor =
-            getDescriptor().getMessageTypes().get(8);
+            getDescriptor().getMessageTypes().get(9);
           internal_static_hbase_pb_ReplicationHLogPosition_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_hbase_pb_ReplicationHLogPosition_descriptor,
               new java.lang.String[] { "Position", });
           internal_static_hbase_pb_ReplicationLock_descriptor =
-            getDescriptor().getMessageTypes().get(9);
+            getDescriptor().getMessageTypes().get(10);
           internal_static_hbase_pb_ReplicationLock_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_hbase_pb_ReplicationLock_descriptor,
               new java.lang.String[] { "LockOwner", });
           internal_static_hbase_pb_TableLock_descriptor =
-            getDescriptor().getMessageTypes().get(10);
+            getDescriptor().getMessageTypes().get(11);
           internal_static_hbase_pb_TableLock_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_hbase_pb_TableLock_descriptor,
               new java.lang.String[] { "TableName", "LockOwner", "ThreadId", "IsShared", "Purpose", "CreateTime", });
           internal_static_hbase_pb_SwitchState_descriptor =
-            getDescriptor().getMessageTypes().get(11);
+            getDescriptor().getMessageTypes().get(12);
           internal_static_hbase_pb_SwitchState_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_hbase_pb_SwitchState_descriptor,

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-protocol/src/main/protobuf/ZooKeeper.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/ZooKeeper.proto b/hbase-protocol/src/main/protobuf/ZooKeeper.proto
index b408db9..60ed229 100644
--- a/hbase-protocol/src/main/protobuf/ZooKeeper.proto
+++ b/hbase-protocol/src/main/protobuf/ZooKeeper.proto
@@ -119,6 +119,11 @@ message Table {
   required State state = 1 [default = ENABLED];
 }
 
+message TableCF {
+  optional TableName table_name = 1;
+  repeated bytes families = 2;
+}
+
 /**
  * Used by replication. Holds a replication peer key.
  */
@@ -129,6 +134,7 @@ message ReplicationPeer {
   optional string replicationEndpointImpl = 2;
   repeated BytesBytesPair data = 3;
   repeated NameStringPair configuration = 4;
+  repeated TableCF table_cfs = 5;
 }
 
 /**
@@ -173,4 +179,4 @@ message TableLock {
  */
 message SwitchState {
   optional bool enabled = 1;
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 8db8324..e079b3b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -143,6 +143,7 @@ import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
 import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy;
 import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy;
+import org.apache.hadoop.hbase.replication.master.TableCFsUpdater;
 import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
@@ -712,6 +713,13 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
     status.setStatus("Initializing ZK system trackers");
     initializeZKBasedSystemTrackers();
 
+    // This is for backwards compatibility
+    // See HBASE-11393
+    status.setStatus("Update TableCFs node in ZNode");
+    TableCFsUpdater tableCFsUpdater = new TableCFsUpdater(zooKeeper,
+            conf, this.clusterConnection);
+    tableCFsUpdater.update();
+
     // initialize master side coprocessors before we start handling requests
     status.setStatus("Initializing master coprocessors");
     this.cpHost = new MasterCoprocessorHost(this, this.conf);

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/TableCFsUpdater.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/TableCFsUpdater.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/TableCFsUpdater.java
new file mode 100644
index 0000000..ce07868
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/TableCFsUpdater.java
@@ -0,0 +1,120 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.master;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.replication.ReplicationSerDeHelper;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * This class is used to upgrade TableCFs from HBase 1.x to HBase 2.x.
+ * It will be removed in HBase 3.x.  See HBASE-11393
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TableCFsUpdater extends ReplicationStateZKBase {
+
+  private static final Log LOG = LogFactory.getLog(TableCFsUpdater.class);
+
+  public TableCFsUpdater(ZooKeeperWatcher zookeeper,
+                         Configuration conf, Abortable abortable) {
+    super(zookeeper, conf, abortable);
+  }
+
+  public void update() {
+    List<String> znodes = null;
+    try {
+      znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
+    } catch (KeeperException e) {
+      LOG.warn("", e);
+    }
+    if (znodes != null) {
+      for (String peerId : znodes) {
+        if (!update(peerId)) {
+          LOG.error("upgrade tableCFs failed for peerId=" + peerId);
+        }
+      }
+    }
+  }
+
+  public boolean update(String peerId) {
+    String tableCFsNode = getTableCFsNode(peerId);
+    try {
+      if (ZKUtil.checkExists(zookeeper, tableCFsNode) != -1) {
+        String peerNode = getPeerNode(peerId);
+        ReplicationPeerConfig rpc = getReplicationPeerConig(peerNode);
+        // We only need to copy data from tableCFs node to rpc Node the first time hmaster start.
+        if (rpc.getTableCFsMap() == null || rpc.getTableCFsMap().size() == 0) {
+          // we copy TableCFs node into PeerNode
+          LOG.info("copy tableCFs into peerNode:" + peerId);
+          ZooKeeperProtos.TableCF[] tableCFs =
+                  ReplicationSerDeHelper.parseTableCFs(
+                          ZKUtil.getData(this.zookeeper, tableCFsNode));
+          rpc.setTableCFsMap(ReplicationSerDeHelper.convert2Map(tableCFs));
+          ZKUtil.setData(this.zookeeper, peerNode,
+                  ReplicationSerDeHelper.toByteArray(rpc));
+        } else {
+          LOG.info("No tableCFs in peerNode:" + peerId);
+        }
+      }
+    } catch (KeeperException e) {
+      LOG.warn("NOTICE!! Update peerId failed, peerId=" + peerId, e);
+      return false;
+    } catch (InterruptedException e) {
+      LOG.warn("NOTICE!! Update peerId failed, peerId=" + peerId, e);
+      return false;
+    } catch (IOException e) {
+      LOG.warn("NOTICE!! Update peerId failed, peerId=" + peerId, e);
+      return false;
+    }
+    return true;
+  }
+
+  private ReplicationPeerConfig getReplicationPeerConig(String peerNode)
+          throws KeeperException, InterruptedException {
+    byte[] data = null;
+    data = ZKUtil.getData(this.zookeeper, peerNode);
+    if (data == null) {
+      LOG.error("Could not get configuration for " +
+              "peer because it doesn't exist. peer=" + peerNode);
+      return null;
+    }
+    try {
+      return ReplicationSerDeHelper.parsePeerFrom(data);
+    } catch (DeserializationException e) {
+      LOG.warn("Failed to parse cluster key from peer=" + peerNode);
+      return null;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
index a771c21..5967a69 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequ
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
 import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
@@ -249,7 +250,9 @@ public class TestReplicaWithCluster {
     HTU2.getHBaseAdmin().createTable(hdt, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
 
     ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
-    admin.addPeer("2", HTU2.getClusterKey());
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(HTU2.getClusterKey());
+    admin.addPeer("2", rpc);
     admin.close();
 
     Put p = new Put(row);

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index 03d7aee..775a6d8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.client.replication;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -44,6 +46,7 @@ import com.google.common.collect.Lists;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -92,11 +95,15 @@ public class TestReplicationAdmin {
    */
   @Test
   public void testAddRemovePeer() throws Exception {
+    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
+    rpc1.setClusterKey(KEY_ONE);
+    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
+    rpc2.setClusterKey(KEY_SECOND);
     // Add a valid peer
-    admin.addPeer(ID_ONE, KEY_ONE);
+    admin.addPeer(ID_ONE, rpc1);
     // try adding the same (fails)
     try {
-      admin.addPeer(ID_ONE, KEY_ONE);
+      admin.addPeer(ID_ONE, rpc1);
     } catch (IllegalArgumentException iae) {
       // OK!
     }
@@ -111,7 +118,7 @@ public class TestReplicationAdmin {
     assertEquals(1, admin.getPeersCount());
     // Add a second since multi-slave is supported
     try {
-      admin.addPeer(ID_SECOND, KEY_SECOND);
+      admin.addPeer(ID_SECOND, rpc2);
     } catch (IllegalStateException iae) {
       fail();
     }
@@ -125,6 +132,10 @@ public class TestReplicationAdmin {
   
   @Test
   public void testAddPeerWithUnDeletedQueues() throws Exception {
+    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
+    rpc1.setClusterKey(KEY_ONE);
+    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
+    rpc2.setClusterKey(KEY_SECOND);
     Configuration conf = TEST_UTIL.getConfiguration();
     ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "Test HBaseAdmin", null);
     ReplicationQueues repQueues =
@@ -134,7 +145,7 @@ public class TestReplicationAdmin {
     // add queue for ID_ONE
     repQueues.addLog(ID_ONE, "file1");
     try {
-      admin.addPeer(ID_ONE, KEY_ONE);
+      admin.addPeer(ID_ONE, rpc1);
       fail();
     } catch (ReplicationException e) {
       // OK!
@@ -145,7 +156,7 @@ public class TestReplicationAdmin {
     // add recovered queue for ID_ONE
     repQueues.addLog(ID_ONE + "-server2", "file1");
     try {
-      admin.addPeer(ID_ONE, KEY_ONE);
+      admin.addPeer(ID_ONE, rpc2);
       fail();
     } catch (ReplicationException e) {
       // OK!
@@ -182,7 +193,9 @@ public class TestReplicationAdmin {
    */
   @Test
   public void testEnableDisable() throws Exception {
-    admin.addPeer(ID_ONE, KEY_ONE);
+    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
+    rpc1.setClusterKey(KEY_ONE);
+    admin.addPeer(ID_ONE, rpc1);
     assertEquals(1, admin.getPeersCount());
     assertTrue(admin.getPeerState(ID_ONE));
     admin.disablePeer(ID_ONE);
@@ -197,100 +210,140 @@ public class TestReplicationAdmin {
   }
 
   @Test
-  public void testGetTableCfsStr() {
-    // opposite of TestPerTableCFReplication#testParseTableCFsFromConfig()
-
-    Map<TableName, List<String>> tabCFsMap = null;
-
-    // 1. null or empty string, result should be null
-    assertEquals(null, ReplicationAdmin.getTableCfsStr(tabCFsMap));
-
-
-    // 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3"
-    tabCFsMap = new TreeMap<TableName, List<String>>();
-    tabCFsMap.put(TableName.valueOf("tab1"), null);   // its table name is "tab1"
-    assertEquals("tab1", ReplicationAdmin.getTableCfsStr(tabCFsMap));
-
-    tabCFsMap = new TreeMap<TableName, List<String>>();
-    tabCFsMap.put(TableName.valueOf("tab1"), Lists.newArrayList("cf1"));
-    assertEquals("tab1:cf1", ReplicationAdmin.getTableCfsStr(tabCFsMap));
-
-    tabCFsMap = new TreeMap<TableName, List<String>>();
-    tabCFsMap.put(TableName.valueOf("tab1"), Lists.newArrayList("cf1", "cf3"));
-    assertEquals("tab1:cf1,cf3", ReplicationAdmin.getTableCfsStr(tabCFsMap));
-
-    // 3. multiple tables: "tab1 ; tab2:cf1 ; tab3:cf1,cf3"
-    tabCFsMap = new TreeMap<TableName, List<String>>();
-    tabCFsMap.put(TableName.valueOf("tab1"), null);
-    tabCFsMap.put(TableName.valueOf("tab2"), Lists.newArrayList("cf1"));
-    tabCFsMap.put(TableName.valueOf("tab3"), Lists.newArrayList("cf1", "cf3"));
-    assertEquals("tab1;tab2:cf1;tab3:cf1,cf3", ReplicationAdmin.getTableCfsStr(tabCFsMap));
-  }
-
-  @Test
   public void testAppendPeerTableCFs() throws Exception {
+    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
+    rpc1.setClusterKey(KEY_ONE);
+    TableName tab1 = TableName.valueOf("t1");
+    TableName tab2 = TableName.valueOf("t2");
+    TableName tab3 = TableName.valueOf("t3");
+    TableName tab4 = TableName.valueOf("t4");
+
     // Add a valid peer
-    admin.addPeer(ID_ONE, KEY_ONE);
+    admin.addPeer(ID_ONE, rpc1);
 
-    admin.appendPeerTableCFs(ID_ONE, "t1");
-    assertEquals("t1", admin.getPeerTableCFs(ID_ONE));
+    Map<TableName, List<String>> tableCFs = new HashMap<>();
 
-    // append table t2 to replication
-    admin.appendPeerTableCFs(ID_ONE, "t2");
-    String peerTablesOne = admin.getPeerTableCFs(ID_ONE);
+    tableCFs.put(tab1, null);
+    admin.appendPeerTableCFs(ID_ONE, tableCFs);
+    Map<TableName, List<String>> result = admin.getPeerConfig(ID_ONE).getTableCFsMap();
+    assertEquals(1, result.size());
+    assertEquals(true, result.containsKey(tab1));
+    assertNull(result.get(tab1));
 
-    // Different jdk's return different sort order for the tables. ( Not sure on why exactly )
-    //
-    // So instead of asserting that the string is exactly we
-    // assert that the string contains all tables and the needed separator.
-    assertTrue("Should contain t1", peerTablesOne.contains("t1"));
-    assertTrue("Should contain t2", peerTablesOne.contains("t2"));
-    assertTrue("Should contain ; as the seperator", peerTablesOne.contains(";"));
+    // append table t2 to replication
+    tableCFs.clear();
+    tableCFs.put(tab2, null);
+    admin.appendPeerTableCFs(ID_ONE, tableCFs);
+    result = admin.getPeerConfig(ID_ONE).getTableCFsMap();
+    assertEquals(2, result.size());
+    assertTrue("Should contain t1", result.containsKey(tab1));
+    assertTrue("Should contain t2", result.containsKey(tab2));
+    assertNull(result.get(tab1));
+    assertNull(result.get(tab2));
 
     // append table column family: f1 of t3 to replication
-    admin.appendPeerTableCFs(ID_ONE, "t3:f1");
-    String peerTablesTwo = admin.getPeerTableCFs(ID_ONE);
-    assertTrue("Should contain t1", peerTablesTwo.contains("t1"));
-    assertTrue("Should contain t2", peerTablesTwo.contains("t2"));
-    assertTrue("Should contain t3:f1", peerTablesTwo.contains("t3:f1"));
-    assertTrue("Should contain ; as the seperator", peerTablesTwo.contains(";"));
+    tableCFs.clear();
+    tableCFs.put(tab3, new ArrayList<String>());
+    tableCFs.get(tab3).add("f1");
+    admin.appendPeerTableCFs(ID_ONE, tableCFs);
+    result = admin.getPeerConfig(ID_ONE).getTableCFsMap();
+    assertEquals(3, result.size());
+    assertTrue("Should contain t1", result.containsKey(tab1));
+    assertTrue("Should contain t2", result.containsKey(tab2));
+    assertTrue("Should contain t3", result.containsKey(tab3));
+    assertNull(result.get(tab1));
+    assertNull(result.get(tab2));
+    assertEquals(1, result.get(tab3).size());
+    assertEquals("f1", result.get(tab3).get(0));
+
+    tableCFs.clear();
+    tableCFs.put(tab4, new ArrayList<String>());
+    tableCFs.get(tab4).add("f1");
+    tableCFs.get(tab4).add("f2");
+    admin.appendPeerTableCFs(ID_ONE, tableCFs);
+    result = admin.getPeerConfig(ID_ONE).getTableCFsMap();
+    assertEquals(4, result.size());
+    assertTrue("Should contain t1", result.containsKey(tab1));
+    assertTrue("Should contain t2", result.containsKey(tab2));
+    assertTrue("Should contain t3", result.containsKey(tab3));
+    assertTrue("Should contain t4", result.containsKey(tab4));
+    assertNull(result.get(tab1));
+    assertNull(result.get(tab2));
+    assertEquals(1, result.get(tab3).size());
+    assertEquals("f1", result.get(tab3).get(0));
+    assertEquals(2, result.get(tab4).size());
+    assertEquals("f1", result.get(tab4).get(0));
+    assertEquals("f2", result.get(tab4).get(1));
+
     admin.removePeer(ID_ONE);
   }
 
   @Test
   public void testRemovePeerTableCFs() throws Exception {
+    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
+    rpc1.setClusterKey(KEY_ONE);
+    TableName tab1 = TableName.valueOf("t1");
+    TableName tab2 = TableName.valueOf("t2");
+    TableName tab3 = TableName.valueOf("t3");
     // Add a valid peer
-    admin.addPeer(ID_ONE, KEY_ONE);
+    admin.addPeer(ID_ONE, rpc1);
+    Map<TableName, List<String>> tableCFs = new HashMap<>();
     try {
-      admin.removePeerTableCFs(ID_ONE, "t3");
+      tableCFs.put(tab3, null);
+      admin.removePeerTableCFs(ID_ONE, tableCFs);
       assertTrue(false);
     } catch (ReplicationException e) {
     }
-    assertEquals("", admin.getPeerTableCFs(ID_ONE));
+    assertNull(admin.getPeerConfig(ID_ONE).getTableCFsMap());
 
-    admin.setPeerTableCFs(ID_ONE, "t1;t2:cf1");
+    tableCFs.clear();
+    tableCFs.put(tab1, null);
+    tableCFs.put(tab2, new ArrayList<String>());
+    tableCFs.get(tab2).add("cf1");
+    admin.setPeerTableCFs(ID_ONE, tableCFs);
     try {
-      admin.removePeerTableCFs(ID_ONE, "t3");
+      tableCFs.clear();
+      tableCFs.put(tab3, null);
+      admin.removePeerTableCFs(ID_ONE, tableCFs);
       assertTrue(false);
     } catch (ReplicationException e) {
     }
-    assertEquals("t1;t2:cf1", admin.getPeerTableCFs(ID_ONE));
-
+    Map<TableName, List<String>> result = admin.getPeerConfig(ID_ONE).getTableCFsMap();
+    assertEquals(2, result.size());
+    assertTrue("Should contain t1", result.containsKey(tab1));
+    assertTrue("Should contain t2", result.containsKey(tab2));
+    assertNull(result.get(tab1));
+    assertEquals(1, result.get(tab2).size());
+    assertEquals("cf1", result.get(tab2).get(0));
+
+    tableCFs.clear();
+    tableCFs.put(tab1, new ArrayList<String>());
+    tableCFs.get(tab1).add("f1");
     try {
-      admin.removePeerTableCFs(ID_ONE, "t1:f1");
+      admin.removePeerTableCFs(ID_ONE, tableCFs);
       assertTrue(false);
     } catch (ReplicationException e) {
     }
-    admin.removePeerTableCFs(ID_ONE, "t1");
-    assertEquals("t2:cf1", admin.getPeerTableCFs(ID_ONE));
-
+    tableCFs.clear();
+    tableCFs.put(tab1, null);
+    admin.removePeerTableCFs(ID_ONE, tableCFs);
+    result = admin.getPeerConfig(ID_ONE).getTableCFsMap();
+    assertEquals(1, result.size());
+    assertEquals(1, result.get(tab2).size());
+    assertEquals("cf1", result.get(tab2).get(0));
+
+    tableCFs.clear();
+    tableCFs.put(tab2, null);
     try {
-      admin.removePeerTableCFs(ID_ONE, "t2");
+      admin.removePeerTableCFs(ID_ONE, tableCFs);
       assertTrue(false);
     } catch (ReplicationException e) {
     }
-    admin.removePeerTableCFs(ID_ONE, "t2:cf1");
-    assertEquals("", admin.getPeerTableCFs(ID_ONE));
+    tableCFs.clear();
+    tableCFs.put(tab2, new ArrayList<String>());
+    tableCFs.get(tab2).add("cf1");
+    admin.removePeerTableCFs(ID_ONE, tableCFs);
+    assertNull(admin.getPeerConfig(ID_ONE).getTableCFsMap());
     admin.removePeer(ID_ONE);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
index eb793dc..ad3f3c8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
@@ -110,7 +110,7 @@ public class TestReplicationHFileCleaner {
   @Before
   public void setup() throws ReplicationException, IOException {
     root = TEST_UTIL.getDataTestDirOnTestFS();
-    rp.addPeer(peerId, new ReplicationPeerConfig().setClusterKey(TEST_UTIL.getClusterKey()), null);
+    rp.addPeer(peerId, new ReplicationPeerConfig().setClusterKey(TEST_UTIL.getClusterKey()));
     rq.addPeerToHFileRefs(peerId);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
index ffa3e42..2a1ef6a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
@@ -508,8 +508,9 @@ public class TestMasterReplication {
     try {
       replicationAdmin = new ReplicationAdmin(
           configurations[masterClusterNumber]);
-      replicationAdmin.addPeer(id,
-          utilities[slaveClusterNumber].getClusterKey());
+      ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+      rpc.setClusterKey(utilities[slaveClusterNumber].getClusterKey());
+      replicationAdmin.addPeer(id, rpc);
     } finally {
       close(replicationAdmin);
     }
@@ -520,7 +521,9 @@ public class TestMasterReplication {
     ReplicationAdmin replicationAdmin = null;
     try {
       replicationAdmin = new ReplicationAdmin(configurations[masterClusterNumber]);
-      replicationAdmin.addPeer(id, utilities[slaveClusterNumber].getClusterKey(), tableCfs);
+      ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+      rpc.setClusterKey(utilities[slaveClusterNumber].getClusterKey());
+      replicationAdmin.addPeer(id, rpc, ReplicationSerDeHelper.parseTableCFsFromConfig(tableCfs));
     } finally {
       close(replicationAdmin);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
index 4480dd2..8a59661 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
@@ -152,7 +152,9 @@ public class TestMultiSlaveReplication {
     Table htable3 = new HTable(conf3, tableName);
     htable3.setWriteBufferSize(1024);
 
-    admin1.addPeer("1", utility2.getClusterKey());
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(utility2.getClusterKey());
+    admin1.addPeer("1", rpc);
 
     // put "row" and wait 'til it got around, then delete
     putAndWait(row, famName, htable1, htable2);
@@ -168,7 +170,9 @@ public class TestMultiSlaveReplication {
     // after the log was rolled put a new row
     putAndWait(row3, famName, htable1, htable2);
 
-    admin1.addPeer("2", utility3.getClusterKey());
+    rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(utility3.getClusterKey());
+    admin1.addPeer("2", rpc);
 
     // put a row, check it was replicated to all clusters
     putAndWait(row1, famName, htable1, htable2, htable3);