You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/08/23 21:42:43 UTC

[2/4] hbase git commit: HBASE-17442 Move most of the replication related classes from hbase-client to new hbase-replication package. (Guanghao Zhang).

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-replication/pom.xml b/hbase-replication/pom.xml
new file mode 100644
index 0000000..858e9fc
--- /dev/null
+++ b/hbase-replication/pom.xml
@@ -0,0 +1,264 @@
+<?xml version="1.0"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <!--
+  /**
+   * Licensed to the Apache Software Foundation (ASF) under one
+   * or more contributor license agreements.  See the NOTICE file
+   * distributed with this work for additional information
+   * regarding copyright ownership.  The ASF licenses this file
+   * to you under the Apache License, Version 2.0 (the
+   * "License"); you may not use this file except in compliance
+   * with the License.  You may obtain a copy of the License at
+   *
+   *     http://www.apache.org/licenses/LICENSE-2.0
+   *
+   * Unless required by applicable law or agreed to in writing, software
+   * distributed under the License is distributed on an "AS IS" BASIS,
+   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   * See the License for the specific language governing permissions and
+   * limitations under the License.
+   */
+  -->
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>hbase-build-configuration</artifactId>
+    <groupId>org.apache.hbase</groupId>
+    <version>3.0.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+  <artifactId>hbase-replication</artifactId>
+  <name>Apache HBase - Replication</name>
+  <description>HBase Replication Support</description>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-site-plugin</artifactId>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+      <plugin>
+        <!--Make it so assembly:single does nothing in here-->
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <skipAssembly>true</skipAssembly>
+        </configuration>
+      </plugin>
+      <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <!-- Always skip the second part executions, since we only run
+          simple unit tests in this module -->
+        <executions>
+          <execution>
+            <id>secondPartTestsExecution</id>
+            <phase>test</phase>
+            <goals>
+              <goal>test</goal>
+            </goals>
+            <configuration>
+              <skip>true</skip>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <!-- Make a jar and put the sources in the jar -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-source-plugin</artifactId>
+      </plugin>
+    </plugins>
+    <pluginManagement>
+      <plugins>
+        <!--This plugin's configuration is used to store Eclipse m2e settings
+             only. It has no influence on the Maven build itself.-->
+        <plugin>
+          <groupId>org.eclipse.m2e</groupId>
+          <artifactId>lifecycle-mapping</artifactId>
+          <version>1.0.0</version>
+          <configuration>
+            <lifecycleMappingMetadata>
+              <pluginExecutions>
+                <pluginExecution>
+                  <pluginExecutionFilter>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-compiler-plugin</artifactId>
+                    <versionRange>[3.2,)</versionRange>
+                    <goals>
+                      <goal>compile</goal>
+                    </goals>
+                  </pluginExecutionFilter>
+                  <action>
+                    <ignore></ignore>
+                  </action>
+                </pluginExecution>
+              </pluginExecutions>
+            </lifecycleMappingMetadata>
+          </configuration>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+  </build>
+
+  <dependencies>
+    <!-- Intra-project dependencies -->
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-annotations</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>jdk.tools</groupId>
+          <artifactId>jdk.tools</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-annotations</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-client</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-common</artifactId>
+      <type>test-jar</type>
+    </dependency>
+    <!-- General dependencies -->
+    <dependency>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <profiles>
+    <!-- profile against Hadoop 2.x: This is the default. -->
+    <profile>
+      <id>hadoop-2.0</id>
+      <activation>
+        <property>
+            <!--Below formatting for dev-support/generate-hadoopX-poms.sh-->
+            <!--h2--><name>!hadoop.profile</name>
+        </property>
+      </activation>
+      <dependencies>
+        <dependency>
+           <groupId>com.github.stephenc.findbugs</groupId>
+           <artifactId>findbugs-annotations</artifactId>
+           <optional>true</optional>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-auth</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+          <exclusions>
+            <exclusion>
+              <groupId>net.java.dev.jets3t</groupId>
+              <artifactId>jets3t</artifactId>
+            </exclusion>
+            <exclusion>
+              <groupId>javax.servlet.jsp</groupId>
+              <artifactId>jsp-api</artifactId>
+            </exclusion>
+            <exclusion>
+              <groupId>org.mortbay.jetty</groupId>
+              <artifactId>jetty</artifactId>
+            </exclusion>
+            <exclusion>
+              <groupId>com.sun.jersey</groupId>
+              <artifactId>jersey-server</artifactId>
+            </exclusion>
+            <exclusion>
+              <groupId>com.sun.jersey</groupId>
+              <artifactId>jersey-core</artifactId>
+            </exclusion>
+            <exclusion>
+              <groupId>com.sun.jersey</groupId>
+              <artifactId>jersey-json</artifactId>
+            </exclusion>
+            <exclusion>
+              <groupId>javax.servlet</groupId>
+              <artifactId>servlet-api</artifactId>
+            </exclusion>
+            <exclusion>
+              <groupId>tomcat</groupId>
+              <artifactId>jasper-compiler</artifactId>
+            </exclusion>
+            <exclusion>
+              <groupId>tomcat</groupId>
+              <artifactId>jasper-runtime</artifactId>
+            </exclusion>
+            <exclusion>
+              <groupId>com.google.code.findbugs</groupId>
+              <artifactId>jsr305</artifactId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+      </dependencies>
+    </profile>
+
+    <!--
+      profile for building against Hadoop 3.0.x. Activate using:
+       mvn -Dhadoop.profile=3.0
+    -->
+    <profile>
+      <id>hadoop-3.0</id>
+      <activation>
+        <property>
+          <name>hadoop.profile</name>
+          <value>3.0</value>
+        </property>
+      </activation>
+      <properties>
+        <hadoop.version>3.0-SNAPSHOT</hadoop.version>
+      </properties>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-auth</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
+</project>

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
new file mode 100644
index 0000000..8506cbb
--- /dev/null
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.commons.lang.reflect.ConstructorUtils;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+
+/**
+ * A factory class for instantiating replication objects that deal with replication state.
+ */
+@InterfaceAudience.Private
+public class ReplicationFactory {
+
+  public static final Class defaultReplicationQueueClass = ReplicationQueuesZKImpl.class;
+
+  public static ReplicationQueues getReplicationQueues(ReplicationQueuesArguments args)
+      throws Exception {
+    Class<?> classToBuild = args.getConf().getClass("hbase.region.replica." +
+        "replication.replicationQueues.class", defaultReplicationQueueClass);
+    return (ReplicationQueues) ConstructorUtils.invokeConstructor(classToBuild, args);
+  }
+
+  public static ReplicationQueuesClient getReplicationQueuesClient(
+      ReplicationQueuesClientArguments args) throws Exception {
+    Class<?> classToBuild = args.getConf().getClass(
+      "hbase.region.replica.replication.replicationQueuesClient.class",
+      ReplicationQueuesClientZKImpl.class);
+    return (ReplicationQueuesClient) ConstructorUtils.invokeConstructor(classToBuild, args);
+  }
+
+  public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf,
+      Abortable abortable) {
+    return getReplicationPeers(zk, conf, null, abortable);
+  }
+
+  public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf,
+      final ReplicationQueuesClient queuesClient, Abortable abortable) {
+    return new ReplicationPeersZKImpl(zk, conf, queuesClient, abortable);
+  }
+
+  public static ReplicationTracker getReplicationTracker(ZooKeeperWatcher zookeeper,
+      final ReplicationPeers replicationPeers, Configuration conf, Abortable abortable,
+      Stoppable stopper) {
+    return new ReplicationTrackerZKImpl(zookeeper, replicationPeers, conf, abortable, stopper);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
new file mode 100644
index 0000000..dfb5fdc
--- /dev/null
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * The replication listener interface can be implemented if a class needs to subscribe to events
+ * generated by the ReplicationTracker. These events include things like addition/deletion of peer
+ * clusters or failure of a local region server. To receive events, the class also needs to register
+ * itself with a Replication Tracker.
+ */
+@InterfaceAudience.Private
+public interface ReplicationListener {
+
+  /**
+   * A region server has been removed from the local cluster
+   * @param regionServer the removed region server
+   */
+  public void regionServerRemoved(String regionServer);
+
+  /**
+   * A peer cluster has been removed (i.e. unregistered) from replication.
+   * @param peerId The peer id of the cluster that has been removed
+   */
+  public void peerRemoved(String peerId);
+
+  /**
+   * The list of registered peer clusters has changed.
+   * @param peerIds A list of all currently registered peer clusters
+   */
+  public void peerListChanged(List<String> peerIds);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
new file mode 100644
index 0000000..4f18048
--- /dev/null
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+
+/**
+ * ReplicationPeer manages enabled / disabled state for the peer.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public interface ReplicationPeer {
+
+  /**
+   * State of the peer, whether it is enabled or not
+   */
+  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+  enum PeerState {
+    ENABLED,
+    DISABLED
+  }
+
+  /**
+   * Get the identifier of this peer
+   * @return string representation of the id
+   */
+  String getId();
+
+  /**
+   * Get the peer config object
+   * @return the ReplicationPeerConfig for this peer
+   */
+  public ReplicationPeerConfig getPeerConfig();
+
+  /**
+   * Returns the state of the peer
+   * @return the enabled state
+   */
+  PeerState getPeerState();
+
+  /**
+   * Get the configuration object required to communicate with this peer
+   * @return configuration object
+   */
+  public Configuration getConfiguration();
+
+  /**
+   * Get replicable (table, cf-list) map of this peer
+   * @return the replicable (table, cf-list) map
+   */
+  public Map<TableName, List<String>> getTableCFs();
+
+  /**
+   * Get replicable namespace set of this peer
+   * @return the replicable namespaces set
+   */
+  public Set<String> getNamespaces();
+
+  /**
+   * Get the per node bandwidth upper limit for this peer
+   * @return the bandwidth upper limit
+   */
+  public long getPeerBandwidth();
+
+  void trackPeerConfigChanges(ReplicationPeerConfigListener listener);
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java
new file mode 100644
index 0000000..4e04186
--- /dev/null
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public interface ReplicationPeerConfigListener {
+  /** Callback method for when users update the ReplicationPeerConfig for this peer
+   *
+   * @param rpc The updated ReplicationPeerConfig
+   */
+  void peerConfigUpdated(ReplicationPeerConfig rpc);
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
new file mode 100644
index 0000000..3973be9
--- /dev/null
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
@@ -0,0 +1,318 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+
+@InterfaceAudience.Private
+public class ReplicationPeerZKImpl extends ReplicationStateZKBase
+    implements ReplicationPeer, Abortable, Closeable {
+  private static final Log LOG = LogFactory.getLog(ReplicationPeerZKImpl.class);
+
+  private ReplicationPeerConfig peerConfig;
+  private final String id;
+  private volatile PeerState peerState;
+  private volatile Map<TableName, List<String>> tableCFs = new HashMap<>();
+  private final Configuration conf;
+  private PeerStateTracker peerStateTracker;
+  private PeerConfigTracker peerConfigTracker;
+
+
+  /**
+   * Constructor that takes all the objects required to communicate with the specified peer, except
+   * for the region server addresses.
+   * @param conf configuration object to this peer
+   * @param id string representation of this peer's identifier
+   * @param peerConfig configuration for the replication peer
+   */
+  public ReplicationPeerZKImpl(ZooKeeperWatcher zkWatcher, Configuration conf,
+                               String id, ReplicationPeerConfig peerConfig,
+                               Abortable abortable)
+      throws ReplicationException {
+    super(zkWatcher, conf, abortable);
+    this.conf = conf;
+    this.peerConfig = peerConfig;
+    this.id = id;
+  }
+
+  /**
+   * start a state tracker to check whether this peer is enabled or not
+   *
+   * @param peerStateNode path to zk node which stores peer state
+   * @throws KeeperException
+   */
+  public void startStateTracker(String peerStateNode)
+      throws KeeperException {
+    ensurePeerEnabled(peerStateNode);
+    this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this);
+    this.peerStateTracker.start();
+    try {
+      this.readPeerStateZnode();
+    } catch (DeserializationException e) {
+      throw ZKUtil.convert(e);
+    }
+  }
+
+  private void readPeerStateZnode() throws DeserializationException {
+    this.peerState =
+        isStateEnabled(this.peerStateTracker.getData(false))
+          ? PeerState.ENABLED
+          : PeerState.DISABLED;
+  }
+
+  /**
+   * start a table-cfs tracker to listen the (table, cf-list) map change
+   * @param peerConfigNode path to zk node which stores table-cfs
+   * @throws KeeperException
+   */
+  public void startPeerConfigTracker(String peerConfigNode)
+    throws KeeperException {
+    this.peerConfigTracker = new PeerConfigTracker(peerConfigNode, zookeeper,
+        this);
+    this.peerConfigTracker.start();
+    this.readPeerConfig();
+  }
+
+  private ReplicationPeerConfig readPeerConfig() {
+    try {
+      byte[] data = peerConfigTracker.getData(false);
+      if (data != null) {
+        this.peerConfig = ReplicationSerDeHelper.parsePeerFrom(data);
+      }
+    } catch (DeserializationException e) {
+      LOG.error("", e);
+    }
+    return this.peerConfig;
+  }
+
+  @Override
+  public PeerState getPeerState() {
+    return peerState;
+  }
+
+  /**
+   * Get the identifier of this peer
+   * @return string representation of the id (short)
+   */
+  @Override
+  public String getId() {
+    return id;
+  }
+
+  /**
+   * Get the peer config object
+   * @return the ReplicationPeerConfig for this peer
+   */
+  @Override
+  public ReplicationPeerConfig getPeerConfig() {
+    return peerConfig;
+  }
+
+  /**
+   * Get the configuration object required to communicate with this peer
+   * @return configuration object
+   */
+  @Override
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  /**
+   * Get replicable (table, cf-list) map of this peer
+   * @return the replicable (table, cf-list) map
+   */
+  @Override
+  public Map<TableName, List<String>> getTableCFs() {
+    this.tableCFs = peerConfig.getTableCFsMap();
+    return this.tableCFs;
+  }
+
+  /**
+   * Get replicable namespace set of this peer
+   * @return the replicable namespaces set
+   */
+  @Override
+  public Set<String> getNamespaces() {
+    return this.peerConfig.getNamespaces();
+  }
+
+  @Override
+  public long getPeerBandwidth() {
+    return this.peerConfig.getBandwidth();
+  }
+
+  @Override
+  public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) {
+    if (this.peerConfigTracker != null){
+      this.peerConfigTracker.setListener(listener);
+    }
+  }
+
+  @Override
+  public void abort(String why, Throwable e) {
+    LOG.fatal("The ReplicationPeer corresponding to peer " + peerConfig
+        + " was aborted for the following reason(s):" + why, e);
+  }
+
+  @Override
+  public boolean isAborted() {
+    // Currently the replication peer is never "Aborted", we just log when the
+    // abort method is called.
+    return false;
+  }
+
+  @Override
+  public void close() throws IOException {
+    // TODO: stop zkw?
+  }
+
+  /**
+   * Parse the raw data from ZK to get a peer's state
+   * @param bytes raw ZK data
+   * @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state.
+   * @throws DeserializationException
+   */
+  public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
+    ReplicationProtos.ReplicationState.State state = parseStateFrom(bytes);
+    return ReplicationProtos.ReplicationState.State.ENABLED == state;
+  }
+
+  /**
+   * @param bytes Content of a state znode.
+   * @return State parsed from the passed bytes.
+   * @throws DeserializationException
+   */
+  private static ReplicationProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
+      throws DeserializationException {
+    ProtobufUtil.expectPBMagicPrefix(bytes);
+    int pblen = ProtobufUtil.lengthOfPBMagic();
+    ReplicationProtos.ReplicationState.Builder builder =
+        ReplicationProtos.ReplicationState.newBuilder();
+    ReplicationProtos.ReplicationState state;
+    try {
+      ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
+      state = builder.build();
+      return state.getState();
+    } catch (IOException e) {
+      throw new DeserializationException(e);
+    }
+  }
+
+  /**
+   * Utility method to ensure an ENABLED znode is in place; if not present, we create it.
+   * @param path Path to znode to check
+   * @return True if we created the znode.
+   * @throws NodeExistsException
+   * @throws KeeperException
+   */
+  private boolean ensurePeerEnabled(final String path)
+      throws NodeExistsException, KeeperException {
+    if (ZKUtil.checkExists(zookeeper, path) == -1) {
+      // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
+      // peer-state znode. This happens while adding a peer.
+      // The peer state data is set as "ENABLED" by default.
+      ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path,
+        ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Tracker for state of this peer
+   */
+  public class PeerStateTracker extends ZooKeeperNodeTracker {
+
+    public PeerStateTracker(String peerStateZNode, ZooKeeperWatcher watcher,
+        Abortable abortable) {
+      super(watcher, peerStateZNode, abortable);
+    }
+
+    @Override
+    public synchronized void nodeDataChanged(String path) {
+      if (path.equals(node)) {
+        super.nodeDataChanged(path);
+        try {
+          readPeerStateZnode();
+        } catch (DeserializationException e) {
+          LOG.warn("Failed deserializing the content of " + path, e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Tracker for PeerConfigNode of this peer
+   */
+  public class PeerConfigTracker extends ZooKeeperNodeTracker {
+
+    ReplicationPeerConfigListener listener;
+
+    public PeerConfigTracker(String peerConfigNode, ZooKeeperWatcher watcher,
+        Abortable abortable) {
+      super(watcher, peerConfigNode, abortable);
+    }
+
+    public synchronized void setListener(ReplicationPeerConfigListener listener){
+      this.listener = listener;
+    }
+
+    @Override
+    public synchronized void nodeCreated(String path) {
+      if (path.equals(node)) {
+        super.nodeCreated(path);
+        ReplicationPeerConfig config = readPeerConfig();
+        if (listener != null){
+          listener.peerConfigUpdated(config);
+        }
+      }
+    }
+
+    @Override
+    public synchronized void nodeDataChanged(String path) {
+      //superclass calls nodeCreated
+      if (path.equals(node)) {
+        super.nodeDataChanged(path);
+      }
+
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
new file mode 100644
index 0000000..2a7963a
--- /dev/null
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -0,0 +1,177 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * This provides an interface for maintaining a set of peer clusters. These peers are remote slave
+ * clusters that data is replicated to. A peer cluster can be in three different states:
+ *
+ * 1. Not-Registered - There is no notion of the peer cluster.
+ * 2. Registered - The peer has an id and is being tracked but there is no connection.
+ * 3. Connected - There is an active connection to the remote peer.
+ *
+ * In the registered or connected state, a peer cluster can either be enabled or disabled.
+ */
+@InterfaceAudience.Private
+public interface ReplicationPeers {
+
+  /**
+   * Initialize the ReplicationPeers interface.
+   */
+  void init() throws ReplicationException;
+
+  /**
+   * Add a new remote slave cluster for replication.
+   * @param peerId a short that identifies the cluster
+   * @param peerConfig configuration for the replication slave cluster
+   */
+  void registerPeer(String peerId, ReplicationPeerConfig peerConfig)
+      throws ReplicationException;
+
+  /**
+   * Removes a remote slave cluster and stops the replication to it.
+   * @param peerId a short that identifies the cluster
+   */
+  void unregisterPeer(String peerId) throws ReplicationException;
+
+  /**
+   * Method called after a peer has been connected. It will create a ReplicationPeer to track the
+   * newly connected cluster.
+   * @param peerId a short that identifies the cluster
+   * @return whether a ReplicationPeer was successfully created
+   * @throws ReplicationException
+   */
+  boolean peerConnected(String peerId) throws ReplicationException;
+
+  /**
+   * Method called after a peer has been disconnected. It will remove the ReplicationPeer that
+   * tracked the disconnected cluster.
+   * @param peerId a short that identifies the cluster
+   */
+  void peerDisconnected(String peerId);
+
+  /**
+   * Restart the replication to the specified remote slave cluster.
+   * @param peerId a short that identifies the cluster
+   */
+  void enablePeer(String peerId) throws ReplicationException;
+
+  /**
+   * Stop the replication to the specified remote slave cluster.
+   * @param peerId a short that identifies the cluster
+   */
+  void disablePeer(String peerId) throws ReplicationException;
+
+  /**
+   * Get the table and column-family list string of the peer from the underlying storage.
+   * @param peerId a short that identifies the cluster
+   */
+  public Map<TableName, List<String>> getPeerTableCFsConfig(String peerId)
+      throws ReplicationException;
+
+  /**
+   * Set the table and column-family list string of the peer to the underlying storage.
+   * @param peerId a short that identifies the cluster
+   * @param tableCFs the table and column-family list which will be replicated for this peer
+   */
+  public void setPeerTableCFsConfig(String peerId,
+                                    Map<TableName, ? extends Collection<String>>  tableCFs)
+      throws ReplicationException;
+
+  /**
+   * Returns the ReplicationPeer for the specified connected peer. This ReplicationPeer will
+   * continue to track changes to the Peer's state and config. This method returns null if no
+   * peer has been connected with the given peerId.
+   * @param peerId id for the peer
+   * @return ReplicationPeer object
+   */
+  ReplicationPeer getConnectedPeer(String peerId);
+
+  /**
+   * Returns the set of peerIds of the clusters that have been connected and have an underlying
+   * ReplicationPeer.
+   * @return a Set of Strings for peerIds
+   */
+  public Set<String> getConnectedPeerIds();
+
+  /**
+   * Get the replication status for the specified connected remote slave cluster.
+   * The value might be read from cache, so it is recommended to
+   * use {@link #getStatusOfPeerFromBackingStore(String)}
+   * if reading the state after enabling or disabling it.
+   * @param peerId a short that identifies the cluster
+   * @return true if replication is enabled, false otherwise.
+   */
+  boolean getStatusOfPeer(String peerId);
+
+  /**
+   * Get the replication status for the specified remote slave cluster, which doesn't
+   * have to be connected. The state is read directly from the backing store.
+   * @param peerId a short that identifies the cluster
+   * @return true if replication is enabled, false otherwise.
+   * @throws ReplicationException thrown if there's an error contacting the store
+   */
+  boolean getStatusOfPeerFromBackingStore(String peerId) throws ReplicationException;
+
+  /**
+   * List the cluster replication configs of all remote slave clusters (whether they are
+   * enabled/disabled or connected/disconnected).
+   * @return A map of peer ids to peer cluster keys
+   */
+  Map<String, ReplicationPeerConfig> getAllPeerConfigs();
+
+  /**
+   * List the peer ids of all remote slave clusters (whether they are enabled/disabled or
+   * connected/disconnected).
+   * @return A list of peer ids
+   */
+  List<String> getAllPeerIds();
+
+  /**
+   * Returns the configured ReplicationPeerConfig for this peerId
+   * @param peerId a short name that identifies the cluster
+   * @return ReplicationPeerConfig for the peer
+   */
+  ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws ReplicationException;
+
+  /**
+   * Returns the configuration needed to talk to the remote slave cluster.
+   * @param peerId a short that identifies the cluster
+   * @return the configuration for the peer cluster, null if it was unable to get the configuration
+   */
+  Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId) throws ReplicationException;
+
+  /**
+   * Update the peerConfig for the a given peer cluster
+   * @param id a short that identifies the cluster
+   * @param peerConfig new config for the peer cluster
+   * @throws ReplicationException
+   */
+  void updatePeerConfig(String id, ReplicationPeerConfig peerConfig) throws ReplicationException;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
new file mode 100644
index 0000000..751e454
--- /dev/null
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -0,0 +1,546 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.CompoundConfiguration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
+import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * This class provides an implementation of the ReplicationPeers interface using ZooKeeper. The
+ * peers znode contains a list of all peer replication clusters and the current replication state of
+ * those clusters. It has one child peer znode for each peer cluster. The peer znode is named with
+ * the cluster id provided by the user in the HBase shell. The value of the peer znode contains the
+ * peers cluster key provided by the user in the HBase Shell. The cluster key contains a list of
+ * zookeeper quorum peers, the client port for the zookeeper quorum, and the base znode for HBase.
+ * For example:
+ *
+ *  /hbase/replication/peers/1 [Value: zk1.host.com,zk2.host.com,zk3.host.com:2181:/hbase]
+ *  /hbase/replication/peers/2 [Value: zk5.host.com,zk6.host.com,zk7.host.com:2181:/hbase]
+ *
+ * Each of these peer znodes has a child znode that indicates whether or not replication is enabled
+ * on that peer cluster. These peer-state znodes do not have child znodes and simply contain a
+ * boolean value (i.e. ENABLED or DISABLED). This value is read/maintained by the
+ * ReplicationPeer.PeerStateTracker class. For example:
+ *
+ * /hbase/replication/peers/1/peer-state [Value: ENABLED]
+ *
+ * Each of these peer znodes has a child znode that indicates which data will be replicated
+ * to the peer cluster. These peer-tableCFs znodes do not have child znodes and only have a
+ * table/cf list config. This value is read/maintained by the ReplicationPeer.TableCFsTracker
+ * class. For example:
+ *
+ * /hbase/replication/peers/1/tableCFs [Value: "table1; table2:cf1,cf3; table3:cfx,cfy"]
+ */
+@InterfaceAudience.Private
+public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers {
+
+  // Map of peer clusters keyed by their id
+  private Map<String, ReplicationPeerZKImpl> peerClusters;
+  private final ReplicationQueuesClient queuesClient;
+  private Abortable abortable;
+
+  private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class);
+
+  public ReplicationPeersZKImpl(final ZooKeeperWatcher zk, final Configuration conf,
+      final ReplicationQueuesClient queuesClient, Abortable abortable) {
+    super(zk, conf, abortable);
+    this.abortable = abortable;
+    this.peerClusters = new ConcurrentHashMap<>();
+    this.queuesClient = queuesClient;
+  }
+
+  @Override
+  public void init() throws ReplicationException {
+    try {
+      if (ZKUtil.checkExists(this.zookeeper, this.peersZNode) < 0) {
+        ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
+      }
+    } catch (KeeperException e) {
+      throw new ReplicationException("Could not initialize replication peers", e);
+    }
+    addExistingPeers();
+  }
+
+  @Override
+  public void registerPeer(String id, ReplicationPeerConfig peerConfig)
+      throws ReplicationException {
+    try {
+      if (peerExists(id)) {
+        throw new IllegalArgumentException("Cannot add a peer with id=" + id
+            + " because that id already exists.");
+      }
+
+      if(id.contains("-")){
+        throw new IllegalArgumentException("Found invalid peer name:" + id);
+      }
+
+      if (peerConfig.getClusterKey() != null) {
+        try {
+          ZKConfig.validateClusterKey(peerConfig.getClusterKey());
+        } catch (IOException ioe) {
+          throw new IllegalArgumentException(ioe.getMessage());
+        }
+      }
+
+      checkQueuesDeleted(id);
+
+      ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
+
+      List<ZKUtilOp> listOfOps = new ArrayList<>(2);
+      ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(getPeerNode(id),
+        ReplicationSerDeHelper.toByteArray(peerConfig));
+      // b/w PeerWatcher and ReplicationZookeeper#add method to create the
+      // peer-state znode. This happens while adding a peer
+      // The peer state data is set as "ENABLED" by default.
+      ZKUtilOp op2 = ZKUtilOp.createAndFailSilent(getPeerStateNode(id), ENABLED_ZNODE_BYTES);
+      listOfOps.add(op1);
+      listOfOps.add(op2);
+      ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
+      // A peer is enabled by default
+    } catch (KeeperException e) {
+      throw new ReplicationException("Could not add peer with id=" + id
+          + ", peerConfif=>" + peerConfig, e);
+    }
+  }
+
+  @Override
+  public void unregisterPeer(String id) throws ReplicationException {
+    try {
+      if (!peerExists(id)) {
+        throw new IllegalArgumentException("Cannot remove peer with id=" + id
+            + " because that id does not exist.");
+      }
+      ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
+    } catch (KeeperException e) {
+      throw new ReplicationException("Could not remove peer with id=" + id, e);
+    }
+  }
+
+  @Override
+  public void enablePeer(String id) throws ReplicationException {
+    changePeerState(id, ReplicationProtos.ReplicationState.State.ENABLED);
+    LOG.info("peer " + id + " is enabled");
+  }
+
+  @Override
+  public void disablePeer(String id) throws ReplicationException {
+    changePeerState(id, ReplicationProtos.ReplicationState.State.DISABLED);
+    LOG.info("peer " + id + " is disabled");
+  }
+
+  @Override
+  public Map<TableName, List<String>> getPeerTableCFsConfig(String id) throws ReplicationException {
+    try {
+      if (!peerExists(id)) {
+        throw new IllegalArgumentException("peer " + id + " doesn't exist");
+      }
+      try {
+        ReplicationPeerConfig rpc = getReplicationPeerConfig(id);
+        if (rpc == null) {
+          throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id);
+        }
+        return rpc.getTableCFsMap();
+      } catch (Exception e) {
+        throw new ReplicationException(e);
+      }
+    } catch (KeeperException e) {
+      throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id, e);
+    }
+  }
+
+  @Override
+  public void setPeerTableCFsConfig(String id,
+                                    Map<TableName, ? extends Collection<String>>  tableCFs)
+      throws ReplicationException {
+    try {
+      if (!peerExists(id)) {
+        throw new IllegalArgumentException("Cannot set peer tableCFs because id=" + id
+            + " does not exist.");
+      }
+      ReplicationPeerConfig rpc = getReplicationPeerConfig(id);
+      if (rpc == null) {
+        throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id);
+      }
+      rpc.setTableCFsMap(tableCFs);
+      ZKUtil.setData(this.zookeeper, getPeerNode(id),
+          ReplicationSerDeHelper.toByteArray(rpc));
+      LOG.info("Peer tableCFs with id= " + id + " is now " +
+        ReplicationSerDeHelper.convertToString(tableCFs));
+    } catch (KeeperException e) {
+      throw new ReplicationException("Unable to change tableCFs of the peer with id=" + id, e);
+    }
+  }
+
+  @Override
+  public boolean getStatusOfPeer(String id) {
+    ReplicationPeer replicationPeer = this.peerClusters.get(id);
+    if (replicationPeer == null) {
+      throw new IllegalArgumentException("Peer with id= " + id + " is not cached");
+    }
+    return replicationPeer.getPeerState() == PeerState.ENABLED;
+  }
+
+  @Override
+  public boolean getStatusOfPeerFromBackingStore(String id) throws ReplicationException {
+    try {
+      if (!peerExists(id)) {
+        throw new IllegalArgumentException("peer " + id + " doesn't exist");
+      }
+      String peerStateZNode = getPeerStateNode(id);
+      try {
+        return ReplicationPeerZKImpl.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
+      } catch (KeeperException e) {
+        throw new ReplicationException(e);
+      } catch (DeserializationException e) {
+        throw new ReplicationException(e);
+      }
+    } catch (KeeperException e) {
+      throw new ReplicationException("Unable to get status of the peer with id=" + id +
+          " from backing store", e);
+    } catch (InterruptedException e) {
+      throw new ReplicationException(e);
+    }
+  }
+
+  @Override
+  public Map<String, ReplicationPeerConfig> getAllPeerConfigs() {
+    Map<String, ReplicationPeerConfig> peers = new TreeMap<>();
+    List<String> ids = null;
+    try {
+      ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
+      for (String id : ids) {
+        ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
+        if (peerConfig == null) {
+          LOG.warn("Failed to get replication peer configuration of clusterid=" + id
+            + " znode content, continuing.");
+          continue;
+        }
+        peers.put(id, peerConfig);
+      }
+    } catch (KeeperException e) {
+      this.abortable.abort("Cannot get the list of peers ", e);
+    } catch (ReplicationException e) {
+      this.abortable.abort("Cannot get the list of peers ", e);
+    }
+    return peers;
+  }
+
+  @Override
+  public ReplicationPeer getConnectedPeer(String peerId) {
+    return peerClusters.get(peerId);
+  }
+
+  @Override
+  public Set<String> getConnectedPeerIds() {
+    return peerClusters.keySet(); // this is not thread-safe
+  }
+
+  /**
+   * Returns a ReplicationPeerConfig from the znode or null for the given peerId.
+   */
+  @Override
+  public ReplicationPeerConfig getReplicationPeerConfig(String peerId)
+      throws ReplicationException {
+    String znode = getPeerNode(peerId);
+    byte[] data = null;
+    try {
+      data = ZKUtil.getData(this.zookeeper, znode);
+    } catch (InterruptedException e) {
+      LOG.warn("Could not get configuration for peer because the thread " +
+          "was interrupted. peerId=" + peerId);
+      Thread.currentThread().interrupt();
+      return null;
+    } catch (KeeperException e) {
+      throw new ReplicationException("Error getting configuration for peer with id="
+          + peerId, e);
+    }
+    if (data == null) {
+      LOG.error("Could not get configuration for peer because it doesn't exist. peerId=" + peerId);
+      return null;
+    }
+
+    try {
+      return ReplicationSerDeHelper.parsePeerFrom(data);
+    } catch (DeserializationException e) {
+      LOG.warn("Failed to parse cluster key from peerId=" + peerId
+          + ", specifically the content from the following znode: " + znode);
+      return null;
+    }
+  }
+
+  @Override
+  public Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId)
+      throws ReplicationException {
+    ReplicationPeerConfig peerConfig = getReplicationPeerConfig(peerId);
+
+    if (peerConfig == null) {
+      return null;
+    }
+
+    Configuration otherConf;
+    try {
+      otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey());
+    } catch (IOException e) {
+      LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e);
+      return null;
+    }
+
+    if (!peerConfig.getConfiguration().isEmpty()) {
+      CompoundConfiguration compound = new CompoundConfiguration();
+      compound.add(otherConf);
+      compound.addStringMap(peerConfig.getConfiguration());
+      return new Pair<>(peerConfig, compound);
+    }
+
+    return new Pair<>(peerConfig, otherConf);
+  }
+
+  @Override
+  public void updatePeerConfig(String id, ReplicationPeerConfig newConfig)
+      throws ReplicationException {
+    ReplicationPeer peer = getConnectedPeer(id);
+    if (peer == null){
+      throw new ReplicationException("Could not find peer Id " + id + " in connected peers");
+    }
+    ReplicationPeerConfig existingConfig = peer.getPeerConfig();
+    if (newConfig.getClusterKey() != null && !newConfig.getClusterKey().isEmpty() &&
+        !newConfig.getClusterKey().equals(existingConfig.getClusterKey())){
+      throw new ReplicationException("Changing the cluster key on an existing peer is not allowed."
+          + " Existing key '" + existingConfig.getClusterKey() + "' does not match new key '"
+          + newConfig.getClusterKey() +
+      "'");
+    }
+    String existingEndpointImpl = existingConfig.getReplicationEndpointImpl();
+    if (newConfig.getReplicationEndpointImpl() != null &&
+        !newConfig.getReplicationEndpointImpl().isEmpty() &&
+        !newConfig.getReplicationEndpointImpl().equals(existingEndpointImpl)){
+      throw new ReplicationException("Changing the replication endpoint implementation class " +
+          "on an existing peer is not allowed. Existing class '"
+          + existingConfig.getReplicationEndpointImpl()
+          + "' does not match new class '" + newConfig.getReplicationEndpointImpl() + "'");
+    }
+    //Update existingConfig's peer config and peer data with the new values, but don't touch config
+    // or data that weren't explicitly changed
+    existingConfig.getConfiguration().putAll(newConfig.getConfiguration());
+    existingConfig.getPeerData().putAll(newConfig.getPeerData());
+    existingConfig.setTableCFsMap(newConfig.getTableCFsMap());
+    existingConfig.setNamespaces(newConfig.getNamespaces());
+    existingConfig.setBandwidth(newConfig.getBandwidth());
+
+    try {
+      ZKUtil.setData(this.zookeeper, getPeerNode(id),
+          ReplicationSerDeHelper.toByteArray(existingConfig));
+    }
+    catch(KeeperException ke){
+      throw new ReplicationException("There was a problem trying to save changes to the " +
+          "replication peer " + id, ke);
+    }
+  }
+
+  /**
+   * List all registered peer clusters and set a watch on their znodes.
+   */
+  @Override
+  public List<String> getAllPeerIds() {
+    List<String> ids = null;
+    try {
+      ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Cannot get the list of peers ", e);
+    }
+    return ids;
+  }
+
+  /**
+   * A private method used during initialization. This method attempts to add all registered
+   * peer clusters. This method does not set a watch on the peer cluster znodes.
+   */
+  private void addExistingPeers() throws ReplicationException {
+    List<String> znodes = null;
+    try {
+      znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
+    } catch (KeeperException e) {
+      throw new ReplicationException("Error getting the list of peer clusters.", e);
+    }
+    if (znodes != null) {
+      for (String z : znodes) {
+        createAndAddPeer(z);
+      }
+    }
+  }
+
+  @Override
+  public boolean peerConnected(String peerId) throws ReplicationException {
+    return createAndAddPeer(peerId);
+  }
+
+  @Override
+  public void peerDisconnected(String peerId) {
+    ReplicationPeer rp = this.peerClusters.get(peerId);
+    if (rp != null) {
+      ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).remove(peerId, rp);
+    }
+  }
+
+  /**
+   * Attempt to connect to a new remote slave cluster.
+   * @param peerId a short that identifies the cluster
+   * @return true if a new connection was made, false if no new connection was made.
+   */
+  public boolean createAndAddPeer(String peerId) throws ReplicationException {
+    if (peerClusters == null) {
+      return false;
+    }
+    if (this.peerClusters.containsKey(peerId)) {
+      return false;
+    }
+
+    ReplicationPeerZKImpl peer = null;
+    try {
+      peer = createPeer(peerId);
+    } catch (Exception e) {
+      throw new ReplicationException("Error adding peer with id=" + peerId, e);
+    }
+    if (peer == null) {
+      return false;
+    }
+    ReplicationPeerZKImpl previous =
+      ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).putIfAbsent(peerId, peer);
+    if (previous == null) {
+      LOG.info("Added new peer cluster=" + peer.getPeerConfig().getClusterKey());
+    } else {
+      LOG.info("Peer already present, " + previous.getPeerConfig().getClusterKey() +
+        ", new cluster=" + peer.getPeerConfig().getClusterKey());
+    }
+    return true;
+  }
+
+  /**
+   * Update the state znode of a peer cluster.
+   * @param id
+   * @param state
+   */
+  private void changePeerState(String id, ReplicationProtos.ReplicationState.State state)
+      throws ReplicationException {
+    try {
+      if (!peerExists(id)) {
+        throw new IllegalArgumentException("Cannot enable/disable peer because id=" + id
+            + " does not exist.");
+      }
+      String peerStateZNode = getPeerStateNode(id);
+      byte[] stateBytes =
+          (state == ReplicationProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES
+              : DISABLED_ZNODE_BYTES;
+      if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
+        ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes);
+      } else {
+        ZKUtil.createAndWatch(this.zookeeper, peerStateZNode, stateBytes);
+      }
+      LOG.info("Peer with id= " + id + " is now " + state.name());
+    } catch (KeeperException e) {
+      throw new ReplicationException("Unable to change state of the peer with id=" + id, e);
+    }
+  }
+
+  /**
+   * Helper method to connect to a peer
+   * @param peerId peer's identifier
+   * @return object representing the peer
+   * @throws ReplicationException
+   */
+  private ReplicationPeerZKImpl createPeer(String peerId) throws ReplicationException {
+    Pair<ReplicationPeerConfig, Configuration> pair = getPeerConf(peerId);
+    if (pair == null) {
+      return null;
+    }
+    Configuration peerConf = pair.getSecond();
+
+    ReplicationPeerZKImpl peer = new ReplicationPeerZKImpl(zookeeper,
+        peerConf, peerId, pair.getFirst(), abortable);
+    try {
+      peer.startStateTracker(this.getPeerStateNode(peerId));
+    } catch (KeeperException e) {
+      throw new ReplicationException("Error starting the peer state tracker for peerId=" +
+          peerId, e);
+    }
+
+    try {
+      peer.startPeerConfigTracker(this.getPeerNode(peerId));
+    } catch (KeeperException e) {
+      throw new ReplicationException("Error starting the peer tableCFs tracker for peerId=" +
+          peerId, e);
+    }
+
+    return peer;
+  }
+
+  private void checkQueuesDeleted(String peerId) throws ReplicationException {
+    if (queuesClient == null) return;
+    try {
+      List<String> replicators = queuesClient.getListOfReplicators();
+      if (replicators == null || replicators.isEmpty()) {
+        return;
+      }
+      for (String replicator : replicators) {
+        List<String> queueIds = queuesClient.getAllQueues(replicator);
+        for (String queueId : queueIds) {
+          ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
+          if (queueInfo.getPeerId().equals(peerId)) {
+            throw new ReplicationException("undeleted queue for peerId: " + peerId
+                + ", replicator: " + replicator + ", queueId: " + queueId);
+          }
+        }
+      }
+      // Check for hfile-refs queue
+      if (-1 != ZKUtil.checkExists(zookeeper, hfileRefsZNode)
+          && queuesClient.getAllPeersFromHFileRefsQueue().contains(peerId)) {
+        throw new ReplicationException("Undeleted queue for peerId: " + peerId
+            + ", found in hfile-refs node path " + hfileRefsZNode);
+      }
+    } catch (KeeperException e) {
+      throw new ReplicationException("Could not check queues deleted with id=" + peerId, e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
new file mode 100644
index 0000000..1403f6d
--- /dev/null
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
@@ -0,0 +1,130 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ServerName;
+
+/**
+ * This class is responsible for the parsing logic for a znode representing a queue.
+ * It will extract the peerId if it's recovered as well as the dead region servers
+ * that were part of the queue's history.
+ */
+@InterfaceAudience.Private
+public class ReplicationQueueInfo {
+  private static final Log LOG = LogFactory.getLog(ReplicationQueueInfo.class);
+
+  private final String peerId;
+  private final String peerClusterZnode;
+  private boolean queueRecovered;
+  // List of all the dead region servers that had this queue (if recovered)
+  private List<String> deadRegionServers = new ArrayList<>();
+
+  /**
+   * The passed znode will be either the id of the peer cluster or
+   * the handling story of that queue in the form of id-servername-*
+   */
+  public ReplicationQueueInfo(String znode) {
+    this.peerClusterZnode = znode;
+    String[] parts = znode.split("-", 2);
+    this.queueRecovered = parts.length != 1;
+    this.peerId = this.queueRecovered ?
+        parts[0] : peerClusterZnode;
+    if (parts.length >= 2) {
+      // extract dead servers
+      extractDeadServersFromZNodeString(parts[1], this.deadRegionServers);
+    }
+  }
+
+  /**
+   * Parse dead server names from znode string servername can contain "-" such as
+   * "ip-10-46-221-101.ec2.internal", so we need skip some "-" during parsing for the following
+   * cases: 2-ip-10-46-221-101.ec2.internal,52170,1364333181125-&lt;server name>-...
+   */
+  private static void
+      extractDeadServersFromZNodeString(String deadServerListStr, List<String> result) {
+
+    if(deadServerListStr == null || result == null || deadServerListStr.isEmpty()) return;
+
+    // valid server name delimiter "-" has to be after "," in a server name
+    int seenCommaCnt = 0;
+    int startIndex = 0;
+    int len = deadServerListStr.length();
+
+    for (int i = 0; i < len; i++) {
+      switch (deadServerListStr.charAt(i)) {
+      case ',':
+        seenCommaCnt += 1;
+        break;
+      case '-':
+        if(seenCommaCnt>=2) {
+          if (i > startIndex) {
+            String serverName = deadServerListStr.substring(startIndex, i);
+            if(ServerName.isFullServerName(serverName)){
+              result.add(serverName);
+            } else {
+              LOG.error("Found invalid server name:" + serverName);
+            }
+            startIndex = i + 1;
+          }
+          seenCommaCnt = 0;
+        }
+        break;
+      default:
+        break;
+      }
+    }
+
+    // add tail
+    if(startIndex < len - 1){
+      String serverName = deadServerListStr.substring(startIndex, len);
+      if(ServerName.isFullServerName(serverName)){
+        result.add(serverName);
+      } else {
+        LOG.error("Found invalid server name at the end:" + serverName);
+      }
+    }
+
+    LOG.debug("Found dead servers:" + result);
+  }
+
+  public List<String> getDeadRegionServers() {
+    return Collections.unmodifiableList(this.deadRegionServers);
+  }
+
+  public String getPeerId() {
+    return this.peerId;
+  }
+
+  public String getPeerClusterZnode() {
+    return this.peerClusterZnode;
+  }
+
+  public boolean isQueueRecovered() {
+    return queueRecovered;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
new file mode 100644
index 0000000..be5a590
--- /dev/null
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
@@ -0,0 +1,160 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.util.List;
+import java.util.SortedSet;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * This provides an interface for maintaining a region server's replication queues. These queues
+ * keep track of the WALs and HFile references (if hbase.replication.bulkload.enabled is enabled)
+ * that still need to be replicated to remote clusters.
+ */
+@InterfaceAudience.Private
+public interface ReplicationQueues {
+
+  /**
+   * Initialize the region server replication queue interface.
+   * @param serverName The server name of the region server that owns the replication queues this
+   *          interface manages.
+   */
+  void init(String serverName) throws ReplicationException;
+
+  /**
+   * Remove a replication queue.
+   * @param queueId a String that identifies the queue.
+   */
+  void removeQueue(String queueId);
+
+  /**
+   * Add a new WAL file to the given queue. If the queue does not exist it is created.
+   * @param queueId a String that identifies the queue.
+   * @param filename name of the WAL
+   */
+  void addLog(String queueId, String filename) throws ReplicationException;
+
+  /**
+   * Remove an WAL file from the given queue.
+   * @param queueId a String that identifies the queue.
+   * @param filename name of the WAL
+   */
+  void removeLog(String queueId, String filename);
+
+  /**
+   * Set the current position for a specific WAL in a given queue.
+   * @param queueId a String that identifies the queue
+   * @param filename name of the WAL
+   * @param position the current position in the file
+   */
+  void setLogPosition(String queueId, String filename, long position);
+
+  /**
+   * Get the current position for a specific WAL in a given queue.
+   * @param queueId a String that identifies the queue
+   * @param filename name of the WAL
+   * @return the current position in the file
+   */
+  long getLogPosition(String queueId, String filename) throws ReplicationException;
+
+  /**
+   * Remove all replication queues for this region server.
+   */
+  void removeAllQueues();
+
+  /**
+   * Get a list of all WALs in the given queue.
+   * @param queueId a String that identifies the queue
+   * @return a list of WALs, null if no such queue exists for this server
+   */
+  List<String> getLogsInQueue(String queueId);
+
+  /**
+   * Get a list of all queues for this region server.
+   * @return a list of queueIds, an empty list if this region server is dead and has no outstanding queues
+   */
+  List<String> getAllQueues();
+
+  /**
+   * Get queueIds from a dead region server, whose queues has not been claimed by other region
+   * servers.
+   * @return empty if the queue exists but no children, null if the queue does not exist.
+  */
+  List<String> getUnClaimedQueueIds(String regionserver);
+
+  /**
+   * Take ownership for the queue identified by queueId and belongs to a dead region server.
+   * @param regionserver the id of the dead region server
+   * @param queueId the id of the queue
+   * @return the new PeerId and A SortedSet of WALs in its queue, and null if no unclaimed queue.
+   */
+  Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId);
+
+  /**
+   * Remove the znode of region server if the queue is empty.
+   * @param regionserver
+   */
+  void removeReplicatorIfQueueIsEmpty(String regionserver);
+
+  /**
+   * Get a list of all region servers that have outstanding replication queues. These servers could
+   * be alive, dead or from a previous run of the cluster.
+   * @return a list of server names
+   */
+  List<String> getListOfReplicators();
+
+  /**
+   * Checks if the provided znode is the same as this region server's
+   * @param regionserver the id of the region server
+   * @return if this is this rs's znode
+   */
+  boolean isThisOurRegionServer(String regionserver);
+
+  /**
+   * Add a peer to hfile reference queue if peer does not exist.
+   * @param peerId peer cluster id to be added
+   * @throws ReplicationException if fails to add a peer id to hfile reference queue
+   */
+  void addPeerToHFileRefs(String peerId) throws ReplicationException;
+
+  /**
+   * Remove a peer from hfile reference queue.
+   * @param peerId peer cluster id to be removed
+   */
+  void removePeerFromHFileRefs(String peerId);
+
+  /**
+   * Add new hfile references to the queue.
+   * @param peerId peer cluster id to which the hfiles need to be replicated
+   * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which
+   *          will be added in the queue }
+   * @throws ReplicationException if fails to add a hfile reference
+   */
+  void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) throws ReplicationException;
+
+  /**
+   * Remove hfile references from the queue.
+   * @param peerId peer cluster id from which this hfile references needs to be removed
+   * @param files list of hfile references to be removed
+   */
+  void removeHFileRefs(String peerId, List<String> files);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java
new file mode 100644
index 0000000..12fc6a1
--- /dev/null
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+
+/**
+ * Wrapper around common arguments used to construct ReplicationQueues. Used to construct various
+ * ReplicationQueues Implementations with different constructor arguments by reflection.
+ */
+@InterfaceAudience.Private
+public class ReplicationQueuesArguments {
+
+  private ZooKeeperWatcher zk;
+  private Configuration conf;
+  private Abortable abort;
+
+  public ReplicationQueuesArguments(Configuration conf, Abortable abort) {
+    this.conf = conf;
+    this.abort = abort;
+  }
+
+  public ReplicationQueuesArguments(Configuration conf, Abortable abort, ZooKeeperWatcher zk) {
+    this(conf, abort);
+    setZk(zk);
+  }
+
+  public ZooKeeperWatcher getZk() {
+    return zk;
+  }
+
+  public void setZk(ZooKeeperWatcher zk) {
+    this.zk = zk;
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  public Abortable getAbortable() {
+    return abort;
+  }
+
+  public void setAbortable(Abortable abort) {
+    this.abort = abort;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
new file mode 100644
index 0000000..6d8900e
--- /dev/null
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
@@ -0,0 +1,93 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * This provides an interface for clients of replication to view replication queues. These queues
+ * keep track of the sources(WALs/HFile references) that still need to be replicated to remote
+ * clusters.
+ */
+@InterfaceAudience.Private
+public interface ReplicationQueuesClient {
+
+  /**
+   * Initialize the replication queue client interface.
+   */
+  public void init() throws ReplicationException;
+
+  /**
+   * Get a list of all region servers that have outstanding replication queues. These servers could
+   * be alive, dead or from a previous run of the cluster.
+   * @return a list of server names
+   * @throws KeeperException zookeeper exception
+   */
+  List<String> getListOfReplicators() throws KeeperException;
+
+  /**
+   * Get a list of all WALs in the given queue on the given region server.
+   * @param serverName the server name of the region server that owns the queue
+   * @param queueId a String that identifies the queue
+   * @return a list of WALs, null if this region server is dead and has no outstanding queues
+   * @throws KeeperException zookeeper exception
+   */
+  List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException;
+
+  /**
+   * Get a list of all queues for the specified region server.
+   * @param serverName the server name of the region server that owns the set of queues
+   * @return a list of queueIds, null if this region server is not a replicator.
+   */
+  List<String> getAllQueues(String serverName) throws KeeperException;
+
+  /**
+   * Load all wals in all replication queues from ZK. This method guarantees to return a
+   * snapshot which contains all WALs in the zookeeper at the start of this call even there
+   * is concurrent queue failover. However, some newly created WALs during the call may
+   * not be included.
+   */
+   Set<String> getAllWALs() throws KeeperException;
+
+  /**
+   * Get the change version number of replication hfile references node. This can be used as
+   * optimistic locking to get a consistent snapshot of the replication queues of hfile references.
+   * @return change version number of hfile references node
+   */
+  int getHFileRefsNodeChangeVersion() throws KeeperException;
+
+  /**
+   * Get list of all peers from hfile reference queue.
+   * @return a list of peer ids
+   * @throws KeeperException zookeeper exception
+   */
+  List<String> getAllPeersFromHFileRefsQueue() throws KeeperException;
+
+  /**
+   * Get a list of all hfile references in the given peer.
+   * @param peerId a String that identifies the peer
+   * @return a list of hfile references, null if not found any
+   * @throws KeeperException zookeeper exception
+   */
+  List<String> getReplicableHFiles(String peerId) throws KeeperException;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java
new file mode 100644
index 0000000..834f831
--- /dev/null
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+
+/**
+ * Wrapper around common arguments used to construct ReplicationQueuesClient. Used to construct
+ * various ReplicationQueuesClient Implementations with different constructor arguments by
+ * reflection.
+ */
+@InterfaceAudience.Private
+public class ReplicationQueuesClientArguments extends ReplicationQueuesArguments {
+  public ReplicationQueuesClientArguments(Configuration conf, Abortable abort,
+     ZooKeeperWatcher zk) {
+    super(conf, abort, zk);
+  }
+  public ReplicationQueuesClientArguments(Configuration conf, Abortable abort) {
+    super(conf, abort);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
new file mode 100644
index 0000000..1981131
--- /dev/null
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
@@ -0,0 +1,175 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+
+@InterfaceAudience.Private
+public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implements
+    ReplicationQueuesClient {
+
+  Log LOG = LogFactory.getLog(ReplicationQueuesClientZKImpl.class);
+
+  public ReplicationQueuesClientZKImpl(ReplicationQueuesClientArguments args) {
+    this(args.getZk(), args.getConf(), args.getAbortable());
+  }
+
+  public ReplicationQueuesClientZKImpl(final ZooKeeperWatcher zk, Configuration conf,
+      Abortable abortable) {
+    super(zk, conf, abortable);
+  }
+
+  @Override
+  public void init() throws ReplicationException {
+    try {
+      if (ZKUtil.checkExists(this.zookeeper, this.queuesZNode) < 0) {
+        ZKUtil.createWithParents(this.zookeeper, this.queuesZNode);
+      }
+    } catch (KeeperException e) {
+      throw new ReplicationException("Internal error while initializing a queues client", e);
+    }
+  }
+
+  @Override
+  public List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException {
+    String znode = ZKUtil.joinZNode(this.queuesZNode, serverName);
+    znode = ZKUtil.joinZNode(znode, queueId);
+    List<String> result = null;
+    try {
+      result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to get list of wals for queueId=" + queueId
+          + " and serverName=" + serverName, e);
+      throw e;
+    }
+    return result;
+  }
+
+  @Override
+  public List<String> getAllQueues(String serverName) throws KeeperException {
+    String znode = ZKUtil.joinZNode(this.queuesZNode, serverName);
+    List<String> result = null;
+    try {
+      result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to get list of queues for serverName=" + serverName, e);
+      throw e;
+    }
+    return result;
+  }
+
+  @Override
+  public Set<String> getAllWALs() throws KeeperException {
+    /**
+     * Load all wals in all replication queues from ZK. This method guarantees to return a
+     * snapshot which contains all WALs in the zookeeper at the start of this call even there
+     * is concurrent queue failover. However, some newly created WALs during the call may
+     * not be included.
+     */
+    for (int retry = 0; ; retry++) {
+      int v0 = getQueuesZNodeCversion();
+      List<String> rss = getListOfReplicators();
+      if (rss == null || rss.isEmpty()) {
+        LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
+        return ImmutableSet.of();
+      }
+      Set<String> wals = Sets.newHashSet();
+      for (String rs : rss) {
+        List<String> listOfPeers = getAllQueues(rs);
+        // if rs just died, this will be null
+        if (listOfPeers == null) {
+          continue;
+        }
+        for (String id : listOfPeers) {
+          List<String> peersWals = getLogsInQueue(rs, id);
+          if (peersWals != null) {
+            wals.addAll(peersWals);
+          }
+        }
+      }
+      int v1 = getQueuesZNodeCversion();
+      if (v0 == v1) {
+        return wals;
+      }
+      LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d",
+        v0, v1, retry));
+    }
+  }
+
+  public int getQueuesZNodeCversion() throws KeeperException {
+    try {
+      Stat stat = new Stat();
+      ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat);
+      return stat.getCversion();
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to get stat of replication rs node", e);
+      throw e;
+    }
+  }
+
+  @Override
+  public int getHFileRefsNodeChangeVersion() throws KeeperException {
+    Stat stat = new Stat();
+    try {
+      ZKUtil.getDataNoWatch(this.zookeeper, this.hfileRefsZNode, stat);
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to get stat of replication hfile references node.", e);
+      throw e;
+    }
+    return stat.getCversion();
+  }
+
+  @Override
+  public List<String> getAllPeersFromHFileRefsQueue() throws KeeperException {
+    List<String> result = null;
+    try {
+      result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.hfileRefsZNode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to get list of all peers in hfile references node.", e);
+      throw e;
+    }
+    return result;
+  }
+
+  @Override
+  public List<String> getReplicableHFiles(String peerId) throws KeeperException {
+    String znode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
+    List<String> result = null;
+    try {
+      result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to get list of hfile references for peerId=" + peerId, e);
+      throw e;
+    }
+    return result;
+  }
+}