You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2014/07/15 01:23:51 UTC

[1/3] HBASE-11367 Pluggable replication endpoint

Repository: hbase
Updated Branches:
  refs/heads/master 4824b0dea -> 463d52d8c


http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
new file mode 100644
index 0000000..e4ec0bc
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.SocketTimeoutException;
+import java.util.List;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
+import org.apache.hadoop.ipc.RemoteException;
+
+/**
+ * A {@link ReplicationEndpoint} implementation for replicating to another HBase cluster.
+ * For the slave cluster it selects a random number of peers
+ * using a replication ratio. For example, if replication ration = 0.1
+ * and slave cluster has 100 region servers, 10 will be selected.
+ * <p/>
+ * A stream is considered down when we cannot contact a region server on the
+ * peer cluster for more than 55 seconds by default.
+ */
+@InterfaceAudience.Private
+public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint {
+
+  private static final Log LOG = LogFactory.getLog(HBaseInterClusterReplicationEndpoint.class);
+  private HConnection conn;
+
+  private Configuration conf;
+
+  // How long should we sleep for each retry
+  private long sleepForRetries;
+
+  // Maximum number of retries before taking bold actions
+  private int maxRetriesMultiplier;
+  // Socket timeouts require even bolder actions since we don't want to DDOS
+  private int socketTimeoutMultiplier;
+  //Metrics for this source
+  private MetricsSource metrics;
+  // Handles connecting to peer region servers
+  private ReplicationSinkManager replicationSinkMgr;
+  private boolean peersSelected = false;
+
+  @Override
+  public void init(Context context) throws IOException {
+    super.init(context);
+    this.conf = HBaseConfiguration.create(ctx.getConfiguration());
+    decorateConf();
+    this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10);
+    this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
+        maxRetriesMultiplier * maxRetriesMultiplier);
+    // TODO: This connection is replication specific or we should make it particular to
+    // replication and make replication specific settings such as compression or codec to use
+    // passing Cells.
+    this.conn = HConnectionManager.createConnection(this.conf);
+    this.sleepForRetries =
+        this.conf.getLong("replication.source.sleepforretries", 1000);
+    this.metrics = context.getMetrics();
+    // ReplicationQueueInfo parses the peerId out of the znode for us
+    this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf);
+  }
+
+  private void decorateConf() {
+    String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
+    if (StringUtils.isNotEmpty(replicationCodec)) {
+      this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
+    }
+  }
+
+  private void connectToPeers() {
+    getRegionServers();
+
+    int sleepMultiplier = 1;
+
+    // Connect to peer cluster first, unless we have to stop
+    while (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) {
+      replicationSinkMgr.chooseSinks();
+      if (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) {
+        if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
+          sleepMultiplier++;
+        }
+      }
+    }
+  }
+
+  /**
+   * Do the sleeping logic
+   * @param msg Why we sleep
+   * @param sleepMultiplier by how many times the default sleeping time is augmented
+   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
+   */
+  protected boolean sleepForRetries(String msg, int sleepMultiplier) {
+    try {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
+      }
+      Thread.sleep(this.sleepForRetries * sleepMultiplier);
+    } catch (InterruptedException e) {
+      LOG.debug("Interrupted while sleeping between retries");
+    }
+    return sleepMultiplier < maxRetriesMultiplier;
+  }
+
+  /**
+   * Do the shipping logic
+   */
+  @Override
+  public boolean replicate(ReplicateContext replicateContext) {
+    List<HLog.Entry> entries = replicateContext.getEntries();
+    int sleepMultiplier = 1;
+    while (this.isRunning()) {
+      if (!peersSelected) {
+        connectToPeers();
+        peersSelected = true;
+      }
+
+      if (!isPeerEnabled()) {
+        if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
+          sleepMultiplier++;
+        }
+        continue;
+      }
+      SinkPeer sinkPeer = null;
+      try {
+        sinkPeer = replicationSinkMgr.getReplicationSink();
+        BlockingInterface rrs = sinkPeer.getRegionServer();
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Replicating " + entries.size() +
+              " entries of total size " + replicateContext.getSize());
+        }
+        ReplicationProtbufUtil.replicateWALEntry(rrs,
+            entries.toArray(new HLog.Entry[entries.size()]));
+
+        // update metrics
+        this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
+        return true;
+
+      } catch (IOException ioe) {
+        // Didn't ship anything, but must still age the last time we did
+        this.metrics.refreshAgeOfLastShippedOp();
+        if (ioe instanceof RemoteException) {
+          ioe = ((RemoteException) ioe).unwrapRemoteException();
+          LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);
+          if (ioe instanceof TableNotFoundException) {
+            if (sleepForRetries("A table is missing in the peer cluster. "
+                + "Replication cannot proceed without losing data.", sleepMultiplier)) {
+              sleepMultiplier++;
+            }
+          }
+        } else {
+          if (ioe instanceof SocketTimeoutException) {
+            // This exception means we waited for more than 60s and nothing
+            // happened, the cluster is alive and calling it right away
+            // even for a test just makes things worse.
+            sleepForRetries("Encountered a SocketTimeoutException. Since the " +
+              "call to the remote cluster timed out, which is usually " +
+              "caused by a machine failure or a massive slowdown",
+              this.socketTimeoutMultiplier);
+          } else if (ioe instanceof ConnectException) {
+            LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
+            replicationSinkMgr.chooseSinks();
+          } else {
+            LOG.warn("Can't replicate because of a local or network error: ", ioe);
+          }
+        }
+
+        if (sinkPeer != null) {
+          replicationSinkMgr.reportBadSink(sinkPeer);
+        }
+        if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
+          sleepMultiplier++;
+        }
+      }
+    }
+    return false; // in case we exited before replicating
+  }
+
+  protected boolean isPeerEnabled() {
+    return ctx.getReplicationPeer().getPeerState() == PeerState.ENABLED;
+  }
+
+  @Override
+  protected void doStop() {
+    disconnect(); //don't call super.doStop()
+    if (this.conn != null) {
+      try {
+        this.conn.close();
+        this.conn = null;
+      } catch (IOException e) {
+        LOG.warn("Failed to close the connection");
+      }
+    }
+    notifyStopped();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index b38a0c8..94dec7c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -22,13 +22,14 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 /**
  * This class is for maintaining the various replication statistics for a source and publishing them
  * through the metrics interfaces.
  */
-@InterfaceAudience.Private
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
 public class MetricsSource {
 
   public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue";
@@ -152,7 +153,7 @@ public class MetricsSource {
     rms.incCounters(shippedKBsKey, sizeInKB);
     rms.incCounters(SOURCE_SHIPPED_KBS, sizeInKB);
   }
-  
+
   /** increase the byte number read by source from log file */
   public void incrLogReadInBytes(long readInBytes) {
     rms.incCounters(logReadInBytesKey, readInBytes);

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
index 839db9b..2104268 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
@@ -29,8 +29,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
-import org.apache.hadoop.hbase.replication.ReplicationPeers;
-
+import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -61,7 +60,7 @@ public class ReplicationSinkManager {
 
   private final String peerClusterId;
 
-  private final ReplicationPeers replicationPeers;
+  private final HBaseReplicationEndpoint endpoint;
 
   // Count of "bad replication sink" reports per peer sink
   private final Map<ServerName, Integer> badReportCounts;
@@ -85,15 +84,15 @@ public class ReplicationSinkManager {
    * Instantiate for a single replication peer cluster.
    * @param conn connection to the peer cluster
    * @param peerClusterId identifier of the peer cluster
-   * @param replicationPeers manages peer clusters being replicated to
+   * @param endpoint replication endpoint for inter cluster replication
    * @param conf HBase configuration, used for determining replication source ratio and bad peer
    *          threshold
    */
   public ReplicationSinkManager(HConnection conn, String peerClusterId,
-      ReplicationPeers replicationPeers, Configuration conf) {
+      HBaseReplicationEndpoint endpoint, Configuration conf) {
     this.conn = conn;
     this.peerClusterId = peerClusterId;
-    this.replicationPeers = replicationPeers;
+    this.endpoint = endpoint;
     this.badReportCounts = Maps.newHashMap();
     this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
     this.badSinkThreshold = conf.getInt("replication.bad.sink.threshold",
@@ -107,8 +106,7 @@ public class ReplicationSinkManager {
    * @return a replication sink to replicate to
    */
   public SinkPeer getReplicationSink() throws IOException {
-    if (replicationPeers.getTimestampOfLastChangeToPeer(peerClusterId)
-                                                        > this.lastUpdateToPeers) {
+    if (endpoint.getLastRegionServerUpdate() > this.lastUpdateToPeers) {
       LOG.info("Current list of sinks is out of date, updating");
       chooseSinks();
     }
@@ -143,8 +141,7 @@ public class ReplicationSinkManager {
   }
 
   void chooseSinks() {
-    List<ServerName> slaveAddresses =
-                        replicationPeers.getRegionServersOfConnectedPeer(peerClusterId);
+    List<ServerName> slaveAddresses = endpoint.getRegionServers();
     Collections.shuffle(slaveAddresses, random);
     int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
     sinks = slaveAddresses.subList(0, numSinks);

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 4e2106d..87cbcc6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -21,13 +21,9 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.net.ConnectException;
-import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
 import java.util.UUID;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -41,27 +37,25 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
+import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.ipc.RemoteException;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
 
 /**
  * Class that handles the source of a replication stream.
@@ -82,9 +76,9 @@ public class ReplicationSource extends Thread
   public static final Log LOG = LogFactory.getLog(ReplicationSource.class);
   // Queue of logs to process
   private PriorityBlockingQueue<Path> queue;
-  private HConnection conn;
   private ReplicationQueues replicationQueues;
   private ReplicationPeers replicationPeers;
+
   private Configuration conf;
   private ReplicationQueueInfo replicationQueueInfo;
   // id of the peer cluster this source replicates to
@@ -118,8 +112,6 @@ public class ReplicationSource extends Thread
   private String peerClusterZnode;
   // Maximum number of retries before taking bold actions
   private int maxRetriesMultiplier;
-  // Socket timeouts require even bolder actions since we don't want to DDOS
-  private int socketTimeoutMultiplier;
   // Current number of operations (Put/Delete) that we need to replicate
   private int currentNbOperations = 0;
   // Current size of data we need to replicate
@@ -130,10 +122,14 @@ public class ReplicationSource extends Thread
   private MetricsSource metrics;
   // Handle on the log reader helper
   private ReplicationHLogReaderManager repLogReader;
-  // Handles connecting to peer region servers
-  private ReplicationSinkManager replicationSinkMgr;
   //WARN threshold for the number of queued logs, defaults to 2
   private int logQueueWarnThreshold;
+  // ReplicationEndpoint which will handle the actual replication
+  private ReplicationEndpoint replicationEndpoint;
+  // A filter (or a chain of filters) for the WAL entries.
+  private WALEntryFilter walEntryFilter;
+  // Context for ReplicationEndpoint#replicate()
+  private ReplicationEndpoint.ReplicateContext replicateContext;
   // throttler
   private ReplicationThrottler throttler;
 
@@ -145,30 +141,30 @@ public class ReplicationSource extends Thread
    * @param manager replication manager to ping to
    * @param stopper     the atomic boolean to use to stop the regionserver
    * @param peerClusterZnode the name of our znode
+   * @param clusterId unique UUID for the cluster
+   * @param replicationEndpoint the replication endpoint implementation
+   * @param metrics metrics for replication source
    * @throws IOException
    */
+  @Override
   public void init(final Configuration conf, final FileSystem fs,
       final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
       final ReplicationPeers replicationPeers, final Stoppable stopper,
-      final String peerClusterZnode, final UUID clusterId) throws IOException {
+      final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
+      final MetricsSource metrics)
+          throws IOException {
     this.stopper = stopper;
-    this.conf = HBaseConfiguration.create(conf);
+    this.conf = conf;
     decorateConf();
     this.replicationQueueSizeCapacity =
         this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
     this.replicationQueueNbCapacity =
         this.conf.getInt("replication.source.nb.capacity", 25000);
     this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10);
-    this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
-        maxRetriesMultiplier * maxRetriesMultiplier);
     this.queue =
         new PriorityBlockingQueue<Path>(
             this.conf.getInt("hbase.regionserver.maxlogs", 32),
             new LogsComparator());
-    // TODO: This connection is replication specific or we should make it particular to
-    // replication and make replication specific settings such as compression or codec to use
-    // passing Cells.
-    this.conn = HConnectionManager.getConnection(this.conf);
     long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
     this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
     this.replicationQueues = replicationQueues;
@@ -177,7 +173,7 @@ public class ReplicationSource extends Thread
     this.sleepForRetries =
         this.conf.getLong("replication.source.sleepforretries", 1000);
     this.fs = fs;
-    this.metrics = new MetricsSource(peerClusterZnode);
+    this.metrics = metrics;
     this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf);
     this.clusterId = clusterId;
 
@@ -185,8 +181,10 @@ public class ReplicationSource extends Thread
     this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
     // ReplicationQueueInfo parses the peerId out of the znode for us
     this.peerId = this.replicationQueueInfo.getPeerId();
-    this.replicationSinkMgr = new ReplicationSinkManager(conn, peerId, replicationPeers, this.conf);
     this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
+    this.replicationEndpoint = replicationEndpoint;
+
+    this.replicateContext = new ReplicationEndpoint.ReplicateContext();
   }
 
   private void decorateConf() {
@@ -209,30 +207,48 @@ public class ReplicationSource extends Thread
   }
 
   private void uninitialize() {
-    if (this.conn != null) {
-      try {
-        this.conn.close();
-      } catch (IOException e) {
-        LOG.debug("Attempt to close connection failed", e);
-      }
-    }
     LOG.debug("Source exiting " + this.peerId);
     metrics.clear();
+    if (replicationEndpoint.state() == Service.State.STARTING
+        || replicationEndpoint.state() == Service.State.RUNNING) {
+      replicationEndpoint.stopAndWait();
+    }
   }
 
   @Override
   public void run() {
-    connectToPeers();
     // We were stopped while looping to connect to sinks, just abort
     if (!this.isActive()) {
       uninitialize();
       return;
     }
 
+    try {
+      // start the endpoint, connect to the cluster
+      Service.State state = replicationEndpoint.start().get();
+      if (state != Service.State.RUNNING) {
+        LOG.warn("ReplicationEndpoint was not started. Exiting");
+        uninitialize();
+        return;
+      }
+    } catch (Exception ex) {
+      LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
+      throw new RuntimeException(ex);
+    }
+
+    // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
+    ArrayList<WALEntryFilter> filters = Lists.newArrayList(
+      (WALEntryFilter)new SystemTableWALEntryFilter());
+    WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
+    if (filterFromEndpoint != null) {
+      filters.add(filterFromEndpoint);
+    }
+    this.walEntryFilter = new ChainWALEntryFilter(filters);
+
     int sleepMultiplier = 1;
     // delay this until we are in an asynchronous thread
     while (this.isActive() && this.peerClusterId == null) {
-      this.peerClusterId = replicationPeers.getPeerUUID(this.peerId);
+      this.peerClusterId = replicationEndpoint.getPeerUUID();
       if (this.isActive() && this.peerClusterId == null) {
         if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
           sleepMultiplier++;
@@ -250,9 +266,10 @@ public class ReplicationSource extends Thread
 
     // In rare case, zookeeper setting may be messed up. That leads to the incorrect
     // peerClusterId value, which is the same as the source clusterId
-    if (clusterId.equals(peerClusterId)) {
+    if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
       this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
-          + peerClusterId);
+          + peerClusterId + " which is not allowed by ReplicationEndpoint:"
+          + replicationEndpoint.getClass().getName(), null, false);
     }
     LOG.info("Replicating "+clusterId + " -> " + peerClusterId);
 
@@ -397,8 +414,8 @@ public class ReplicationSource extends Thread
    * entries
    * @throws IOException
    */
-  protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo, List<HLog.Entry> entries)
-      throws IOException{
+  protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
+      List<HLog.Entry> entries) throws IOException{
     long seenEntries = 0;
     if (LOG.isTraceEnabled()) {
       LOG.trace("Seeking in " + this.currentPath + " at position "
@@ -409,18 +426,22 @@ public class ReplicationSource extends Thread
     HLog.Entry entry =
         this.repLogReader.readNextAndSetPosition();
     while (entry != null) {
-      WALEdit edit = entry.getEdit();
       this.metrics.incrLogEditsRead();
       seenEntries++;
-      // Remove all KVs that should not be replicated
-      HLogKey logKey = entry.getKey();
+
       // don't replicate if the log entries have already been consumed by the cluster
-      if (!logKey.getClusterIds().contains(peerClusterId)) {
-        removeNonReplicableEdits(entry);
-        // Don't replicate catalog entries, if the WALEdit wasn't
-        // containing anything to replicate and if we're currently not set to replicate
-        if (!logKey.getTablename().equals(TableName.META_TABLE_NAME) &&
-            edit.size() != 0) {
+      if (replicationEndpoint.canReplicateToSameCluster()
+          || !entry.getKey().getClusterIds().contains(peerClusterId)) {
+        // Remove all KVs that should not be replicated
+        entry = walEntryFilter.filter(entry);
+        WALEdit edit = null;
+        HLogKey logKey = null;
+        if (entry != null) {
+          edit = entry.getEdit();
+          logKey = entry.getKey();
+        }
+
+        if (edit != null && edit.size() != 0) {
           //Mark that the current cluster has the change
           logKey.addClusterId(clusterId);
           currentNbOperations += countDistinctRowKeys(edit);
@@ -451,20 +472,6 @@ public class ReplicationSource extends Thread
     return seenEntries == 0 && processEndOfFile();
   }
 
-  private void connectToPeers() {
-    int sleepMultiplier = 1;
-
-    // Connect to peer cluster first, unless we have to stop
-    while (this.isActive() && replicationSinkMgr.getSinks().size() == 0) {
-      replicationSinkMgr.chooseSinks();
-      if (this.isActive() && replicationSinkMgr.getSinks().size() == 0) {
-        if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
-          sleepMultiplier++;
-        }
-      }
-    }
-  }
-
   /**
    * Poll for the next path
    * @return true if a path was obtained, false if not
@@ -594,8 +601,8 @@ public class ReplicationSource extends Thread
   /*
    * Checks whether the current log file is empty, and it is not a recovered queue. This is to
    * handle scenario when in an idle cluster, there is no entry in the current log and we keep on
-   * trying to read the log file and get EOFEception. In case of a recovered queue the last log file
-   * may be empty, and we don't want to retry that.
+   * trying to read the log file and get EOFException. In case of a recovered queue the last log
+   * file may be empty, and we don't want to retry that.
    */
   private boolean isCurrentLogEmpty() {
     return (this.repLogReader.getPosition() == 0 &&
@@ -622,47 +629,6 @@ public class ReplicationSource extends Thread
   }
 
   /**
-   * We only want KVs that are scoped other than local
-   * @param entry The entry to check for replication
-   */
-  protected void removeNonReplicableEdits(HLog.Entry entry) {
-    String tabName = entry.getKey().getTablename().getNameAsString();
-    ArrayList<KeyValue> kvs = entry.getEdit().getKeyValues();
-    Map<String, List<String>> tableCFs = null;
-    try {
-      tableCFs = this.replicationPeers.getTableCFs(peerId);
-    } catch (IllegalArgumentException e) {
-      LOG.error("should not happen: can't get tableCFs for peer " + peerId +
-          ", degenerate as if it's not configured by keeping tableCFs==null");
-    }
-    int size = kvs.size();
-
-    // clear kvs(prevent replicating) if logKey's table isn't in this peer's
-    // replicable table list (empty tableCFs means all table are replicable)
-    if (tableCFs != null && !tableCFs.containsKey(tabName)) {
-      kvs.clear();
-    } else {
-      NavigableMap<byte[], Integer> scopes = entry.getKey().getScopes();
-      List<String> cfs = (tableCFs == null) ? null : tableCFs.get(tabName);
-      for (int i = size - 1; i >= 0; i--) {
-        KeyValue kv = kvs.get(i);
-        // The scope will be null or empty if
-        // there's nothing to replicate in that WALEdit
-        // ignore(remove) kv if its cf isn't in the replicable cf list
-        // (empty cfs means all cfs of this table are replicable)
-        if (scopes == null || !scopes.containsKey(kv.getFamily()) ||
-            (cfs != null && !cfs.contains(Bytes.toString(kv.getFamily())))) {
-          kvs.remove(i);
-        }
-      }
-    }
-
-    if (kvs.size() < size/2) {
-      kvs.trimToSize();
-    }
-  }
-
-  /**
    * Count the number of different row keys in the given edit because of
    * mini-batching. We assume that there's at least one KV in the WALEdit.
    * @param edit edit to count row keys from
@@ -692,13 +658,6 @@ public class ReplicationSource extends Thread
       return;
     }
     while (this.isActive()) {
-      if (!isPeerEnabled()) {
-        if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
-          sleepMultiplier++;
-        }
-        continue;
-      }
-      SinkPeer sinkPeer = null;
       try {
         if (this.throttler.isEnabled()) {
           long sleepTicks = this.throttler.getNextSleepInterval(currentSize);
@@ -719,14 +678,15 @@ public class ReplicationSource extends Thread
             this.throttler.resetStartTick();
           }
         }
-        sinkPeer = replicationSinkMgr.getReplicationSink();
-        BlockingInterface rrs = sinkPeer.getRegionServer();
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Replicating " + entries.size() +
-              " entries of total size " + currentSize);
+        replicateContext.setEntries(entries).setSize(currentSize);
+
+        // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
+        boolean replicated = replicationEndpoint.replicate(replicateContext);
+
+        if (!replicated) {
+          continue;
         }
-        ReplicationProtbufUtil.replicateWALEntry(rrs,
-            entries.toArray(new HLog.Entry[entries.size()]));
+
         if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
           this.manager.logPositionAndCleanOldLogs(this.currentPath,
               this.peerClusterZnode, this.repLogReader.getPosition(),
@@ -745,50 +705,9 @@ public class ReplicationSource extends Thread
               + this.totalReplicatedOperations + " operations");
         }
         break;
-
-      } catch (IOException ioe) {
-        // Didn't ship anything, but must still age the last time we did
-        this.metrics.refreshAgeOfLastShippedOp();
-        if (ioe instanceof RemoteException) {
-          ioe = ((RemoteException) ioe).unwrapRemoteException();
-          LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);
-          if (ioe instanceof TableNotFoundException) {
-            if (sleepForRetries("A table is missing in the peer cluster. "
-                + "Replication cannot proceed without losing data.", sleepMultiplier)) {
-              sleepMultiplier++;
-            }
-            // current thread might be interrupted to terminate
-            // directly go back to while() for confirm this
-            if (isInterrupted()) {
-              continue;
-            }
-          }
-        } else {
-          if (ioe instanceof SocketTimeoutException) {
-            // This exception means we waited for more than 60s and nothing
-            // happened, the cluster is alive and calling it right away
-            // even for a test just makes things worse.
-            sleepForRetries("Encountered a SocketTimeoutException. Since the " +
-              "call to the remote cluster timed out, which is usually " +
-              "caused by a machine failure or a massive slowdown",
-              this.socketTimeoutMultiplier);
-            // current thread might be interrupted to terminate
-            // directly go back to while() for confirm this
-            if (isInterrupted()) {
-              continue;
-            }
-          } else if (ioe instanceof ConnectException) {
-            LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
-            replicationSinkMgr.chooseSinks();
-          } else {
-            LOG.warn("Can't replicate because of a local or network error: ", ioe);
-          }
-        }
-
-        if (sinkPeer != null) {
-          replicationSinkMgr.reportBadSink(sinkPeer);
-        }
-        if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
+      } catch (Exception ex) {
+        LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:" + ex);
+        if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
           sleepMultiplier++;
         }
       }
@@ -801,7 +720,7 @@ public class ReplicationSource extends Thread
    * @return true if the peer is enabled, otherwise false
    */
   protected boolean isPeerEnabled() {
-    return this.replicationPeers.getStatusOfConnectedPeer(this.peerId);
+    return this.replicationPeers.getStatusOfPeer(this.peerId);
   }
 
   /**
@@ -835,10 +754,12 @@ public class ReplicationSource extends Thread
     return false;
   }
 
+  @Override
   public void startup() {
     String n = Thread.currentThread().getName();
     Thread.UncaughtExceptionHandler handler =
         new Thread.UncaughtExceptionHandler() {
+          @Override
           public void uncaughtException(final Thread t, final Throwable e) {
             LOG.error("Unexpected exception in ReplicationSource," +
               " currentPath=" + currentPath, e);
@@ -849,11 +770,17 @@ public class ReplicationSource extends Thread
         this.peerClusterZnode, handler);
   }
 
+  @Override
   public void terminate(String reason) {
     terminate(reason, null);
   }
 
+  @Override
   public void terminate(String reason, Exception cause) {
+    terminate(reason, cause, true);
+  }
+
+  public void terminate(String reason, Exception cause, boolean join) {
     if (cause == null) {
       LOG.info("Closing source "
           + this.peerClusterZnode + " because: " + reason);
@@ -864,17 +791,33 @@ public class ReplicationSource extends Thread
     }
     this.running = false;
     this.interrupt();
-    Threads.shutdown(this, this.sleepForRetries * this.maxRetriesMultiplier);
+    ListenableFuture<Service.State> future = null;
+    if (this.replicationEndpoint != null) {
+      future = this.replicationEndpoint.stop();
+    }
+    if (join) {
+      Threads.shutdown(this, this.sleepForRetries);
+      if (future != null) {
+        try {
+          future.get();
+        } catch (Exception e) {
+          LOG.warn("Got exception:" + e);
+        }
+      }
+    }
   }
 
+  @Override
   public String getPeerClusterZnode() {
     return this.peerClusterZnode;
   }
 
+  @Override
   public String getPeerClusterId() {
     return this.peerId;
   }
 
+  @Override
   public Path getCurrentPath() {
     return this.currentPath;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index df599f0..6388d9b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 
@@ -50,7 +51,8 @@ public interface ReplicationSourceInterface {
   public void init(final Configuration conf, final FileSystem fs,
       final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
       final ReplicationPeers replicationPeers, final Stoppable stopper,
-      final String peerClusterZnode, final UUID clusterId) throws IOException;
+      final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
+      final MetricsSource metrics) throws IOException;
 
   /**
    * Add a log to the list of logs to replicate

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 7b4cd83..db9c505 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -43,9 +43,13 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationListener;
+import org.apache.hadoop.hbase.replication.ReplicationPeer;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
 
@@ -115,7 +119,7 @@ public class ReplicationSourceManager implements ReplicationListener {
       final Configuration conf, final Stoppable stopper, final FileSystem fs, final Path logDir,
       final Path oldLogDir, final UUID clusterId) {
     //CopyOnWriteArrayList is thread-safe.
-    //Generally, reading is more than modifying. 
+    //Generally, reading is more than modifying.
     this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
     this.replicationQueues = replicationQueues;
     this.replicationPeers = replicationPeers;
@@ -194,7 +198,7 @@ public class ReplicationSourceManager implements ReplicationListener {
    * old region server hlog queues
    */
   protected void init() throws IOException, ReplicationException {
-    for (String id : this.replicationPeers.getConnectedPeers()) {
+    for (String id : this.replicationPeers.getPeerIds()) {
       addSource(id);
     }
     List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
@@ -221,9 +225,12 @@ public class ReplicationSourceManager implements ReplicationListener {
    */
   protected ReplicationSourceInterface addSource(String id) throws IOException,
       ReplicationException {
+    ReplicationPeerConfig peerConfig
+      = replicationPeers.getReplicationPeerConfig(id);
+    ReplicationPeer peer = replicationPeers.getPeer(id);
     ReplicationSourceInterface src =
         getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
-          this.replicationPeers, stopper, id, this.clusterId);
+          this.replicationPeers, stopper, id, this.clusterId, peerConfig, peer);
     synchronized (this.hlogsById) {
       this.sources.add(src);
       this.hlogsById.put(id, new TreeSet<String>());
@@ -254,7 +261,7 @@ public class ReplicationSourceManager implements ReplicationListener {
   public void deleteSource(String peerId, boolean closeConnection) {
     this.replicationQueues.removeQueue(peerId);
     if (closeConnection) {
-      this.replicationPeers.disconnectFromPeer(peerId);
+      this.replicationPeers.peerRemoved(peerId);
     }
   }
 
@@ -340,7 +347,9 @@ public class ReplicationSourceManager implements ReplicationListener {
   protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
       final FileSystem fs, final ReplicationSourceManager manager,
       final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
-      final Stoppable stopper, final String peerId, final UUID clusterId) throws IOException {
+      final Stoppable stopper, final String peerId, final UUID clusterId,
+      final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
+          throws IOException {
     ReplicationSourceInterface src;
     try {
       @SuppressWarnings("rawtypes")
@@ -351,9 +360,32 @@ public class ReplicationSourceManager implements ReplicationListener {
       LOG.warn("Passed replication source implementation throws errors, " +
           "defaulting to ReplicationSource", e);
       src = new ReplicationSource();
+    }
 
+    ReplicationEndpoint replicationEndpoint = null;
+    try {
+      String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
+      if (replicationEndpointImpl == null) {
+        // Default to HBase inter-cluster replication endpoint
+        replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
+      }
+      @SuppressWarnings("rawtypes")
+      Class c = Class.forName(replicationEndpointImpl);
+      replicationEndpoint = (ReplicationEndpoint) c.newInstance();
+    } catch (Exception e) {
+      LOG.warn("Passed replication endpoint implementation throws errors", e);
+      throw new IOException(e);
     }
-    src.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerId, clusterId);
+
+    MetricsSource metrics = new MetricsSource(peerId);
+    // init replication source
+    src.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerId,
+      clusterId, replicationEndpoint, metrics);
+
+    // init replication endpoint
+    replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
+      fs, peerConfig, peerId, clusterId, replicationPeer, metrics));
+
     return src;
   }
 
@@ -441,7 +473,7 @@ public class ReplicationSourceManager implements ReplicationListener {
   public void peerListChanged(List<String> peerIds) {
     for (String id : peerIds) {
       try {
-        boolean added = this.replicationPeers.connectToPeer(id);
+        boolean added = this.replicationPeers.peerAdded(id);
         if (added) {
           addSource(id);
         }
@@ -507,10 +539,26 @@ public class ReplicationSourceManager implements ReplicationListener {
       for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
         String peerId = entry.getKey();
         try {
+          // there is not an actual peer defined corresponding to peerId for the failover.
+          ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
+          String actualPeerId = replicationQueueInfo.getPeerId();
+          ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
+          ReplicationPeerConfig peerConfig = null;
+          try {
+            peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId);
+          } catch (ReplicationException ex) {
+            LOG.warn("Received exception while getting replication peer config, skipping replay"
+                + ex);
+          }
+          if (peer == null || peerConfig == null) {
+            LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
+            continue;
+          }
+
           ReplicationSourceInterface src =
               getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
-                stopper, peerId, this.clusterId);
-          if (!this.rp.getConnectedPeers().contains((src.getPeerClusterId()))) {
+                stopper, peerId, this.clusterId, peerConfig, peer);
+          if (!this.rp.getPeerIds().contains((src.getPeerClusterId()))) {
             src.terminate("Recovered queue doesn't belong to any current peer");
             break;
           }
@@ -561,7 +609,7 @@ public class ReplicationSourceManager implements ReplicationListener {
       stats.append(source.getStats() + "\n");
     }
     for (ReplicationSourceInterface oldSource : oldsources) {
-      stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId() + ": ");
+      stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId()+": ");
       stats.append(oldSource.getStats()+ "\n");
     }
     return stats.toString();

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index fa744ff..77bc64e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -17,6 +17,10 @@
  */
 package org.apache.hadoop.hbase.client.replication;
 
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -25,6 +29,8 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import com.google.common.collect.Lists;
+
 import static org.junit.Assert.fail;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -117,5 +123,36 @@ public class TestReplicationAdmin {
     admin.removePeer(ID_ONE);
   }
 
+  @Test
+  public void testGetTableCfsStr() {
+    // opposite of TestPerTableCFReplication#testParseTableCFsFromConfig()
+
+    Map<TableName, List<String>> tabCFsMap = null;
+
+    // 1. null or empty string, result should be null
+    assertEquals(null, ReplicationAdmin.getTableCfsStr(tabCFsMap));
+
+
+    // 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3"
+    tabCFsMap = new TreeMap<TableName, List<String>>();
+    tabCFsMap.put(TableName.valueOf("tab1"), null);   // its table name is "tab1"
+    assertEquals("tab1", ReplicationAdmin.getTableCfsStr(tabCFsMap));
+
+    tabCFsMap = new TreeMap<TableName, List<String>>();
+    tabCFsMap.put(TableName.valueOf("tab1"), Lists.newArrayList("cf1"));
+    assertEquals("tab1:cf1", ReplicationAdmin.getTableCfsStr(tabCFsMap));
+
+    tabCFsMap = new TreeMap<TableName, List<String>>();
+    tabCFsMap.put(TableName.valueOf("tab1"), Lists.newArrayList("cf1", "cf3"));
+    assertEquals("tab1:cf1,cf3", ReplicationAdmin.getTableCfsStr(tabCFsMap));
+
+    // 3. multiple tables: "tab1 ; tab2:cf1 ; tab3:cf1,cf3"
+    tabCFsMap = new TreeMap<TableName, List<String>>();
+    tabCFsMap.put(TableName.valueOf("tab1"), null);
+    tabCFsMap.put(TableName.valueOf("tab2"), Lists.newArrayList("cf1"));
+    tabCFsMap.put(TableName.valueOf("tab3"), Lists.newArrayList("cf1", "cf3"));
+    assertEquals("tab1;tab2:cf1;tab3:cf1,cf3", ReplicationAdmin.getTableCfsStr(tabCFsMap));
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index fa3dda6..13a18ce 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
 
@@ -40,7 +41,8 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
   @Override
   public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
       ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId,
-      UUID clusterId) throws IOException {
+      UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics)
+          throws IOException {
 
     this.manager = manager;
     this.peerClusterId = peerClusterId;

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
index ee102fc..ff77a94 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
@@ -175,30 +175,30 @@ public class TestPerTableCFReplication {
     Map<String, List<String>> tabCFsMap = null;
 
     // 1. null or empty string, result should be null
-    tabCFsMap = ReplicationPeer.parseTableCFsFromConfig(null);
+    tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig(null);
     assertEquals(null, tabCFsMap);
 
-    tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("");
+    tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("");
     assertEquals(null, tabCFsMap);
 
-    tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("   ");
+    tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("   ");
     assertEquals(null, tabCFsMap);
 
     // 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3"
-    tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab1");
+    tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab1");
     assertEquals(1, tabCFsMap.size()); // only one table
     assertTrue(tabCFsMap.containsKey("tab1"));   // its table name is "tab1"
     assertFalse(tabCFsMap.containsKey("tab2"));  // not other table
     assertEquals(null, tabCFsMap.get("tab1"));   // null cf-list,
 
-    tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab2:cf1");
+    tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab2:cf1");
     assertEquals(1, tabCFsMap.size()); // only one table
     assertTrue(tabCFsMap.containsKey("tab2"));   // its table name is "tab2"
     assertFalse(tabCFsMap.containsKey("tab1"));  // not other table
     assertEquals(1, tabCFsMap.get("tab2").size());   // cf-list contains only 1 cf
     assertEquals("cf1", tabCFsMap.get("tab2").get(0));// the only cf is "cf1"
 
-    tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab3 : cf1 , cf3");
+    tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab3 : cf1 , cf3");
     assertEquals(1, tabCFsMap.size()); // only one table
     assertTrue(tabCFsMap.containsKey("tab3"));   // its table name is "tab2"
     assertFalse(tabCFsMap.containsKey("tab1"));  // not other table
@@ -207,7 +207,7 @@ public class TestPerTableCFReplication {
     assertTrue(tabCFsMap.get("tab3").contains("cf3"));// contains "cf3"
 
     // 3. multiple tables: "tab1 ; tab2:cf1 ; tab3:cf1,cf3"
-    tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab1 ; tab2:cf1 ; tab3:cf1,cf3");
+    tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab1 ; tab2:cf1 ; tab3:cf1,cf3");
     // 3.1 contains 3 tables : "tab1", "tab2" and "tab3"
     assertEquals(3, tabCFsMap.size());
     assertTrue(tabCFsMap.containsKey("tab1"));
@@ -225,7 +225,8 @@ public class TestPerTableCFReplication {
 
     // 4. contiguous or additional ";"(table delimiter) or ","(cf delimiter) can be tolerated
     // still use the example of multiple tables: "tab1 ; tab2:cf1 ; tab3:cf1,cf3"
-    tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab1 ; ; tab2:cf1 ; tab3:cf1,,cf3 ;");
+    tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig(
+      "tab1 ; ; tab2:cf1 ; tab3:cf1,,cf3 ;");
     // 4.1 contains 3 tables : "tab1", "tab2" and "tab3"
     assertEquals(3, tabCFsMap.size());
     assertTrue(tabCFsMap.containsKey("tab1"));
@@ -243,7 +244,8 @@ public class TestPerTableCFReplication {
 
     // 5. invalid format "tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3"
     //    "tab1:tt:cf1" and "tab2::cf1" are invalid and will be ignored totally
-    tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3");
+    tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig(
+      "tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3");
     // 5.1 no "tab1" and "tab2", only "tab3"
     assertEquals(1, tabCFsMap.size()); // only one table
     assertFalse(tabCFsMap.containsKey("tab1"));

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
new file mode 100644
index 0000000..38aaa7a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Tests ReplicationSource and ReplicationEndpoint interactions
+ */
+@Category(MediumTests.class)
+public class TestReplicationEndpoint extends TestReplicationBase {
+
+  static int numRegionServers;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TestReplicationBase.setUpBeforeClass();
+    utility2.shutdownMiniCluster(); // we don't need the second cluster
+    admin.removePeer("2");
+    numRegionServers = utility1.getHBaseCluster().getRegionServerThreads().size();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TestReplicationBase.tearDownAfterClass();
+    // check stop is called
+    Assert.assertTrue(ReplicationEndpointForTest.stoppedCount.get() > 0);
+  }
+
+  @Before
+  public void setup() throws FailedLogCloseException, IOException {
+    ReplicationEndpointForTest.contructedCount.set(0);
+    ReplicationEndpointForTest.startedCount.set(0);
+    ReplicationEndpointForTest.replicateCount.set(0);
+    ReplicationEndpointForTest.lastEntries = null;
+    for (RegionServerThread rs : utility1.getMiniHBaseCluster().getRegionServerThreads()) {
+      utility1.getHBaseAdmin().rollHLogWriter(rs.getRegionServer().getServerName().toString());
+    }
+  }
+
+  @Test
+  public void testCustomReplicationEndpoint() throws Exception {
+    // test installing a custom replication endpoint other than the default one.
+    admin.addPeer("testCustomReplicationEndpoint",
+      new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1))
+        .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
+
+    // check whether the class has been constructed and started
+    Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers;
+      }
+    });
+
+    Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        return ReplicationEndpointForTest.startedCount.get() >= numRegionServers;
+      }
+    });
+
+    Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
+
+    // now replicate some data.
+    doPut(Bytes.toBytes("row42"));
+
+    Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        return ReplicationEndpointForTest.replicateCount.get() >= 1;
+      }
+    });
+
+    doAssert(Bytes.toBytes("row42"));
+
+    admin.removePeer("testCustomReplicationEndpoint");
+  }
+
+  @Test
+  public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception {
+    admin.addPeer("testReplicationEndpointReturnsFalseOnReplicate",
+      new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1))
+        .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null);
+    // now replicate some data.
+    doPut(row);
+
+    Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        return ReplicationEndpointReturningFalse.replicated.get();
+      }
+    });
+    if (ReplicationEndpointReturningFalse.ex.get() != null) {
+      throw ReplicationEndpointReturningFalse.ex.get();
+    }
+
+    admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate");
+  }
+
+  @Test
+  public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
+    admin.addPeer("testWALEntryFilterFromReplicationEndpoint",
+      new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1))
+        .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()), null);
+    // now replicate some data.
+    doPut(Bytes.toBytes("row1"));
+    doPut(row);
+    doPut(Bytes.toBytes("row2"));
+
+    Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        return ReplicationEndpointForTest.replicateCount.get() >= 1;
+      }
+    });
+
+    Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get());
+    admin.removePeer("testWALEntryFilterFromReplicationEndpoint");
+  }
+
+
+  private void doPut(byte[] row) throws IOException {
+    Put put = new Put(row);
+    put.add(famName, row, row);
+    htable1 = new HTable(conf1, tableName);
+    htable1.put(put);
+    htable1.close();
+  }
+
+  private static void doAssert(byte[] row) throws Exception {
+    if (ReplicationEndpointForTest.lastEntries == null) {
+      return; // first call
+    }
+    Assert.assertEquals(1, ReplicationEndpointForTest.lastEntries.size());
+    List<KeyValue> kvs = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getKeyValues();
+    Assert.assertEquals(1, kvs.size());
+    Assert.assertTrue(Bytes.equals(kvs.get(0).getRowArray(), kvs.get(0).getRowOffset(),
+      kvs.get(0).getRowLength(), row, 0, row.length));
+  }
+
+  public static class ReplicationEndpointForTest extends BaseReplicationEndpoint {
+    static UUID uuid = UUID.randomUUID();
+    static AtomicInteger contructedCount = new AtomicInteger();
+    static AtomicInteger startedCount = new AtomicInteger();
+    static AtomicInteger stoppedCount = new AtomicInteger();
+    static AtomicInteger replicateCount = new AtomicInteger();
+    static volatile List<HLog.Entry> lastEntries = null;
+
+    public ReplicationEndpointForTest() {
+      contructedCount.incrementAndGet();
+    }
+
+    @Override
+    public UUID getPeerUUID() {
+      return uuid;
+    }
+
+    @Override
+    public boolean replicate(ReplicateContext replicateContext) {
+      replicateCount.incrementAndGet();
+      lastEntries = replicateContext.entries;
+      return true;
+    }
+
+    @Override
+    protected void doStart() {
+      startedCount.incrementAndGet();
+      notifyStarted();
+    }
+
+    @Override
+    protected void doStop() {
+      stoppedCount.incrementAndGet();
+      notifyStopped();
+    }
+  }
+
+  public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest {
+    static AtomicReference<Exception> ex = new AtomicReference<Exception>(null);
+    static AtomicBoolean replicated = new AtomicBoolean(false);
+    @Override
+    public boolean replicate(ReplicateContext replicateContext) {
+      try {
+        // check row
+        doAssert(row);
+      } catch (Exception e) {
+        ex.set(e);
+      }
+
+      super.replicate(replicateContext);
+
+      replicated.set(replicateCount.get() > 10); // first 10 times, we return false
+      return replicated.get();
+    }
+  }
+
+  // return a WALEntry filter which only accepts "row", but not other rows
+  public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEndpointForTest {
+    static AtomicReference<Exception> ex = new AtomicReference<Exception>(null);
+
+    @Override
+    public boolean replicate(ReplicateContext replicateContext) {
+      try {
+        super.replicate(replicateContext);
+        doAssert(row);
+      } catch (Exception e) {
+        ex.set(e);
+      }
+      return true;
+    }
+
+    @Override
+    public WALEntryFilter getWALEntryfilter() {
+      return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() {
+        @Override
+        public Entry filter(Entry entry) {
+          ArrayList<KeyValue> kvs = entry.getEdit().getKeyValues();
+          int size = kvs.size();
+          for (int i = size-1; i >= 0; i--) {
+            KeyValue kv = kvs.get(i);
+            if (!Bytes.equals(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
+              row, 0, row.length)) {
+              kvs.remove(i);
+            }
+          }
+          return entry;
+        }
+      });
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index fd003ad..e560620 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -179,50 +179,48 @@ public abstract class TestReplicationStateBasic {
     } catch (IllegalArgumentException e) {
     }
     try {
-      rp.getStatusOfConnectedPeer("bogus");
+      rp.getStatusOfPeer("bogus");
       fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
     } catch (IllegalArgumentException e) {
     }
-    assertFalse(rp.connectToPeer("bogus"));
-    rp.disconnectFromPeer("bogus");
-    assertEquals(0, rp.getRegionServersOfConnectedPeer("bogus").size());
-    assertNull(rp.getPeerUUID("bogus"));
+    assertFalse(rp.peerAdded("bogus"));
+    rp.peerRemoved("bogus");
+
     assertNull(rp.getPeerConf("bogus"));
-    assertNumberOfPeers(0, 0);
+    assertNumberOfPeers(0);
 
     // Add some peers
-    rp.addPeer(ID_ONE, KEY_ONE);
-    assertNumberOfPeers(0, 1);
-    rp.addPeer(ID_TWO, KEY_TWO);
-    assertNumberOfPeers(0, 2);
+    rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
+    assertNumberOfPeers(1);
+    rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), null);
+    assertNumberOfPeers(2);
 
     // Test methods with a peer that is added but not connected
     try {
-      rp.getStatusOfConnectedPeer(ID_ONE);
+      rp.getStatusOfPeer(ID_ONE);
       fail("There are no connected peers, should have thrown an IllegalArgumentException");
     } catch (IllegalArgumentException e) {
     }
