You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2016/10/18 01:29:15 UTC

[3/3] hbase git commit: HBASE-16653 Backport HBASE-11393 to branches which support namespace

HBASE-16653 Backport HBASE-11393 to branches which support namespace

Signed-off-by: chenheng <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/66941910
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/66941910
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/66941910

Branch: refs/heads/branch-1
Commit: 66941910bd07462fe496c5bbb591f4071f77b8fb
Parents: 6df7554
Author: Guanghao Zhang <zg...@gmail.com>
Authored: Mon Sep 26 19:33:43 2016 +0800
Committer: chenheng <ch...@apache.org>
Committed: Tue Oct 18 09:12:47 2016 +0800

----------------------------------------------------------------------
 .../client/replication/ReplicationAdmin.java    |   84 +-
 .../replication/ReplicationPeerConfig.java      |   16 +-
 .../replication/ReplicationPeerZKImpl.java      |   80 +-
 .../hbase/replication/ReplicationPeers.java     |   15 +-
 .../replication/ReplicationPeersZKImpl.java     |   60 +-
 .../replication/ReplicationSerDeHelper.java     |  189 +++
 .../replication/ReplicationStateZKBase.java     |   17 +
 .../protobuf/generated/ZooKeeperProtos.java     | 1155 +++++++++++++++++-
 .../src/main/protobuf/ZooKeeper.proto           |    8 +-
 .../org/apache/hadoop/hbase/master/HMaster.java |    8 +
 .../replication/master/TableCFsUpdater.java     |  120 ++
 .../hbase/client/TestReplicaWithCluster.java    |    5 +-
 .../replication/TestReplicationAdmin.java       |  193 +--
 .../cleaner/TestReplicationHFileCleaner.java    |    2 +-
 .../replication/TestMasterReplication.java      |    9 +-
 .../replication/TestMultiSlaveReplication.java  |    8 +-
 .../replication/TestPerTableCFReplication.java  |  153 ++-
 .../hbase/replication/TestReplicationBase.java  |    4 +-
 .../replication/TestReplicationSmallTests.java  |    4 +-
 .../replication/TestReplicationStateBasic.java  |   20 +-
 .../replication/TestReplicationSyncUpTool.java  |    4 +-
 .../TestReplicationTrackerZKImpl.java           |   10 +-
 .../replication/TestReplicationWithTags.java    |    4 +-
 .../replication/master/TestTableCFsUpdater.java |  164 +++
 .../TestReplicationSourceManager.java           |    2 +-
 ...sibilityLabelReplicationWithExpAsString.java |    5 +-
 .../TestVisibilityLabelsReplication.java        |    5 +-
 .../apache/hadoop/hbase/util/TestHBaseFsck.java |    5 +-
 .../src/main/ruby/hbase/replication_admin.rb    |   49 +-
 .../src/main/ruby/shell/commands/add_peer.rb    |    4 +-
 .../ruby/shell/commands/append_peer_tableCFs.rb |    2 +-
 .../src/main/ruby/shell/commands/list_peers.rb  |    6 +-
 .../ruby/shell/commands/remove_peer_tableCFs.rb |    4 +-
 .../ruby/shell/commands/set_peer_tableCFs.rb    |    4 +-
 .../hbase/client/TestReplicationShell.java      |    2 +-
 .../test/ruby/hbase/replication_admin_test.rb   |  118 +-
 36 files changed, 2167 insertions(+), 371 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index 1304396..9fca28b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
