You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2014/07/15 01:23:53 UTC

[3/3] git commit: HBASE-11367 Pluggable replication endpoint

HBASE-11367 Pluggable replication endpoint


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/463d52d8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/463d52d8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/463d52d8

Branch: refs/heads/master
Commit: 463d52d8cf2a87e1f11eb6fabcd0164584e29fbb
Parents: 4824b0d
Author: Enis Soztutar <en...@apache.org>
Authored: Mon Jul 14 16:21:55 2014 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Mon Jul 14 16:22:26 2014 -0700

----------------------------------------------------------------------
 .../client/replication/ReplicationAdmin.java    | 109 ++-
 .../hbase/replication/ReplicationPeer.java      | 353 +------
 .../replication/ReplicationPeerConfig.java      |  87 ++
 .../replication/ReplicationPeerZKImpl.java      | 320 ++++++
 .../hbase/replication/ReplicationPeers.java     |  89 +-
 .../replication/ReplicationPeersZKImpl.java     | 348 +++----
 .../hadoop/hbase/zookeeper/ZKClusterId.java     |   3 +-
 .../hadoop/hbase/HBaseInterfaceAudience.java    |   1 +
 .../protobuf/generated/ZooKeeperProtos.java     | 963 ++++++++++++++++++-
 .../src/main/protobuf/ZooKeeper.proto           |   3 +
 .../replication/VerifyReplication.java          |  13 +-
 .../hadoop/hbase/regionserver/wal/HLog.java     |   9 +-
 .../hadoop/hbase/regionserver/wal/HLogKey.java  |  11 +-
 .../hadoop/hbase/regionserver/wal/WALEdit.java  |   5 +-
 .../replication/BaseReplicationEndpoint.java    |  77 ++
 .../hbase/replication/ChainWALEntryFilter.java  |  68 ++
 .../replication/HBaseReplicationEndpoint.java   | 217 +++++
 .../hbase/replication/ReplicationEndpoint.java  | 163 ++++
 .../hbase/replication/ScopeWALEntryFilter.java  |  58 ++
 .../replication/SystemTableWALEntryFilter.java  |  36 +
 .../replication/TableCfWALEntryFilter.java      |  74 ++
 .../hbase/replication/WALEntryFilter.java       |  41 +
 .../master/ReplicationLogCleaner.java           |   9 +-
 .../HBaseInterClusterReplicationEndpoint.java   | 225 +++++
 .../replication/regionserver/MetricsSource.java |   5 +-
 .../regionserver/ReplicationSinkManager.java    |  17 +-
 .../regionserver/ReplicationSource.java         | 279 +++---
 .../ReplicationSourceInterface.java             |   4 +-
 .../regionserver/ReplicationSourceManager.java  |  68 +-
 .../replication/TestReplicationAdmin.java       |  37 +
 .../replication/ReplicationSourceDummy.java     |   4 +-
 .../replication/TestPerTableCFReplication.java  |  20 +-
 .../replication/TestReplicationEndpoint.java    | 272 ++++++
 .../replication/TestReplicationStateBasic.java  |  58 +-
 .../TestReplicationTrackerZKImpl.java           |   4 +-
 .../TestReplicationWALEntryFilters.java         | 277 ++++++
 .../TestReplicationSinkManager.java             |  17 +-
 37 files changed, 3464 insertions(+), 880 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index 72e3fbb..4028d87 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -21,10 +21,12 @@ package org.apache.hadoop.hbase.client.replication;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
+import java.util.Map.Entry;
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -34,14 +36,18 @@ import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * <p>
  * This class provides the administrative interface to HBase cluster
@@ -80,6 +86,8 @@ public class ReplicationAdmin implements Closeable {
       .toString(HConstants.REPLICATION_SCOPE_GLOBAL);
 
   private final HConnection connection;
+  // TODO: replication should be managed by master. All the classes except ReplicationAdmin should
+  // be moved to hbase-server. Resolve it in HBASE-11392.
   private final ReplicationQueuesClient replicationQueuesClient;
   private final ReplicationPeers replicationPeers;
 
@@ -126,27 +134,65 @@ public class ReplicationAdmin implements Closeable {
     });
   }
 
-
   /**
    * Add a new peer cluster to replicate to.
-   * @param id a short that identifies the cluster
+   * @param id a short name that identifies the cluster
    * @param clusterKey the concatenation of the slave cluster's
    * <code>hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent</code>
    * @throws IllegalStateException if there's already one slave since
    * multi-slave isn't supported yet.
+   * @deprecated Use addPeer(String, ReplicationPeerConfig, Map) instead.
    */
+  @Deprecated
   public void addPeer(String id, String clusterKey) throws ReplicationException {
-    this.replicationPeers.addPeer(id, clusterKey);
+    this.addPeer(id, new ReplicationPeerConfig().setClusterKey(clusterKey), null);
   }
 