-    assertNull(rp.getPeerUUID(ID_ONE));
-    assertEquals(KEY_ONE, ZKUtil.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE)));
-    rp.disconnectFromPeer(ID_ONE);
-    assertEquals(0, rp.getRegionServersOfConnectedPeer(ID_ONE).size());
-
-    // Connect to one peer
-    rp.connectToPeer(ID_ONE);
-    assertNumberOfPeers(1, 2);
-    assertTrue(rp.getStatusOfConnectedPeer(ID_ONE));
+    assertEquals(KEY_ONE, ZKUtil.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond()));
+    rp.removePeer(ID_ONE);
+    rp.peerRemoved(ID_ONE);
+    assertNumberOfPeers(1);
+
+    // Add one peer
+    rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
+    rp.peerAdded(ID_ONE);
+    assertNumberOfPeers(2);
+    assertTrue(rp.getStatusOfPeer(ID_ONE));
     rp.disablePeer(ID_ONE);
     assertConnectedPeerStatus(false, ID_ONE);
     rp.enablePeer(ID_ONE);
     assertConnectedPeerStatus(true, ID_ONE);
-    assertEquals(1, rp.getRegionServersOfConnectedPeer(ID_ONE).size());
-    assertNotNull(rp.getPeerUUID(ID_ONE).toString());
 
     // Disconnect peer