+import org.apache.hadoop.hbase.replication.ReplicationSerDeHelper;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 
@@ -184,8 +185,8 @@ public class ReplicationAdmin implements Closeable {
   @Deprecated
   public void addPeer(String id, String clusterKey, String tableCFs)
     throws ReplicationException {
-    this.replicationPeers.addPeer(id,
-      new ReplicationPeerConfig().setClusterKey(clusterKey), tableCFs);
+    this.addPeer(id, new ReplicationPeerConfig().setClusterKey(clusterKey),
+      parseTableCFsFromConfig(tableCFs));
   }
 
   /**
@@ -199,7 +200,19 @@ public class ReplicationAdmin implements Closeable {
    */
   public void addPeer(String id, ReplicationPeerConfig peerConfig,
       Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
-    this.replicationPeers.addPeer(id, peerConfig, getTableCfsStr(tableCfs));
+    if (tableCfs != null) {
+      peerConfig.setTableCFsMap(tableCfs);
+    }
+    this.replicationPeers.addPeer(id, peerConfig);
+  }
+
+  /**
+   * Add a new remote slave cluster for replication.
+   * @param id a short name that identifies the cluster
+   * @param peerConfig configuration for the replication slave cluster
+   */
+  public void addPeer(String id, ReplicationPeerConfig peerConfig) throws ReplicationException {
+    this.replicationPeers.addPeer(id, peerConfig);
   }
 
   public void updatePeerConfig(String id, ReplicationPeerConfig peerConfig)
@@ -208,52 +221,7 @@ public class ReplicationAdmin implements Closeable {
   }
 
   public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
-    if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
-      return null;
-    }
-
-    Map<TableName, List<String>> tableCFsMap = null;
-    // TODO: This should be a PB object rather than a String to be parsed!! See HBASE-11393
-    // parse out (table, cf-list) pairs from tableCFsConfig
-    // format: "table1:cf1,cf2;table2:cfA,cfB"
-    String[] tables = tableCFsConfig.split(";");
-    for (String tab : tables) {
-      // 1 ignore empty table config
-      tab = tab.trim();
-      if (tab.length() == 0) {
-        continue;
-      }
-      // 2 split to "table" and "cf1,cf2"
-      //   for each table: "table:cf1,cf2" or "table"
-      String[] pair = tab.split(":");
-      String tabName = pair[0].trim();
-      if (pair.length > 2 || tabName.length() == 0) {
-        LOG.error("ignore invalid tableCFs setting: " + tab);
-        continue;
-      }
-
-      // 3 parse "cf1,cf2" part to List<cf>
-      List<String> cfs = null;
-      if (pair.length == 2) {
-        String[] cfsList = pair[1].split(",");
-        for (String cf : cfsList) {
-          String cfName = cf.trim();
-          if (cfName.length() > 0) {
-            if (cfs == null) {
-              cfs = new ArrayList<String>();
-            }
-            cfs.add(cfName);
-          }
-        }
-      }
-
-      // 4 put <table, List<cf>> to map
-      if (tableCFsMap == null) {
-        tableCFsMap = new HashMap<TableName, List<String>>();
-      }
-      tableCFsMap.put(TableName.valueOf(tabName), cfs);
-    }
-    return tableCFsMap;
+    return ReplicationSerDeHelper.parseTableCFsFromConfig(tableCFsConfig);
   }
 
   @VisibleForTesting