+  @Deprecated
   public void addPeer(String id, String clusterKey, String tableCFs)
     throws ReplicationException {
-    this.replicationPeers.addPeer(id, clusterKey, tableCFs);
+    this.replicationPeers.addPeer(id,
+      new ReplicationPeerConfig().setClusterKey(clusterKey), tableCFs);
+  }
+
+  /**
+   * Add a new remote slave cluster for replication.
+   * @param id a short name that identifies the cluster
+   * @param peerConfig configuration for the replication slave cluster
+   * @param tableCfs the table and column-family list which will be replicated for this peer.
+   * A map from tableName to column family names. An empty collection can be passed
+   * to indicate replicating all column families. Pass null for replicating all table and column
+   * families
+   */
+  public void addPeer(String id, ReplicationPeerConfig peerConfig,
+      Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
+    this.replicationPeers.addPeer(id, peerConfig, getTableCfsStr(tableCfs));
+  }
+
+  @VisibleForTesting
+  static String getTableCfsStr(Map<TableName, ? extends Collection<String>> tableCfs) {
+    String tableCfsStr = null;
+    if (tableCfs != null) {
+      // Format: table1:cf1,cf2;table2:cfA,cfB;table3
+      StringBuilder builder = new StringBuilder();
+      for (Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
+        if (builder.length() > 0) {
+          builder.append(";");
+        }
+        builder.append(entry.getKey());
+        if (entry.getValue() != null && !entry.getValue().isEmpty()) {
+          builder.append(":");
+          builder.append(StringUtils.join(entry.getValue(), ","));
+        }
+      }
+      tableCfsStr = builder.toString();
+    }
+    return tableCfsStr;
   }
 
   /**
    * Removes a peer cluster and stops the replication to it.
-   * @param id a short that identifies the cluster
+   * @param id a short name that identifies the cluster
    */
   public void removePeer(String id) throws ReplicationException {
     this.replicationPeers.removePeer(id);
@@ -154,7 +200,7 @@ public class ReplicationAdmin implements Closeable {
 
   /**
    * Restart the replication stream to the specified peer.
-   * @param id a short that identifies the cluster
+   * @param id a short name that identifies the cluster
    */
   public void enablePeer(String id) throws ReplicationException {
     this.replicationPeers.enablePeer(id);
@@ -162,7 +208,7 @@ public class ReplicationAdmin implements Closeable {
 
   /**
    * Stop the replication stream to the specified peer.
-   * @param id a short that identifies the cluster
+   * @param id a short name that identifies the cluster
    */
   public void disablePeer(String id) throws ReplicationException {
     this.replicationPeers.disablePeer(id);
@@ -179,14 +225,30 @@ public class ReplicationAdmin implements Closeable {
   /**
    * Map of this cluster's peers for display.
    * @return A map of peer ids to peer cluster keys
+   * @deprecated use {@link #listPeerConfigs()}
    */
+  @Deprecated
   public Map<String, String> listPeers() {
-    return this.replicationPeers.getAllPeerClusterKeys();
+    Map<String, ReplicationPeerConfig> peers = this.listPeerConfigs();
+    Map<String, String> ret = new HashMap<String, String>(peers.size());
+
+    for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) {
+      ret.put(entry.getKey(), entry.getValue().getClusterKey());
+    }
+    return ret;
+  }
+
+  public Map<String, ReplicationPeerConfig> listPeerConfigs() {
+    return this.replicationPeers.getAllPeerConfigs();
+  }
+
+  public ReplicationPeerConfig getPeerConfig(String id) throws ReplicationException {
+    return this.replicationPeers.getReplicationPeerConfig(id);
   }
 
   /**
    * Get the replicable table-cf config of the specified peer.
-   * @param id a short that identifies the cluster
+   * @param id a short name that identifies the cluster
    */
   public String getPeerTableCFs(String id) throws ReplicationException {
     return this.replicationPeers.getPeerTableCFsConfig(id);
@@ -194,16 +256,31 @@ public class ReplicationAdmin implements Closeable {
 
   /**
    * Set the replicable table-cf config of the specified peer
-   * @param id a short that identifies the cluster
+   * @param id a short name that identifies the cluster
+   * @deprecated use {@link #setPeerTableCFs(String, Map)}
    */
+  @Deprecated
   public void setPeerTableCFs(String id, String tableCFs) throws ReplicationException {
     this.replicationPeers.setPeerTableCFsConfig(id, tableCFs);
   }
 
   /**
+   * Set the replicable table-cf config of the specified peer
+   * @param id a short name that identifies the cluster
+   * @param tableCfs the table and column-family list which will be replicated for this peer.
+   * A map from tableName to column family names. An empty collection can be passed
+   * to indicate replicating all column families. Pass null for replicating all table and column
+   * families
+   */
+  public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
+      throws ReplicationException {
+    this.replicationPeers.setPeerTableCFsConfig(id, getTableCfsStr(tableCfs));
+  }
+
+  /**
    * Get the state of the specified peer cluster
-   * @param id String format of the Short that identifies the peer, an IllegalArgumentException
-   *           is thrown if it doesn't exist
+   * @param id String format of the Short name that identifies the peer,
+   * an IllegalArgumentException is thrown if it doesn't exist
    * @return true if replication is enabled to that peer, false if it isn't
    */
   public boolean getPeerState(String id) throws ReplicationException {
@@ -217,7 +294,7 @@ public class ReplicationAdmin implements Closeable {
     }
   }
 
-  
+
   /**
    * Find all column families that are replicated from this cluster
    * @return the full list of the replicated column families of this cluster as:
@@ -227,7 +304,7 @@ public class ReplicationAdmin implements Closeable {
    * types may be extended here. For example
    *  1) the replication may only apply to selected peers instead of all peers
    *  2) the replicationType may indicate the host Cluster servers as Slave
-   *     for the table:columnFam.         
+   *     for the table:columnFam.
    */
   public List<HashMap<String, String>> listReplicated() throws IOException {
     List<HashMap<String, String>> replicationColFams = new ArrayList<HashMap<String, String>>();
@@ -249,5 +326,5 @@ public class ReplicationAdmin implements Closeable {
     }
 
     return replicationColFams;
-  } 
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
index 1b14dab..c116674 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,362 +17,56 @@
  */
 package org.apache.hadoop.hbase.replication;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-
-import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 
 /**
- * This class acts as a wrapper for all the objects used to identify and
- * communicate with remote peers and is responsible for answering to expired
- * sessions and re-establishing the ZK connections.
+ * ReplicationPeer manages enabled / disabled state for the peer.
  */
-@InterfaceAudience.Private
-public class ReplicationPeer implements Abortable, Closeable {
-  private static final Log LOG = LogFactory.getLog(ReplicationPeer.class);
-
-  private final String clusterKey;
-  private final String id;
-  private List<ServerName> regionServers = new ArrayList<ServerName>(0);
-  private final AtomicBoolean peerEnabled = new AtomicBoolean();
-  private volatile Map<String, List<String>> tableCFs = new HashMap<String, List<String>>();
-  // Cannot be final since a new object needs to be recreated when session fails
-  private ZooKeeperWatcher zkw;
-  private final Configuration conf;
-  private long lastRegionserverUpdate;
-
-  private PeerStateTracker peerStateTracker;
-  private TableCFsTracker tableCFsTracker;
-
-  /**
-   * Constructor that takes all the objects required to communicate with the
-   * specified peer, except for the region server addresses.
-   * @param conf configuration object to this peer
-   * @param key cluster key used to locate the peer
-   * @param id string representation of this peer's identifier
-   */
-  public ReplicationPeer(Configuration conf, String key, String id) throws ReplicationException {
-    this.conf = conf;
-    this.clusterKey = key;
-    this.id = id;
-    try {
-      this.reloadZkWatcher();
-    } catch (IOException e) {
-      throw new ReplicationException("Error connecting to peer cluster with peerId=" + id, e);
-    }
-  }
-
-  /**
-   * start a state tracker to check whether this peer is enabled or not
-   *
-   * @param zookeeper zk watcher for the local cluster
-   * @param peerStateNode path to zk node which stores peer state
-   * @throws KeeperException
-   */
-  public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode)
-      throws KeeperException {
-    ensurePeerEnabled(zookeeper, peerStateNode);
-    this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this);
-    this.peerStateTracker.start();
-    try {
-      this.readPeerStateZnode();
-    } catch (DeserializationException e) {
-      throw ZKUtil.convert(e);
-    }
-  }
-
-  private void readPeerStateZnode() throws DeserializationException {
-    this.peerEnabled.set(isStateEnabled(this.peerStateTracker.getData(false)));
-  }
-
-  /**
-   * start a table-cfs tracker to listen the (table, cf-list) map change
-   *
-   * @param zookeeper zk watcher for the local cluster
-   * @param tableCFsNode path to zk node which stores table-cfs
-   * @throws KeeperException
-   */
-  public void startTableCFsTracker(ZooKeeperWatcher zookeeper, String tableCFsNode)
-    throws KeeperException {
-    this.tableCFsTracker = new TableCFsTracker(tableCFsNode, zookeeper,
-        this);
-    this.tableCFsTracker.start();
-    this.readTableCFsZnode();
-  }
-
-  static Map<String, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
-    if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
-      return null;
-    }
-
-    Map<String, List<String>> tableCFsMap = null;
-
-    // parse out (table, cf-list) pairs from tableCFsConfig
-    // format: "table1:cf1,cf2;table2:cfA,cfB"
-    String[] tables = tableCFsConfig.split(";");
-    for (String tab : tables) {
-      // 1 ignore empty table config
-      tab = tab.trim();
-      if (tab.length() == 0) {
-        continue;
-      }
-      // 2 split to "table" and "cf1,cf2"
-      //   for each table: "table:cf1,cf2" or "table"
-      String[] pair = tab.split(":");
-      String tabName = pair[0].trim();
-      if (pair.length > 2 || tabName.length() == 0) {
-        LOG.error("ignore invalid tableCFs setting: " + tab);
-        continue;
-      }
-
-      // 3 parse "cf1,cf2" part to List<cf>
-      List<String> cfs = null;
-      if (pair.length == 2) {
-        String[] cfsList = pair[1].split(",");
-        for (String cf : cfsList) {
-          String cfName = cf.trim();
-          if (cfName.length() > 0) {
-            if (cfs == null) {
-              cfs = new ArrayList<String>();
-            }
-            cfs.add(cfName);
-          }
-        }
-      }
-
-      // 4 put <table, List<cf>> to map
-      if (tableCFsMap == null) {
-        tableCFsMap = new HashMap<String, List<String>>();
-      }
-      tableCFsMap.put(tabName, cfs);
-    }
-
-    return tableCFsMap;
-  }
-
-  private void readTableCFsZnode() {
-    String currentTableCFs = Bytes.toString(tableCFsTracker.getData(false));
-    this.tableCFs = parseTableCFsFromConfig(currentTableCFs);
-  }
-
-  /**
-   * Get the cluster key of that peer
-   * @return string consisting of zk ensemble addresses, client port
-   * and root znode
-   */
-  public String getClusterKey() {
-    return clusterKey;
-  }
-
-  /**
-   * Get the state of this peer
-   * @return atomic boolean that holds the status
-   */
-  public AtomicBoolean getPeerEnabled() {
-    return peerEnabled;
-  }
-
-  /**
-   * Get replicable (table, cf-list) map of this peer
-   * @return the replicable (table, cf-list) map
-   */
-  public Map<String, List<String>> getTableCFs() {
-    return this.tableCFs;
-  }
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public interface ReplicationPeer {
 
   /**
-   * Get a list of all the addresses of all the region servers
-   * for this peer cluster
-   * @return list of addresses
+   * State of the peer, whether it is enabled or not
    */
-  public List<ServerName> getRegionServers() {
-    return regionServers;
+  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+  enum PeerState {
+    ENABLED,
+    DISABLED
   }
 
   /**
-   * Set the list of region servers for that peer
-   * @param regionServers list of addresses for the region servers
+   * Get the identifier of this peer
+   * @return string representation of the id
    */
-  public void setRegionServers(List<ServerName> regionServers) {
-    this.regionServers = regionServers;
-    lastRegionserverUpdate = System.currentTimeMillis();
-  }
+  String getId();
 
   /**
-   * Get the ZK connection to this peer
-   * @return zk connection
+   * Get the peer config object
+   * @return the ReplicationPeerConfig for this peer
    */
-  public ZooKeeperWatcher getZkw() {
-    return zkw;
-  }
+  public ReplicationPeerConfig getPeerConfig();
 
   /**
-   * Get the timestamp at which the last change occurred to the list of region servers to replicate
-   * to.
-   * @return The System.currentTimeMillis at the last time the list of peer region servers changed.
+   * Returns the state of the peer
+   * @return the enabled state
    */
-  public long getLastRegionserverUpdate() {
-    return lastRegionserverUpdate;
-  }
-
-  /**
-   * Get the identifier of this peer
-   * @return string representation of the id (short)
-   */
-  public String getId() {
-    return id;
-  }
+  PeerState getPeerState();
 
   /**
    * Get the configuration object required to communicate with this peer
    * @return configuration object
    */
-  public Configuration getConfiguration() {
-    return conf;
-  }
-
-  @Override
-  public void abort(String why, Throwable e) {
-    LOG.fatal("The ReplicationPeer coresponding to peer " + clusterKey
-        + " was aborted for the following reason(s):" + why, e);
-  }
-
-  /**
-   * Closes the current ZKW (if not null) and creates a new one
-   * @throws IOException If anything goes wrong connecting
-   */
-  public void reloadZkWatcher() throws IOException {
-    if (zkw != null) zkw.close();
-    zkw = new ZooKeeperWatcher(conf,
-        "connection to cluster: " + id, this);
-  }
-
-  @Override
-  public boolean isAborted() {
-    // Currently the replication peer is never "Aborted", we just log when the
-    // abort method is called.
-    return false;
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (zkw != null){
-      zkw.close();
-    }
-  }
-
-  /**
-   * Parse the raw data from ZK to get a peer's state
-   * @param bytes raw ZK data
-   * @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state.
-   * @throws DeserializationException
-   */
-  public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
-    ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
-    return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
-  }
-
-  /**
-   * @param bytes Content of a state znode.
-   * @return State parsed from the passed bytes.
-   * @throws DeserializationException
-   */
-  private static ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
-      throws DeserializationException {
-    ProtobufUtil.expectPBMagicPrefix(bytes);
-    int pblen = ProtobufUtil.lengthOfPBMagic();
-    ZooKeeperProtos.ReplicationState.Builder builder =
-        ZooKeeperProtos.ReplicationState.newBuilder();
-    ZooKeeperProtos.ReplicationState state;
-    try {
-      state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
-      return state.getState();
-    } catch (InvalidProtocolBufferException e) {
-      throw new DeserializationException(e);
-    }
-  }
-
-  /**
-   * Utility method to ensure an ENABLED znode is in place; if not present, we create it.
-   * @param zookeeper
-   * @param path Path to znode to check
-   * @return True if we created the znode.
-   * @throws NodeExistsException
-   * @throws KeeperException
-   */
-  private static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
-      throws NodeExistsException, KeeperException {
-    if (ZKUtil.checkExists(zookeeper, path) == -1) {
-      // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
-      // peer-state znode. This happens while adding a peer.
-      // The peer state data is set as "ENABLED" by default.
-      ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path,
-        ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
-      return true;
-    }
-    return false;
-  }
+  public Configuration getConfiguration();
 
   /**
-   * Tracker for state of this peer
-   */
-  public class PeerStateTracker extends ZooKeeperNodeTracker {
-
-    public PeerStateTracker(String peerStateZNode, ZooKeeperWatcher watcher,
-        Abortable abortable) {
-      super(watcher, peerStateZNode, abortable);
-    }
-
-    @Override
-    public synchronized void nodeDataChanged(String path) {
-      if (path.equals(node)) {
-        super.nodeDataChanged(path);
-        try {
-          readPeerStateZnode();
-        } catch (DeserializationException e) {
-          LOG.warn("Failed deserializing the content of " + path, e);
-        }
-      }
-    }
-  }
-
-  /**
-   * Tracker for (table, cf-list) map of this peer
+   * Get replicable (table, cf-list) map of this peer
+   * @return the replicable (table, cf-list) map
    */
-  public class TableCFsTracker extends ZooKeeperNodeTracker {
+  public Map<String, List<String>> getTableCFs();
 
-    public TableCFsTracker(String tableCFsZNode, ZooKeeperWatcher watcher,
-        Abortable abortable) {
-      super(watcher, tableCFsZNode, abortable);
-    }
-
-    @Override
-    public synchronized void nodeDataChanged(String path) {
-      if (path.equals(node)) {
-        super.nodeDataChanged(path);
-        readTableCFsZnode();
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
new file mode 100644
index 0000000..8b8bab7
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * A configuration for the replication peer cluster.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ReplicationPeerConfig {
+
+  private String clusterKey;
+  private String replicationEndpointImpl;
+  private final Map<byte[], byte[]> peerData;
+  private final Map<String, String> configuration;
+
+
+  public ReplicationPeerConfig() {
+    this.peerData = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
+    this.configuration = new HashMap<String, String>(0);
+  }
+
+  /**
+   * Set the clusterKey which is the concatenation of the slave cluster's:
+   *          hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
+   */
+  public ReplicationPeerConfig setClusterKey(String clusterKey) {
+    this.clusterKey = clusterKey;
+    return this;
+  }
+
+  /**
+   * Sets the ReplicationEndpoint plugin class for this peer.
+   * @param replicationEndpointImpl a class implementing ReplicationEndpoint
+   */
+  public ReplicationPeerConfig setReplicationEndpointImpl(String replicationEndpointImpl) {
+    this.replicationEndpointImpl = replicationEndpointImpl;
+    return this;
+  }
+
+  public String getClusterKey() {
+    return clusterKey;
+  }
+
+  public String getReplicationEndpointImpl() {
+    return replicationEndpointImpl;
+  }
+
+  public Map<byte[], byte[]> getPeerData() {
+    return peerData;
+  }
+
+  public Map<String, String> getConfiguration() {
+    return configuration;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(",");
+    builder.append("replicationEndpointImpl=").append(replicationEndpointImpl);
+    return builder.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
new file mode 100644
index 0000000..a39392c
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
@@ -0,0 +1,320 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+@InterfaceAudience.Private
+public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closeable {
+  private static final Log LOG = LogFactory.getLog(ReplicationPeerZKImpl.class);
+
+  private final ReplicationPeerConfig peerConfig;
+  private final String id;
+  private volatile PeerState peerState;
+  private volatile Map<String, List<String>> tableCFs = new HashMap<String, List<String>>();
+  private final Configuration conf;
+
+  private PeerStateTracker peerStateTracker;
+  private TableCFsTracker tableCFsTracker;
+
+  /**
+   * Constructor that takes all the objects required to communicate with the
+   * specified peer, except for the region server addresses.
+   * @param conf configuration object to this peer
+   * @param id string representation of this peer's identifier
+   * @param peerConfig configuration for the replication peer
+   */
+  public ReplicationPeerZKImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig)
+      throws ReplicationException {
+    this.conf = conf;
+    this.peerConfig = peerConfig;
+    this.id = id;
+  }
+
+  /**
+   * start a state tracker to check whether this peer is enabled or not
+   *
+   * @param zookeeper zk watcher for the local cluster
+   * @param peerStateNode path to zk node which stores peer state
+   * @throws KeeperException
+   */
+  public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode)
+      throws KeeperException {
+    ensurePeerEnabled(zookeeper, peerStateNode);
+    this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this);
+    this.peerStateTracker.start();
+    try {
+      this.readPeerStateZnode();
+    } catch (DeserializationException e) {
+      throw ZKUtil.convert(e);
+    }
+  }
+
+  private void readPeerStateZnode() throws DeserializationException {
+    this.peerState =
+        isStateEnabled(this.peerStateTracker.getData(false))
+          ? PeerState.ENABLED
+          : PeerState.DISABLED;
+  }
+
+  /**
+   * start a table-cfs tracker to listen the (table, cf-list) map change
+   *
+   * @param zookeeper zk watcher for the local cluster
+   * @param tableCFsNode path to zk node which stores table-cfs
+   * @throws KeeperException
+   */
+  public void startTableCFsTracker(ZooKeeperWatcher zookeeper, String tableCFsNode)
+    throws KeeperException {
+    this.tableCFsTracker = new TableCFsTracker(tableCFsNode, zookeeper,
+        this);
+    this.tableCFsTracker.start();
+    this.readTableCFsZnode();
+  }
+
+  static Map<String, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
+    if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
+      return null;
+    }
+
+    Map<String, List<String>> tableCFsMap = null;
+    // TODO: This should be a PB object rather than a String to be parsed!! See HBASE-11393
+    // parse out (table, cf-list) pairs from tableCFsConfig
+    // format: "table1:cf1,cf2;table2:cfA,cfB"
+    String[] tables = tableCFsConfig.split(";");
+    for (String tab : tables) {
+      // 1 ignore empty table config
+      tab = tab.trim();
+      if (tab.length() == 0) {
+        continue;
+      }
+      // 2 split to "table" and "cf1,cf2"
+      //   for each table: "table:cf1,cf2" or "table"
+      String[] pair = tab.split(":");
+      String tabName = pair[0].trim();
+      if (pair.length > 2 || tabName.length() == 0) {
+        LOG.error("ignore invalid tableCFs setting: " + tab);
+        continue;
+      }
+
+      // 3 parse "cf1,cf2" part to List<cf>
+      List<String> cfs = null;
+      if (pair.length == 2) {
+        String[] cfsList = pair[1].split(",");
+        for (String cf : cfsList) {
+          String cfName = cf.trim();
+          if (cfName.length() > 0) {
+            if (cfs == null) {
+              cfs = new ArrayList<String>();
+            }
+            cfs.add(cfName);
+          }
+        }
+      }
+
+      // 4 put <table, List<cf>> to map
+      if (tableCFsMap == null) {
+        tableCFsMap = new HashMap<String, List<String>>();
+      }
+      tableCFsMap.put(tabName, cfs);
+    }
+
+    return tableCFsMap;
+  }
+
+  private void readTableCFsZnode() {
+    String currentTableCFs = Bytes.toString(tableCFsTracker.getData(false));
+    this.tableCFs = parseTableCFsFromConfig(currentTableCFs);
+  }
+
+  @Override
+  public PeerState getPeerState() {
+    return peerState;
+  }
+
+  /**
+   * Get the identifier of this peer
+   * @return string representation of the id (short)
+   */
+  @Override
+  public String getId() {
+    return id;
+  }
+
+  /**
+   * Get the peer config object
+   * @return the ReplicationPeerConfig for this peer
+   */
+  @Override
+  public ReplicationPeerConfig getPeerConfig() {
+    return peerConfig;
+  }
+
+  /**
+   * Get the configuration object required to communicate with this peer
+   * @return configuration object
+   */
+  @Override
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  /**
+   * Get replicable (table, cf-list) map of this peer
+   * @return the replicable (table, cf-list) map
+   */
+  @Override
+  public Map<String, List<String>> getTableCFs() {
+    return this.tableCFs;
+  }
+
+  @Override
+  public void abort(String why, Throwable e) {
+    LOG.fatal("The ReplicationPeer coresponding to peer " + peerConfig
+        + " was aborted for the following reason(s):" + why, e);
+  }
+
+  @Override
+  public boolean isAborted() {
+    // Currently the replication peer is never "Aborted", we just log when the
+    // abort method is called.
+    return false;
+  }
+
+  @Override
+  public void close() throws IOException {
+    // TODO: stop zkw?
+  }
+
+  /**
+   * Parse the raw data from ZK to get a peer's state
+   * @param bytes raw ZK data
+   * @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state.
+   * @throws DeserializationException
+   */
+  public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
+    ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
+    return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
+  }
+
+  /**
+   * @param bytes Content of a state znode.
+   * @return State parsed from the passed bytes.
+   * @throws DeserializationException
+   */
+  private static ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
+      throws DeserializationException {
+    ProtobufUtil.expectPBMagicPrefix(bytes);
+    int pblen = ProtobufUtil.lengthOfPBMagic();
+    ZooKeeperProtos.ReplicationState.Builder builder =
+        ZooKeeperProtos.ReplicationState.newBuilder();
+    ZooKeeperProtos.ReplicationState state;
+    try {
+      state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
+      return state.getState();
+    } catch (InvalidProtocolBufferException e) {
+      throw new DeserializationException(e);
+    }
+  }
+
+  /**
+   * Utility method to ensure an ENABLED znode is in place; if not present, we create it.
+   * @param zookeeper
+   * @param path Path to znode to check
+   * @return True if we created the znode.
+   * @throws NodeExistsException
+   * @throws KeeperException
+   */
+  private static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
+      throws NodeExistsException, KeeperException {
+    if (ZKUtil.checkExists(zookeeper, path) == -1) {
+      // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
+      // peer-state znode. This happens while adding a peer.
+      // The peer state data is set as "ENABLED" by default.
+      ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path,
+        ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Tracker for state of this peer
+   */
+  public class PeerStateTracker extends ZooKeeperNodeTracker {
+
+    public PeerStateTracker(String peerStateZNode, ZooKeeperWatcher watcher,
+        Abortable abortable) {
+      super(watcher, peerStateZNode, abortable);
+    }
+
+    @Override
+    public synchronized void nodeDataChanged(String path) {
+      if (path.equals(node)) {
+        super.nodeDataChanged(path);
+        try {
+          readPeerStateZnode();
+        } catch (DeserializationException e) {
+          LOG.warn("Failed deserializing the content of " + path, e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Tracker for (table, cf-list) map of this peer
+   */
+  public class TableCFsTracker extends ZooKeeperNodeTracker {
+
+    public TableCFsTracker(String tableCFsZNode, ZooKeeperWatcher watcher,
+        Abortable abortable) {
+      super(watcher, tableCFsZNode, abortable);
+    }
+
+    @Override
+    public synchronized void nodeDataChanged(String path) {
+      if (path.equals(node)) {
+        super.nodeDataChanged(path);
+        readTableCFsZnode();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
index 4922f70..b1c3b49 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -21,11 +21,10 @@ package org.apache.hadoop.hbase.replication;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * This provides an interface for maintaining a set of peer clusters. These peers are remote slave
@@ -44,22 +43,16 @@ public interface ReplicationPeers {
    * Initialize the ReplicationPeers interface.
    */
   void init() throws ReplicationException;
-  /**
-   * Add a new remote slave cluster for replication.
-   * @param peerId a short that identifies the cluster
-   * @param clusterKey the concatenation of the slave cluster's:
-   *          hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
-   */
-  void addPeer(String peerId, String clusterKey) throws ReplicationException;
 
   /**
    * Add a new remote slave cluster for replication.
    * @param peerId a short that identifies the cluster
-   * @param clusterKey the concatenation of the slave cluster's:
-   *          hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
-   * @param tableCFs the table and column-family list which will be replicated for this peer
+   * @param peerConfig configuration for the replication slave cluster
+   * @param tableCFs the table and column-family list which will be replicated for this peer or null
+   * for all table and column families
    */
-  void addPeer(String peerId, String clusterKey, String tableCFs) throws ReplicationException;
+  void addPeer(String peerId, ReplicationPeerConfig peerConfig, String tableCFs)
+      throws ReplicationException;
 
   /**
    * Removes a remote slave cluster and stops the replication to it.
@@ -67,6 +60,10 @@ public interface ReplicationPeers {
    */
   void removePeer(String peerId) throws ReplicationException;
 
+  boolean peerAdded(String peerId) throws ReplicationException;
+
+  void peerRemoved(String peerId);
+
   /**
    * Restart the replication to the specified remote slave cluster.
    * @param peerId a short that identifies the cluster
@@ -100,6 +97,19 @@ public interface ReplicationPeers {
   public Map<String, List<String>> getTableCFs(String peerId);
 
   /**
+   * Returns the ReplicationPeer
+   * @param peerId id for the peer
+   * @return ReplicationPeer object
+   */
+  ReplicationPeer getPeer(String peerId);
+
+  /**
+   * Returns the set of peerIds defined
+   * @return a Set of Strings for peerIds
+   */
+  public Set<String> getPeerIds();
+
+  /**
    * Get the replication status for the specified connected remote slave cluster.
    * The value might be read from cache, so it is recommended to
    * use {@link #getStatusOfPeerFromBackingStore(String)}
@@ -107,7 +117,7 @@ public interface ReplicationPeers {
    * @param peerId a short that identifies the cluster
    * @return true if replication is enabled, false otherwise.
    */
-  boolean getStatusOfConnectedPeer(String peerId);
+  boolean getStatusOfPeer(String peerId);
 
   /**
    * Get the replication status for the specified remote slave cluster, which doesn't
@@ -119,17 +129,11 @@ public interface ReplicationPeers {
   boolean getStatusOfPeerFromBackingStore(String peerId) throws ReplicationException;
 
   /**
-   * Get a set of all connected remote slave clusters.
-   * @return set of peer ids
-   */
-  Set<String> getConnectedPeers();
-
-  /**
-   * List the cluster keys of all remote slave clusters (whether they are enabled/disabled or
-   * connected/disconnected).
+   * List the cluster replication configs of all remote slave clusters (whether they are
+   * enabled/disabled or connected/disconnected).
    * @return A map of peer ids to peer cluster keys
    */
-  Map<String, String> getAllPeerClusterKeys();
+  Map<String, ReplicationPeerConfig> getAllPeerConfigs();
 
   /**
    * List the peer ids of all remote slave clusters (whether they are enabled/disabled or
@@ -139,45 +143,16 @@ public interface ReplicationPeers {
   List<String> getAllPeerIds();
 
   /**
-   * Attempt to connect to a new remote slave cluster.
-   * @param peerId a short that identifies the cluster
-   * @return true if a new connection was made, false if no new connection was made.
-   */
-  boolean connectToPeer(String peerId) throws ReplicationException;
-
-  /**
-   * Disconnect from a remote slave cluster.
-   * @param peerId a short that identifies the cluster
-   */
-  void disconnectFromPeer(String peerId);
-
-  /**
-   * Returns all region servers from given connected remote slave cluster.
-   * @param peerId a short that identifies the cluster
-   * @return addresses of all region servers in the peer cluster. Returns an empty list if the peer
-   *         cluster is unavailable or there are no region servers in the cluster.
-   */
-  List<ServerName> getRegionServersOfConnectedPeer(String peerId);
-
-  /**
-   * Get the timestamp of the last change in composition of a given peer cluster.
-   * @param peerId identifier of the peer cluster for which the timestamp is requested
-   * @return the timestamp (in milliseconds) of the last change to the composition of
-   *         the peer cluster
-   */
-  long getTimestampOfLastChangeToPeer(String peerId);
-
-  /**
-   * Returns the UUID of the provided peer id.
-   * @param peerId the peer's ID that will be converted into a UUID
-   * @return a UUID or null if the peer cluster does not exist or is not connected.
+   * Returns the configured ReplicationPeerConfig for this peerId
+   * @param peerId a short name that identifies the cluster
+   * @return ReplicationPeerConfig for the peer
    */
-  UUID getPeerUUID(String peerId);
+  ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws ReplicationException;
 
   /**
    * Returns the configuration needed to talk to the remote slave cluster.
    * @param peerId a short that identifies the cluster
    * @return the configuration for the peer cluster, null if it was unable to get the configuration
    */
-  Configuration getPeerConf(String peerId) throws ReplicationException;
+  Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId) throws ReplicationException;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index fb09102..488d37a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -19,33 +19,29 @@
 package org.apache.hadoop.hbase.replication;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.UUID;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.CompoundConfiguration;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.AuthFailedException;
-import org.apache.zookeeper.KeeperException.ConnectionLossException;
-import org.apache.zookeeper.KeeperException.SessionExpiredException;
-
+import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
@@ -77,7 +73,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
 public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers {
 
   // Map of peer clusters keyed by their id
-  private Map<String, ReplicationPeer> peerClusters;
+  private Map<String, ReplicationPeerZKImpl> peerClusters;
   private final String tableCFsNodeName;
 
   private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class);
@@ -86,7 +82,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
       Abortable abortable) {
     super(zk, conf, abortable);
     this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs");
-    this.peerClusters = new HashMap<String, ReplicationPeer>();
+    this.peerClusters = new HashMap<String, ReplicationPeerZKImpl>();
   }
 
   @Override
@@ -98,16 +94,12 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
     } catch (KeeperException e) {
       throw new ReplicationException("Could not initialize replication peers", e);
     }
-    connectExistingPeers();
-  }
-
-  @Override
-  public void addPeer(String id, String clusterKey) throws ReplicationException {
-    addPeer(id, clusterKey, null);
+    addExistingPeers();
   }
 
   @Override
-  public void addPeer(String id, String clusterKey, String tableCFs) throws ReplicationException {
+  public void addPeer(String id, ReplicationPeerConfig peerConfig, String tableCFs)
+      throws ReplicationException {
     try {
       if (peerExists(id)) {
         throw new IllegalArgumentException("Cannot add a peer with id=" + id
@@ -115,7 +107,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
       }
       ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
       ZKUtil.createAndWatch(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id),
-        toByteArray(clusterKey));
+        toByteArray(peerConfig));
       // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
       // peer-state znode. This happens while adding a peer.
       // The peer state data is set as "ENABLED" by default.
@@ -128,7 +120,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
                     Bytes.toBytes(tableCFsStr));
     } catch (KeeperException e) {
       throw new ReplicationException("Could not add peer with id=" + id
-          + ", clusterKey=" + clusterKey, e);
+          + ", peerConfif=>" + peerConfig, e);
     }
   }
 
@@ -202,11 +194,11 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
   }
 
   @Override
-  public boolean getStatusOfConnectedPeer(String id) {
+  public boolean getStatusOfPeer(String id) {
     if (!this.peerClusters.containsKey(id)) {
       throw new IllegalArgumentException("Peer with id= " + id + " is not connected");
     }
-    return this.peerClusters.get(id).getPeerEnabled().get();
+    return this.peerClusters.get(id).getPeerState() == PeerState.ENABLED;
   }
 
   @Override
@@ -217,7 +209,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
       }
       String peerStateZNode = getPeerStateNode(id);
       try {
-        return ReplicationPeer.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
+        return ReplicationPeerZKImpl.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
       } catch (KeeperException e) {
         throw new ReplicationException(e);
       } catch (DeserializationException e) {
@@ -232,140 +224,98 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
   }
 
   @Override
-  public boolean connectToPeer(String peerId) throws ReplicationException {
-    if (peerClusters == null) {
-      return false;
-    }
-    if (this.peerClusters.containsKey(peerId)) {
-      return false;
-    }
-    ReplicationPeer peer = null;
-    try {
-      peer = getPeer(peerId);
-    } catch (Exception e) {
-      throw new ReplicationException("Error connecting to peer with id=" + peerId, e);
-    }
-    if (peer == null) {
-      return false;
-    }
-    this.peerClusters.put(peerId, peer);
-    LOG.info("Added new peer cluster " + peer.getClusterKey());
-    return true;
-  }
-
-  @Override
-  public void disconnectFromPeer(String peerId) {
-    ReplicationPeer rp = this.peerClusters.get(peerId);
-    if (rp != null) {
-      rp.getZkw().close();
-      this.peerClusters.remove(peerId);
-    }
-  }
-
-  @Override
-  public Map<String, String> getAllPeerClusterKeys() {
-    Map<String, String> peers = new TreeMap<String, String>();
+  public Map<String, ReplicationPeerConfig> getAllPeerConfigs() {
+    Map<String, ReplicationPeerConfig> peers = new TreeMap<String, ReplicationPeerConfig>();
     List<String> ids = null;
     try {
       ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
       for (String id : ids) {
-        byte[] bytes = ZKUtil.getData(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
-        String clusterKey = null;
-        try {
-          clusterKey = parsePeerFrom(bytes);
-        } catch (DeserializationException de) {
-          LOG.warn("Failed parse of clusterid=" + id + " znode content, continuing.");
+        ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
+        if (peerConfig == null) {
+          LOG.warn("Failed to get replication peer configuration of clusterid=" + id
+            + " znode content, continuing.");
           continue;
         }
-        peers.put(id, clusterKey);
+        peers.put(id, peerConfig);
       }
     } catch (KeeperException e) {
       this.abortable.abort("Cannot get the list of peers ", e);
-    } catch (InterruptedException e) {
+    } catch (ReplicationException e) {
       this.abortable.abort("Cannot get the list of peers ", e);
     }
     return peers;
   }
 
   @Override
-  public List<ServerName> getRegionServersOfConnectedPeer(String peerId) {
-    if (this.peerClusters.size() == 0) {
-      return Collections.emptyList();
-    }
-    ReplicationPeer peer = this.peerClusters.get(peerId);
-    if (peer == null) {
-      return Collections.emptyList();
-    }
-    List<ServerName> addresses;
-    try {
-      addresses = fetchSlavesAddresses(peer.getZkw());
-    } catch (KeeperException ke) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Fetch salves addresses failed.", ke);
-      }
-      reconnectPeer(ke, peer);
-      addresses = Collections.emptyList();
-    }
-    peer.setRegionServers(addresses);
-    return peer.getRegionServers();
+  public ReplicationPeer getPeer(String peerId) {
+    return peerClusters.get(peerId);
   }
 
   @Override
-  public UUID getPeerUUID(String peerId) {
-    ReplicationPeer peer = this.peerClusters.get(peerId);
-    if (peer == null) {
-      return null;
-    }
-    UUID peerUUID = null;
-    try {
-      peerUUID = ZKClusterId.getUUIDForCluster(peer.getZkw());
-    } catch (KeeperException ke) {
-      reconnectPeer(ke, peer);
-    }
-    return peerUUID;
-  }
-
-  @Override
-  public Set<String> getConnectedPeers() {
-    return this.peerClusters.keySet();
+  public Set<String> getPeerIds() {
+    return peerClusters.keySet(); // this is not thread-safe
   }
 
+  /**
+   * Returns a ReplicationPeerConfig from the znode or null for the given peerId.
+   */
   @Override
-  public Configuration getPeerConf(String peerId) throws ReplicationException {
+  public ReplicationPeerConfig getReplicationPeerConfig(String peerId)
+      throws ReplicationException {
     String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
     byte[] data = null;
     try {
       data = ZKUtil.getData(this.zookeeper, znode);
-    } catch (KeeperException e) {
-      throw new ReplicationException("Error getting configuration for peer with id="
-          + peerId, e);
     } catch (InterruptedException e) {
       LOG.warn("Could not get configuration for peer because the thread " +
           "was interrupted. peerId=" + peerId);
       Thread.currentThread().interrupt();
       return null;
+    } catch (KeeperException e) {
+      throw new ReplicationException("Error getting configuration for peer with id="
+          + peerId, e);
     }
     if (data == null) {
       LOG.error("Could not get configuration for peer because it doesn't exist. peerId=" + peerId);
       return null;
     }
-    String otherClusterKey = "";
+
     try {
-      otherClusterKey = parsePeerFrom(data);
+      return parsePeerFrom(data);
     } catch (DeserializationException e) {
       LOG.warn("Failed to parse cluster key from peerId=" + peerId
           + ", specifically the content from the following znode: " + znode);
       return null;
     }
+  }
+
+  @Override
+  public Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId)
+      throws ReplicationException {
+    ReplicationPeerConfig peerConfig = getReplicationPeerConfig(peerId);
+
+    if (peerConfig == null) {
+      return null;
+    }
 
     Configuration otherConf = new Configuration(this.conf);
     try {
-      ZKUtil.applyClusterKeyToConf(otherConf, otherClusterKey);
+      if (peerConfig.getClusterKey() != null && !peerConfig.getClusterKey().isEmpty()) {
+        ZKUtil.applyClusterKeyToConf(otherConf, peerConfig.getClusterKey());
+      }
     } catch (IOException e) {
       LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e);
       return null;
     }
-    return otherConf;
+
+    if (!peerConfig.getConfiguration().isEmpty()) {
+      CompoundConfiguration compound = new CompoundConfiguration();
+      compound.add(otherConf);
+      compound.addStringMap(peerConfig.getConfiguration());
+      return new Pair<ReplicationPeerConfig, Configuration>(peerConfig, compound);
+    }
+
+    return new Pair<ReplicationPeerConfig, Configuration>(peerConfig, otherConf);
   }
 
   /**
@@ -382,19 +332,11 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
     return ids;
   }
 
-  @Override
-  public long getTimestampOfLastChangeToPeer(String peerId) {
-    if (!peerClusters.containsKey(peerId)) {
-      throw new IllegalArgumentException("Unknown peer id: " + peerId);
-    }
-    return peerClusters.get(peerId).getLastRegionserverUpdate();
-  }
-
   /**
-   * A private method used during initialization. This method attempts to connect to all registered
+   * A private method used during initialization. This method attempts to add all registered
    * peer clusters. This method does not set a watch on the peer cluster znodes.
    */
-  private void connectExistingPeers() throws ReplicationException {
+  private void addExistingPeers() throws ReplicationException {
     List<String> znodes = null;
     try {
       znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
@@ -403,45 +345,49 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
     }
     if (znodes != null) {
       for (String z : znodes) {
-        connectToPeer(z);
+        createAndAddPeer(z);
       }
     }
   }
 
-  /**
-   * A private method used to re-establish a zookeeper session with a peer cluster.
-   * @param ke
-   * @param peer
-   */
-  private void reconnectPeer(KeeperException ke, ReplicationPeer peer) {
-    if (ke instanceof ConnectionLossException || ke instanceof SessionExpiredException
-        || ke instanceof AuthFailedException) {
-      LOG.warn("Lost the ZooKeeper connection for peer " + peer.getClusterKey(), ke);
-      try {
-        peer.reloadZkWatcher();
-        peer.getZkw().registerListener(new PeerRegionServerListener(peer));
-      } catch (IOException io) {
-        LOG.warn("Creation of ZookeeperWatcher failed for peer " + peer.getClusterKey(), io);
-      }
+  @Override
+  public boolean peerAdded(String peerId) throws ReplicationException {
+    return createAndAddPeer(peerId);
+  }
+
+  @Override
+  public void peerRemoved(String peerId) {
+    ReplicationPeer rp = this.peerClusters.get(peerId);
+    if (rp != null) {
+      this.peerClusters.remove(peerId);
     }
   }
 
   /**
-   * Get the list of all the region servers from the specified peer
-   * @param zkw zk connection to use
-   * @return list of region server addresses or an empty list if the slave is unavailable
+   * Attempt to connect to a new remote slave cluster.
+   * @param peerId a short that identifies the cluster
+   * @return true if a new connection was made, false if no new connection was made.
    */
-  private static List<ServerName> fetchSlavesAddresses(ZooKeeperWatcher zkw)
-      throws KeeperException {
-    List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.rsZNode);
-    if (children == null) {
-      return Collections.emptyList();
+  public boolean createAndAddPeer(String peerId) throws ReplicationException {
+    if (peerClusters == null) {
+      return false;
     }
-    List<ServerName> addresses = new ArrayList<ServerName>(children.size());
-    for (String child : children) {
-      addresses.add(ServerName.parseServerName(child));
+    if (this.peerClusters.containsKey(peerId)) {
+      return false;
+    }
+
+    ReplicationPeerZKImpl peer = null;
+    try {
+      peer = createPeer(peerId);
+    } catch (Exception e) {
+      throw new ReplicationException("Error adding peer with id=" + peerId, e);
+    }
+    if (peer == null) {
+      return false;
     }
-    return addresses;
+    this.peerClusters.put(peerId, peer);
+    LOG.info("Added new peer cluster " + peer.getPeerConfig().getClusterKey());
+    return true;
   }
 
   private String getTableCFsNode(String id) {
@@ -485,18 +431,14 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
    * @return object representing the peer
    * @throws ReplicationException
    */
-  private ReplicationPeer getPeer(String peerId) throws ReplicationException {
-    Configuration peerConf = getPeerConf(peerId);
-    if (peerConf == null) {
-      return null;
-    }
-    if (this.ourClusterKey.equals(ZKUtil.getZooKeeperClusterKey(peerConf))) {
-      LOG.debug("Not connecting to " + peerId + " because it's us");
+  private ReplicationPeerZKImpl createPeer(String peerId) throws ReplicationException {
+    Pair<ReplicationPeerConfig, Configuration> pair = getPeerConf(peerId);
+    if (pair == null) {
       return null;
     }
+    Configuration peerConf = pair.getSecond();
 
-    ReplicationPeer peer =
-        new ReplicationPeer(peerConf, peerId, ZKUtil.getZooKeeperClusterKey(peerConf));
+    ReplicationPeerZKImpl peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst());
     try {
       peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
     } catch (KeeperException e) {
@@ -511,7 +453,6 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
           peerId, e);
     }
 
-    peer.getZkw().registerListener(new PeerRegionServerListener(peer));
     return peer;
   }
 
@@ -520,7 +461,8 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
    * @return ClusterKey parsed from the passed bytes.
    * @throws DeserializationException
    */
-  private static String parsePeerFrom(final byte[] bytes) throws DeserializationException {
+  private static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
+      throws DeserializationException {
     if (ProtobufUtil.isPBMagicPrefix(bytes)) {
       int pblen = ProtobufUtil.lengthOfPBMagic();
       ZooKeeperProtos.ReplicationPeer.Builder builder =
@@ -531,58 +473,70 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
       } catch (InvalidProtocolBufferException e) {
         throw new DeserializationException(e);
       }
-      return peer.getClusterkey();
+      return convert(peer);
     } else {
       if (bytes.length > 0) {
-        return Bytes.toString(bytes);
+        return new ReplicationPeerConfig().setClusterKey(Bytes.toString(bytes));
       }
-      return "";
+      return new ReplicationPeerConfig().setClusterKey("");
     }
   }
 
-  /**
-   * @param clusterKey
-   * @return Serialized protobuf of <code>clusterKey</code> with pb magic prefix prepended suitable
-   *         for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under
-   *         /hbase/replication/peers/PEER_ID
-   */
-  private static byte[] toByteArray(final String clusterKey) {
-    byte[] bytes =
-        ZooKeeperProtos.ReplicationPeer.newBuilder().setClusterkey(clusterKey).build()
-            .toByteArray();
-    return ProtobufUtil.prependPBMagic(bytes);
-  }
+  private static ReplicationPeerConfig convert(ZooKeeperProtos.ReplicationPeer peer) {
+    ReplicationPeerConfig peerConfig = new ReplicationPeerConfig();
+    if (peer.hasClusterkey()) {
+      peerConfig.setClusterKey(peer.getClusterkey());
+    }
+    if (peer.hasReplicationEndpointImpl()) {
+      peerConfig.setReplicationEndpointImpl(peer.getReplicationEndpointImpl());
+    }
 
-  /**
-   * Tracks changes to the list of region servers in a peer's cluster.
-   */
-  public static class PeerRegionServerListener extends ZooKeeperListener {
+    for (BytesBytesPair pair : peer.getDataList()) {
+      peerConfig.getPeerData().put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
+    }
 
-    private ReplicationPeer peer;
-    private String regionServerListNode;
+    for (NameStringPair pair : peer.getConfigurationList()) {
+      peerConfig.getConfiguration().put(pair.getName(), pair.getValue());
+    }
+    return peerConfig;
+  }
 
-    public PeerRegionServerListener(ReplicationPeer replicationPeer) {
-      super(replicationPeer.getZkw());
-      this.peer = replicationPeer;
-      this.regionServerListNode = peer.getZkw().rsZNode;
+  private static ZooKeeperProtos.ReplicationPeer convert(ReplicationPeerConfig  peerConfig) {
+    ZooKeeperProtos.ReplicationPeer.Builder builder = ZooKeeperProtos.ReplicationPeer.newBuilder();
+    if (peerConfig.getClusterKey() != null) {
+      builder.setClusterkey(peerConfig.getClusterKey());
+    }
+    if (peerConfig.getReplicationEndpointImpl() != null) {
+      builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl());
     }
 
-    public PeerRegionServerListener(String regionServerListNode, ZooKeeperWatcher zkw) {
-      super(zkw);
-      this.regionServerListNode = regionServerListNode;
+    for (Map.Entry<byte[], byte[]> entry : peerConfig.getPeerData().entrySet()) {
+      builder.addData(BytesBytesPair.newBuilder()
+        .setFirst(ByteString.copyFrom(entry.getKey()))
+        .setSecond(ByteString.copyFrom(entry.getValue()))
+          .build());
     }
 
-    @Override
-    public synchronized void nodeChildrenChanged(String path) {
-      if (path.equals(regionServerListNode)) {
-        try {
-          LOG.info("Detected change to peer regionservers, fetching updated list");
-          peer.setRegionServers(fetchSlavesAddresses(peer.getZkw()));
-        } catch (KeeperException e) {
-          LOG.fatal("Error reading slave addresses", e);
-        }
-      }
+    for (Map.Entry<String, String> entry : peerConfig.getConfiguration().entrySet()) {
+      builder.addConfiguration(NameStringPair.newBuilder()
+        .setName(entry.getKey())
+        .setValue(entry.getValue())
+        .build());
     }
 
+    return builder.build();
   }
+
+  /**
+   * @param peerConfig
+   * @return Serialized protobuf of <code>peerConfig</code> with pb magic prefix prepended suitable
+   *         for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under
+   *         /hbase/replication/peers/PEER_ID
+   */
+  private static byte[] toByteArray(final ReplicationPeerConfig peerConfig) {
+    byte[] bytes = convert(peerConfig).toByteArray();
+    return ProtobufUtil.prependPBMagic(bytes);
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java
index e0fb7cd..118d2bf 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java
@@ -93,6 +93,7 @@ public class ZKClusterId {
    * @throws KeeperException
    */
   public static UUID getUUIDForCluster(ZooKeeperWatcher zkw) throws KeeperException {
-    return UUID.fromString(readClusterIdZNode(zkw));
+    String uuid = readClusterIdZNode(zkw);
+    return uuid == null ? null : UUID.fromString(uuid);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseInterfaceAudience.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseInterfaceAudience.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseInterfaceAudience.java
index e7ce8d5..4010dc0 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseInterfaceAudience.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseInterfaceAudience.java
@@ -27,5 +27,6 @@ import org.apache.hadoop.classification.InterfaceStability;
 @InterfaceStability.Evolving
 public class HBaseInterfaceAudience {
   public static final String COPROC = "Coprocesssor";
+  public static final String REPLICATION = "Replication";
   public static final String PHOENIX = "Phoenix";
 }