-    rp.disconnectFromPeer(ID_ONE);
-    assertNumberOfPeers(0, 2);
+    rp.peerRemoved(ID_ONE);
+    assertNumberOfPeers(2);
     try {
-      rp.getStatusOfConnectedPeer(ID_ONE);
+      rp.getStatusOfPeer(ID_ONE);
       fail("There are no connected peers, should have thrown an IllegalArgumentException");
     } catch (IllegalArgumentException e) {
     }
@@ -234,7 +232,7 @@ public abstract class TestReplicationStateBasic {
       fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK");
     }
     while (true) {
-      if (status == rp.getStatusOfConnectedPeer(peerId)) {
+      if (status == rp.getStatusOfPeer(peerId)) {
         return;
       }
       if (zkTimeoutCount < ZK_MAX_COUNT) {
@@ -247,9 +245,9 @@ public abstract class TestReplicationStateBasic {
     }
   }
 
-  protected void assertNumberOfPeers(int connected, int total) {
-    assertEquals(total, rp.getAllPeerClusterKeys().size());
-    assertEquals(connected, rp.getConnectedPeers().size());
+  protected void assertNumberOfPeers(int total) {
+    assertEquals(total, rp.getAllPeerConfigs().size());
+    assertEquals(total, rp.getAllPeerIds().size());
     assertEquals(total, rp.getAllPeerIds().size());
   }
 
@@ -269,7 +267,7 @@ public abstract class TestReplicationStateBasic {
         rq3.addLog("qId" + i, "filename" + j);
       }
       //Add peers for the corresponding queues so they are not orphans
-      rp.addPeer("qId" + i, "bogus" + i);
+      rp.addPeer("qId" + i, new ReplicationPeerConfig().setClusterKey("bogus" + i), null);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
index 5273fe3..1c3de71 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
@@ -145,7 +145,7 @@ public class TestReplicationTrackerZKImpl {
 
   @Ignore ("Flakey") @Test(timeout = 30000)
   public void testPeerRemovedEvent() throws Exception {
-    rp.addPeer("5", utility.getClusterKey());
+    rp.addPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null);
     rt.registerListener(new DummyReplicationListener());
     rp.removePeer("5");
     // wait for event
@@ -158,7 +158,7 @@ public class TestReplicationTrackerZKImpl {
   @Ignore ("Flakey") @Test(timeout = 30000)
   public void testPeerListChangedEvent() throws Exception {
     // add a peer
-    rp.addPeer("5", utility.getClusterKey());
+    rp.addPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null);
     zkw.getRecoverableZooKeeper().getZooKeeper().getChildren("/hbase/replication/peers/5", true);
     rt.registerListener(new DummyReplicationListener());
     rp.disablePeer("5");

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
new file mode 100644
index 0000000..d4b412e
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+@Category(SmallTests.class)
+public class TestReplicationWALEntryFilters {
+
+  static byte[] a = new byte[] {'a'};
+  static byte[] b = new byte[] {'b'};
+  static byte[] c = new byte[] {'c'};
+  static byte[] d = new byte[] {'d'};
+
+  @Test
+  public void testSystemTableWALEntryFilter() {
+    SystemTableWALEntryFilter filter = new SystemTableWALEntryFilter();
+
+    // meta
+    HLogKey key1 = new HLogKey( HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
+      HTableDescriptor.META_TABLEDESC.getTableName());
+    HLog.Entry metaEntry = new Entry(key1, null);
+
+    assertNull(filter.filter(metaEntry));
+
+    // ns table
+    HLogKey key2 = new HLogKey(new byte[] {}, HTableDescriptor.NAMESPACE_TABLEDESC.getTableName());
+    HLog.Entry nsEntry = new Entry(key2, null);
+    assertNull(filter.filter(nsEntry));
+
+    // user table
+
+    HLogKey key3 = new HLogKey(new byte[] {}, TableName.valueOf("foo"));
+    HLog.Entry userEntry = new Entry(key3, null);
+
+    assertEquals(userEntry, filter.filter(userEntry));
+  }
+
+  @Test
+  public void testScopeWALEntryFilter() {
+    ScopeWALEntryFilter filter = new ScopeWALEntryFilter();
+
+    HLog.Entry userEntry = createEntry(a, b);
+    HLog.Entry userEntryA = createEntry(a);
+    HLog.Entry userEntryB = createEntry(b);
+    HLog.Entry userEntryEmpty = createEntry();
+
+    // no scopes
+    assertEquals(null, filter.filter(userEntry));
+
+    // empty scopes
+    TreeMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+    userEntry = createEntry(a, b);
+    userEntry.getKey().setScopes(scopes);
+    assertEquals(null, filter.filter(userEntry));
+
+    // different scope
+    scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+    scopes.put(c, HConstants.REPLICATION_SCOPE_GLOBAL);
+    userEntry = createEntry(a, b);
+    userEntry.getKey().setScopes(scopes);
+    // all kvs should be filtered
+    assertEquals(userEntryEmpty, filter.filter(userEntry));
+
+    // local scope
+    scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+    scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL);
+    userEntry = createEntry(a, b);
+    userEntry.getKey().setScopes(scopes);
+    assertEquals(userEntryEmpty, filter.filter(userEntry));
+    scopes.put(b, HConstants.REPLICATION_SCOPE_LOCAL);
+    assertEquals(userEntryEmpty, filter.filter(userEntry));
+
+    // only scope a
+    scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+    scopes.put(a, HConstants.REPLICATION_SCOPE_GLOBAL);
+    userEntry = createEntry(a, b);
+    userEntry.getKey().setScopes(scopes);
+    assertEquals(userEntryA, filter.filter(userEntry));
+    scopes.put(b, HConstants.REPLICATION_SCOPE_LOCAL);
+    assertEquals(userEntryA, filter.filter(userEntry));
+
+    // only scope b
+    scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+    scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
+    userEntry = createEntry(a, b);
+    userEntry.getKey().setScopes(scopes);
+    assertEquals(userEntryB, filter.filter(userEntry));
+    scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL);
+    assertEquals(userEntryB, filter.filter(userEntry));
+
+    // scope a and b
+    scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+    scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
+    userEntry = createEntry(a, b);
+    userEntry.getKey().setScopes(scopes);
+    assertEquals(userEntryB, filter.filter(userEntry));
+    scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL);
+    assertEquals(userEntryB, filter.filter(userEntry));
+  }
+
+  WALEntryFilter nullFilter = new WALEntryFilter() {
+    @Override
+    public Entry filter(Entry entry) {
+      return null;
+    }
+  };
+
+  WALEntryFilter passFilter = new WALEntryFilter() {
+    @Override
+    public Entry filter(Entry entry) {
+      return entry;
+    }
+  };
+
+  @Test
+  public void testChainWALEntryFilter() {
+    HLog.Entry userEntry = createEntry(a, b, c);
+
+    ChainWALEntryFilter filter = new ChainWALEntryFilter(passFilter);
+    assertEquals(createEntry(a,b,c), filter.filter(userEntry));
+
+    filter = new ChainWALEntryFilter(passFilter, passFilter);
+    assertEquals(createEntry(a,b,c), filter.filter(userEntry));
+
+    filter = new ChainWALEntryFilter(passFilter, passFilter, passFilter);
+    assertEquals(createEntry(a,b,c), filter.filter(userEntry));
+
+    filter = new ChainWALEntryFilter(nullFilter);
+    assertEquals(null, filter.filter(userEntry));
+
+    filter = new ChainWALEntryFilter(nullFilter, passFilter);
+    assertEquals(null, filter.filter(userEntry));
+
+    filter = new ChainWALEntryFilter(passFilter, nullFilter);
+    assertEquals(null, filter.filter(userEntry));
+
+    filter = new ChainWALEntryFilter(nullFilter, passFilter, nullFilter);
+    assertEquals(null, filter.filter(userEntry));
+
+    filter = new ChainWALEntryFilter(nullFilter, nullFilter);
+    assertEquals(null, filter.filter(userEntry));
+
+    // flatten
+    filter =
+        new ChainWALEntryFilter(
+          new ChainWALEntryFilter(passFilter,
+            new ChainWALEntryFilter(passFilter, passFilter),
+          new ChainWALEntryFilter(passFilter),
+          new ChainWALEntryFilter(passFilter)),
+          new ChainWALEntryFilter(passFilter));
+    assertEquals(createEntry(a,b,c), filter.filter(userEntry));
+
+
+    filter =
+        new ChainWALEntryFilter(
+          new ChainWALEntryFilter(passFilter,
+            new ChainWALEntryFilter(passFilter,
+              new ChainWALEntryFilter(nullFilter))),
+          new ChainWALEntryFilter(passFilter));
+    assertEquals(null, filter.filter(userEntry));
+  }
+
+  @Test
+  public void testTableCfWALEntryFilter() {
+    ReplicationPeer peer = mock(ReplicationPeer.class);
+
+    when(peer.getTableCFs()).thenReturn(null);
+    HLog.Entry userEntry = createEntry(a, b, c);
+    TableCfWALEntryFilter filter = new TableCfWALEntryFilter(peer);
+    assertEquals(createEntry(a,b,c), filter.filter(userEntry));
+
+    // empty map
+    userEntry = createEntry(a, b, c);
+    Map<String, List<String>> tableCfs = new HashMap<String, List<String>>();
+    when(peer.getTableCFs()).thenReturn(tableCfs);
+    filter = new TableCfWALEntryFilter(peer);
+    assertEquals(null, filter.filter(userEntry));
+
+    // table bar
+    userEntry = createEntry(a, b, c);
+    tableCfs = new HashMap<String, List<String>>();
+    tableCfs.put("bar", null);
+    when(peer.getTableCFs()).thenReturn(tableCfs);
+    filter = new TableCfWALEntryFilter(peer);
+    assertEquals(null, filter.filter(userEntry));
+
+    // table foo:a
+    userEntry = createEntry(a, b, c);
+    tableCfs = new HashMap<String, List<String>>();
+    tableCfs.put("foo", Lists.newArrayList("a"));
+    when(peer.getTableCFs()).thenReturn(tableCfs);
+    filter = new TableCfWALEntryFilter(peer);
+    assertEquals(createEntry(a), filter.filter(userEntry));
+
+    // table foo:a,c
+    userEntry = createEntry(a, b, c, d);
+    tableCfs = new HashMap<String, List<String>>();
+    tableCfs.put("foo", Lists.newArrayList("a", "c"));
+    when(peer.getTableCFs()).thenReturn(tableCfs);
+    filter = new TableCfWALEntryFilter(peer);
+    assertEquals(createEntry(a,c), filter.filter(userEntry));
+  }
+
+  private HLog.Entry createEntry(byte[]... kvs) {
+    HLogKey key1 = new HLogKey(new byte[] {}, TableName.valueOf("foo"));
+    WALEdit edit1 = new WALEdit();
+
+    for (byte[] kv : kvs) {
+      edit1.add(new KeyValue(kv, kv, kv));
+    }
+    return new HLog.Entry(key1, edit1);
+  }
+
+
+  private void assertEquals(HLog.Entry e1, HLog.Entry e2) {
+    Assert.assertEquals(e1 == null, e2 == null);
+    if (e1 == null) {
+      return;
+    }
+
+    // do not compare HLogKeys
+
+    // compare kvs
+    Assert.assertEquals(e1.getEdit() == null, e2.getEdit() == null);
+    if (e1.getEdit() == null) {
+      return;
+    }
+    List<KeyValue> kvs1 = e1.getEdit().getKeyValues();
+    List<KeyValue> kvs2 = e2.getEdit().getKeyValues();
+    Assert.assertEquals(kvs1.size(), kvs2.size());
+    for (int i = 0; i < kvs1.size(); i++) {
+      KeyValue.COMPARATOR.compare(kvs1.get(i), kvs2.get(i));
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
index 296f953..9175192 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.SmallTests;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
 import org.junit.Before;
@@ -42,13 +43,15 @@ public class TestReplicationSinkManager {
   private static final String PEER_CLUSTER_ID = "PEER_CLUSTER_ID";
 
   private ReplicationPeers replicationPeers;
+  private HBaseReplicationEndpoint replicationEndpoint;
   private ReplicationSinkManager sinkManager;
 
   @Before
   public void setUp() {
     replicationPeers = mock(ReplicationPeers.class);
+    replicationEndpoint = mock(HBaseReplicationEndpoint.class);
     sinkManager = new ReplicationSinkManager(mock(HConnection.class),
-                      PEER_CLUSTER_ID, replicationPeers, new Configuration());
+                      PEER_CLUSTER_ID, replicationEndpoint, new Configuration());
   }
 
   @Test
@@ -58,7 +61,7 @@ public class TestReplicationSinkManager {
       serverNames.add(mock(ServerName.class));
     }
 
-    when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
+    when(replicationEndpoint.getRegionServers())
           .thenReturn(serverNames);
 
     sinkManager.chooseSinks();
@@ -72,7 +75,7 @@ public class TestReplicationSinkManager {
     List<ServerName> serverNames = Lists.newArrayList(mock(ServerName.class),
       mock(ServerName.class));
 
-    when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
+    when(replicationEndpoint.getRegionServers())
           .thenReturn(serverNames);
 
     sinkManager.chooseSinks();
@@ -84,8 +87,8 @@ public class TestReplicationSinkManager {
   public void testReportBadSink() {
     ServerName serverNameA = mock(ServerName.class);
     ServerName serverNameB = mock(ServerName.class);
-    when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID)).thenReturn(
-      Lists.newArrayList(serverNameA, serverNameB));
+    when(replicationEndpoint.getRegionServers())
+      .thenReturn(Lists.newArrayList(serverNameA, serverNameB));
 
     sinkManager.chooseSinks();
     // Sanity check
@@ -110,7 +113,7 @@ public class TestReplicationSinkManager {
     for (int i = 0; i < 20; i++) {
       serverNames.add(mock(ServerName.class));
     }
-    when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
+    when(replicationEndpoint.getRegionServers())
           .thenReturn(serverNames);
 
 
@@ -137,7 +140,7 @@ public class TestReplicationSinkManager {
     for (int i = 0; i < 20; i++) {
       serverNames.add(mock(ServerName.class));
     }
-    when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
+    when(replicationEndpoint.getRegionServers())
           .thenReturn(serverNames);
 
 


[2/3] HBASE-11367 Pluggable replication endpoint

Posted by en...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
index 9d037f5..a1caf87 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
@@ -4828,6 +4828,71 @@ public final class ZooKeeperProtos {
      */
     com.google.protobuf.ByteString
         getClusterkeyBytes();
+
+    // optional string replicationEndpointImpl = 2;
+    /**
+     * <code>optional string replicationEndpointImpl = 2;</code>
+     */
+    boolean hasReplicationEndpointImpl();
+    /**
+     * <code>optional string replicationEndpointImpl = 2;</code>
+     */
+    java.lang.String getReplicationEndpointImpl();
+    /**
+     * <code>optional string replicationEndpointImpl = 2;</code>
+     */
+    com.google.protobuf.ByteString
+        getReplicationEndpointImplBytes();
+
+    // repeated .BytesBytesPair data = 3;
+    /**
+     * <code>repeated .BytesBytesPair data = 3;</code>
+     */
+    java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair> 
+        getDataList();
+    /**
+     * <code>repeated .BytesBytesPair data = 3;</code>
+     */
+    org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair getData(int index);
+    /**
+     * <code>repeated .BytesBytesPair data = 3;</code>
+     */
+    int getDataCount();
+    /**
+     * <code>repeated .BytesBytesPair data = 3;</code>
+     */
+    java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPairOrBuilder> 
+        getDataOrBuilderList();
+    /**
+     * <code>repeated .BytesBytesPair data = 3;</code>
+     */
+    org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPairOrBuilder getDataOrBuilder(
+        int index);
+
+    // repeated .NameStringPair configuration = 4;
+    /**
+     * <code>repeated .NameStringPair configuration = 4;</code>
+     */
+    java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair> 
+        getConfigurationList();
+    /**
+     * <code>repeated .NameStringPair configuration = 4;</code>
+     */
+    org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair getConfiguration(int index);
+    /**
+     * <code>repeated .NameStringPair configuration = 4;</code>
+     */
+    int getConfigurationCount();
+    /**
+     * <code>repeated .NameStringPair configuration = 4;</code>
+     */
+    java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPairOrBuilder> 
+        getConfigurationOrBuilderList();
+    /**
+     * <code>repeated .NameStringPair configuration = 4;</code>
+     */
+    org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPairOrBuilder getConfigurationOrBuilder(
+        int index);
   }
   /**
    * Protobuf type {@code ReplicationPeer}
@@ -4890,6 +4955,27 @@ public final class ZooKeeperProtos {
               clusterkey_ = input.readBytes();
               break;
             }
+            case 18: {
+              bitField0_ |= 0x00000002;
+              replicationEndpointImpl_ = input.readBytes();
+              break;
+            }
+            case 26: {
+              if (!((mutable_bitField0_ & 0x00000004) == 0x00000004)) {
+                data_ = new java.util.ArrayList<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair>();
+                mutable_bitField0_ |= 0x00000004;
+              }
+              data_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.PARSER, extensionRegistry));
+              break;
+            }
+            case 34: {
+              if (!((mutable_bitField0_ & 0x00000008) == 0x00000008)) {
+                configuration_ = new java.util.ArrayList<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair>();
+                mutable_bitField0_ |= 0x00000008;
+              }
+              configuration_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.PARSER, extensionRegistry));
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -4898,6 +4984,12 @@ public final class ZooKeeperProtos {
         throw new com.google.protobuf.InvalidProtocolBufferException(
             e.getMessage()).setUnfinishedMessage(this);
       } finally {
+        if (((mutable_bitField0_ & 0x00000004) == 0x00000004)) {
+          data_ = java.util.Collections.unmodifiableList(data_);
+        }
+        if (((mutable_bitField0_ & 0x00000008) == 0x00000008)) {
+          configuration_ = java.util.Collections.unmodifiableList(configuration_);
+        }
         this.unknownFields = unknownFields.build();
         makeExtensionsImmutable();
       }
@@ -4988,8 +5080,126 @@ public final class ZooKeeperProtos {
       }
     }
 
+    // optional string replicationEndpointImpl = 2;
+    public static final int REPLICATIONENDPOINTIMPL_FIELD_NUMBER = 2;
+    private java.lang.Object replicationEndpointImpl_;
+    /**
+     * <code>optional string replicationEndpointImpl = 2;</code>
+     */
+    public boolean hasReplicationEndpointImpl() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    /**
+     * <code>optional string replicationEndpointImpl = 2;</code>
+     */
+    public java.lang.String getReplicationEndpointImpl() {
+      java.lang.Object ref = replicationEndpointImpl_;
+      if (ref instanceof java.lang.String) {
+        return (java.lang.String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        java.lang.String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          replicationEndpointImpl_ = s;
+        }
+        return s;
+      }
+    }
+    /**
+     * <code>optional string replicationEndpointImpl = 2;</code>
+     */
+    public com.google.protobuf.ByteString
+        getReplicationEndpointImplBytes() {
+      java.lang.Object ref = replicationEndpointImpl_;
+      if (ref instanceof java.lang.String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (java.lang.String) ref);
+        replicationEndpointImpl_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
+    // repeated .BytesBytesPair data = 3;
+    public static final int DATA_FIELD_NUMBER = 3;
+    private java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair> data_;
+    /**
+     * <code>repeated .BytesBytesPair data = 3;</code>
+     */
+    public java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair> getDataList() {
+      return data_;
+    }
+    /**
+     * <code>repeated .BytesBytesPair data = 3;</code>
+     */
+    public java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPairOrBuilder> 
+        getDataOrBuilderList() {
+      return data_;
+    }
+    /**
+     * <code>repeated .BytesBytesPair data = 3;</code>
+     */
+    public int getDataCount() {
+      return data_.size();
+    }
+    /**
+     * <code>repeated .BytesBytesPair data = 3;</code>
+     */
+    public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair getData(int index) {
+      return data_.get(index);
+    }
+    /**
+     * <code>repeated .BytesBytesPair data = 3;</code>
+     */
+    public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPairOrBuilder getDataOrBuilder(
+        int index) {
+      return data_.get(index);
+    }
+
+    // repeated .NameStringPair configuration = 4;
+    public static final int CONFIGURATION_FIELD_NUMBER = 4;
+    private java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair> configuration_;
+    /**
+     * <code>repeated .NameStringPair configuration = 4;</code>
+     */
+    public java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair> getConfigurationList() {
+      return configuration_;
+    }
+    /**
+     * <code>repeated .NameStringPair configuration = 4;</code>
+     */
+    public java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPairOrBuilder> 
+        getConfigurationOrBuilderList() {
+      return configuration_;
+    }
+    /**
+     * <code>repeated .NameStringPair configuration = 4;</code>
+     */
+    public int getConfigurationCount() {
+      return configuration_.size();
+    }
+    /**
+     * <code>repeated .NameStringPair configuration = 4;</code>
+     */
+    public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair getConfiguration(int index) {
+      return configuration_.get(index);
+    }
+    /**
+     * <code>repeated .NameStringPair configuration = 4;</code>
+     */
+    public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPairOrBuilder getConfigurationOrBuilder(
+        int index) {
+      return configuration_.get(index);
+    }
+
     private void initFields() {
       clusterkey_ = "";
+      replicationEndpointImpl_ = "";
+      data_ = java.util.Collections.emptyList();
+      configuration_ = java.util.Collections.emptyList();
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -5000,6 +5210,18 @@ public final class ZooKeeperProtos {
         memoizedIsInitialized = 0;
         return false;
       }
+      for (int i = 0; i < getDataCount(); i++) {
+        if (!getData(i).isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
+      for (int i = 0; i < getConfigurationCount(); i++) {
+        if (!getConfiguration(i).isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -5010,6 +5232,15 @@ public final class ZooKeeperProtos {
       if (((bitField0_ & 0x00000001) == 0x00000001)) {
         output.writeBytes(1, getClusterkeyBytes());
       }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeBytes(2, getReplicationEndpointImplBytes());
+      }
+      for (int i = 0; i < data_.size(); i++) {
+        output.writeMessage(3, data_.get(i));
+      }
+      for (int i = 0; i < configuration_.size(); i++) {
+        output.writeMessage(4, configuration_.get(i));
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -5023,6 +5254,18 @@ public final class ZooKeeperProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBytesSize(1, getClusterkeyBytes());
       }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(2, getReplicationEndpointImplBytes());
+      }
+      for (int i = 0; i < data_.size(); i++) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(3, data_.get(i));
+      }
+      for (int i = 0; i < configuration_.size(); i++) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(4, configuration_.get(i));
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -5051,6 +5294,15 @@ public final class ZooKeeperProtos {
         result = result && getClusterkey()
             .equals(other.getClusterkey());
       }
+      result = result && (hasReplicationEndpointImpl() == other.hasReplicationEndpointImpl());
+      if (hasReplicationEndpointImpl()) {
+        result = result && getReplicationEndpointImpl()
+            .equals(other.getReplicationEndpointImpl());
+      }
+      result = result && getDataList()
+          .equals(other.getDataList());
+      result = result && getConfigurationList()
+          .equals(other.getConfigurationList());
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -5068,6 +5320,18 @@ public final class ZooKeeperProtos {
         hash = (37 * hash) + CLUSTERKEY_FIELD_NUMBER;
         hash = (53 * hash) + getClusterkey().hashCode();
       }
+      if (hasReplicationEndpointImpl()) {
+        hash = (37 * hash) + REPLICATIONENDPOINTIMPL_FIELD_NUMBER;
+        hash = (53 * hash) + getReplicationEndpointImpl().hashCode();
+      }
+      if (getDataCount() > 0) {
+        hash = (37 * hash) + DATA_FIELD_NUMBER;
+        hash = (53 * hash) + getDataList().hashCode();
+      }
+      if (getConfigurationCount() > 0) {
+        hash = (37 * hash) + CONFIGURATION_FIELD_NUMBER;
+        hash = (53 * hash) + getConfigurationList().hashCode();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -5174,6 +5438,8 @@ public final class ZooKeeperProtos {
       }
       private void maybeForceBuilderInitialization() {
         if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+          getDataFieldBuilder();
+          getConfigurationFieldBuilder();
         }
       }
       private static Builder create() {
@@ -5184,6 +5450,20 @@ public final class ZooKeeperProtos {
         super.clear();
         clusterkey_ = "";
         bitField0_ = (bitField0_ & ~0x00000001);
+        replicationEndpointImpl_ = "";
+        bitField0_ = (bitField0_ & ~0x00000002);
+        if (dataBuilder_ == null) {
+          data_ = java.util.Collections.emptyList();
+          bitField0_ = (bitField0_ & ~0x00000004);
+        } else {
+          dataBuilder_.clear();
+        }
+        if (configurationBuilder_ == null) {
+          configuration_ = java.util.Collections.emptyList();
+          bitField0_ = (bitField0_ & ~0x00000008);
+        } else {
+          configurationBuilder_.clear();
+        }
         return this;
       }
 
@@ -5216,6 +5496,28 @@ public final class ZooKeeperProtos {
           to_bitField0_ |= 0x00000001;
         }
         result.clusterkey_ = clusterkey_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.replicationEndpointImpl_ = replicationEndpointImpl_;
+        if (dataBuilder_ == null) {
+          if (((bitField0_ & 0x00000004) == 0x00000004)) {
+            data_ = java.util.Collections.unmodifiableList(data_);
+            bitField0_ = (bitField0_ & ~0x00000004);
+          }
+          result.data_ = data_;
+        } else {
+          result.data_ = dataBuilder_.build();
+        }
+        if (configurationBuilder_ == null) {
+          if (((bitField0_ & 0x00000008) == 0x00000008)) {
+            configuration_ = java.util.Collections.unmodifiableList(configuration_);
+            bitField0_ = (bitField0_ & ~0x00000008);
+          }
+          result.configuration_ = configuration_;
+        } else {
+          result.configuration_ = configurationBuilder_.build();
+        }
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -5237,6 +5539,63 @@ public final class ZooKeeperProtos {
           clusterkey_ = other.clusterkey_;
           onChanged();
         }
+        if (other.hasReplicationEndpointImpl()) {
+          bitField0_ |= 0x00000002;
+          replicationEndpointImpl_ = other.replicationEndpointImpl_;
+          onChanged();
+        }
+        if (dataBuilder_ == null) {
+          if (!other.data_.isEmpty()) {
+            if (data_.isEmpty()) {
+              data_ = other.data_;
+              bitField0_ = (bitField0_ & ~0x00000004);
+            } else {
+              ensureDataIsMutable();
+              data_.addAll(other.data_);
+            }
+            onChanged();
+          }
+        } else {
+          if (!other.data_.isEmpty()) {
+            if (dataBuilder_.isEmpty()) {
+              dataBuilder_.dispose();
+              dataBuilder_ = null;
+              data_ = other.data_;
+              bitField0_ = (bitField0_ & ~0x00000004);
+              dataBuilder_ = 
+                com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders ?
+                   getDataFieldBuilder() : null;
+            } else {
+              dataBuilder_.addAllMessages(other.data_);
+            }
+          }
+        }
+        if (configurationBuilder_ == null) {
+          if (!other.configuration_.isEmpty()) {
+            if (configuration_.isEmpty()) {
+              configuration_ = other.configuration_;
+              bitField0_ = (bitField0_ & ~0x00000008);
+            } else {
+              ensureConfigurationIsMutable();
+              configuration_.addAll(other.configuration_);
+            }
+            onChanged();
+          }
+        } else {
+          if (!other.configuration_.isEmpty()) {
+            if (configurationBuilder_.isEmpty()) {
+              configurationBuilder_.dispose();
+              configurationBuilder_ = null;
+              configuration_ = other.configuration_;
+              bitField0_ = (bitField0_ & ~0x00000008);
+              configurationBuilder_ = 
+                com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders ?
+                   getConfigurationFieldBuilder() : null;
+            } else {
+              configurationBuilder_.addAllMessages(other.configuration_);
+            }
+          }
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -5246,6 +5605,18 @@ public final class ZooKeeperProtos {
           
           return false;
         }
+        for (int i = 0; i < getDataCount(); i++) {
+          if (!getData(i).isInitialized()) {
+            
+            return false;
+          }
+        }
+        for (int i = 0; i < getConfigurationCount(); i++) {
+          if (!getConfiguration(i).isInitialized()) {
+            
+            return false;
+          }
+        }
         return true;
       }
 
@@ -5372,6 +5743,560 @@ public final class ZooKeeperProtos {
         return this;
       }
 
+      // optional string replicationEndpointImpl = 2;
+      private java.lang.Object replicationEndpointImpl_ = "";
+      /**
+       * <code>optional string replicationEndpointImpl = 2;</code>
+       */
+      public boolean hasReplicationEndpointImpl() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      /**
+       * <code>optional string replicationEndpointImpl = 2;</code>
+       */
+      public java.lang.String getReplicationEndpointImpl() {
+        java.lang.Object ref = replicationEndpointImpl_;
+        if (!(ref instanceof java.lang.String)) {
+          java.lang.String s = ((com.google.protobuf.ByteString) ref)
+              .toStringUtf8();
+          replicationEndpointImpl_ = s;
+          return s;
+        } else {
+          return (java.lang.String) ref;
+        }
+      }
+      /**
+       * <code>optional string replicationEndpointImpl = 2;</code>
+       */
+      public com.google.protobuf.ByteString
+          getReplicationEndpointImplBytes() {
+        java.lang.Object ref = replicationEndpointImpl_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
+              com.google.protobuf.ByteString.copyFromUtf8(
+                  (java.lang.String) ref);
+          replicationEndpointImpl_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      /**
+       * <code>optional string replicationEndpointImpl = 2;</code>
+       */
+      public Builder setReplicationEndpointImpl(
+          java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        replicationEndpointImpl_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string replicationEndpointImpl = 2;</code>
+       */
+      public Builder clearReplicationEndpointImpl() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        replicationEndpointImpl_ = getDefaultInstance().getReplicationEndpointImpl();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string replicationEndpointImpl = 2;</code>
+       */
+      public Builder setReplicationEndpointImplBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        replicationEndpointImpl_ = value;
+        onChanged();
+        return this;
+      }
+
+      // repeated .BytesBytesPair data = 3;
+      private java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair> data_ =
+        java.util.Collections.emptyList();
+      private void ensureDataIsMutable() {
+        if (!((bitField0_ & 0x00000004) == 0x00000004)) {
+          data_ = new java.util.ArrayList<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair>(data_);
+          bitField0_ |= 0x00000004;
+         }
+      }
+
+      private com.google.protobuf.RepeatedFieldBuilder<
+          org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPairOrBuilder> dataBuilder_;
+
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair> getDataList() {
+        if (dataBuilder_ == null) {
+          return java.util.Collections.unmodifiableList(data_);
+        } else {
+          return dataBuilder_.getMessageList();
+        }
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public int getDataCount() {
+        if (dataBuilder_ == null) {
+          return data_.size();
+        } else {
+          return dataBuilder_.getCount();
+        }
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair getData(int index) {
+        if (dataBuilder_ == null) {
+          return data_.get(index);
+        } else {
+          return dataBuilder_.getMessage(index);
+        }
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public Builder setData(
+          int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair value) {
+        if (dataBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          ensureDataIsMutable();
+          data_.set(index, value);
+          onChanged();
+        } else {
+          dataBuilder_.setMessage(index, value);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public Builder setData(
+          int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.Builder builderForValue) {
+        if (dataBuilder_ == null) {
+          ensureDataIsMutable();
+          data_.set(index, builderForValue.build());
+          onChanged();
+        } else {
+          dataBuilder_.setMessage(index, builderForValue.build());
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public Builder addData(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair value) {
+        if (dataBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          ensureDataIsMutable();
+          data_.add(value);
+          onChanged();
+        } else {
+          dataBuilder_.addMessage(value);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public Builder addData(
+          int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair value) {
+        if (dataBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          ensureDataIsMutable();
+          data_.add(index, value);
+          onChanged();
+        } else {
+          dataBuilder_.addMessage(index, value);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public Builder addData(
+          org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.Builder builderForValue) {
+        if (dataBuilder_ == null) {
+          ensureDataIsMutable();
+          data_.add(builderForValue.build());
+          onChanged();
+        } else {
+          dataBuilder_.addMessage(builderForValue.build());
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public Builder addData(
+          int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.Builder builderForValue) {
+        if (dataBuilder_ == null) {
+          ensureDataIsMutable();
+          data_.add(index, builderForValue.build());
+          onChanged();
+        } else {
+          dataBuilder_.addMessage(index, builderForValue.build());
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public Builder addAllData(
+          java.lang.Iterable<? extends org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair> values) {
+        if (dataBuilder_ == null) {
+          ensureDataIsMutable();
+          super.addAll(values, data_);
+          onChanged();
+        } else {
+          dataBuilder_.addAllMessages(values);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public Builder clearData() {
+        if (dataBuilder_ == null) {
+          data_ = java.util.Collections.emptyList();
+          bitField0_ = (bitField0_ & ~0x00000004);
+          onChanged();
+        } else {
+          dataBuilder_.clear();
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public Builder removeData(int index) {
+        if (dataBuilder_ == null) {
+          ensureDataIsMutable();
+          data_.remove(index);
+          onChanged();
+        } else {
+          dataBuilder_.remove(index);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.Builder getDataBuilder(
+          int index) {
+        return getDataFieldBuilder().getBuilder(index);
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPairOrBuilder getDataOrBuilder(
+          int index) {
+        if (dataBuilder_ == null) {
+          return data_.get(index);  } else {
+          return dataBuilder_.getMessageOrBuilder(index);
+        }
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPairOrBuilder> 
+           getDataOrBuilderList() {
+        if (dataBuilder_ != null) {
+          return dataBuilder_.getMessageOrBuilderList();
+        } else {
+          return java.util.Collections.unmodifiableList(data_);
+        }
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.Builder addDataBuilder() {
+        return getDataFieldBuilder().addBuilder(
+            org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.getDefaultInstance());
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.Builder addDataBuilder(
+          int index) {
+        return getDataFieldBuilder().addBuilder(
+            index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.getDefaultInstance());
+      }
+      /**
+       * <code>repeated .BytesBytesPair data = 3;</code>
+       */
+      public java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.Builder> 
+           getDataBuilderList() {
+        return getDataFieldBuilder().getBuilderList();
+      }
+      private com.google.protobuf.RepeatedFieldBuilder<
+          org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPairOrBuilder> 
+          getDataFieldBuilder() {
+        if (dataBuilder_ == null) {
+          dataBuilder_ = new com.google.protobuf.RepeatedFieldBuilder<
+              org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPairOrBuilder>(
+                  data_,
+                  ((bitField0_ & 0x00000004) == 0x00000004),
+                  getParentForChildren(),
+                  isClean());
+          data_ = null;
+        }
+        return dataBuilder_;
+      }
+
+      // repeated .NameStringPair configuration = 4;
+      private java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair> configuration_ =
+        java.util.Collections.emptyList();
+      private void ensureConfigurationIsMutable() {
+        if (!((bitField0_ & 0x00000008) == 0x00000008)) {
+          configuration_ = new java.util.ArrayList<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair>(configuration_);
+          bitField0_ |= 0x00000008;
+         }
+      }
+
+      private com.google.protobuf.RepeatedFieldBuilder<
+          org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPairOrBuilder> configurationBuilder_;
+
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair> getConfigurationList() {
+        if (configurationBuilder_ == null) {
+          return java.util.Collections.unmodifiableList(configuration_);
+        } else {
+          return configurationBuilder_.getMessageList();
+        }
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public int getConfigurationCount() {
+        if (configurationBuilder_ == null) {
+          return configuration_.size();
+        } else {
+          return configurationBuilder_.getCount();
+        }
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair getConfiguration(int index) {
+        if (configurationBuilder_ == null) {
+          return configuration_.get(index);
+        } else {
+          return configurationBuilder_.getMessage(index);
+        }
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public Builder setConfiguration(
+          int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair value) {
+        if (configurationBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          ensureConfigurationIsMutable();
+          configuration_.set(index, value);
+          onChanged();
+        } else {
+          configurationBuilder_.setMessage(index, value);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public Builder setConfiguration(
+          int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.Builder builderForValue) {
+        if (configurationBuilder_ == null) {
+          ensureConfigurationIsMutable();
+          configuration_.set(index, builderForValue.build());
+          onChanged();
+        } else {
+          configurationBuilder_.setMessage(index, builderForValue.build());
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public Builder addConfiguration(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair value) {
+        if (configurationBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          ensureConfigurationIsMutable();
+          configuration_.add(value);
+          onChanged();
+        } else {
+          configurationBuilder_.addMessage(value);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public Builder addConfiguration(
+          int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair value) {
+        if (configurationBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          ensureConfigurationIsMutable();
+          configuration_.add(index, value);
+          onChanged();
+        } else {
+          configurationBuilder_.addMessage(index, value);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public Builder addConfiguration(
+          org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.Builder builderForValue) {
+        if (configurationBuilder_ == null) {
+          ensureConfigurationIsMutable();
+          configuration_.add(builderForValue.build());
+          onChanged();
+        } else {
+          configurationBuilder_.addMessage(builderForValue.build());
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public Builder addConfiguration(
+          int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.Builder builderForValue) {
+        if (configurationBuilder_ == null) {
+          ensureConfigurationIsMutable();
+          configuration_.add(index, builderForValue.build());
+          onChanged();
+        } else {
+          configurationBuilder_.addMessage(index, builderForValue.build());
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public Builder addAllConfiguration(
+          java.lang.Iterable<? extends org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair> values) {
+        if (configurationBuilder_ == null) {
+          ensureConfigurationIsMutable();
+          super.addAll(values, configuration_);
+          onChanged();
+        } else {
+          configurationBuilder_.addAllMessages(values);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public Builder clearConfiguration() {
+        if (configurationBuilder_ == null) {
+          configuration_ = java.util.Collections.emptyList();
+          bitField0_ = (bitField0_ & ~0x00000008);
+          onChanged();
+        } else {
+          configurationBuilder_.clear();
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public Builder removeConfiguration(int index) {
+        if (configurationBuilder_ == null) {
+          ensureConfigurationIsMutable();
+          configuration_.remove(index);
+          onChanged();
+        } else {
+          configurationBuilder_.remove(index);
+        }
+        return this;
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.Builder getConfigurationBuilder(
+          int index) {
+        return getConfigurationFieldBuilder().getBuilder(index);
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPairOrBuilder getConfigurationOrBuilder(
+          int index) {
+        if (configurationBuilder_ == null) {
+          return configuration_.get(index);  } else {
+          return configurationBuilder_.getMessageOrBuilder(index);
+        }
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPairOrBuilder> 
+           getConfigurationOrBuilderList() {
+        if (configurationBuilder_ != null) {
+          return configurationBuilder_.getMessageOrBuilderList();
+        } else {
+          return java.util.Collections.unmodifiableList(configuration_);
+        }
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.Builder addConfigurationBuilder() {
+        return getConfigurationFieldBuilder().addBuilder(
+            org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.getDefaultInstance());
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.Builder addConfigurationBuilder(
+          int index) {
+        return getConfigurationFieldBuilder().addBuilder(
+            index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.getDefaultInstance());
+      }
+      /**
+       * <code>repeated .NameStringPair configuration = 4;</code>
+       */
+      public java.util.List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.Builder> 
+           getConfigurationBuilderList() {
+        return getConfigurationFieldBuilder().getBuilderList();
+      }
+      private com.google.protobuf.RepeatedFieldBuilder<
+          org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPairOrBuilder> 
+          getConfigurationFieldBuilder() {
+        if (configurationBuilder_ == null) {
+          configurationBuilder_ = new com.google.protobuf.RepeatedFieldBuilder<
+              org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPairOrBuilder>(
+                  configuration_,
+                  ((bitField0_ & 0x00000008) == 0x00000008),
+                  getParentForChildren(),
+                  isClean());
+          configuration_ = null;
+        }
+        return configurationBuilder_;
+      }
+
       // @@protoc_insertion_point(builder_scope:ReplicationPeer)
     }
 
@@ -9598,23 +10523,25 @@ public final class ZooKeeperProtos {
       "NKNOWN\020\000\022\021\n\rLOG_SPLITTING\020\001\022\016\n\nLOG_REPLA" +
       "Y\020\002\"n\n\005Table\022$\n\005state\030\001 \002(\0162\014.Table.Stat" +
       "e:\007ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISA" +
-      "BLED\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003\"%\n\017R" +
-      "eplicationPeer\022\022\n\nclusterkey\030\001 \002(\t\"^\n\020Re" +
-      "plicationState\022&\n\005state\030\001 \002(\0162\027.Replicat",
-      "ionState.State\"\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010" +
-      "DISABLED\020\001\"+\n\027ReplicationHLogPosition\022\020\n" +
-      "\010position\030\001 \002(\003\"%\n\017ReplicationLock\022\022\n\nlo" +
-      "ck_owner\030\001 \002(\t\"\230\001\n\tTableLock\022\036\n\ntable_na" +
-      "me\030\001 \001(\0132\n.TableName\022\037\n\nlock_owner\030\002 \001(\013" +
-      "2\013.ServerName\022\021\n\tthread_id\030\003 \001(\003\022\021\n\tis_s" +
-      "hared\030\004 \001(\010\022\017\n\007purpose\030\005 \001(\t\022\023\n\013create_t" +
-      "ime\030\006 \001(\003\";\n\017StoreSequenceId\022\023\n\013family_n" +
-      "ame\030\001 \002(\014\022\023\n\013sequence_id\030\002 \002(\004\"g\n\026Region" +
-      "StoreSequenceIds\022 \n\030last_flushed_sequenc",
-      "e_id\030\001 \002(\004\022+\n\021store_sequence_id\030\002 \003(\0132\020." +
-      "StoreSequenceIdBE\n*org.apache.hadoop.hba" +
-      "se.protobuf.generatedB\017ZooKeeperProtosH\001" +
-      "\210\001\001\240\001\001"
+      "BLED\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003\"\215\001\n\017" +
+      "ReplicationPeer\022\022\n\nclusterkey\030\001 \002(\t\022\037\n\027r" +
+      "eplicationEndpointImpl\030\002 \001(\t\022\035\n\004data\030\003 \003",
+      "(\0132\017.BytesBytesPair\022&\n\rconfiguration\030\004 \003" +
+      "(\0132\017.NameStringPair\"^\n\020ReplicationState\022" +
+      "&\n\005state\030\001 \002(\0162\027.ReplicationState.State\"" +
+      "\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISABLED\020\001\"+\n\027R" +
+      "eplicationHLogPosition\022\020\n\010position\030\001 \002(\003" +
+      "\"%\n\017ReplicationLock\022\022\n\nlock_owner\030\001 \002(\t\"" +
+      "\230\001\n\tTableLock\022\036\n\ntable_name\030\001 \001(\0132\n.Tabl" +
+      "eName\022\037\n\nlock_owner\030\002 \001(\0132\013.ServerName\022\021" +
+      "\n\tthread_id\030\003 \001(\003\022\021\n\tis_shared\030\004 \001(\010\022\017\n\007" +
+      "purpose\030\005 \001(\t\022\023\n\013create_time\030\006 \001(\003\";\n\017St",
+      "oreSequenceId\022\023\n\013family_name\030\001 \002(\014\022\023\n\013se" +
+      "quence_id\030\002 \002(\004\"g\n\026RegionStoreSequenceId" +
+      "s\022 \n\030last_flushed_sequence_id\030\001 \002(\004\022+\n\021s" +
+      "tore_sequence_id\030\002 \003(\0132\020.StoreSequenceId" +
+      "BE\n*org.apache.hadoop.hbase.protobuf.gen" +
+      "eratedB\017ZooKeeperProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -9662,7 +10589,7 @@ public final class ZooKeeperProtos {
           internal_static_ReplicationPeer_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_ReplicationPeer_descriptor,
-              new java.lang.String[] { "Clusterkey", });
+              new java.lang.String[] { "Clusterkey", "ReplicationEndpointImpl", "Data", "Configuration", });
           internal_static_ReplicationState_descriptor =
             getDescriptor().getMessageTypes().get(7);
           internal_static_ReplicationState_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-protocol/src/main/protobuf/ZooKeeper.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/ZooKeeper.proto b/hbase-protocol/src/main/protobuf/ZooKeeper.proto
index 37816da..598385c 100644
--- a/hbase-protocol/src/main/protobuf/ZooKeeper.proto
+++ b/hbase-protocol/src/main/protobuf/ZooKeeper.proto
@@ -119,6 +119,9 @@ message ReplicationPeer {
   // clusterkey is the concatenation of the slave cluster's
   // hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
   required string clusterkey = 1;
+  optional string replicationEndpointImpl = 2;
+  repeated BytesBytesPair data = 3;
+  repeated NameStringPair configuration = 4;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index 87fa157..d4ac8f7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -39,9 +39,11 @@ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TableMapper;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
-import org.apache.hadoop.hbase.replication.ReplicationPeer;
+import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.mapreduce.Job;
@@ -131,6 +133,7 @@ public class VerifyReplication extends Configured implements Tool {
       }
     }
 
+    @Override
     protected void cleanup(Context context) {
       if (replicatedScanner != null) {
         replicatedScanner.close();
@@ -141,7 +144,7 @@ public class VerifyReplication extends Configured implements Tool {
 
   private static String getPeerQuorumAddress(final Configuration conf) throws IOException {
     ZooKeeperWatcher localZKW = null;
-    ReplicationPeer peer = null;
+    ReplicationPeerZKImpl peer = null;
     try {
       localZKW = new ZooKeeperWatcher(conf, "VerifyReplication",
           new Abortable() {
@@ -152,11 +155,11 @@ public class VerifyReplication extends Configured implements Tool {
       ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
       rp.init();
 
-      Configuration peerConf = rp.getPeerConf(peerId);
-      if (peerConf == null) {
+      Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId);
+      if (pair == null) {
         throw new IOException("Couldn't get peer conf!");
       }
-
+      Configuration peerConf = rp.getPeerConf(peerId).getSecond();
       return ZKUtil.getZooKeeperClusterKey(peerConf);
     } catch (ReplicationException e) {
       throw new IOException(

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
index 99a9a5c..8c72a17 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
@@ -133,6 +133,7 @@ public interface HLog {
    */
   // TODO: Remove this Writable.
   // TODO: Why is this in here?  Implementation detail?
+  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
   class Entry implements Writable {
     private WALEdit edit;
     private HLogKey key;
@@ -224,7 +225,7 @@ public interface HLog {
    * @return the number of HLog files
    */
   int getNumLogFiles();
-  
+
   /**
    * @return the size of HLog files
    */
@@ -292,9 +293,10 @@ public interface HLog {
    * @param sequenceId
    * @throws IOException
    * @deprecated For tests only and even then, should use
-   * {@link #appendNoSync(HTableDescriptor, HRegionInfo, HLogKey, WALEdit, AtomicLong, boolean, 
+   * {@link #appendNoSync(HTableDescriptor, HRegionInfo, HLogKey, WALEdit, AtomicLong, boolean,
    * List)} and {@link #sync()} instead.
    */
+  @Deprecated
   @VisibleForTesting
   public void append(HRegionInfo info, TableName tableName, WALEdit edits,
       final long now, HTableDescriptor htd, AtomicLong sequenceId) throws IOException;
@@ -343,6 +345,7 @@ public interface HLog {
    * instead because you can get back the region edit/sequenceid; it is set into the passed in
    * <code>key</code>.
    */
+  @Deprecated
   long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits,
       List<UUID> clusterIds, final long now, HTableDescriptor htd, AtomicLong sequenceId,
       boolean isInMemstore, long nonceGroup, long nonce) throws IOException;

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
index 770a588..cc10cb2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.TableName;
@@ -67,7 +68,7 @@ import com.google.protobuf.ByteString;
  */
 // TODO: Key and WALEdit are never used separately, or in one-to-many relation, for practical
 //       purposes. They need to be merged into HLogEntry.
-@InterfaceAudience.Private
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
 public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
   public static final Log LOG = LogFactory.getLog(HLogKey.class);
 
@@ -190,7 +191,7 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
    */
   public HLogKey(final byte [] encodedRegionName, final TableName tablename,
       final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
-    init(encodedRegionName, tablename, HLog.NO_SEQUENCE_ID, now, clusterIds, 
+    init(encodedRegionName, tablename, HLog.NO_SEQUENCE_ID, now, clusterIds,
       nonceGroup, nonce);
   }
 
@@ -208,7 +209,7 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
    */
   public HLogKey(final byte [] encodedRegionName, final TableName tablename, long logSeqNum,
       long nonceGroup, long nonce) {
-    init(encodedRegionName, tablename, logSeqNum, EnvironmentEdgeManager.currentTimeMillis(), 
+    init(encodedRegionName, tablename, logSeqNum, EnvironmentEdgeManager.currentTimeMillis(),
       EMPTY_UUIDS, nonceGroup, nonce);
   }
 
@@ -254,7 +255,7 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
     this.logSeqNum = sequence;
     this.seqNumAssignedLatch.countDown();
   }
-  
+
   /**
    * Used to set original seq Id for HLogKey during wal replay
    * @param seqId
@@ -276,6 +277,7 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
    * @return long the new assigned sequence number
    * @throws InterruptedException
    */
+  @Override
   public long getSequenceNumber() throws IOException {
     try {
       this.seqNumAssignedLatch.await();
@@ -396,6 +398,7 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
     return result;
   }
 
+  @Override
   public int compareTo(HLogKey o) {
     int result = Bytes.compareTo(this.encodedRegionName, o.encodedRegionName);
     if (result == 0) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
index 85e2d7c..24d9d6d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.io.HeapSize;
@@ -74,7 +75,7 @@ import org.apache.hadoop.io.Writable;
  * is an old style KeyValue or the new style WALEdit.
  *
  */
-@InterfaceAudience.Private
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
 public class WALEdit implements Writable, HeapSize {
   public static final Log LOG = LogFactory.getLog(WALEdit.class);
 
@@ -288,4 +289,4 @@ public class WALEdit implements Writable, HeapSize {
     }
     return null;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
new file mode 100644
index 0000000..1c9d0f6
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.AbstractService;
+
+/**
+ * A Base implementation for {@link ReplicationEndpoint}s. Users should consider extending this
+ * class rather than implementing {@link ReplicationEndpoint} directly for better backwards
+ * compatibility.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public abstract class BaseReplicationEndpoint extends AbstractService
+  implements ReplicationEndpoint {
+
+  protected Context ctx;
+
+  @Override
+  public void init(Context context) throws IOException {
+    this.ctx = context;
+  }
+
+  /** Returns a default set of filters */
+  @Override
+  public WALEntryFilter getWALEntryfilter() {
+    ArrayList<WALEntryFilter> filters = Lists.newArrayList();
+    WALEntryFilter scopeFilter = getScopeWALEntryFilter();
+    if (scopeFilter != null) {
+      filters.add(scopeFilter);
+    }
+    WALEntryFilter tableCfFilter = getTableCfWALEntryFilter();
+    if (tableCfFilter != null) {
+      filters.add(tableCfFilter);
+    }
+    return filters.isEmpty() ? null : new ChainWALEntryFilter(filters);
+  }
+
+  /** Returns a WALEntryFilter for checking the scope. Subclasses can
+   * return null if they don't want this filter */
+  protected WALEntryFilter getScopeWALEntryFilter() {
+    return new ScopeWALEntryFilter();
+  }
+
+  /** Returns a WALEntryFilter for checking replication per table and CF. Subclasses can
+   * return null if they don't want this filter */
+  protected WALEntryFilter getTableCfWALEntryFilter() {
+    return new TableCfWALEntryFilter(ctx.getReplicationPeer());
+  }
+
+  @Override
+  public boolean canReplicateToSameCluster() {
+    return false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
new file mode 100644
index 0000000..f701e94
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+
+/**
+ * A {@link WALEntryFilter} which contains multiple filters and applies them
+ * in chain order
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public class ChainWALEntryFilter implements WALEntryFilter {
+
+  private final WALEntryFilter[] filters;
+
+  public ChainWALEntryFilter(WALEntryFilter...filters) {
+    this.filters = filters;
+  }
+
+  public ChainWALEntryFilter(List<WALEntryFilter> filters) {
+    ArrayList<WALEntryFilter> rawFilters = new ArrayList<WALEntryFilter>(filters.size());
+    // flatten the chains
+    for (WALEntryFilter filter : filters) {
+      if (filter instanceof ChainWALEntryFilter) {
+        for (WALEntryFilter f : ((ChainWALEntryFilter) filter).filters) {
+          rawFilters.add(f);
+        }
+      } else {
+        rawFilters.add(filter);
+      }
+    }
+
+    this.filters = rawFilters.toArray(new WALEntryFilter[rawFilters.size()]);
+  }
+
+  @Override
+  public Entry filter(Entry entry) {
+    for (WALEntryFilter filter : filters) {
+      if (entry == null) {
+        return null;
+      }
+      entry = filter.filter(entry);
+    }
+    return entry;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
new file mode 100644
index 0000000..4b9a28f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.AuthFailedException;
+import org.apache.zookeeper.KeeperException.ConnectionLossException;
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
+
+/**
+ * A {@link BaseReplicationEndpoint} for replication endpoints whose
+ * target cluster is an HBase cluster.
+ */
+@InterfaceAudience.Private
+public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
+  implements Abortable {
+
+  private static final Log LOG = LogFactory.getLog(HBaseReplicationEndpoint.class);
+
+  private ZooKeeperWatcher zkw = null;
+
+  private List<ServerName> regionServers = new ArrayList<ServerName>(0);
+  private volatile long lastRegionServerUpdate;
+
+  protected void disconnect() {
+    if (zkw != null){
+      zkw.close();
+    }
+  }
+
+  /**
+   * A private method used to re-establish a zookeeper session with a peer cluster.
+   * @param ke
+   */
+  protected void reconnect(KeeperException ke) {
+    if (ke instanceof ConnectionLossException || ke instanceof SessionExpiredException
+        || ke instanceof AuthFailedException) {
+      String clusterKey = ctx.getPeerConfig().getClusterKey();
+      LOG.warn("Lost the ZooKeeper connection for peer " + clusterKey, ke);
+      try {
+        reloadZkWatcher();
+      } catch (IOException io) {
+        LOG.warn("Creation of ZookeeperWatcher failed for peer " + clusterKey, io);
+      }
+    }
+  }
+
+  @Override
+  protected void doStart() {
+    try {
+      reloadZkWatcher();
+      notifyStarted();
+    } catch (IOException e) {
+      notifyFailed(e);
+    }
+  }
+
+  @Override
+  protected void doStop() {
+    disconnect();
+    notifyStopped();
+  }
+
+  @Override
+  public UUID getPeerUUID() {
+    UUID peerUUID = null;
+    try {
+      peerUUID = ZKClusterId.getUUIDForCluster(zkw);
+    } catch (KeeperException ke) {
+      reconnect(ke);
+    }
+    return peerUUID;
+  }
+
+  /**
+   * Get the ZK connection to this peer
+   * @return zk connection
+   */
+  protected ZooKeeperWatcher getZkw() {
+    return zkw;
+  }
+
+  /**
+   * Closes the current ZKW (if not null) and creates a new one
+   * @throws IOException If anything goes wrong connecting
+   */
+  void reloadZkWatcher() throws IOException {
+    if (zkw != null) zkw.close();
+    zkw = new ZooKeeperWatcher(ctx.getConfiguration(),
+        "connection to cluster: " + ctx.getPeerId(), this);
+    getZkw().registerListener(new PeerRegionServerListener(this));
+  }
+
+  @Override
+  public void abort(String why, Throwable e) {
+    LOG.fatal("The HBaseReplicationEndpoint corresponding to peer " + ctx.getPeerId()
+        + " was aborted for the following reason(s):" + why, e);
+  }
+
+  @Override
+  public boolean isAborted() {
+    // Currently this is never "Aborted", we just log when the abort method is called.
+    return false;
+  }
+
+  /**
+   * Get the list of all the region servers from the specified peer
+   * @param zkw zk connection to use
+   * @return list of region server addresses or an empty list if the slave is unavailable
+   */
+  protected static List<ServerName> fetchSlavesAddresses(ZooKeeperWatcher zkw)
+      throws KeeperException {
+    List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.rsZNode);
+    if (children == null) {
+      return Collections.emptyList();
+    }
+    List<ServerName> addresses = new ArrayList<ServerName>(children.size());
+    for (String child : children) {
+      addresses.add(ServerName.parseServerName(child));
+    }
+    return addresses;
+  }
+
+  /**
+   * Get a list of all the addresses of all the region servers
+   * for this peer cluster
+   * @return list of addresses
+   * @throws KeeperException
+   */
+  public List<ServerName> getRegionServers() {
+    try {
+      setRegionServers(fetchSlavesAddresses(this.getZkw()));
+    } catch (KeeperException ke) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Fetch salves addresses failed.", ke);
+      }
+      reconnect(ke);
+    }
+    return regionServers;
+  }
+
+  /**
+   * Set the list of region servers for that peer
+   * @param regionServers list of addresses for the region servers
+   */
+  public void setRegionServers(List<ServerName> regionServers) {
+    this.regionServers = regionServers;
+    lastRegionServerUpdate = System.currentTimeMillis();
+  }
+
+  /**
+   * Get the timestamp at which the last change occurred to the list of region servers to replicate
+   * to.
+   * @return The System.currentTimeMillis at the last time the list of peer region servers changed.
+   */
+  public long getLastRegionServerUpdate() {
+    return lastRegionServerUpdate;
+  }
+
+  /**
+   * Tracks changes to the list of region servers in a peer's cluster.
+   */
+  public static class PeerRegionServerListener extends ZooKeeperListener {
+
+    private final HBaseReplicationEndpoint replicationEndpoint;
+    private final String regionServerListNode;
+
+    public PeerRegionServerListener(HBaseReplicationEndpoint replicationPeer) {
+      super(replicationPeer.getZkw());
+      this.replicationEndpoint = replicationPeer;
+      this.regionServerListNode = replicationEndpoint.getZkw().rsZNode;
+    }
+
+    @Override
+    public synchronized void nodeChildrenChanged(String path) {
+      if (path.equals(regionServerListNode)) {
+        try {
+          LOG.info("Detected change to peer region servers, fetching updated list");
+          replicationEndpoint.setRegionServers(fetchSlavesAddresses(replicationEndpoint.getZkw()));
+        } catch (KeeperException e) {
+          LOG.fatal("Error reading slave addresses", e);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
new file mode 100644
index 0000000..90e3460
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
+
+import com.google.common.util.concurrent.Service;
+
+/**
+ * ReplicationEndpoint is a plugin which implements replication
+ * to other HBase clusters, or other systems. ReplicationEndpoint implementation
+ * can be specified at the peer creation time by specifying it
+ * in the {@link ReplicationPeerConfig}. A ReplicationEndpoint is run in a thread
+ * in each region server in the same process.
+ * <p>
+ * ReplicationEndpoint is closely tied to ReplicationSource in a producer-consumer
+ * relation. ReplicationSource is an HBase-private class which tails the logs and manages
+ * the queue of logs plus management and persistence of all the state for replication.
+ * ReplicationEndpoint on the other hand is responsible for doing the actual shipping
+ * and persisting of the WAL entries in the other cluster.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public interface ReplicationEndpoint extends Service {
+
+  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+  class Context {
+    private final Configuration conf;
+    private final FileSystem fs;
+    private final ReplicationPeerConfig peerConfig;
+    private final ReplicationPeer replicationPeer;
+    private final String peerId;
+    private final UUID clusterId;
+    private final MetricsSource metrics;
+
+    @InterfaceAudience.Private
+    public Context(
+        final Configuration conf,
+        final FileSystem fs,
+        final ReplicationPeerConfig peerConfig,
+        final String peerId,
+        final UUID clusterId,
+        final ReplicationPeer replicationPeer,
+        final MetricsSource metrics) {
+      this.peerConfig = peerConfig;
+      this.conf = conf;
+      this.fs = fs;
+      this.clusterId = clusterId;
+      this.peerId = peerId;
+      this.replicationPeer = replicationPeer;
+      this.metrics = metrics;
+    }
+    public Configuration getConfiguration() {
+      return conf;
+    }
+    public FileSystem getFilesystem() {
+      return fs;
+    }
+    public UUID getClusterId() {
+      return clusterId;
+    }
+    public String getPeerId() {
+      return peerId;
+    }
+    public ReplicationPeerConfig getPeerConfig() {
+      return peerConfig;
+    }
+    public ReplicationPeer getReplicationPeer() {
+      return replicationPeer;
+    }
+    public MetricsSource getMetrics() {
+      return metrics;
+    }
+  }
+
+  /**
+   * Initialize the replication endpoint with the given context.
+   * @param context replication context
+   * @throws IOException
+   */
+  void init(Context context) throws IOException;
+
+  /** Whether or not, the replication endpoint can replicate to it's source cluster with the same
+   * UUID */
+  boolean canReplicateToSameCluster();
+
+  /**
+   * Returns a UUID of the provided peer id. Every HBase cluster instance has a persisted
+   * associated UUID. If the replication is not performed to an actual HBase cluster (but
+   * some other system), the UUID returned has to uniquely identify the connected target system.
+   * @return a UUID or null if the peer cluster does not exist or is not connected.
+   */
+  UUID getPeerUUID();
+
+  /**
+   * Returns a WALEntryFilter to use for filtering out WALEntries from the log. Replication
+   * infrastructure will call this filter before sending the edits to shipEdits().
+   * @return a {@link WALEntryFilter} or null.
+   */
+  WALEntryFilter getWALEntryfilter();
+
+  /**
+   * A context for {@link ReplicationEndpoint#replicate(ReplicateContext)} method.
+   */
+  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+  class ReplicateContext {
+    List<HLog.Entry> entries;
+    int size;
+    @InterfaceAudience.Private
+    public ReplicateContext() {
+    }
+
+    public ReplicateContext setEntries(List<HLog.Entry> entries) {
+      this.entries = entries;
+      return this;
+    }
+    public ReplicateContext setSize(int size) {
+      this.size = size;
+      return this;
+    }
+    public List<HLog.Entry> getEntries() {
+      return entries;
+    }
+    public int getSize() {
+      return size;
+    }
+  }
+
+  /**
+   * Replicate the given set of entries (in the context) to the other cluster.
+   * Can block until all the given entries are replicated. Upon this method is returned,
+   * all entries that were passed in the context are assumed to be persisted in the
+   * target cluster.
+   * @param replicateContext a context where WAL entries and other
+   * parameters can be obtained.
+   */
+  boolean replicate(ReplicateContext replicateContext);
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
new file mode 100644
index 0000000..82d976d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.util.ArrayList;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+
+/**
+ * Keeps KVs that are scoped other than local
+ */
+@InterfaceAudience.Private
+public class ScopeWALEntryFilter implements WALEntryFilter {
+
+  @Override
+  public Entry filter(Entry entry) {
+    NavigableMap<byte[], Integer> scopes = entry.getKey().getScopes();
+    if (scopes == null || scopes.isEmpty()) {
+      return null;
+    }
+    ArrayList<KeyValue> kvs = entry.getEdit().getKeyValues();
+    int size = kvs.size();
+    for (int i = size - 1; i >= 0; i--) {
+      KeyValue kv = kvs.get(i);
+      // The scope will be null or empty if
+      // there's nothing to replicate in that WALEdit
+      if (!scopes.containsKey(kv.getFamily())
+          || scopes.get(kv.getFamily()) == HConstants.REPLICATION_SCOPE_LOCAL) {
+        kvs.remove(i);
+      }
+    }
+    if (kvs.size() < size / 2) {
+      kvs.trimToSize();
+    }
+    return entry;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java
new file mode 100644
index 0000000..e84ac18
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+
+/**
+ * Skips WAL edits for all System tables including META
+ */
+@InterfaceAudience.Private
+public class SystemTableWALEntryFilter implements WALEntryFilter {
+  @Override
+  public Entry filter(Entry entry) {
+    if (entry.getKey().getTablename().isSystemTable()) {
+      return null;
+    }
+    return entry;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
new file mode 100644
index 0000000..b7dfc4e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class TableCfWALEntryFilter implements WALEntryFilter {
+
+  private static final Log LOG = LogFactory.getLog(TableCfWALEntryFilter.class);
+  private final ReplicationPeer peer;
+
+  public TableCfWALEntryFilter(ReplicationPeer peer) {
+    this.peer = peer;
+  }
+
+  @Override
+  public Entry filter(Entry entry) {
+    String tabName = entry.getKey().getTablename().getNameAsString();
+    ArrayList<KeyValue> kvs = entry.getEdit().getKeyValues();
+    Map<String, List<String>> tableCFs = null;
+
+    try {
+      tableCFs = this.peer.getTableCFs();
+    } catch (IllegalArgumentException e) {
+      LOG.error("should not happen: can't get tableCFs for peer " + peer.getId() +
+          ", degenerate as if it's not configured by keeping tableCFs==null");
+    }
+    int size = kvs.size();
+
+    // return null(prevent replicating) if logKey's table isn't in this peer's
+    // replicable table list (empty tableCFs means all table are replicable)
+    if (tableCFs != null && !tableCFs.containsKey(tabName)) {
+      return null;
+    } else {
+      List<String> cfs = (tableCFs == null) ? null : tableCFs.get(tabName);
+      for (int i = size - 1; i >= 0; i--) {
+        KeyValue kv = kvs.get(i);
+        // ignore(remove) kv if its cf isn't in the replicable cf list
+        // (empty cfs means all cfs of this table are replicable)
+        if ((cfs != null && !cfs.contains(Bytes.toString(kv.getFamily())))) {
+          kvs.remove(i);
+        }
+      }
+    }
+    if (kvs.size() < size/2) {
+      kvs.trimToSize();
+    }
+    return entry;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
new file mode 100644
index 0000000..c054978
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+
+/**
+ * A Filter for WAL entries before being sent over to replication. Multiple
+ * filters might be chained together using {@link ChainWALEntryFilter}.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public interface WALEntryFilter {
+
+  /**
+   * Applies the filter, possibly returning a different HLog.Entry instance.
+   * If null is returned, the entry will be skipped.
+   * @param entry WAL Entry to filter
+   * @return a (possibly modified) HLog.Entry to use. Returning null or an entry with
+   * no cells will cause the entry to be skipped for replication.
+   */
+  public HLog.Entry filter(HLog.Entry entry);
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
index cea5750..be69d98 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
@@ -25,14 +25,11 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.KeeperException;
-
 import java.io.IOException;
 import java.util.List;
 import java.util.Set;
@@ -57,12 +54,12 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
 
   @Override
   public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
-   // all members of this class are null if replication is disabled, 
+   // all members of this class are null if replication is disabled,
    // so we cannot filter the files
     if (this.getConf() == null) {
       return files;
     }
-    
+
     final Set<String> hlogs = loadHLogsFromQueues();
     return Iterables.filter(files, new Predicate<FileStatus>() {
       @Override
@@ -137,8 +134,6 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
       LOG.info("Stopping " + this.zkw);
       this.zkw.close();
     }
-    // Not sure why we're deleting a connection that we never acquired or used
-    HConnectionManager.deleteConnection(this.getConf());
   }
 
   @Override


[3/3] git commit: HBASE-11367 Pluggable replication endpoint

Posted by en...@apache.org.
HBASE-11367 Pluggable replication endpoint


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/463d52d8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/463d52d8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/463d52d8

Branch: refs/heads/master
Commit: 463d52d8cf2a87e1f11eb6fabcd0164584e29fbb
Parents: 4824b0d
Author: Enis Soztutar <en...@apache.org>
Authored: Mon Jul 14 16:21:55 2014 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Mon Jul 14 16:22:26 2014 -0700

----------------------------------------------------------------------
 .../client/replication/ReplicationAdmin.java    | 109 ++-
 .../hbase/replication/ReplicationPeer.java      | 353 +------
 .../replication/ReplicationPeerConfig.java      |  87 ++
 .../replication/ReplicationPeerZKImpl.java      | 320 ++++++
 .../hbase/replication/ReplicationPeers.java     |  89 +-
 .../replication/ReplicationPeersZKImpl.java     | 348 +++----
 .../hadoop/hbase/zookeeper/ZKClusterId.java     |   3 +-
 .../hadoop/hbase/HBaseInterfaceAudience.java    |   1 +
 .../protobuf/generated/ZooKeeperProtos.java     | 963 ++++++++++++++++++-
 .../src/main/protobuf/ZooKeeper.proto           |   3 +
 .../replication/VerifyReplication.java          |  13 +-
 .../hadoop/hbase/regionserver/wal/HLog.java     |   9 +-
 .../hadoop/hbase/regionserver/wal/HLogKey.java  |  11 +-
 .../hadoop/hbase/regionserver/wal/WALEdit.java  |   5 +-
 .../replication/BaseReplicationEndpoint.java    |  77 ++
 .../hbase/replication/ChainWALEntryFilter.java  |  68 ++
 .../replication/HBaseReplicationEndpoint.java   | 217 +++++
 .../hbase/replication/ReplicationEndpoint.java  | 163 ++++
 .../hbase/replication/ScopeWALEntryFilter.java  |  58 ++
 .../replication/SystemTableWALEntryFilter.java  |  36 +
 .../replication/TableCfWALEntryFilter.java      |  74 ++
 .../hbase/replication/WALEntryFilter.java       |  41 +
 .../master/ReplicationLogCleaner.java           |   9 +-
 .../HBaseInterClusterReplicationEndpoint.java   | 225 +++++
 .../replication/regionserver/MetricsSource.java |   5 +-
 .../regionserver/ReplicationSinkManager.java    |  17 +-
 .../regionserver/ReplicationSource.java         | 279 +++---
 .../ReplicationSourceInterface.java             |   4 +-
 .../regionserver/ReplicationSourceManager.java  |  68 +-
 .../replication/TestReplicationAdmin.java       |  37 +
 .../replication/ReplicationSourceDummy.java     |   4 +-
 .../replication/TestPerTableCFReplication.java  |  20 +-
 .../replication/TestReplicationEndpoint.java    | 272 ++++++
 .../replication/TestReplicationStateBasic.java  |  58 +-
 .../TestReplicationTrackerZKImpl.java           |   4 +-
 .../TestReplicationWALEntryFilters.java         | 277 ++++++
 .../TestReplicationSinkManager.java             |  17 +-
 37 files changed, 3464 insertions(+), 880 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index 72e3fbb..4028d87 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -21,10 +21,12 @@ package org.apache.hadoop.hbase.client.replication;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
+import java.util.Map.Entry;
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -34,14 +36,18 @@ import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * <p>
  * This class provides the administrative interface to HBase cluster
@@ -80,6 +86,8 @@ public class ReplicationAdmin implements Closeable {
       .toString(HConstants.REPLICATION_SCOPE_GLOBAL);
 
   private final HConnection connection;
+  // TODO: replication should be managed by master. All the classes except ReplicationAdmin should
+  // be moved to hbase-server. Resolve it in HBASE-11392.
   private final ReplicationQueuesClient replicationQueuesClient;
   private final ReplicationPeers replicationPeers;
 
@@ -126,27 +134,65 @@ public class ReplicationAdmin implements Closeable {
     });
   }
 
-
   /**
    * Add a new peer cluster to replicate to.
-   * @param id a short that identifies the cluster
+   * @param id a short name that identifies the cluster
    * @param clusterKey the concatenation of the slave cluster's
    * <code>hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent</code>
    * @throws IllegalStateException if there's already one slave since
    * multi-slave isn't supported yet.
+   * @deprecated Use addPeer(String, ReplicationPeerConfig, Map) instead.
    */
+  @Deprecated
   public void addPeer(String id, String clusterKey) throws ReplicationException {
-    this.replicationPeers.addPeer(id, clusterKey);
+    this.addPeer(id, new ReplicationPeerConfig().setClusterKey(clusterKey), null);
   }
 
+  @Deprecated
   public void addPeer(String id, String clusterKey, String tableCFs)
     throws ReplicationException {
-    this.replicationPeers.addPeer(id, clusterKey, tableCFs);
+    this.replicationPeers.addPeer(id,
+      new ReplicationPeerConfig().setClusterKey(clusterKey), tableCFs);
+  }
+
+  /**
+   * Add a new remote slave cluster for replication.
+   * @param id a short name that identifies the cluster
+   * @param peerConfig configuration for the replication slave cluster
+   * @param tableCfs the table and column-family list which will be replicated for this peer.
+   * A map from tableName to column family names. An empty collection can be passed
+   * to indicate replicating all column families. Pass null for replicating all table and column
+   * families
+   */
+  public void addPeer(String id, ReplicationPeerConfig peerConfig,
+      Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
+    this.replicationPeers.addPeer(id, peerConfig, getTableCfsStr(tableCfs));
+  }
+
+  @VisibleForTesting
+  static String getTableCfsStr(Map<TableName, ? extends Collection<String>> tableCfs) {
+    String tableCfsStr = null;
+    if (tableCfs != null) {
+      // Format: table1:cf1,cf2;table2:cfA,cfB;table3
+      StringBuilder builder = new StringBuilder();
+      for (Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
+        if (builder.length() > 0) {
+          builder.append(";");
+        }
+        builder.append(entry.getKey());
+        if (entry.getValue() != null && !entry.getValue().isEmpty()) {
+          builder.append(":");
+          builder.append(StringUtils.join(entry.getValue(), ","));
+        }
+      }
+      tableCfsStr = builder.toString();
+    }
+    return tableCfsStr;
   }
 
   /**
    * Removes a peer cluster and stops the replication to it.
-   * @param id a short that identifies the cluster
+   * @param id a short name that identifies the cluster
    */
   public void removePeer(String id) throws ReplicationException {
     this.replicationPeers.removePeer(id);
@@ -154,7 +200,7 @@ public class ReplicationAdmin implements Closeable {
 
   /**
    * Restart the replication stream to the specified peer.
-   * @param id a short that identifies the cluster
+   * @param id a short name that identifies the cluster
    */
   public void enablePeer(String id) throws ReplicationException {
     this.replicationPeers.enablePeer(id);
@@ -162,7 +208,7 @@ public class ReplicationAdmin implements Closeable {
 
   /**
    * Stop the replication stream to the specified peer.
-   * @param id a short that identifies the cluster
+   * @param id a short name that identifies the cluster
    */
   public void disablePeer(String id) throws ReplicationException {
     this.replicationPeers.disablePeer(id);
@@ -179,14 +225,30 @@ public class ReplicationAdmin implements Closeable {
   /**
    * Map of this cluster's peers for display.
    * @return A map of peer ids to peer cluster keys
+   * @deprecated use {@link #listPeerConfigs()}
    */
+  @Deprecated
   public Map<String, String> listPeers() {
-    return this.replicationPeers.getAllPeerClusterKeys();
+    Map<String, ReplicationPeerConfig> peers = this.listPeerConfigs();
+    Map<String, String> ret = new HashMap<String, String>(peers.size());
+
+    for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) {
+      ret.put(entry.getKey(), entry.getValue().getClusterKey());
+    }
+    return ret;
+  }
+
+  public Map<String, ReplicationPeerConfig> listPeerConfigs() {
+    return this.replicationPeers.getAllPeerConfigs();
+  }
+
+  public ReplicationPeerConfig getPeerConfig(String id) throws ReplicationException {
+    return this.replicationPeers.getReplicationPeerConfig(id);
   }
 
   /**
    * Get the replicable table-cf config of the specified peer.
-   * @param id a short that identifies the cluster
+   * @param id a short name that identifies the cluster
    */
   public String getPeerTableCFs(String id) throws ReplicationException {
     return this.replicationPeers.getPeerTableCFsConfig(id);
@@ -194,16 +256,31 @@ public class ReplicationAdmin implements Closeable {
 
   /**
    * Set the replicable table-cf config of the specified peer
-   * @param id a short that identifies the cluster
+   * @param id a short name that identifies the cluster
+   * @deprecated use {@link #setPeerTableCFs(String, Map)}
    */
+  @Deprecated
   public void setPeerTableCFs(String id, String tableCFs) throws ReplicationException {
     this.replicationPeers.setPeerTableCFsConfig(id, tableCFs);
   }
 
   /**
+   * Set the replicable table-cf config of the specified peer
+   * @param id a short name that identifies the cluster
+   * @param tableCfs the table and column-family list which will be replicated for this peer.
+   * A map from tableName to column family names. An empty collection can be passed
+   * to indicate replicating all column families. Pass null for replicating all table and column
+   * families
+   */
+  public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
+      throws ReplicationException {
+    this.replicationPeers.setPeerTableCFsConfig(id, getTableCfsStr(tableCfs));
+  }
+
+  /**
    * Get the state of the specified peer cluster
-   * @param id String format of the Short that identifies the peer, an IllegalArgumentException
-   *           is thrown if it doesn't exist
+   * @param id String format of the Short name that identifies the peer,
+   * an IllegalArgumentException is thrown if it doesn't exist
    * @return true if replication is enabled to that peer, false if it isn't
    */
   public boolean getPeerState(String id) throws ReplicationException {
@@ -217,7 +294,7 @@ public class ReplicationAdmin implements Closeable {
     }
   }
 
-  
+
   /**
    * Find all column families that are replicated from this cluster
    * @return the full list of the replicated column families of this cluster as:
@@ -227,7 +304,7 @@ public class ReplicationAdmin implements Closeable {
    * types may be extended here. For example
    *  1) the replication may only apply to selected peers instead of all peers
    *  2) the replicationType may indicate the host Cluster servers as Slave
-   *     for the table:columnFam.         
+   *     for the table:columnFam.
    */
   public List<HashMap<String, String>> listReplicated() throws IOException {
     List<HashMap<String, String>> replicationColFams = new ArrayList<HashMap<String, String>>();
@@ -249,5 +326,5 @@ public class ReplicationAdmin implements Closeable {
     }
 
     return replicationColFams;
-  } 
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
index 1b14dab..c116674 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,362 +17,56 @@
  */
 package org.apache.hadoop.hbase.replication;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-
-import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 
 /**
- * This class acts as a wrapper for all the objects used to identify and
- * communicate with remote peers and is responsible for answering to expired
- * sessions and re-establishing the ZK connections.
+ * ReplicationPeer manages enabled / disabled state for the peer.
  */
-@InterfaceAudience.Private
-public class ReplicationPeer implements Abortable, Closeable {
-  private static final Log LOG = LogFactory.getLog(ReplicationPeer.class);
-
-  private final String clusterKey;
-  private final String id;
-  private List<ServerName> regionServers = new ArrayList<ServerName>(0);
-  private final AtomicBoolean peerEnabled = new AtomicBoolean();
-  private volatile Map<String, List<String>> tableCFs = new HashMap<String, List<String>>();
-  // Cannot be final since a new object needs to be recreated when session fails
-  private ZooKeeperWatcher zkw;
-  private final Configuration conf;
-  private long lastRegionserverUpdate;
-
-  private PeerStateTracker peerStateTracker;
-  private TableCFsTracker tableCFsTracker;
-
-  /**
-   * Constructor that takes all the objects required to communicate with the
-   * specified peer, except for the region server addresses.
-   * @param conf configuration object to this peer
-   * @param key cluster key used to locate the peer
-   * @param id string representation of this peer's identifier
-   */
-  public ReplicationPeer(Configuration conf, String key, String id) throws ReplicationException {
-    this.conf = conf;
-    this.clusterKey = key;
-    this.id = id;
-    try {
-      this.reloadZkWatcher();
-    } catch (IOException e) {
-      throw new ReplicationException("Error connecting to peer cluster with peerId=" + id, e);
-    }
-  }
-
-  /**
-   * start a state tracker to check whether this peer is enabled or not
-   *
-   * @param zookeeper zk watcher for the local cluster
-   * @param peerStateNode path to zk node which stores peer state
-   * @throws KeeperException
-   */
-  public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode)
-      throws KeeperException {
-    ensurePeerEnabled(zookeeper, peerStateNode);
-    this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this);
-    this.peerStateTracker.start();
-    try {
-      this.readPeerStateZnode();
-    } catch (DeserializationException e) {
-      throw ZKUtil.convert(e);
-    }
-  }
-
-  private void readPeerStateZnode() throws DeserializationException {
-    this.peerEnabled.set(isStateEnabled(this.peerStateTracker.getData(false)));
-  }
-
-  /**
-   * start a table-cfs tracker to listen the (table, cf-list) map change
-   *
-   * @param zookeeper zk watcher for the local cluster
-   * @param tableCFsNode path to zk node which stores table-cfs
-   * @throws KeeperException
-   */
-  public void startTableCFsTracker(ZooKeeperWatcher zookeeper, String tableCFsNode)
-    throws KeeperException {
-    this.tableCFsTracker = new TableCFsTracker(tableCFsNode, zookeeper,
-        this);
-    this.tableCFsTracker.start();
-    this.readTableCFsZnode();
-  }
-
-  static Map<String, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
-    if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
-      return null;
-    }
-
-    Map<String, List<String>> tableCFsMap = null;
-
-    // parse out (table, cf-list) pairs from tableCFsConfig
-    // format: "table1:cf1,cf2;table2:cfA,cfB"
-    String[] tables = tableCFsConfig.split(";");
-    for (String tab : tables) {
-      // 1 ignore empty table config
-      tab = tab.trim();
-      if (tab.length() == 0) {
-        continue;
-      }
-      // 2 split to "table" and "cf1,cf2"
-      //   for each table: "table:cf1,cf2" or "table"
-      String[] pair = tab.split(":");
-      String tabName = pair[0].trim();
-      if (pair.length > 2 || tabName.length() == 0) {
-        LOG.error("ignore invalid tableCFs setting: " + tab);
-        continue;
-      }
-
-      // 3 parse "cf1,cf2" part to List<cf>
-      List<String> cfs = null;
-      if (pair.length == 2) {
-        String[] cfsList = pair[1].split(",");
-        for (String cf : cfsList) {
-          String cfName = cf.trim();
-          if (cfName.length() > 0) {
-            if (cfs == null) {
-              cfs = new ArrayList<String>();
-            }
-            cfs.add(cfName);
-          }
-        }
-      }
-
-      // 4 put <table, List<cf>> to map
-      if (tableCFsMap == null) {
-        tableCFsMap = new HashMap<String, List<String>>();
-      }
-      tableCFsMap.put(tabName, cfs);
-    }
-
-    return tableCFsMap;
-  }
-
-  private void readTableCFsZnode() {
-    String currentTableCFs = Bytes.toString(tableCFsTracker.getData(false));
-    this.tableCFs = parseTableCFsFromConfig(currentTableCFs);
-  }
-
-  /**
-   * Get the cluster key of that peer
-   * @return string consisting of zk ensemble addresses, client port
-   * and root znode
-   */
-  public String getClusterKey() {
-    return clusterKey;
-  }
-
-  /**
-   * Get the state of this peer
-   * @return atomic boolean that holds the status
-   */
-  public AtomicBoolean getPeerEnabled() {
-    return peerEnabled;
-  }
-
-  /**
-   * Get replicable (table, cf-list) map of this peer
-   * @return the replicable (table, cf-list) map
-   */
-  public Map<String, List<String>> getTableCFs() {
-    return this.tableCFs;
-  }
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public interface ReplicationPeer {
 
   /**
-   * Get a list of all the addresses of all the region servers
-   * for this peer cluster
-   * @return list of addresses
+   * State of the peer, whether it is enabled or not
    */
-  public List<ServerName> getRegionServers() {
-    return regionServers;
+  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+  enum PeerState {
+    ENABLED,
+    DISABLED
   }
 
   /**
-   * Set the list of region servers for that peer
-   * @param regionServers list of addresses for the region servers
+   * Get the identifier of this peer
+   * @return string representation of the id
    */
-  public void setRegionServers(List<ServerName> regionServers) {
-    this.regionServers = regionServers;
-    lastRegionserverUpdate = System.currentTimeMillis();
-  }
+  String getId();
 
   /**
-   * Get the ZK connection to this peer
-   * @return zk connection
+   * Get the peer config object
+   * @return the ReplicationPeerConfig for this peer
    */
-  public ZooKeeperWatcher getZkw() {
-    return zkw;
-  }
+  public ReplicationPeerConfig getPeerConfig();
 
   /**
-   * Get the timestamp at which the last change occurred to the list of region servers to replicate
-   * to.
-   * @return The System.currentTimeMillis at the last time the list of peer region servers changed.
+   * Returns the state of the peer
+   * @return the enabled state
    */
-  public long getLastRegionserverUpdate() {
-    return lastRegionserverUpdate;
-  }
-
-  /**
-   * Get the identifier of this peer
-   * @return string representation of the id (short)
-   */
-  public String getId() {
-    return id;
-  }
+  PeerState getPeerState();
 
   /**
    * Get the configuration object required to communicate with this peer
    * @return configuration object
    */
-  public Configuration getConfiguration() {
-    return conf;
-  }
-
-  @Override
-  public void abort(String why, Throwable e) {
-    LOG.fatal("The ReplicationPeer coresponding to peer " + clusterKey
-        + " was aborted for the following reason(s):" + why, e);
-  }
-
-  /**
-   * Closes the current ZKW (if not null) and creates a new one
-   * @throws IOException If anything goes wrong connecting
-   */
-  public void reloadZkWatcher() throws IOException {
-    if (zkw != null) zkw.close();
-    zkw = new ZooKeeperWatcher(conf,
-        "connection to cluster: " + id, this);
-  }
-
-  @Override
-  public boolean isAborted() {
-    // Currently the replication peer is never "Aborted", we just log when the
-    // abort method is called.
-    return false;
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (zkw != null){
-      zkw.close();
-    }
-  }
-
-  /**
-   * Parse the raw data from ZK to get a peer's state
-   * @param bytes raw ZK data
-   * @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state.
-   * @throws DeserializationException
-   */
-  public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
-    ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
-    return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
-  }
-
-  /**
-   * @param bytes Content of a state znode.
-   * @return State parsed from the passed bytes.
-   * @throws DeserializationException
-   */
-  private static ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
-      throws DeserializationException {
-    ProtobufUtil.expectPBMagicPrefix(bytes);
-    int pblen = ProtobufUtil.lengthOfPBMagic();
-    ZooKeeperProtos.ReplicationState.Builder builder =
-        ZooKeeperProtos.ReplicationState.newBuilder();
-    ZooKeeperProtos.ReplicationState state;
-    try {
-      state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
-      return state.getState();
-    } catch (InvalidProtocolBufferException e) {
-      throw new DeserializationException(e);
-    }
-  }
-
-  /**
-   * Utility method to ensure an ENABLED znode is in place; if not present, we create it.
-   * @param zookeeper
-   * @param path Path to znode to check
-   * @return True if we created the znode.
-   * @throws NodeExistsException
-   * @throws KeeperException
-   */
-  private static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
-      throws NodeExistsException, KeeperException {
-    if (ZKUtil.checkExists(zookeeper, path) == -1) {
-      // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
-      // peer-state znode. This happens while adding a peer.
-      // The peer state data is set as "ENABLED" by default.
-      ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path,
-        ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
-      return true;
-    }
-    return false;
-  }
+  public Configuration getConfiguration();
 
   /**
-   * Tracker for state of this peer
-   */
-  public class PeerStateTracker extends ZooKeeperNodeTracker {
-
-    public PeerStateTracker(String peerStateZNode, ZooKeeperWatcher watcher,
-        Abortable abortable) {
-      super(watcher, peerStateZNode, abortable);
-    }
-
-    @Override
-    public synchronized void nodeDataChanged(String path) {
-      if (path.equals(node)) {
-        super.nodeDataChanged(path);
-        try {
-          readPeerStateZnode();
-        } catch (DeserializationException e) {
-          LOG.warn("Failed deserializing the content of " + path, e);
-        }
-      }
-    }
-  }
-
-  /**
-   * Tracker for (table, cf-list) map of this peer
+   * Get replicable (table, cf-list) map of this peer
+   * @return the replicable (table, cf-list) map
    */
-  public class TableCFsTracker extends ZooKeeperNodeTracker {
+  public Map<String, List<String>> getTableCFs();
 
-    public TableCFsTracker(String tableCFsZNode, ZooKeeperWatcher watcher,
-        Abortable abortable) {
-      super(watcher, tableCFsZNode, abortable);
-    }
-
-    @Override
-    public synchronized void nodeDataChanged(String path) {
-      if (path.equals(node)) {
-        super.nodeDataChanged(path);
-        readTableCFsZnode();
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
new file mode 100644
index 0000000..8b8bab7
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * A configuration for the replication peer cluster.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ReplicationPeerConfig {
+
+  private String clusterKey;
+  private String replicationEndpointImpl;
+  private final Map<byte[], byte[]> peerData;
+  private final Map<String, String> configuration;
+
+
+  public ReplicationPeerConfig() {
+    this.peerData = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
+    this.configuration = new HashMap<String, String>(0);
+  }
+
+  /**
+   * Set the clusterKey which is the concatenation of the slave cluster's:
+   *          hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
+   */
+  public ReplicationPeerConfig setClusterKey(String clusterKey) {
+    this.clusterKey = clusterKey;
+    return this;
+  }
+
+  /**
+   * Sets the ReplicationEndpoint plugin class for this peer.
+   * @param replicationEndpointImpl a class implementing ReplicationEndpoint
+   */
+  public ReplicationPeerConfig setReplicationEndpointImpl(String replicationEndpointImpl) {
+    this.replicationEndpointImpl = replicationEndpointImpl;
+    return this;
+  }
+
+  public String getClusterKey() {
+    return clusterKey;
+  }
+
+  public String getReplicationEndpointImpl() {
+    return replicationEndpointImpl;
+  }
+
+  public Map<byte[], byte[]> getPeerData() {
+    return peerData;
+  }
+
+  public Map<String, String> getConfiguration() {
+    return configuration;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(",");
+    builder.append("replicationEndpointImpl=").append(replicationEndpointImpl);
+    return builder.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
new file mode 100644
index 0000000..a39392c
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
@@ -0,0 +1,320 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+@InterfaceAudience.Private
+public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closeable {
+  private static final Log LOG = LogFactory.getLog(ReplicationPeerZKImpl.class);
+
+  private final ReplicationPeerConfig peerConfig;
+  private final String id;
+  private volatile PeerState peerState;
+  private volatile Map<String, List<String>> tableCFs = new HashMap<String, List<String>>();
+  private final Configuration conf;
+
+  private PeerStateTracker peerStateTracker;
+  private TableCFsTracker tableCFsTracker;
+
+  /**
+   * Constructor that takes all the objects required to communicate with the
+   * specified peer, except for the region server addresses.
+   * @param conf configuration object to this peer
+   * @param id string representation of this peer's identifier
+   * @param peerConfig configuration for the replication peer
+   */
+  public ReplicationPeerZKImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig)
+      throws ReplicationException {
+    this.conf = conf;
+    this.peerConfig = peerConfig;
+    this.id = id;
+  }
+
+  /**
+   * start a state tracker to check whether this peer is enabled or not
+   *
+   * @param zookeeper zk watcher for the local cluster
+   * @param peerStateNode path to zk node which stores peer state
+   * @throws KeeperException
+   */
+  public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode)
+      throws KeeperException {
+    ensurePeerEnabled(zookeeper, peerStateNode);
+    this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this);
+    this.peerStateTracker.start();
+    try {
+      this.readPeerStateZnode();
+    } catch (DeserializationException e) {
+      throw ZKUtil.convert(e);
+    }
+  }
+
+  private void readPeerStateZnode() throws DeserializationException {
+    this.peerState =
+        isStateEnabled(this.peerStateTracker.getData(false))
+          ? PeerState.ENABLED
+          : PeerState.DISABLED;
+  }
+
+  /**
+   * start a table-cfs tracker to listen the (table, cf-list) map change
+   *
+   * @param zookeeper zk watcher for the local cluster
+   * @param tableCFsNode path to zk node which stores table-cfs
+   * @throws KeeperException
+   */
+  public void startTableCFsTracker(ZooKeeperWatcher zookeeper, String tableCFsNode)
+    throws KeeperException {
+    this.tableCFsTracker = new TableCFsTracker(tableCFsNode, zookeeper,
+        this);
+    this.tableCFsTracker.start();
+    this.readTableCFsZnode();
+  }
+
+  static Map<String, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
+    if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
+      return null;
+    }
+
+    Map<String, List<String>> tableCFsMap = null;
+    // TODO: This should be a PB object rather than a String to be parsed!! See HBASE-11393
+    // parse out (table, cf-list) pairs from tableCFsConfig
+    // format: "table1:cf1,cf2;table2:cfA,cfB"
+    String[] tables = tableCFsConfig.split(";");
+    for (String tab : tables) {
+      // 1 ignore empty table config
+      tab = tab.trim();
+      if (tab.length() == 0) {
+        continue;
+      }
+      // 2 split to "table" and "cf1,cf2"
+      //   for each table: "table:cf1,cf2" or "table"
+      String[] pair = tab.split(":");
+      String tabName = pair[0].trim();
+      if (pair.length > 2 || tabName.length() == 0) {
+        LOG.error("ignore invalid tableCFs setting: " + tab);
+        continue;
+      }
+
+      // 3 parse "cf1,cf2" part to List<cf>
+      List<String> cfs = null;
+      if (pair.length == 2) {
+        String[] cfsList = pair[1].split(",");
+        for (String cf : cfsList) {
+          String cfName = cf.trim();
+          if (cfName.length() > 0) {
+            if (cfs == null) {
+              cfs = new ArrayList<String>();
+            }
+            cfs.add(cfName);
+          }
+        }
+      }
+
+      // 4 put <table, List<cf>> to map
+      if (tableCFsMap == null) {
+        tableCFsMap = new HashMap<String, List<String>>();
+      }
+      tableCFsMap.put(tabName, cfs);
+    }
+
+    return tableCFsMap;
+  }
+
+  private void readTableCFsZnode() {
+    String currentTableCFs = Bytes.toString(tableCFsTracker.getData(false));
+    this.tableCFs = parseTableCFsFromConfig(currentTableCFs);
+  }
+
+  @Override
+  public PeerState getPeerState() {
+    return peerState;
+  }
+
+  /**
+   * Get the identifier of this peer
+   * @return string representation of the id (short)
+   */
+  @Override
+  public String getId() {
+    return id;
+  }
+
+  /**
+   * Get the peer config object
+   * @return the ReplicationPeerConfig for this peer
+   */
+  @Override
+  public ReplicationPeerConfig getPeerConfig() {
+    return peerConfig;
+  }
+
+  /**
+   * Get the configuration object required to communicate with this peer
+   * @return configuration object
+   */
+  @Override
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  /**
+   * Get replicable (table, cf-list) map of this peer
+   * @return the replicable (table, cf-list) map
+   */
+  @Override
+  public Map<String, List<String>> getTableCFs() {
+    return this.tableCFs;
+  }
+
+  @Override
+  public void abort(String why, Throwable e) {
+    LOG.fatal("The ReplicationPeer coresponding to peer " + peerConfig
+        + " was aborted for the following reason(s):" + why, e);
+  }
+
+  @Override
+  public boolean isAborted() {
+    // Currently the replication peer is never "Aborted", we just log when the
+    // abort method is called.
+    return false;
+  }
+
+  @Override
+  public void close() throws IOException {
+    // TODO: stop zkw?
+  }
+
+  /**
+   * Parse the raw data from ZK to get a peer's state
+   * @param bytes raw ZK data
+   * @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state.
+   * @throws DeserializationException
+   */
+  public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
+    ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
+    return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
+  }
+
+  /**
+   * @param bytes Content of a state znode.
+   * @return State parsed from the passed bytes.
+   * @throws DeserializationException
+   */
+  private static ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
+      throws DeserializationException {
+    ProtobufUtil.expectPBMagicPrefix(bytes);
+    int pblen = ProtobufUtil.lengthOfPBMagic();
+    ZooKeeperProtos.ReplicationState.Builder builder =
+        ZooKeeperProtos.ReplicationState.newBuilder();
+    ZooKeeperProtos.ReplicationState state;
+    try {
+      state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
+      return state.getState();
+    } catch (InvalidProtocolBufferException e) {
+      throw new DeserializationException(e);
+    }
+  }
+
+  /**
+   * Utility method to ensure an ENABLED znode is in place; if not present, we create it.
+   * @param zookeeper
+   * @param path Path to znode to check
+   * @return True if we created the znode.
+   * @throws NodeExistsException
+   * @throws KeeperException
+   */
+  private static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
+      throws NodeExistsException, KeeperException {
+    if (ZKUtil.checkExists(zookeeper, path) == -1) {
+      // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
+      // peer-state znode. This happens while adding a peer.
+      // The peer state data is set as "ENABLED" by default.
+      ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path,
+        ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Tracker for state of this peer
+   */
+  public class PeerStateTracker extends ZooKeeperNodeTracker {
+
+    public PeerStateTracker(String peerStateZNode, ZooKeeperWatcher watcher,
+        Abortable abortable) {
+      super(watcher, peerStateZNode, abortable);
+    }
+
+    @Override
+    public synchronized void nodeDataChanged(String path) {
+      if (path.equals(node)) {
+        super.nodeDataChanged(path);
+        try {
+          readPeerStateZnode();
+        } catch (DeserializationException e) {
+          LOG.warn("Failed deserializing the content of " + path, e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Tracker for (table, cf-list) map of this peer
+   */
+  public class TableCFsTracker extends ZooKeeperNodeTracker {
+
+    public TableCFsTracker(String tableCFsZNode, ZooKeeperWatcher watcher,
+        Abortable abortable) {
+      super(watcher, tableCFsZNode, abortable);
+    }
+
+    @Override
+    public synchronized void nodeDataChanged(String path) {
+      if (path.equals(node)) {
+        super.nodeDataChanged(path);
+        readTableCFsZnode();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
index 4922f70..b1c3b49 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -21,11 +21,10 @@ package org.apache.hadoop.hbase.replication;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * This provides an interface for maintaining a set of peer clusters. These peers are remote slave
@@ -44,22 +43,16 @@ public interface ReplicationPeers {
    * Initialize the ReplicationPeers interface.
    */
   void init() throws ReplicationException;
-  /**
-   * Add a new remote slave cluster for replication.
-   * @param peerId a short that identifies the cluster
-   * @param clusterKey the concatenation of the slave cluster's:
-   *          hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
-   */
-  void addPeer(String peerId, String clusterKey) throws ReplicationException;
 
   /**
    * Add a new remote slave cluster for replication.
    * @param peerId a short that identifies the cluster
-   * @param clusterKey the concatenation of the slave cluster's:
-   *          hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
-   * @param tableCFs the table and column-family list which will be replicated for this peer
+   * @param peerConfig configuration for the replication slave cluster
+   * @param tableCFs the table and column-family list which will be replicated for this peer or null
+   * for all table and column families
    */
-  void addPeer(String peerId, String clusterKey, String tableCFs) throws ReplicationException;
+  void addPeer(String peerId, ReplicationPeerConfig peerConfig, String tableCFs)
+      throws ReplicationException;
 
   /**
    * Removes a remote slave cluster and stops the replication to it.
@@ -67,6 +60,10 @@ public interface ReplicationPeers {
    */
   void removePeer(String peerId) throws ReplicationException;
 
+  boolean peerAdded(String peerId) throws ReplicationException;
+
+  void peerRemoved(String peerId);
+
   /**
    * Restart the replication to the specified remote slave cluster.
    * @param peerId a short that identifies the cluster
@@ -100,6 +97,19 @@ public interface ReplicationPeers {
   public Map<String, List<String>> getTableCFs(String peerId);
 
   /**
+   * Returns the ReplicationPeer
+   * @param peerId id for the peer
+   * @return ReplicationPeer object
+   */
+  ReplicationPeer getPeer(String peerId);
+
+  /**
+   * Returns the set of peerIds defined
+   * @return a Set of Strings for peerIds
+   */
+  public Set<String> getPeerIds();
+
+  /**
    * Get the replication status for the specified connected remote slave cluster.
    * The value might be read from cache, so it is recommended to
    * use {@link #getStatusOfPeerFromBackingStore(String)}
@@ -107,7 +117,7 @@ public interface ReplicationPeers {
    * @param peerId a short that identifies the cluster
    * @return true if replication is enabled, false otherwise.
    */
-  boolean getStatusOfConnectedPeer(String peerId);
+  boolean getStatusOfPeer(String peerId);
 
   /**
    * Get the replication status for the specified remote slave cluster, which doesn't
@@ -119,17 +129,11 @@ public interface ReplicationPeers {
   boolean getStatusOfPeerFromBackingStore(String peerId) throws ReplicationException;
 
   /**
-   * Get a set of all connected remote slave clusters.
-   * @return set of peer ids
-   */
-  Set<String> getConnectedPeers();
-
-  /**
-   * List the cluster keys of all remote slave clusters (whether they are enabled/disabled or
-   * connected/disconnected).
+   * List the cluster replication configs of all remote slave clusters (whether they are
+   * enabled/disabled or connected/disconnected).
    * @return A map of peer ids to peer cluster keys
    */
-  Map<String, String> getAllPeerClusterKeys();
+  Map<String, ReplicationPeerConfig> getAllPeerConfigs();
 
   /**
    * List the peer ids of all remote slave clusters (whether they are enabled/disabled or
@@ -139,45 +143,16 @@ public interface ReplicationPeers {
   List<String> getAllPeerIds();
 
   /**
-   * Attempt to connect to a new remote slave cluster.
-   * @param peerId a short that identifies the cluster
-   * @return true if a new connection was made, false if no new connection was made.
-   */
-  boolean connectToPeer(String peerId) throws ReplicationException;
-
-  /**
-   * Disconnect from a remote slave cluster.
-   * @param peerId a short that identifies the cluster
-   */
-  void disconnectFromPeer(String peerId);
-
-  /**
-   * Returns all region servers from given connected remote slave cluster.
-   * @param peerId a short that identifies the cluster
-   * @return addresses of all region servers in the peer cluster. Returns an empty list if the peer
-   *         cluster is unavailable or there are no region servers in the cluster.
-   */
-  List<ServerName> getRegionServersOfConnectedPeer(String peerId);
-
-  /**
-   * Get the timestamp of the last change in composition of a given peer cluster.
-   * @param peerId identifier of the peer cluster for which the timestamp is requested
-   * @return the timestamp (in milliseconds) of the last change to the composition of
-   *         the peer cluster
-   */
-  long getTimestampOfLastChangeToPeer(String peerId);
-
-  /**
-   * Returns the UUID of the provided peer id.
-   * @param peerId the peer's ID that will be converted into a UUID
-   * @return a UUID or null if the peer cluster does not exist or is not connected.
+   * Returns the configured ReplicationPeerConfig for this peerId
+   * @param peerId a short name that identifies the cluster
+   * @return ReplicationPeerConfig for the peer
    */
-  UUID getPeerUUID(String peerId);
+  ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws ReplicationException;
 
   /**
    * Returns the configuration needed to talk to the remote slave cluster.
    * @param peerId a short that identifies the cluster
    * @return the configuration for the peer cluster, null if it was unable to get the configuration
    */
-  Configuration getPeerConf(String peerId) throws ReplicationException;
+  Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId) throws ReplicationException;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index fb09102..488d37a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -19,33 +19,29 @@
 package org.apache.hadoop.hbase.replication;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.UUID;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.CompoundConfiguration;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.AuthFailedException;
-import org.apache.zookeeper.KeeperException.ConnectionLossException;
-import org.apache.zookeeper.KeeperException.SessionExpiredException;
-
+import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
@@ -77,7 +73,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
 public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers {
 
   // Map of peer clusters keyed by their id
-  private Map<String, ReplicationPeer> peerClusters;
+  private Map<String, ReplicationPeerZKImpl> peerClusters;
   private final String tableCFsNodeName;
 
   private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class);
@@ -86,7 +82,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
       Abortable abortable) {
     super(zk, conf, abortable);
     this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs");
-    this.peerClusters = new HashMap<String, ReplicationPeer>();
+    this.peerClusters = new HashMap<String, ReplicationPeerZKImpl>();
   }
 
   @Override
@@ -98,16 +94,12 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
     } catch (KeeperException e) {
       throw new ReplicationException("Could not initialize replication peers", e);
     }
-    connectExistingPeers();
-  }
-
-  @Override
-  public void addPeer(String id, String clusterKey) throws ReplicationException {
-    addPeer(id, clusterKey, null);
+    addExistingPeers();
   }
 
   @Override
-  public void addPeer(String id, String clusterKey, String tableCFs) throws ReplicationException {
+  public void addPeer(String id, ReplicationPeerConfig peerConfig, String tableCFs)
+      throws ReplicationException {
     try {
       if (peerExists(id)) {
         throw new IllegalArgumentException("Cannot add a peer with id=" + id
@@ -115,7 +107,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
       }
       ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
       ZKUtil.createAndWatch(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id),
-        toByteArray(clusterKey));
+        toByteArray(peerConfig));
       // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
       // peer-state znode. This happens while adding a peer.
       // The peer state data is set as "ENABLED" by default.
@@ -128,7 +120,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
                     Bytes.toBytes(tableCFsStr));
     } catch (KeeperException e) {
       throw new ReplicationException("Could not add peer with id=" + id
-          + ", clusterKey=" + clusterKey, e);
+          + ", peerConfif=>" + peerConfig, e);
     }
   }
 
@@ -202,11 +194,11 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
   }
 
   @Override
-  public boolean getStatusOfConnectedPeer(String id) {
+  public boolean getStatusOfPeer(String id) {
     if (!this.peerClusters.containsKey(id)) {
       throw new IllegalArgumentException("Peer with id= " + id + " is not connected");
     }
-    return this.peerClusters.get(id).getPeerEnabled().get();
+    return this.peerClusters.get(id).getPeerState() == PeerState.ENABLED;
   }
 
   @Override
@@ -217,7 +209,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
       }
       String peerStateZNode = getPeerStateNode(id);
       try {
-        return ReplicationPeer.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
+        return ReplicationPeerZKImpl.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
       } catch (KeeperException e) {
         throw new ReplicationException(e);
       } catch (DeserializationException e) {
@@ -232,140 +224,98 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
   }
 
   @Override
-  public boolean connectToPeer(String peerId) throws ReplicationException {
-    if (peerClusters == null) {
-      return false;
-    }
-    if (this.peerClusters.containsKey(peerId)) {
-      return false;
-    }
-    ReplicationPeer peer = null;
-    try {
-      peer = getPeer(peerId);
-    } catch (Exception e) {
-      throw new ReplicationException("Error connecting to peer with id=" + peerId, e);
-    }
-    if (peer == null) {
-      return false;
-    }
-    this.peerClusters.put(peerId, peer);
-    LOG.info("Added new peer cluster " + peer.getClusterKey());
-    return true;
-  }
-
-  @Override
-  public void disconnectFromPeer(String peerId) {
-    ReplicationPeer rp = this.peerClusters.get(peerId);
-    if (rp != null) {
-      rp.getZkw().close();
-      this.peerClusters.remove(peerId);
-    }
-  }
-
-  @Override
-  public Map<String, String> getAllPeerClusterKeys() {
-    Map<String, String> peers = new TreeMap<String, String>();
+  public Map<String, ReplicationPeerConfig> getAllPeerConfigs() {
+    Map<String, ReplicationPeerConfig> peers = new TreeMap<String, ReplicationPeerConfig>();
     List<String> ids = null;
     try {
       ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
       for (String id : ids) {
-        byte[] bytes = ZKUtil.getData(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
-        String clusterKey = null;
-        try {
-          clusterKey = parsePeerFrom(bytes);
-        } catch (DeserializationException de) {
-          LOG.warn("Failed parse of clusterid=" + id + " znode content, continuing.");
+        ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
+        if (peerConfig == null) {
+          LOG.warn("Failed to get replication peer configuration of clusterid=" + id
+            + " znode content, continuing.");
           continue;
         }
-        peers.put(id, clusterKey);
+        peers.put(id, peerConfig);
       }
     } catch (KeeperException e) {
       this.abortable.abort("Cannot get the list of peers ", e);
-    } catch (InterruptedException e) {
+    } catch (ReplicationException e) {
       this.abortable.abort("Cannot get the list of peers ", e);
     }
     return peers;
   }
 
   @Override
-  public List<ServerName> getRegionServersOfConnectedPeer(String peerId) {
-    if (this.peerClusters.size() == 0) {
-      return Collections.emptyList();
-    }
-    ReplicationPeer peer = this.peerClusters.get(peerId);
-    if (peer == null) {
-      return Collections.emptyList();
-    }
-    List<ServerName> addresses;
-    try {
-      addresses = fetchSlavesAddresses(peer.getZkw());
-    } catch (KeeperException ke) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Fetch salves addresses failed.", ke);
-      }
-      reconnectPeer(ke, peer);
-      addresses = Collections.emptyList();
-    }
-    peer.setRegionServers(addresses);
-    return peer.getRegionServers();
+  public ReplicationPeer getPeer(String peerId) {
+    return peerClusters.get(peerId);
   }
 
   @Override
-  public UUID getPeerUUID(String peerId) {
-    ReplicationPeer peer = this.peerClusters.get(peerId);
-    if (peer == null) {
-      return null;
-    }
-    UUID peerUUID = null;
-    try {
-      peerUUID = ZKClusterId.getUUIDForCluster(peer.getZkw());
-    } catch (KeeperException ke) {
-      reconnectPeer(ke, peer);
-    }
-    return peerUUID;
-  }
-
-  @Override
-  public Set<String> getConnectedPeers() {
-    return this.peerClusters.keySet();
+  public Set<String> getPeerIds() {
+    return peerClusters.keySet(); // this is not thread-safe
   }
 
+  /**
+   * Returns a ReplicationPeerConfig from the znode or null for the given peerId.
+   */
   @Override
-  public Configuration getPeerConf(String peerId) throws ReplicationException {
+  public ReplicationPeerConfig getReplicationPeerConfig(String peerId)
+      throws ReplicationException {
     String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
     byte[] data = null;
     try {
       data = ZKUtil.getData(this.zookeeper, znode);
-    } catch (KeeperException e) {
-      throw new ReplicationException("Error getting configuration for peer with id="
-          + peerId, e);
     } catch (InterruptedException e) {
       LOG.warn("Could not get configuration for peer because the thread " +
           "was interrupted. peerId=" + peerId);
       Thread.currentThread().interrupt();
       return null;
+    } catch (KeeperException e) {
+      throw new ReplicationException("Error getting configuration for peer with id="
+          + peerId, e);
     }
     if (data == null) {
       LOG.error("Could not get configuration for peer because it doesn't exist. peerId=" + peerId);
       return null;
     }
-    String otherClusterKey = "";
+
     try {
-      otherClusterKey = parsePeerFrom(data);
+      return parsePeerFrom(data);
     } catch (DeserializationException e) {
       LOG.warn("Failed to parse cluster key from peerId=" + peerId
           + ", specifically the content from the following znode: " + znode);
       return null;
     }
+  }
+
+  @Override
+  public Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId)
+      throws ReplicationException {
+    ReplicationPeerConfig peerConfig = getReplicationPeerConfig(peerId);
+
+    if (peerConfig == null) {
+      return null;
+    }
 
     Configuration otherConf = new Configuration(this.conf);
     try {
-      ZKUtil.applyClusterKeyToConf(otherConf, otherClusterKey);
+      if (peerConfig.getClusterKey() != null && !peerConfig.getClusterKey().isEmpty()) {
+        ZKUtil.applyClusterKeyToConf(otherConf, peerConfig.getClusterKey());
+      }
     } catch (IOException e) {
       LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e);
       return null;
     }
-    return otherConf;
+
+    if (!peerConfig.getConfiguration().isEmpty()) {
+      CompoundConfiguration compound = new CompoundConfiguration();
+      compound.add(otherConf);
+      compound.addStringMap(peerConfig.getConfiguration());
+      return new Pair<ReplicationPeerConfig, Configuration>(peerConfig, compound);
+    }
+
+    return new Pair<ReplicationPeerConfig, Configuration>(peerConfig, otherConf);
   }
 
   /**
@@ -382,19 +332,11 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
     return ids;
   }
 
-  @Override
-  public long getTimestampOfLastChangeToPeer(String peerId) {
-    if (!peerClusters.containsKey(peerId)) {
-      throw new IllegalArgumentException("Unknown peer id: " + peerId);
-    }
-    return peerClusters.get(peerId).getLastRegionserverUpdate();
-  }
-
   /**
-   * A private method used during initialization. This method attempts to connect to all registered
+   * A private method used during initialization. This method attempts to add all registered
    * peer clusters. This method does not set a watch on the peer cluster znodes.
    */
-  private void connectExistingPeers() throws ReplicationException {
+  private void addExistingPeers() throws ReplicationException {
     List<String> znodes = null;
     try {
       znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
@@ -403,45 +345,49 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
     }
     if (znodes != null) {
       for (String z : znodes) {
-        connectToPeer(z);
+        createAndAddPeer(z);
       }
     }
   }
 
-  /**
-   * A private method used to re-establish a zookeeper session with a peer cluster.
-   * @param ke
-   * @param peer
-   */
-  private void reconnectPeer(KeeperException ke, ReplicationPeer peer) {
-    if (ke instanceof ConnectionLossException || ke instanceof SessionExpiredException
-        || ke instanceof AuthFailedException) {
-      LOG.warn("Lost the ZooKeeper connection for peer " + peer.getClusterKey(), ke);
-      try {
-        peer.reloadZkWatcher();
-        peer.getZkw().registerListener(new PeerRegionServerListener(peer));
-      } catch (IOException io) {
-        LOG.warn("Creation of ZookeeperWatcher failed for peer " + peer.getClusterKey(), io);
-      }
+  @Override
+  public boolean peerAdded(String peerId) throws ReplicationException {
+    return createAndAddPeer(peerId);
+  }
+
+  @Override
+  public void peerRemoved(String peerId) {
+    ReplicationPeer rp = this.peerClusters.get(peerId);
+    if (rp != null) {
+      this.peerClusters.remove(peerId);
     }
   }
 
   /**
-   * Get the list of all the region servers from the specified peer
-   * @param zkw zk connection to use
-   * @return list of region server addresses or an empty list if the slave is unavailable
+   * Attempt to connect to a new remote slave cluster.
+   * @param peerId a short that identifies the cluster
+   * @return true if a new connection was made, false if no new connection was made.
    */
-  private static List<ServerName> fetchSlavesAddresses(ZooKeeperWatcher zkw)
-      throws KeeperException {
-    List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.rsZNode);
-    if (children == null) {
-      return Collections.emptyList();
+  public boolean createAndAddPeer(String peerId) throws ReplicationException {
+    if (peerClusters == null) {
+      return false;
     }
-    List<ServerName> addresses = new ArrayList<ServerName>(children.size());
-    for (String child : children) {
-      addresses.add(ServerName.parseServerName(child));
+    if (this.peerClusters.containsKey(peerId)) {
+      return false;
+    }
+
+    ReplicationPeerZKImpl peer = null;
+    try {
+      peer = createPeer(peerId);
+    } catch (Exception e) {
+      throw new ReplicationException("Error adding peer with id=" + peerId, e);
+    }
+    if (peer == null) {
+      return false;
     }
-    return addresses;
+    this.peerClusters.put(peerId, peer);
+    LOG.info("Added new peer cluster " + peer.getPeerConfig().getClusterKey());
+    return true;
   }
 
   private String getTableCFsNode(String id) {
@@ -485,18 +431,14 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
    * @return object representing the peer
    * @throws ReplicationException
    */
-  private ReplicationPeer getPeer(String peerId) throws ReplicationException {
-    Configuration peerConf = getPeerConf(peerId);
-    if (peerConf == null) {
-      return null;
-    }
-    if (this.ourClusterKey.equals(ZKUtil.getZooKeeperClusterKey(peerConf))) {
-      LOG.debug("Not connecting to " + peerId + " because it's us");
+  private ReplicationPeerZKImpl createPeer(String peerId) throws ReplicationException {
+    Pair<ReplicationPeerConfig, Configuration> pair = getPeerConf(peerId);
+    if (pair == null) {
       return null;
     }
+    Configuration peerConf = pair.getSecond();
 
-    ReplicationPeer peer =
-        new ReplicationPeer(peerConf, peerId, ZKUtil.getZooKeeperClusterKey(peerConf));
+    ReplicationPeerZKImpl peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst());
     try {
       peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
     } catch (KeeperException e) {
@@ -511,7 +453,6 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
           peerId, e);
     }
 
-    peer.getZkw().registerListener(new PeerRegionServerListener(peer));
     return peer;
   }
 
@@ -520,7 +461,8 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
    * @return ClusterKey parsed from the passed bytes.
    * @throws DeserializationException
    */
-  private static String parsePeerFrom(final byte[] bytes) throws DeserializationException {
+  private static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
+      throws DeserializationException {
     if (ProtobufUtil.isPBMagicPrefix(bytes)) {
       int pblen = ProtobufUtil.lengthOfPBMagic();
       ZooKeeperProtos.ReplicationPeer.Builder builder =
@@ -531,58 +473,70 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
       } catch (InvalidProtocolBufferException e) {
         throw new DeserializationException(e);
       }
-      return peer.getClusterkey();
+      return convert(peer);
     } else {
       if (bytes.length > 0) {
-        return Bytes.toString(bytes);
+        return new ReplicationPeerConfig().setClusterKey(Bytes.toString(bytes));
       }
-      return "";
+      return new ReplicationPeerConfig().setClusterKey("");
     }
   }
 
-  /**
-   * @param clusterKey
-   * @return Serialized protobuf of <code>clusterKey</code> with pb magic prefix prepended suitable
-   *         for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under
-   *         /hbase/replication/peers/PEER_ID
-   */
-  private static byte[] toByteArray(final String clusterKey) {
-    byte[] bytes =
-        ZooKeeperProtos.ReplicationPeer.newBuilder().setClusterkey(clusterKey).build()
-            .toByteArray();
-    return ProtobufUtil.prependPBMagic(bytes);
-  }
+  private static ReplicationPeerConfig convert(ZooKeeperProtos.ReplicationPeer peer) {
+    ReplicationPeerConfig peerConfig = new ReplicationPeerConfig();
+    if (peer.hasClusterkey()) {
+      peerConfig.setClusterKey(peer.getClusterkey());
+    }
+    if (peer.hasReplicationEndpointImpl()) {
+      peerConfig.setReplicationEndpointImpl(peer.getReplicationEndpointImpl());
+    }
 
-  /**
-   * Tracks changes to the list of region servers in a peer's cluster.
-   */
-  public static class PeerRegionServerListener extends ZooKeeperListener {
+    for (BytesBytesPair pair : peer.getDataList()) {
+      peerConfig.getPeerData().put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
+    }
 
-    private ReplicationPeer peer;
-    private String regionServerListNode;
+    for (NameStringPair pair : peer.getConfigurationList()) {
+      peerConfig.getConfiguration().put(pair.getName(), pair.getValue());
+    }
+    return peerConfig;
+  }
 
-    public PeerRegionServerListener(ReplicationPeer replicationPeer) {
-      super(replicationPeer.getZkw());
-      this.peer = replicationPeer;
-      this.regionServerListNode = peer.getZkw().rsZNode;
+  private static ZooKeeperProtos.ReplicationPeer convert(ReplicationPeerConfig  peerConfig) {
+    ZooKeeperProtos.ReplicationPeer.Builder builder = ZooKeeperProtos.ReplicationPeer.newBuilder();
+    if (peerConfig.getClusterKey() != null) {
+      builder.setClusterkey(peerConfig.getClusterKey());
+    }
+    if (peerConfig.getReplicationEndpointImpl() != null) {
+      builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl());
     }
 
-    public PeerRegionServerListener(String regionServerListNode, ZooKeeperWatcher zkw) {
-      super(zkw);
-      this.regionServerListNode = regionServerListNode;
+    for (Map.Entry<byte[], byte[]> entry : peerConfig.getPeerData().entrySet()) {
+      builder.addData(BytesBytesPair.newBuilder()
+        .setFirst(ByteString.copyFrom(entry.getKey()))
+        .setSecond(ByteString.copyFrom(entry.getValue()))
+          .build());
     }
 
-    @Override
-    public synchronized void nodeChildrenChanged(String path) {
-      if (path.equals(regionServerListNode)) {
-        try {
-          LOG.info("Detected change to peer regionservers, fetching updated list");
-          peer.setRegionServers(fetchSlavesAddresses(peer.getZkw()));
-        } catch (KeeperException e) {
-          LOG.fatal("Error reading slave addresses", e);
-        }
-      }
+    for (Map.Entry<String, String> entry : peerConfig.getConfiguration().entrySet()) {
+      builder.addConfiguration(NameStringPair.newBuilder()
+        .setName(entry.getKey())
+        .setValue(entry.getValue())
+        .build());
     }
 
+    return builder.build();
   }
+
+  /**
+   * @param peerConfig
+   * @return Serialized protobuf of <code>peerConfig</code> with pb magic prefix prepended suitable
+   *         for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under
+   *         /hbase/replication/peers/PEER_ID
+   */
+  private static byte[] toByteArray(final ReplicationPeerConfig peerConfig) {
+    byte[] bytes = convert(peerConfig).toByteArray();
+    return ProtobufUtil.prependPBMagic(bytes);
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java
index e0fb7cd..118d2bf 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java
@@ -93,6 +93,7 @@ public class ZKClusterId {
    * @throws KeeperException
    */
   public static UUID getUUIDForCluster(ZooKeeperWatcher zkw) throws KeeperException {
-    return UUID.fromString(readClusterIdZNode(zkw));
+    String uuid = readClusterIdZNode(zkw);
+    return uuid == null ? null : UUID.fromString(uuid);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/463d52d8/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseInterfaceAudience.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseInterfaceAudience.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseInterfaceAudience.java
index e7ce8d5..4010dc0 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseInterfaceAudience.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseInterfaceAudience.java
@@ -27,5 +27,6 @@ import org.apache.hadoop.classification.InterfaceStability;
 @InterfaceStability.Evolving
 public class HBaseInterfaceAudience {
   public static final String COPROC = "Coprocesssor";
+  public static final String REPLICATION = "Replication";
   public static final String PHOENIX = "Phoenix";
 }