@@ -338,7 +306,7 @@ public class ReplicationAdmin implements Closeable {
    * @param id a short name that identifies the cluster
    */
   public String getPeerTableCFs(String id) throws ReplicationException {
-    return this.replicationPeers.getPeerTableCFsConfig(id);
+    return ReplicationSerDeHelper.convertToString(this.replicationPeers.getPeerTableCFsConfig(id));
   }
 
   /**
@@ -348,7 +316,7 @@ public class ReplicationAdmin implements Closeable {
    */
   @Deprecated
   public void setPeerTableCFs(String id, String tableCFs) throws ReplicationException {
-    this.replicationPeers.setPeerTableCFsConfig(id, tableCFs);
+    this.setPeerTableCFs(id, parseTableCFsFromConfig(tableCFs));
   }
 
   /**
@@ -357,7 +325,7 @@ public class ReplicationAdmin implements Closeable {
    * @param tableCfs table-cfs config str
    */
   public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException {
-    appendPeerTableCFs(id, parseTableCFsFromConfig(tableCfs));
+    appendPeerTableCFs(id, ReplicationSerDeHelper.parseTableCFsFromConfig(tableCfs));
   }
 
   /**
@@ -370,7 +338,7 @@ public class ReplicationAdmin implements Closeable {
     if (tableCfs == null) {
       throw new ReplicationException("tableCfs is null");
     }
-    Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
+    Map<TableName, List<String>> preTableCfs = this.replicationPeers.getPeerTableCFsConfig(id);
     if (preTableCfs == null) {
       setPeerTableCFs(id, tableCfs);
       return;
@@ -406,7 +374,7 @@ public class ReplicationAdmin implements Closeable {
    * @throws ReplicationException
    */
   public void removePeerTableCFs(String id, String tableCf) throws ReplicationException {
-    removePeerTableCFs(id, parseTableCFsFromConfig(tableCf));
+    removePeerTableCFs(id, ReplicationSerDeHelper.parseTableCFsFromConfig(tableCf));
   }
 
   /**
@@ -421,7 +389,7 @@ public class ReplicationAdmin implements Closeable {
       throw new ReplicationException("tableCfs is null");
     }
 
-    Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
+    Map<TableName, List<String>> preTableCfs = this.replicationPeers.getPeerTableCFsConfig(id);
     if (preTableCfs == null) {
       throw new ReplicationException("Table-Cfs for peer" + id + " is null");
     }
@@ -464,7 +432,7 @@ public class ReplicationAdmin implements Closeable {
    */
   public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
       throws ReplicationException {
-    this.replicationPeers.setPeerTableCFsConfig(id, getTableCfsStr(tableCfs));
+    this.replicationPeers.setPeerTableCFsConfig(id, tableCfs);
   }
 
   /**
@@ -658,8 +626,8 @@ public class ReplicationAdmin implements Closeable {
       try {
         Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId);
         Configuration peerConf = pair.getSecond();
-        ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst(),
-            parseTableCFsFromConfig(this.getPeerTableCFs(peerId)));
+        ReplicationPeer peer = new ReplicationPeerZKImpl(zkw, pair.getSecond(),
+          peerId, pair.getFirst(), this.connection);
         listOfPeers.add(peer);
       } catch (ReplicationException e) {
         LOG.warn("Failed to get valid replication peers. "

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
index 043b38f..e2c7bc7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
@@ -18,10 +18,13 @@
 
 package org.apache.hadoop.hbase.replication;
 
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -37,7 +40,7 @@ public class ReplicationPeerConfig {
   private String replicationEndpointImpl;
   private final Map<byte[], byte[]> peerData;
   private final Map<String, String> configuration;
-
+  private Map<TableName, ? extends Collection<String>> tableCFsMap = null;
 
   public ReplicationPeerConfig() {
     this.peerData = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
@@ -78,10 +81,21 @@ public class ReplicationPeerConfig {
     return configuration;
   }
 
+  public Map<TableName, List<String>> getTableCFsMap() {
+    return (Map<TableName, List<String>>) tableCFsMap;
+  }
+
+  public void setTableCFsMap(Map<TableName, ? extends Collection<String>> tableCFsMap) {
+    this.tableCFsMap = tableCFsMap;
+  }
+
   @Override
   public String toString() {
     StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(",");
     builder.append("replicationEndpointImpl=").append(replicationEndpointImpl);
+    if (tableCFsMap != null) {
+      builder.append(tableCFsMap.toString());
+    }
     return builder.toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
index 6b10015..382545d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
@@ -42,7 +42,8 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
 
 @InterfaceAudience.Private
-public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closeable {
+public class ReplicationPeerZKImpl extends ReplicationStateZKBase implements ReplicationPeer,
+    Abortable, Closeable {
   private static final Log LOG = LogFactory.getLog(ReplicationPeerZKImpl.class);
 
   private ReplicationPeerConfig peerConfig;
@@ -52,8 +53,8 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea
   private final Configuration conf;
 
   private PeerStateTracker peerStateTracker;
-  private TableCFsTracker tableCFsTracker;
   private PeerConfigTracker peerConfigTracker;
+
   /**
    * Constructor that takes all the objects required to communicate with the specified peer, except
    * for the region server addresses.
@@ -61,39 +62,23 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea
    * @param id string representation of this peer's identifier
    * @param peerConfig configuration for the replication peer
    */
-  public ReplicationPeerZKImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig)
-      throws ReplicationException {
-    this.conf = conf;
-    this.peerConfig = peerConfig;
-    this.id = id;
-  }
-  
-  /**
-   * Constructor that takes all the objects required to communicate with the specified peer, except
-   * for the region server addresses.
-   * @param conf configuration object to this peer
-   * @param id string representation of this peer's identifier
-   * @param peerConfig configuration for the replication peer
-   * @param tableCFs table-cf configuration for this peer
-   */
-  public ReplicationPeerZKImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig,
-      Map<TableName, List<String>> tableCFs) throws ReplicationException {
+  public ReplicationPeerZKImpl(ZooKeeperWatcher zkWatcher, Configuration conf, String id,
+      ReplicationPeerConfig peerConfig, Abortable abortable) throws ReplicationException {
+    super(zkWatcher, conf, abortable);
     this.conf = conf;
     this.peerConfig = peerConfig;
     this.id = id;
-    this.tableCFs = tableCFs;
   }
 
   /**
    * start a state tracker to check whether this peer is enabled or not
    *
-   * @param zookeeper zk watcher for the local cluster
    * @param peerStateNode path to zk node which stores peer state
    * @throws KeeperException
    */
-  public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode)
+  public void startStateTracker(String peerStateNode)
       throws KeeperException {
-    ensurePeerEnabled(zookeeper, peerStateNode);
+    ensurePeerEnabled(peerStateNode);
     this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this);
     this.peerStateTracker.start();
     try {
@@ -112,25 +97,6 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea
 
   /**
    * start a table-cfs tracker to listen the (table, cf-list) map change
-   *
-   * @param zookeeper zk watcher for the local cluster
-   * @param tableCFsNode path to zk node which stores table-cfs
-   * @throws KeeperException
-   */
-  public void startTableCFsTracker(ZooKeeperWatcher zookeeper, String tableCFsNode)
-    throws KeeperException {
-    this.tableCFsTracker = new TableCFsTracker(tableCFsNode, zookeeper,
-        this);
-    this.tableCFsTracker.start();
-    this.readTableCFsZnode();
-  }
-
-  private void readTableCFsZnode() {
-    String currentTableCFs = Bytes.toString(tableCFsTracker.getData(false));
-    this.tableCFs = ReplicationAdmin.parseTableCFsFromConfig(currentTableCFs);
-  }
-  /**
-   * start a table-cfs tracker to listen the (table, cf-list) map change
    * @param zookeeper
    * @param peerConfigNode path to zk node which stores table-cfs
    * @throws KeeperException
@@ -154,6 +120,7 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea
     }
     return this.peerConfig;
   }
+
   @Override
   public PeerState getPeerState() {
     return peerState;
@@ -192,6 +159,7 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea
    */
   @Override
   public Map<TableName, List<String>> getTableCFs() {
+    this.tableCFs = peerConfig.getTableCFsMap();
     return this.tableCFs;
   }
 
@@ -260,7 +228,7 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea
    * @throws NodeExistsException
    * @throws KeeperException
    */
-  private static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
+  private boolean ensurePeerEnabled(final String path)
       throws NodeExistsException, KeeperException {
     if (ZKUtil.checkExists(zookeeper, path) == -1) {
       // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
@@ -297,32 +265,6 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea
   }
 
   /**
-   * Tracker for (table, cf-list) map of this peer
-   */
-  public class TableCFsTracker extends ZooKeeperNodeTracker {
-
-    public TableCFsTracker(String tableCFsZNode, ZooKeeperWatcher watcher,
-        Abortable abortable) {
-      super(watcher, tableCFsZNode, abortable);
-    }
-    
-    @Override
-    public synchronized void nodeCreated(String path) {
-      if (path.equals(node)) {
-        super.nodeCreated(path);
-        readTableCFsZnode();
-      }
-    }
-
-    @Override
-    public synchronized void nodeDataChanged(String path) {
-      if (path.equals(node)) {
-        super.nodeDataChanged(path);
-      }
-    }
-  }
-
-  /**
    * Tracker for PeerConfigNode of this peer
    */
   public class PeerConfigTracker extends ZooKeeperNodeTracker {

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
index b8d04b4..37d157a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hadoop.hbase.replication;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -49,10 +50,8 @@ public interface ReplicationPeers {
    * Add a new remote slave cluster for replication.
    * @param peerId a short that identifies the cluster
    * @param peerConfig configuration for the replication slave cluster
-   * @param tableCFs the table and column-family list which will be replicated for this peer or null
-   *          for all table and column families
    */
-  void addPeer(String peerId, ReplicationPeerConfig peerConfig, String tableCFs)
+  void addPeer(String peerId, ReplicationPeerConfig peerConfig)
       throws ReplicationException;
 
   /**
@@ -78,17 +77,19 @@ public interface ReplicationPeers {
   void disablePeer(String peerId) throws ReplicationException;
 
   /**
-   * Get the table and column-family list string of the peer from ZK.
+   * Get the table and column-family list of the peer from ZK.
    * @param peerId a short that identifies the cluster
    */
-  public String getPeerTableCFsConfig(String peerId) throws ReplicationException;
+  public Map<TableName, List<String>> getPeerTableCFsConfig(String peerId)
+      throws ReplicationException;
 
   /**
-   * Set the table and column-family list string of the peer to ZK.
+   * Set the table and column-family list of the peer to ZK.
    * @param peerId a short that identifies the cluster
    * @param tableCFs the table and column-family list which will be replicated for this peer
    */
-  public void setPeerTableCFsConfig(String peerId, String tableCFs) throws ReplicationException;
+  public void setPeerTableCFsConfig(String peerId,
+      Map<TableName, ? extends Collection<String>> tableCFs) throws ReplicationException;
 
   /**
    * Get the table and column-family-list map of the peer.

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index d717b0b..bb9842b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -78,15 +79,15 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
 
   // Map of peer clusters keyed by their id
   private Map<String, ReplicationPeerZKImpl> peerClusters;
-  private final String tableCFsNodeName;
   private final ReplicationQueuesClient queuesClient;
+  private Abortable abortable;
 
   private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class);
 
   public ReplicationPeersZKImpl(final ZooKeeperWatcher zk, final Configuration conf,
       final ReplicationQueuesClient queuesClient, Abortable abortable) {
     super(zk, conf, abortable);
-    this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs");
+    this.abortable = abortable;
     this.peerClusters = new ConcurrentHashMap<String, ReplicationPeerZKImpl>();
     this.queuesClient = queuesClient;
   }
@@ -104,7 +105,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
   }
 
   @Override
-  public void addPeer(String id, ReplicationPeerConfig peerConfig, String tableCFs)
+  public void addPeer(String id, ReplicationPeerConfig peerConfig)
       throws ReplicationException {
     try {
       if (peerExists(id)) {
@@ -129,18 +130,15 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
       ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
 
       List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
-      ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(this.peersZNode, id),
+      ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(getPeerNode(id),
         ReplicationSerDeHelper.toByteArray(peerConfig));
       // There is a race (if hbase.zookeeper.useMulti is false)
       // b/w PeerWatcher and ReplicationZookeeper#add method to create the
       // peer-state znode. This happens while adding a peer
       // The peer state data is set as "ENABLED" by default.
       ZKUtilOp op2 = ZKUtilOp.createAndFailSilent(getPeerStateNode(id), ENABLED_ZNODE_BYTES);
-      String tableCFsStr = (tableCFs == null) ? "" : tableCFs;
-      ZKUtilOp op3 = ZKUtilOp.createAndFailSilent(getTableCFsNode(id), Bytes.toBytes(tableCFsStr));
       listOfOps.add(op1);
       listOfOps.add(op2);
-      listOfOps.add(op3);
       ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
       // A peer is enabled by default
     } catch (KeeperException e) {
@@ -175,13 +173,17 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
   }
 
   @Override
-  public String getPeerTableCFsConfig(String id) throws ReplicationException {
+  public Map<TableName, List<String>> getPeerTableCFsConfig(String id) throws ReplicationException {
     try {
       if (!peerExists(id)) {
         throw new IllegalArgumentException("peer " + id + " doesn't exist");
       }
       try {
-        return Bytes.toString(ZKUtil.getData(this.zookeeper, getTableCFsNode(id)));
+        ReplicationPeerConfig rpc = getReplicationPeerConfig(id);
+        if (rpc == null) {
+          throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id);
+        }
+        return rpc.getTableCFsMap();
       } catch (Exception e) {
         throw new ReplicationException(e);
       }
@@ -191,20 +193,22 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
   }
 
   @Override
-  public void setPeerTableCFsConfig(String id, String tableCFsStr) throws ReplicationException {
+  public void setPeerTableCFsConfig(String id,
+      Map<TableName, ? extends Collection<String>> tableCFs) throws ReplicationException {
     try {
       if (!peerExists(id)) {
         throw new IllegalArgumentException("Cannot set peer tableCFs because id=" + id
             + " does not exist.");
       }
-      String tableCFsZKNode = getTableCFsNode(id);
-      byte[] tableCFs = Bytes.toBytes(tableCFsStr);
-      if (ZKUtil.checkExists(this.zookeeper, tableCFsZKNode) != -1) {
-        ZKUtil.setData(this.zookeeper, tableCFsZKNode, tableCFs);
-      } else {
-        ZKUtil.createAndWatch(this.zookeeper, tableCFsZKNode, tableCFs);
+      ReplicationPeerConfig rpc = getReplicationPeerConfig(id);
+      if (rpc == null) {
+        throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id);
       }
-      LOG.info("Peer tableCFs with id= " + id + " is now " + tableCFsStr);
+      rpc.setTableCFsMap(tableCFs);
+      ZKUtil.setData(this.zookeeper, getPeerNode(id),
+          ReplicationSerDeHelper.toByteArray(rpc));
+      LOG.info("Peer tableCFs with id= " + id + " is now "
+          + ReplicationSerDeHelper.convertToString(tableCFs));
     } catch (KeeperException e) {
       throw new ReplicationException("Unable to change tableCFs of the peer with id=" + id, e);
     }
@@ -289,7 +293,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
   @Override
   public ReplicationPeerConfig getReplicationPeerConfig(String peerId)
       throws ReplicationException {
-    String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
+    String znode = getPeerNode(peerId);
     byte[] data = null;
     try {
       data = ZKUtil.getData(this.zookeeper, znode);
@@ -458,14 +462,6 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
     return true;
   }
 
-  private String getTableCFsNode(String id) {
-    return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.tableCFsNodeName));
-  }
-
-  private String getPeerStateNode(String id) {
-    return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
-  }
-
   /**
    * Update the state znode of a peer cluster.
    * @param id
@@ -506,22 +502,16 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
     }
     Configuration peerConf = pair.getSecond();
 
-    ReplicationPeerZKImpl peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst());
+    ReplicationPeerZKImpl peer = new ReplicationPeerZKImpl(zookeeper, peerConf, peerId,
+        pair.getFirst(), abortable);
     try {
-      peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
+      peer.startStateTracker(getPeerStateNode(peerId));
     } catch (KeeperException e) {
       throw new ReplicationException("Error starting the peer state tracker for peerId=" +
           peerId, e);
     }
 
     try {
-      peer.startTableCFsTracker(this.zookeeper, this.getTableCFsNode(peerId));
-    } catch (KeeperException e) {
-      throw new ReplicationException("Error starting the peer tableCFs tracker for peerId=" +
-          peerId, e);
-    }
-
-    try {
       peer.startPeerConfigTracker(this.zookeeper, this.getPeerNode(peerId));
     }
     catch(KeeperException e) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSerDeHelper.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSerDeHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSerDeHelper.java
index 05f909d..cdb95f7f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSerDeHelper.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSerDeHelper.java
@@ -19,8 +19,10 @@
 package org.apache.hadoop.hbase.replication;
 
 import com.google.protobuf.ByteString;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
@@ -28,8 +30,13 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Strings;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 @InterfaceAudience.Private
@@ -39,6 +46,175 @@ public final class ReplicationSerDeHelper {
 
   private ReplicationSerDeHelper() {}
 
+  /** convert map to TableCFs Object */
+  public static ZooKeeperProtos.TableCF[] convert(
+      Map<TableName, ? extends Collection<String>> tableCfs) {
+    if (tableCfs == null) {
+      return null;
+    }
+    List<ZooKeeperProtos.TableCF> tableCFList = new ArrayList<>();
+    ZooKeeperProtos.TableCF.Builder tableCFBuilder =  ZooKeeperProtos.TableCF.newBuilder();
+    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
+      tableCFBuilder.clear();
+      tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(entry.getKey()));
+      Collection<String> v = entry.getValue();
+      if (v != null && !v.isEmpty()) {
+        for (String value : entry.getValue()) {
+          tableCFBuilder.addFamilies(ByteString.copyFromUtf8(value));
+        }
+      }
+      tableCFList.add(tableCFBuilder.build());
+    }
+    return tableCFList.toArray(new ZooKeeperProtos.TableCF[tableCFList.size()]);
+  }
+
+  public static String convertToString(Map<TableName, ? extends Collection<String>> tableCfs) {
+    if (tableCfs == null) {
+      return null;
+    }
+    return convert(convert(tableCfs));
+  }
+
+  /**
+   *  Convert string to TableCFs Object.
+   *  This is only for read TableCFs information from TableCF node.
+   *  Input String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;ns3.table3.
+   * */
+  public static ZooKeeperProtos.TableCF[] convert(String tableCFsConfig) {
+    if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
+      return null;
+    }
+    List<ZooKeeperProtos.TableCF> tableCFList = new ArrayList<>();
+    ZooKeeperProtos.TableCF.Builder tableCFBuilder = ZooKeeperProtos.TableCF.newBuilder();
+
+    String[] tables = tableCFsConfig.split(";");
+    for (String tab : tables) {
+      // 1 ignore empty table config
+      tab = tab.trim();
+      if (tab.length() == 0) {
+        continue;
+      }
+      // 2 split to "table" and "cf1,cf2"
+      //   for each table: "table#cf1,cf2" or "table"
+      String[] pair = tab.split(":");
+      String tabName = pair[0].trim();
+      if (pair.length > 2 || tabName.length() == 0) {
+        LOG.info("incorrect format:" + tableCFsConfig);
+        continue;
+      }
+
+      tableCFBuilder.clear();
+      // split namespace from tableName
+      String ns = "default";
+      String tName = tabName;
+      String[] dbs = tabName.split("\\.");
+      if (dbs != null && dbs.length == 2) {
+        ns = dbs[0];
+        tName = dbs[1];
+      }
+      tableCFBuilder.setTableName(
+        ProtobufUtil.toProtoTableName(TableName.valueOf(ns, tName)));
+
+      // 3 parse "cf1,cf2" part to List<cf>
+      if (pair.length == 2) {
+        String[] cfsList = pair[1].split(",");
+        for (String cf : cfsList) {
+          String cfName = cf.trim();
+          if (cfName.length() > 0) {
+            tableCFBuilder.addFamilies(ByteString.copyFromUtf8(cfName));
+          }
+        }
+      }
+      tableCFList.add(tableCFBuilder.build());
+    }
+    return tableCFList.toArray(new ZooKeeperProtos.TableCF[tableCFList.size()]);
+  }
+
+  /**
+   *  Convert TableCFs Object to String.
+   *  Output String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;table3
+   * */
+  public static String convert(ZooKeeperProtos.TableCF[] tableCFs) {
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0, n = tableCFs.length; i < n; i++) {
+      ZooKeeperProtos.TableCF tableCF = tableCFs[i];
+      String namespace = tableCF.getTableName().getNamespace().toStringUtf8();
+      if (!Strings.isEmpty(namespace)) {
+        sb.append(namespace).append(".").
+            append(tableCF.getTableName().getQualifier().toStringUtf8())
+            .append(":");
+      } else {
+        sb.append(tableCF.getTableName().toString()).append(":");
+      }
+      for (int j = 0; j < tableCF.getFamiliesCount(); j++) {
+        sb.append(tableCF.getFamilies(j).toStringUtf8()).append(",");
+      }
+      sb.deleteCharAt(sb.length() - 1).append(";");
+    }
+    if (sb.length() > 0) {
+      sb.deleteCharAt(sb.length() - 1);
+    }
+    return sb.toString();
+  }
+
+  /**
+   *  Get TableCF in TableCFs, if not exist, return null.
+   * */
+  public static ZooKeeperProtos.TableCF getTableCF(ZooKeeperProtos.TableCF[] tableCFs,
+                                           String table) {
+    for (int i = 0, n = tableCFs.length; i < n; i++) {
+      ZooKeeperProtos.TableCF tableCF = tableCFs[i];
+      if (tableCF.getTableName().getQualifier().toStringUtf8().equals(table)) {
+        return tableCF;
+      }
+    }
+    return null;
+  }
+
+  /**
+   *  Parse bytes into TableCFs.
+   *  It is used for backward compatibility.
+   *  Old format bytes have no PB_MAGIC Header
+   * */
+  public static ZooKeeperProtos.TableCF[] parseTableCFs(byte[] bytes) throws IOException {
+    if (bytes == null) {
+      return null;
+    }
+    return ReplicationSerDeHelper.convert(Bytes.toString(bytes));
+  }
+
+  /**
+   *  Convert tableCFs string into Map.
+   * */
+  public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
+    ZooKeeperProtos.TableCF[] tableCFs = convert(tableCFsConfig);
+    return convert2Map(tableCFs);
+  }
+
+  /**
+   *  Convert tableCFs Object to Map.
+   * */
+  public static Map<TableName, List<String>> convert2Map(ZooKeeperProtos.TableCF[] tableCFs) {
+    if (tableCFs == null || tableCFs.length == 0) {
+      return null;
+    }
+    Map<TableName, List<String>> tableCFsMap = new HashMap<TableName, List<String>>();
+    for (int i = 0, n = tableCFs.length; i < n; i++) {
+      ZooKeeperProtos.TableCF tableCF = tableCFs[i];
+      List<String> families = new ArrayList<>();
+      for (int j = 0, m = tableCF.getFamiliesCount(); j < m; j++) {
+        families.add(tableCF.getFamilies(j).toStringUtf8());
+      }
+      if (families.size() > 0) {
+        tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), families);
+      } else {
+        tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), null);
+      }
+    }
+
+    return tableCFsMap;
+  }
+
   /**
    * @param bytes Content of a peer znode.
    * @return ClusterKey parsed from the passed bytes.
@@ -82,6 +258,12 @@ public final class ReplicationSerDeHelper {
     for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) {
       peerConfig.getConfiguration().put(pair.getName(), pair.getValue());
     }
+
+    Map<TableName, ? extends Collection<String>> tableCFsMap = convert2Map(
+      peer.getTableCfsList().toArray(new ZooKeeperProtos.TableCF[peer.getTableCfsCount()]));
+    if (tableCFsMap != null) {
+      peerConfig.setTableCFsMap(tableCFsMap);
+    }
     return peerConfig;
   }
 
@@ -119,6 +301,13 @@ public final class ReplicationSerDeHelper {
           .build());
     }
 
+    ZooKeeperProtos.TableCF[] tableCFs = convert(peerConfig.getTableCFsMap());
+    if (tableCFs != null) {
+      for (int i = 0; i < tableCFs.length; i++) {
+        builder.addTableCfs(tableCFs[i]);
+      }
+    }
+
     return builder.build();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
index ed9359d..d0c3513 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
 
+import com.google.common.annotations.VisibleForTesting;
+
 
 /**
  * This is a base class for maintaining replication state in zookeeper.
@@ -52,6 +54,9 @@ public abstract class ReplicationStateZKBase {
   protected final String hfileRefsZNode;
   /** The cluster key of the local cluster */
   protected final String ourClusterKey;
+  /** The name of the znode that contains tableCFs */
+  protected final String tableCFsNodeName;
+
   protected final ZooKeeperWatcher zookeeper;
   protected final Configuration conf;
   protected final Abortable abortable;
@@ -77,6 +82,7 @@ public abstract class ReplicationStateZKBase {
     String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
       ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
     this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
+    this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs");
     this.ourClusterKey = ZKConfig.getZooKeeperClusterKey(this.conf);
     this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
     this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
@@ -119,6 +125,17 @@ public abstract class ReplicationStateZKBase {
     return path.split("/").length == peersZNode.split("/").length + 1;
   }
 
+  @VisibleForTesting
+  protected String getTableCFsNode(String id) {
+    return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.tableCFsNodeName));
+  }
+
+  @VisibleForTesting
+  protected String getPeerStateNode(String id) {
+    return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
+  }
+
+  @VisibleForTesting
   protected String getPeerNode(String id) {
     return ZKUtil.joinZNode(this.peersZNode, id);
   }