You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/08/23 21:42:45 UTC

[4/4] hbase git commit: HBASE-17442 Move most of the replication related classes from hbase-client to new hbase-replication package. (Guanghao Zhang).

HBASE-17442 Move most of the replication related classes from hbase-client to new hbase-replication package. (Guanghao Zhang).

Change-Id: Ie0e24cc617ab4bf56de8b1747062d1b78a5d4669


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/26e6c2ce
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/26e6c2ce
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/26e6c2ce

Branch: refs/heads/master
Commit: 26e6c2ceb4db80e561ec321dba52673e7f285d1b
Parents: ae052e4
Author: Apekshit Sharma <ap...@apache.org>
Authored: Thu Aug 17 20:59:35 2017 -0700
Committer: Apekshit Sharma <ap...@apache.org>
Committed: Wed Aug 23 14:41:58 2017 -0700

----------------------------------------------------------------------
 .../client/replication/ReplicationAdmin.java    | 102 +---
 .../hbase/replication/ReplicationFactory.java   |  66 ---
 .../hbase/replication/ReplicationListener.java  |  51 --
 .../hbase/replication/ReplicationPeer.java      |  89 ---
 .../ReplicationPeerConfigListener.java          |  33 --
 .../replication/ReplicationPeerZKImpl.java      | 318 -----------
 .../hbase/replication/ReplicationPeers.java     | 177 ------
 .../replication/ReplicationPeersZKImpl.java     | 546 -------------------
 .../hbase/replication/ReplicationQueueInfo.java | 130 -----
 .../hbase/replication/ReplicationQueues.java    | 160 ------
 .../replication/ReplicationQueuesArguments.java |  70 ---
 .../replication/ReplicationQueuesClient.java    |  93 ----
 .../ReplicationQueuesClientArguments.java       |  40 --
 .../ReplicationQueuesClientZKImpl.java          | 175 ------
 .../replication/ReplicationQueuesZKImpl.java    | 407 --------------
 .../replication/ReplicationStateZKBase.java     | 155 ------
 .../hbase/replication/ReplicationTableBase.java | 441 ---------------
 .../hbase/replication/ReplicationTracker.java   |  49 --
 .../replication/ReplicationTrackerZKImpl.java   | 250 ---------
 .../TableBasedReplicationQueuesClientImpl.java  | 112 ----
 .../TableBasedReplicationQueuesImpl.java        | 450 ---------------
 .../apache/hadoop/hbase/zookeeper/ZKUtil.java   |  14 +-
 .../hadoop/hbase/zookeeper/ZNodePaths.java      |  22 +-
 hbase-replication/pom.xml                       | 264 +++++++++
 .../hbase/replication/ReplicationFactory.java   |  66 +++
 .../hbase/replication/ReplicationListener.java  |  51 ++
 .../hbase/replication/ReplicationPeer.java      |  89 +++
 .../ReplicationPeerConfigListener.java          |  33 ++
 .../replication/ReplicationPeerZKImpl.java      | 318 +++++++++++
 .../hbase/replication/ReplicationPeers.java     | 177 ++++++
 .../replication/ReplicationPeersZKImpl.java     | 546 +++++++++++++++++++
 .../hbase/replication/ReplicationQueueInfo.java | 130 +++++
 .../hbase/replication/ReplicationQueues.java    | 160 ++++++
 .../replication/ReplicationQueuesArguments.java |  70 +++
 .../replication/ReplicationQueuesClient.java    |  93 ++++
 .../ReplicationQueuesClientArguments.java       |  40 ++
 .../ReplicationQueuesClientZKImpl.java          | 175 ++++++
 .../replication/ReplicationQueuesZKImpl.java    | 407 ++++++++++++++
 .../replication/ReplicationStateZKBase.java     | 155 ++++++
 .../hbase/replication/ReplicationTableBase.java | 441 +++++++++++++++
 .../hbase/replication/ReplicationTracker.java   |  49 ++
 .../replication/ReplicationTrackerZKImpl.java   | 250 +++++++++
 .../TableBasedReplicationQueuesClientImpl.java  | 112 ++++
 .../TableBasedReplicationQueuesImpl.java        | 450 +++++++++++++++
 hbase-server/pom.xml                            |   4 +
 .../replication/BaseReplicationEndpoint.java    |   2 -
 pom.xml                                         |   6 +
 47 files changed, 4113 insertions(+), 3925 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index 752d18c..615a79d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -26,37 +26,22 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationFactory;
-import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
-import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
-import org.apache.hadoop.hbase.replication.ReplicationPeers;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 
 import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
@@ -101,16 +86,6 @@ public class ReplicationAdmin implements Closeable {
       Integer.toString(HConstants.REPLICATION_SCOPE_SERIAL);
 
   private final Connection connection;
-  // TODO: replication should be managed by master. All the classes except ReplicationAdmin should
-  // be moved to hbase-server. Resolve it in HBASE-11392.
-  private final ReplicationQueuesClient replicationQueuesClient;
-  private final ReplicationPeers replicationPeers;
-  /**
-   * A watcher used by replicationPeers and replicationQueuesClient. Keep reference so can dispose
-   * on {@link #close()}.
-   */
-  private final ZooKeeperWatcher zkw;
-
   private Admin admin;
 
   /**
@@ -122,49 +97,6 @@ public class ReplicationAdmin implements Closeable {
   public ReplicationAdmin(Configuration conf) throws IOException {
     this.connection = ConnectionFactory.createConnection(conf);
     admin = connection.getAdmin();
-    try {
-      zkw = createZooKeeperWatcher();
-      try {
-        this.replicationQueuesClient =
-            ReplicationFactory.getReplicationQueuesClient(new ReplicationQueuesClientArguments(conf,
-            this.connection, zkw));
-        this.replicationQueuesClient.init();
-        this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf,
-          this.replicationQueuesClient, this.connection);
-        this.replicationPeers.init();
-      } catch (Exception exception) {
-        if (zkw != null) {
-          zkw.close();
-        }
-        throw exception;
-      }
-    } catch (Exception exception) {
-      connection.close();
-      if (exception instanceof IOException) {
-        throw (IOException) exception;
-      } else if (exception instanceof RuntimeException) {
-        throw (RuntimeException) exception;
-      } else {
-        throw new IOException("Error initializing the replication admin client.", exception);
-      }
-    }
-  }
-
-  private ZooKeeperWatcher createZooKeeperWatcher() throws IOException {
-    // This Abortable doesn't 'abort'... it just logs.
-    return new ZooKeeperWatcher(connection.getConfiguration(), "ReplicationAdmin", new Abortable() {
-      @Override
-      public void abort(String why, Throwable e) {
-        LOG.error(why, e);
-        // We used to call system.exit here but this script can be embedded by other programs that
-        // want to do replication stuff... so inappropriate calling System.exit. Just log for now.
-      }
-
-      @Override
-      public boolean isAborted() {
-        return false;
-      }
-    });
   }
 
   /**
@@ -452,9 +384,6 @@ public class ReplicationAdmin implements Closeable {
 
   @Override
   public void close() throws IOException {
-    if (this.zkw != null) {
-      this.zkw.close();
-    }
     if (this.connection != null) {
       this.connection.close();
     }
@@ -518,40 +447,13 @@ public class ReplicationAdmin implements Closeable {
     admin.disableTableReplication(tableName);
   }
 
-  @VisibleForTesting
-  @Deprecated
-  public void peerAdded(String id) throws ReplicationException {
-    this.replicationPeers.peerConnected(id);
-  }
-
   /**
    * @deprecated use {@link org.apache.hadoop.hbase.client.Admin#listReplicationPeers()} instead
    */
   @VisibleForTesting
   @Deprecated
-  List<ReplicationPeer> listReplicationPeers() throws IOException {
-    Map<String, ReplicationPeerConfig> peers = listPeerConfigs();
-    if (peers == null || peers.size() <= 0) {
-      return null;
-    }
-    List<ReplicationPeer> listOfPeers = new ArrayList<>(peers.size());
-    for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) {
-      String peerId = peerEntry.getKey();
-      try {
-        Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId);
-        Configuration peerConf = pair.getSecond();
-        ReplicationPeer peer = new ReplicationPeerZKImpl(zkw, pair.getSecond(),
-          peerId, pair.getFirst(), this.connection);
-        listOfPeers.add(peer);
-      } catch (ReplicationException e) {
-        LOG.warn("Failed to get valid replication peers. "
-            + "Error connecting to peer cluster with peerId=" + peerId + ". Error message="
-            + e.getMessage());
-        LOG.debug("Failure details to get valid replication peers.", e);
-        continue;
-      }
-    }
-    return listOfPeers;
+  List<ReplicationPeerDescription> listReplicationPeers() throws IOException {
+    return admin.listReplicationPeers();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
deleted file mode 100644
index 8506cbb..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import org.apache.commons.lang.reflect.ConstructorUtils;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-
-/**
- * A factory class for instantiating replication objects that deal with replication state.
- */
-@InterfaceAudience.Private
-public class ReplicationFactory {
-
-  public static final Class defaultReplicationQueueClass = ReplicationQueuesZKImpl.class;
-
-  public static ReplicationQueues getReplicationQueues(ReplicationQueuesArguments args)
-      throws Exception {
-    Class<?> classToBuild = args.getConf().getClass("hbase.region.replica." +
-        "replication.replicationQueues.class", defaultReplicationQueueClass);
-    return (ReplicationQueues) ConstructorUtils.invokeConstructor(classToBuild, args);
-  }
-
-  public static ReplicationQueuesClient getReplicationQueuesClient(
-      ReplicationQueuesClientArguments args) throws Exception {
-    Class<?> classToBuild = args.getConf().getClass(
-      "hbase.region.replica.replication.replicationQueuesClient.class",
-      ReplicationQueuesClientZKImpl.class);
-    return (ReplicationQueuesClient) ConstructorUtils.invokeConstructor(classToBuild, args);
-  }
-
-  public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf,
-      Abortable abortable) {
-    return getReplicationPeers(zk, conf, null, abortable);
-  }
-
-  public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf,
-      final ReplicationQueuesClient queuesClient, Abortable abortable) {
-    return new ReplicationPeersZKImpl(zk, conf, queuesClient, abortable);
-  }
-
-  public static ReplicationTracker getReplicationTracker(ZooKeeperWatcher zookeeper,
-      final ReplicationPeers replicationPeers, Configuration conf, Abortable abortable,
-      Stoppable stopper) {
-    return new ReplicationTrackerZKImpl(zookeeper, replicationPeers, conf, abortable, stopper);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
deleted file mode 100644
index dfb5fdc..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.util.List;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-/**
- * The replication listener interface can be implemented if a class needs to subscribe to events
- * generated by the ReplicationTracker. These events include things like addition/deletion of peer
- * clusters or failure of a local region server. To receive events, the class also needs to register
- * itself with a Replication Tracker.
- */
-@InterfaceAudience.Private
-public interface ReplicationListener {
-
-  /**
-   * A region server has been removed from the local cluster
-   * @param regionServer the removed region server
-   */
-  public void regionServerRemoved(String regionServer);
-
-  /**
-   * A peer cluster has been removed (i.e. unregistered) from replication.
-   * @param peerId The peer id of the cluster that has been removed
-   */
-  public void peerRemoved(String peerId);
-
-  /**
-   * The list of registered peer clusters has changed.
-   * @param peerIds A list of all currently registered peer clusters
-   */
-  public void peerListChanged(List<String> peerIds);
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
deleted file mode 100644
index 4f18048..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-
-/**
- * ReplicationPeer manages enabled / disabled state for the peer.
- */
-@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
-public interface ReplicationPeer {
-
-  /**
-   * State of the peer, whether it is enabled or not
-   */
-  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
-  enum PeerState {
-    ENABLED,
-    DISABLED
-  }
-
-  /**
-   * Get the identifier of this peer
-   * @return string representation of the id
-   */
-  String getId();
-
-  /**
-   * Get the peer config object
-   * @return the ReplicationPeerConfig for this peer
-   */
-  public ReplicationPeerConfig getPeerConfig();
-
-  /**
-   * Returns the state of the peer
-   * @return the enabled state
-   */
-  PeerState getPeerState();
-
-  /**
-   * Get the configuration object required to communicate with this peer
-   * @return configuration object
-   */
-  public Configuration getConfiguration();
-
-  /**
-   * Get replicable (table, cf-list) map of this peer
-   * @return the replicable (table, cf-list) map
-   */
-  public Map<TableName, List<String>> getTableCFs();
-
-  /**
-   * Get replicable namespace set of this peer
-   * @return the replicable namespaces set
-   */
-  public Set<String> getNamespaces();
-
-  /**
-   * Get the per node bandwidth upper limit for this peer
-   * @return the bandwidth upper limit
-   */
-  public long getPeerBandwidth();
-
-  void trackPeerConfigChanges(ReplicationPeerConfigListener listener);
-
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java
deleted file mode 100644
index 4e04186..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.replication;
-
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
-public interface ReplicationPeerConfigListener {
-  /** Callback method for when users update the ReplicationPeerConfig for this peer
-   *
-   * @param rpc The updated ReplicationPeerConfig
-   */
-  void peerConfigUpdated(ReplicationPeerConfig rpc);
-
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
deleted file mode 100644
index 3973be9..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
+++ /dev/null
@@ -1,318 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-
-@InterfaceAudience.Private
-public class ReplicationPeerZKImpl extends ReplicationStateZKBase
-    implements ReplicationPeer, Abortable, Closeable {
-  private static final Log LOG = LogFactory.getLog(ReplicationPeerZKImpl.class);
-
-  private ReplicationPeerConfig peerConfig;
-  private final String id;
-  private volatile PeerState peerState;
-  private volatile Map<TableName, List<String>> tableCFs = new HashMap<>();
-  private final Configuration conf;
-  private PeerStateTracker peerStateTracker;
-  private PeerConfigTracker peerConfigTracker;
-
-
-  /**
-   * Constructor that takes all the objects required to communicate with the specified peer, except
-   * for the region server addresses.
-   * @param conf configuration object to this peer
-   * @param id string representation of this peer's identifier
-   * @param peerConfig configuration for the replication peer
-   */
-  public ReplicationPeerZKImpl(ZooKeeperWatcher zkWatcher, Configuration conf,
-                               String id, ReplicationPeerConfig peerConfig,
-                               Abortable abortable)
-      throws ReplicationException {
-    super(zkWatcher, conf, abortable);
-    this.conf = conf;
-    this.peerConfig = peerConfig;
-    this.id = id;
-  }
-
-  /**
-   * start a state tracker to check whether this peer is enabled or not
-   *
-   * @param peerStateNode path to zk node which stores peer state
-   * @throws KeeperException
-   */
-  public void startStateTracker(String peerStateNode)
-      throws KeeperException {
-    ensurePeerEnabled(peerStateNode);
-    this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this);
-    this.peerStateTracker.start();
-    try {
-      this.readPeerStateZnode();
-    } catch (DeserializationException e) {
-      throw ZKUtil.convert(e);
-    }
-  }
-
-  private void readPeerStateZnode() throws DeserializationException {
-    this.peerState =
-        isStateEnabled(this.peerStateTracker.getData(false))
-          ? PeerState.ENABLED
-          : PeerState.DISABLED;
-  }
-
-  /**
-   * start a table-cfs tracker to listen the (table, cf-list) map change
-   * @param peerConfigNode path to zk node which stores table-cfs
-   * @throws KeeperException
-   */
-  public void startPeerConfigTracker(String peerConfigNode)
-    throws KeeperException {
-    this.peerConfigTracker = new PeerConfigTracker(peerConfigNode, zookeeper,
-        this);
-    this.peerConfigTracker.start();
-    this.readPeerConfig();
-  }
-
-  private ReplicationPeerConfig readPeerConfig() {
-    try {
-      byte[] data = peerConfigTracker.getData(false);
-      if (data != null) {
-        this.peerConfig = ReplicationSerDeHelper.parsePeerFrom(data);
-      }
-    } catch (DeserializationException e) {
-      LOG.error("", e);
-    }
-    return this.peerConfig;
-  }
-
-  @Override
-  public PeerState getPeerState() {
-    return peerState;
-  }
-
-  /**
-   * Get the identifier of this peer
-   * @return string representation of the id (short)
-   */
-  @Override
-  public String getId() {
-    return id;
-  }
-
-  /**
-   * Get the peer config object
-   * @return the ReplicationPeerConfig for this peer
-   */
-  @Override
-  public ReplicationPeerConfig getPeerConfig() {
-    return peerConfig;
-  }
-
-  /**
-   * Get the configuration object required to communicate with this peer
-   * @return configuration object
-   */
-  @Override
-  public Configuration getConfiguration() {
-    return conf;
-  }
-
-  /**
-   * Get replicable (table, cf-list) map of this peer
-   * @return the replicable (table, cf-list) map
-   */
-  @Override
-  public Map<TableName, List<String>> getTableCFs() {
-    this.tableCFs = peerConfig.getTableCFsMap();
-    return this.tableCFs;
-  }
-
-  /**
-   * Get replicable namespace set of this peer
-   * @return the replicable namespaces set
-   */
-  @Override
-  public Set<String> getNamespaces() {
-    return this.peerConfig.getNamespaces();
-  }
-
-  @Override
-  public long getPeerBandwidth() {
-    return this.peerConfig.getBandwidth();
-  }
-
-  @Override
-  public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) {
-    if (this.peerConfigTracker != null){
-      this.peerConfigTracker.setListener(listener);
-    }
-  }
-
-  @Override
-  public void abort(String why, Throwable e) {
-    LOG.fatal("The ReplicationPeer corresponding to peer " + peerConfig
-        + " was aborted for the following reason(s):" + why, e);
-  }
-
-  @Override
-  public boolean isAborted() {
-    // Currently the replication peer is never "Aborted", we just log when the
-    // abort method is called.
-    return false;
-  }
-
-  @Override
-  public void close() throws IOException {
-    // TODO: stop zkw?
-  }
-
-  /**
-   * Parse the raw data from ZK to get a peer's state
-   * @param bytes raw ZK data
-   * @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state.
-   * @throws DeserializationException
-   */
-  public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
-    ReplicationProtos.ReplicationState.State state = parseStateFrom(bytes);
-    return ReplicationProtos.ReplicationState.State.ENABLED == state;
-  }
-
-  /**
-   * @param bytes Content of a state znode.
-   * @return State parsed from the passed bytes.
-   * @throws DeserializationException
-   */
-  private static ReplicationProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
-      throws DeserializationException {
-    ProtobufUtil.expectPBMagicPrefix(bytes);
-    int pblen = ProtobufUtil.lengthOfPBMagic();
-    ReplicationProtos.ReplicationState.Builder builder =
-        ReplicationProtos.ReplicationState.newBuilder();
-    ReplicationProtos.ReplicationState state;
-    try {
-      ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
-      state = builder.build();
-      return state.getState();
-    } catch (IOException e) {
-      throw new DeserializationException(e);
-    }
-  }
-
-  /**
-   * Utility method to ensure an ENABLED znode is in place; if not present, we create it.
-   * @param path Path to znode to check
-   * @return True if we created the znode.
-   * @throws NodeExistsException
-   * @throws KeeperException
-   */
-  private boolean ensurePeerEnabled(final String path)
-      throws NodeExistsException, KeeperException {
-    if (ZKUtil.checkExists(zookeeper, path) == -1) {
-      // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
-      // peer-state znode. This happens while adding a peer.
-      // The peer state data is set as "ENABLED" by default.
-      ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path,
-        ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * Tracker for state of this peer
-   */
-  public class PeerStateTracker extends ZooKeeperNodeTracker {
-
-    public PeerStateTracker(String peerStateZNode, ZooKeeperWatcher watcher,
-        Abortable abortable) {
-      super(watcher, peerStateZNode, abortable);
-    }
-
-    @Override
-    public synchronized void nodeDataChanged(String path) {
-      if (path.equals(node)) {
-        super.nodeDataChanged(path);
-        try {
-          readPeerStateZnode();
-        } catch (DeserializationException e) {
-          LOG.warn("Failed deserializing the content of " + path, e);
-        }
-      }
-    }
-  }
-
-  /**
-   * Tracker for PeerConfigNode of this peer
-   */
-  public class PeerConfigTracker extends ZooKeeperNodeTracker {
-
-    ReplicationPeerConfigListener listener;
-
-    public PeerConfigTracker(String peerConfigNode, ZooKeeperWatcher watcher,
-        Abortable abortable) {
-      super(watcher, peerConfigNode, abortable);
-    }
-
-    public synchronized void setListener(ReplicationPeerConfigListener listener){
-      this.listener = listener;
-    }
-
-    @Override
-    public synchronized void nodeCreated(String path) {
-      if (path.equals(node)) {
-        super.nodeCreated(path);
-        ReplicationPeerConfig config = readPeerConfig();
-        if (listener != null){
-          listener.peerConfigUpdated(config);
-        }
-      }
-    }
-
-    @Override
-    public synchronized void nodeDataChanged(String path) {
-      //superclass calls nodeCreated
-      if (path.equals(node)) {
-        super.nodeDataChanged(path);
-      }
-
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
deleted file mode 100644
index 2a7963a..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Pair;
-
-/**
- * This provides an interface for maintaining a set of peer clusters. These peers are remote slave
- * clusters that data is replicated to. A peer cluster can be in three different states:
- *
- * 1. Not-Registered - There is no notion of the peer cluster.
- * 2. Registered - The peer has an id and is being tracked but there is no connection.
- * 3. Connected - There is an active connection to the remote peer.
- *
- * In the registered or connected state, a peer cluster can either be enabled or disabled.
- */
-@InterfaceAudience.Private
-public interface ReplicationPeers {
-
-  /**
-   * Initialize the ReplicationPeers interface.
-   */
-  void init() throws ReplicationException;
-
-  /**
-   * Add a new remote slave cluster for replication.
-   * @param peerId a short that identifies the cluster
-   * @param peerConfig configuration for the replication slave cluster
-   */
-  void registerPeer(String peerId, ReplicationPeerConfig peerConfig)
-      throws ReplicationException;
-
-  /**
-   * Removes a remote slave cluster and stops the replication to it.
-   * @param peerId a short that identifies the cluster
-   */
-  void unregisterPeer(String peerId) throws ReplicationException;
-
-  /**
-   * Method called after a peer has been connected. It will create a ReplicationPeer to track the
-   * newly connected cluster.
-   * @param peerId a short that identifies the cluster
-   * @return whether a ReplicationPeer was successfully created
-   * @throws ReplicationException
-   */
-  boolean peerConnected(String peerId) throws ReplicationException;
-
-  /**
-   * Method called after a peer has been disconnected. It will remove the ReplicationPeer that
-   * tracked the disconnected cluster.
-   * @param peerId a short that identifies the cluster
-   */
-  void peerDisconnected(String peerId);
-
-  /**
-   * Restart the replication to the specified remote slave cluster.
-   * @param peerId a short that identifies the cluster
-   */
-  void enablePeer(String peerId) throws ReplicationException;
-
-  /**
-   * Stop the replication to the specified remote slave cluster.
-   * @param peerId a short that identifies the cluster
-   */
-  void disablePeer(String peerId) throws ReplicationException;
-
-  /**
-   * Get the table and column-family list string of the peer from the underlying storage.
-   * @param peerId a short that identifies the cluster
-   */
-  public Map<TableName, List<String>> getPeerTableCFsConfig(String peerId)
-      throws ReplicationException;
-
-  /**
-   * Set the table and column-family list string of the peer to the underlying storage.
-   * @param peerId a short that identifies the cluster
-   * @param tableCFs the table and column-family list which will be replicated for this peer
-   */
-  public void setPeerTableCFsConfig(String peerId,
-                                    Map<TableName, ? extends Collection<String>>  tableCFs)
-      throws ReplicationException;
-
-  /**
-   * Returns the ReplicationPeer for the specified connected peer. This ReplicationPeer will
-   * continue to track changes to the Peer's state and config. This method returns null if no
-   * peer has been connected with the given peerId.
-   * @param peerId id for the peer
-   * @return ReplicationPeer object
-   */
-  ReplicationPeer getConnectedPeer(String peerId);
-
-  /**
-   * Returns the set of peerIds of the clusters that have been connected and have an underlying
-   * ReplicationPeer.
-   * @return a Set of Strings for peerIds
-   */
-  public Set<String> getConnectedPeerIds();
-
-  /**
-   * Get the replication status for the specified connected remote slave cluster.
-   * The value might be read from cache, so it is recommended to
-   * use {@link #getStatusOfPeerFromBackingStore(String)}
-   * if reading the state after enabling or disabling it.
-   * @param peerId a short that identifies the cluster
-   * @return true if replication is enabled, false otherwise.
-   */
-  boolean getStatusOfPeer(String peerId);
-
-  /**
-   * Get the replication status for the specified remote slave cluster, which doesn't
-   * have to be connected. The state is read directly from the backing store.
-   * @param peerId a short that identifies the cluster
-   * @return true if replication is enabled, false otherwise.
-   * @throws ReplicationException thrown if there's an error contacting the store
-   */
-  boolean getStatusOfPeerFromBackingStore(String peerId) throws ReplicationException;
-
-  /**
-   * List the cluster replication configs of all remote slave clusters (whether they are
-   * enabled/disabled or connected/disconnected).
-   * @return A map of peer ids to peer cluster keys
-   */
-  Map<String, ReplicationPeerConfig> getAllPeerConfigs();
-
-  /**
-   * List the peer ids of all remote slave clusters (whether they are enabled/disabled or
-   * connected/disconnected).
-   * @return A list of peer ids
-   */
-  List<String> getAllPeerIds();
-
-  /**
-   * Returns the configured ReplicationPeerConfig for this peerId
-   * @param peerId a short name that identifies the cluster
-   * @return ReplicationPeerConfig for the peer
-   */
-  ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws ReplicationException;
-
-  /**
-   * Returns the configuration needed to talk to the remote slave cluster.
-   * @param peerId a short that identifies the cluster
-   * @return the configuration for the peer cluster, null if it was unable to get the configuration
-   */
-  Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId) throws ReplicationException;
-
-  /**
-   * Update the peerConfig for the a given peer cluster
-   * @param id a short that identifies the cluster
-   * @param peerConfig new config for the peer cluster
-   * @throws ReplicationException
-   */
-  void updatePeerConfig(String id, ReplicationPeerConfig peerConfig) throws ReplicationException;
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
deleted file mode 100644
index 751e454..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ /dev/null
@@ -1,546 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.CompoundConfiguration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
-import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.ZKConfig;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.KeeperException;
-
-/**
- * This class provides an implementation of the ReplicationPeers interface using ZooKeeper. The
- * peers znode contains a list of all peer replication clusters and the current replication state of
- * those clusters. It has one child peer znode for each peer cluster. The peer znode is named with
- * the cluster id provided by the user in the HBase shell. The value of the peer znode contains the
- * peers cluster key provided by the user in the HBase Shell. The cluster key contains a list of
- * zookeeper quorum peers, the client port for the zookeeper quorum, and the base znode for HBase.
- * For example:
- *
- *  /hbase/replication/peers/1 [Value: zk1.host.com,zk2.host.com,zk3.host.com:2181:/hbase]
- *  /hbase/replication/peers/2 [Value: zk5.host.com,zk6.host.com,zk7.host.com:2181:/hbase]
- *
- * Each of these peer znodes has a child znode that indicates whether or not replication is enabled
- * on that peer cluster. These peer-state znodes do not have child znodes and simply contain a
- * boolean value (i.e. ENABLED or DISABLED). This value is read/maintained by the
- * ReplicationPeer.PeerStateTracker class. For example:
- *
- * /hbase/replication/peers/1/peer-state [Value: ENABLED]
- *
- * Each of these peer znodes has a child znode that indicates which data will be replicated
- * to the peer cluster. These peer-tableCFs znodes do not have child znodes and only have a
- * table/cf list config. This value is read/maintained by the ReplicationPeer.TableCFsTracker
- * class. For example:
- *
- * /hbase/replication/peers/1/tableCFs [Value: "table1; table2:cf1,cf3; table3:cfx,cfy"]
- */
-@InterfaceAudience.Private
-public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers {
-
-  // Map of peer clusters keyed by their id
-  private Map<String, ReplicationPeerZKImpl> peerClusters;
-  private final ReplicationQueuesClient queuesClient;
-  private Abortable abortable;
-
-  private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class);
-
-  public ReplicationPeersZKImpl(final ZooKeeperWatcher zk, final Configuration conf,
-      final ReplicationQueuesClient queuesClient, Abortable abortable) {
-    super(zk, conf, abortable);
-    this.abortable = abortable;
-    this.peerClusters = new ConcurrentHashMap<>();
-    this.queuesClient = queuesClient;
-  }
-
-  @Override
-  public void init() throws ReplicationException {
-    try {
-      if (ZKUtil.checkExists(this.zookeeper, this.peersZNode) < 0) {
-        ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
-      }
-    } catch (KeeperException e) {
-      throw new ReplicationException("Could not initialize replication peers", e);
-    }
-    addExistingPeers();
-  }
-
-  @Override
-  public void registerPeer(String id, ReplicationPeerConfig peerConfig)
-      throws ReplicationException {
-    try {
-      if (peerExists(id)) {
-        throw new IllegalArgumentException("Cannot add a peer with id=" + id
-            + " because that id already exists.");
-      }
-
-      if(id.contains("-")){
-        throw new IllegalArgumentException("Found invalid peer name:" + id);
-      }
-
-      if (peerConfig.getClusterKey() != null) {
-        try {
-          ZKConfig.validateClusterKey(peerConfig.getClusterKey());
-        } catch (IOException ioe) {
-          throw new IllegalArgumentException(ioe.getMessage());
-        }
-      }
-
-      checkQueuesDeleted(id);
-
-      ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
-
-      List<ZKUtilOp> listOfOps = new ArrayList<>(2);
-      ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(getPeerNode(id),
-        ReplicationSerDeHelper.toByteArray(peerConfig));
-      // b/w PeerWatcher and ReplicationZookeeper#add method to create the
-      // peer-state znode. This happens while adding a peer
-      // The peer state data is set as "ENABLED" by default.
-      ZKUtilOp op2 = ZKUtilOp.createAndFailSilent(getPeerStateNode(id), ENABLED_ZNODE_BYTES);
-      listOfOps.add(op1);
-      listOfOps.add(op2);
-      ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
-      // A peer is enabled by default
-    } catch (KeeperException e) {
-      throw new ReplicationException("Could not add peer with id=" + id
-          + ", peerConfif=>" + peerConfig, e);
-    }
-  }
-
-  @Override
-  public void unregisterPeer(String id) throws ReplicationException {
-    try {
-      if (!peerExists(id)) {
-        throw new IllegalArgumentException("Cannot remove peer with id=" + id
-            + " because that id does not exist.");
-      }
-      ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
-    } catch (KeeperException e) {
-      throw new ReplicationException("Could not remove peer with id=" + id, e);
-    }
-  }
-
-  @Override
-  public void enablePeer(String id) throws ReplicationException {
-    changePeerState(id, ReplicationProtos.ReplicationState.State.ENABLED);
-    LOG.info("peer " + id + " is enabled");
-  }
-
-  @Override
-  public void disablePeer(String id) throws ReplicationException {
-    changePeerState(id, ReplicationProtos.ReplicationState.State.DISABLED);
-    LOG.info("peer " + id + " is disabled");
-  }
-
-  @Override
-  public Map<TableName, List<String>> getPeerTableCFsConfig(String id) throws ReplicationException {
-    try {
-      if (!peerExists(id)) {
-        throw new IllegalArgumentException("peer " + id + " doesn't exist");
-      }
-      try {
-        ReplicationPeerConfig rpc = getReplicationPeerConfig(id);
-        if (rpc == null) {
-          throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id);
-        }
-        return rpc.getTableCFsMap();
-      } catch (Exception e) {
-        throw new ReplicationException(e);
-      }
-    } catch (KeeperException e) {
-      throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id, e);
-    }
-  }
-
-  @Override
-  public void setPeerTableCFsConfig(String id,
-                                    Map<TableName, ? extends Collection<String>>  tableCFs)
-      throws ReplicationException {
-    try {
-      if (!peerExists(id)) {
-        throw new IllegalArgumentException("Cannot set peer tableCFs because id=" + id
-            + " does not exist.");
-      }
-      ReplicationPeerConfig rpc = getReplicationPeerConfig(id);
-      if (rpc == null) {
-        throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id);
-      }
-      rpc.setTableCFsMap(tableCFs);
-      ZKUtil.setData(this.zookeeper, getPeerNode(id),
-          ReplicationSerDeHelper.toByteArray(rpc));
-      LOG.info("Peer tableCFs with id= " + id + " is now " +
-        ReplicationSerDeHelper.convertToString(tableCFs));
-    } catch (KeeperException e) {
-      throw new ReplicationException("Unable to change tableCFs of the peer with id=" + id, e);
-    }
-  }
-
-  @Override
-  public boolean getStatusOfPeer(String id) {
-    ReplicationPeer replicationPeer = this.peerClusters.get(id);
-    if (replicationPeer == null) {
-      throw new IllegalArgumentException("Peer with id= " + id + " is not cached");
-    }
-    return replicationPeer.getPeerState() == PeerState.ENABLED;
-  }
-
-  @Override
-  public boolean getStatusOfPeerFromBackingStore(String id) throws ReplicationException {
-    try {
-      if (!peerExists(id)) {
-        throw new IllegalArgumentException("peer " + id + " doesn't exist");
-      }
-      String peerStateZNode = getPeerStateNode(id);
-      try {
-        return ReplicationPeerZKImpl.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
-      } catch (KeeperException e) {
-        throw new ReplicationException(e);
-      } catch (DeserializationException e) {
-        throw new ReplicationException(e);
-      }
-    } catch (KeeperException e) {
-      throw new ReplicationException("Unable to get status of the peer with id=" + id +
-          " from backing store", e);
-    } catch (InterruptedException e) {
-      throw new ReplicationException(e);
-    }
-  }
-
-  @Override
-  public Map<String, ReplicationPeerConfig> getAllPeerConfigs() {
-    Map<String, ReplicationPeerConfig> peers = new TreeMap<>();
-    List<String> ids = null;
-    try {
-      ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
-      for (String id : ids) {
-        ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
-        if (peerConfig == null) {
-          LOG.warn("Failed to get replication peer configuration of clusterid=" + id
-            + " znode content, continuing.");
-          continue;
-        }
-        peers.put(id, peerConfig);
-      }
-    } catch (KeeperException e) {
-      this.abortable.abort("Cannot get the list of peers ", e);
-    } catch (ReplicationException e) {
-      this.abortable.abort("Cannot get the list of peers ", e);
-    }
-    return peers;
-  }
-
-  @Override
-  public ReplicationPeer getConnectedPeer(String peerId) {
-    return peerClusters.get(peerId);
-  }
-
-  @Override
-  public Set<String> getConnectedPeerIds() {
-    return peerClusters.keySet(); // this is not thread-safe
-  }
-
-  /**
-   * Returns a ReplicationPeerConfig from the znode or null for the given peerId.
-   */
-  @Override
-  public ReplicationPeerConfig getReplicationPeerConfig(String peerId)
-      throws ReplicationException {
-    String znode = getPeerNode(peerId);
-    byte[] data = null;
-    try {
-      data = ZKUtil.getData(this.zookeeper, znode);
-    } catch (InterruptedException e) {
-      LOG.warn("Could not get configuration for peer because the thread " +
-          "was interrupted. peerId=" + peerId);
-      Thread.currentThread().interrupt();
-      return null;
-    } catch (KeeperException e) {
-      throw new ReplicationException("Error getting configuration for peer with id="
-          + peerId, e);
-    }
-    if (data == null) {
-      LOG.error("Could not get configuration for peer because it doesn't exist. peerId=" + peerId);
-      return null;
-    }
-
-    try {
-      return ReplicationSerDeHelper.parsePeerFrom(data);
-    } catch (DeserializationException e) {
-      LOG.warn("Failed to parse cluster key from peerId=" + peerId
-          + ", specifically the content from the following znode: " + znode);
-      return null;
-    }
-  }
-
-  @Override
-  public Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId)
-      throws ReplicationException {
-    ReplicationPeerConfig peerConfig = getReplicationPeerConfig(peerId);
-
-    if (peerConfig == null) {
-      return null;
-    }
-
-    Configuration otherConf;
-    try {
-      otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey());
-    } catch (IOException e) {
-      LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e);
-      return null;
-    }
-
-    if (!peerConfig.getConfiguration().isEmpty()) {
-      CompoundConfiguration compound = new CompoundConfiguration();
-      compound.add(otherConf);
-      compound.addStringMap(peerConfig.getConfiguration());
-      return new Pair<>(peerConfig, compound);
-    }
-
-    return new Pair<>(peerConfig, otherConf);
-  }
-
-  @Override
-  public void updatePeerConfig(String id, ReplicationPeerConfig newConfig)
-      throws ReplicationException {
-    ReplicationPeer peer = getConnectedPeer(id);
-    if (peer == null){
-      throw new ReplicationException("Could not find peer Id " + id + " in connected peers");
-    }
-    ReplicationPeerConfig existingConfig = peer.getPeerConfig();
-    if (newConfig.getClusterKey() != null && !newConfig.getClusterKey().isEmpty() &&
-        !newConfig.getClusterKey().equals(existingConfig.getClusterKey())){
-      throw new ReplicationException("Changing the cluster key on an existing peer is not allowed."
-          + " Existing key '" + existingConfig.getClusterKey() + "' does not match new key '"
-          + newConfig.getClusterKey() +
-      "'");
-    }
-    String existingEndpointImpl = existingConfig.getReplicationEndpointImpl();
-    if (newConfig.getReplicationEndpointImpl() != null &&
-        !newConfig.getReplicationEndpointImpl().isEmpty() &&
-        !newConfig.getReplicationEndpointImpl().equals(existingEndpointImpl)){
-      throw new ReplicationException("Changing the replication endpoint implementation class " +
-          "on an existing peer is not allowed. Existing class '"
-          + existingConfig.getReplicationEndpointImpl()
-          + "' does not match new class '" + newConfig.getReplicationEndpointImpl() + "'");
-    }
-    //Update existingConfig's peer config and peer data with the new values, but don't touch config
-    // or data that weren't explicitly changed
-    existingConfig.getConfiguration().putAll(newConfig.getConfiguration());
-    existingConfig.getPeerData().putAll(newConfig.getPeerData());
-    existingConfig.setTableCFsMap(newConfig.getTableCFsMap());
-    existingConfig.setNamespaces(newConfig.getNamespaces());
-    existingConfig.setBandwidth(newConfig.getBandwidth());
-
-    try {
-      ZKUtil.setData(this.zookeeper, getPeerNode(id),
-          ReplicationSerDeHelper.toByteArray(existingConfig));
-    }
-    catch(KeeperException ke){
-      throw new ReplicationException("There was a problem trying to save changes to the " +
-          "replication peer " + id, ke);
-    }
-  }
-
-  /**
-   * List all registered peer clusters and set a watch on their znodes.
-   */
-  @Override
-  public List<String> getAllPeerIds() {
-    List<String> ids = null;
-    try {
-      ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
-    } catch (KeeperException e) {
-      this.abortable.abort("Cannot get the list of peers ", e);
-    }
-    return ids;
-  }
-
-  /**
-   * A private method used during initialization. This method attempts to add all registered
-   * peer clusters. This method does not set a watch on the peer cluster znodes.
-   */
-  private void addExistingPeers() throws ReplicationException {
-    List<String> znodes = null;
-    try {
-      znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
-    } catch (KeeperException e) {
-      throw new ReplicationException("Error getting the list of peer clusters.", e);
-    }
-    if (znodes != null) {
-      for (String z : znodes) {
-        createAndAddPeer(z);
-      }
-    }
-  }
-
-  @Override
-  public boolean peerConnected(String peerId) throws ReplicationException {
-    return createAndAddPeer(peerId);
-  }
-
-  @Override
-  public void peerDisconnected(String peerId) {
-    ReplicationPeer rp = this.peerClusters.get(peerId);
-    if (rp != null) {
-      ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).remove(peerId, rp);
-    }
-  }
-
-  /**
-   * Attempt to connect to a new remote slave cluster.
-   * @param peerId a short that identifies the cluster
-   * @return true if a new connection was made, false if no new connection was made.
-   */
-  public boolean createAndAddPeer(String peerId) throws ReplicationException {
-    if (peerClusters == null) {
-      return false;
-    }
-    if (this.peerClusters.containsKey(peerId)) {
-      return false;
-    }
-
-    ReplicationPeerZKImpl peer = null;
-    try {
-      peer = createPeer(peerId);
-    } catch (Exception e) {
-      throw new ReplicationException("Error adding peer with id=" + peerId, e);
-    }
-    if (peer == null) {
-      return false;
-    }
-    ReplicationPeerZKImpl previous =
-      ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).putIfAbsent(peerId, peer);
-    if (previous == null) {
-      LOG.info("Added new peer cluster=" + peer.getPeerConfig().getClusterKey());
-    } else {
-      LOG.info("Peer already present, " + previous.getPeerConfig().getClusterKey() +
-        ", new cluster=" + peer.getPeerConfig().getClusterKey());
-    }
-    return true;
-  }
-
-  /**
-   * Update the state znode of a peer cluster.
-   * @param id
-   * @param state
-   */
-  private void changePeerState(String id, ReplicationProtos.ReplicationState.State state)
-      throws ReplicationException {
-    try {
-      if (!peerExists(id)) {
-        throw new IllegalArgumentException("Cannot enable/disable peer because id=" + id
-            + " does not exist.");
-      }
-      String peerStateZNode = getPeerStateNode(id);
-      byte[] stateBytes =
-          (state == ReplicationProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES
-              : DISABLED_ZNODE_BYTES;
-      if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
-        ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes);
-      } else {
-        ZKUtil.createAndWatch(this.zookeeper, peerStateZNode, stateBytes);
-      }
-      LOG.info("Peer with id= " + id + " is now " + state.name());
-    } catch (KeeperException e) {
-      throw new ReplicationException("Unable to change state of the peer with id=" + id, e);
-    }
-  }
-
-  /**
-   * Helper method to connect to a peer
-   * @param peerId peer's identifier
-   * @return object representing the peer
-   * @throws ReplicationException
-   */
-  private ReplicationPeerZKImpl createPeer(String peerId) throws ReplicationException {
-    Pair<ReplicationPeerConfig, Configuration> pair = getPeerConf(peerId);
-    if (pair == null) {
-      return null;
-    }
-    Configuration peerConf = pair.getSecond();
-
-    ReplicationPeerZKImpl peer = new ReplicationPeerZKImpl(zookeeper,
-        peerConf, peerId, pair.getFirst(), abortable);
-    try {
-      peer.startStateTracker(this.getPeerStateNode(peerId));
-    } catch (KeeperException e) {
-      throw new ReplicationException("Error starting the peer state tracker for peerId=" +
-          peerId, e);
-    }
-
-    try {
-      peer.startPeerConfigTracker(this.getPeerNode(peerId));
-    } catch (KeeperException e) {
-      throw new ReplicationException("Error starting the peer tableCFs tracker for peerId=" +
-          peerId, e);
-    }
-
-    return peer;
-  }
-
-  private void checkQueuesDeleted(String peerId) throws ReplicationException {
-    if (queuesClient == null) return;
-    try {
-      List<String> replicators = queuesClient.getListOfReplicators();
-      if (replicators == null || replicators.isEmpty()) {
-        return;
-      }
-      for (String replicator : replicators) {
-        List<String> queueIds = queuesClient.getAllQueues(replicator);
-        for (String queueId : queueIds) {
-          ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
-          if (queueInfo.getPeerId().equals(peerId)) {
-            throw new ReplicationException("undeleted queue for peerId: " + peerId
-                + ", replicator: " + replicator + ", queueId: " + queueId);
-          }
-        }
-      }
-      // Check for hfile-refs queue
-      if (-1 != ZKUtil.checkExists(zookeeper, hfileRefsZNode)
-          && queuesClient.getAllPeersFromHFileRefsQueue().contains(peerId)) {
-        throw new ReplicationException("Undeleted queue for peerId: " + peerId
-            + ", found in hfile-refs node path " + hfileRefsZNode);
-      }
-    } catch (KeeperException e) {
-      throw new ReplicationException("Could not check queues deleted with id=" + peerId, e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
deleted file mode 100644
index 1403f6d..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.ServerName;
-
-/**
- * This class is responsible for the parsing logic for a znode representing a queue.
- * It will extract the peerId if it's recovered as well as the dead region servers
- * that were part of the queue's history.
- */
-@InterfaceAudience.Private
-public class ReplicationQueueInfo {
-  private static final Log LOG = LogFactory.getLog(ReplicationQueueInfo.class);
-
-  private final String peerId;
-  private final String peerClusterZnode;
-  private boolean queueRecovered;
-  // List of all the dead region servers that had this queue (if recovered)
-  private List<String> deadRegionServers = new ArrayList<>();
-
-  /**
-   * The passed znode will be either the id of the peer cluster or
-   * the handling story of that queue in the form of id-servername-*
-   */
-  public ReplicationQueueInfo(String znode) {
-    this.peerClusterZnode = znode;
-    String[] parts = znode.split("-", 2);
-    this.queueRecovered = parts.length != 1;
-    this.peerId = this.queueRecovered ?
-        parts[0] : peerClusterZnode;
-    if (parts.length >= 2) {
-      // extract dead servers
-      extractDeadServersFromZNodeString(parts[1], this.deadRegionServers);
-    }
-  }
-
-  /**
-   * Parse dead server names from znode string servername can contain "-" such as
-   * "ip-10-46-221-101.ec2.internal", so we need skip some "-" during parsing for the following
-   * cases: 2-ip-10-46-221-101.ec2.internal,52170,1364333181125-&lt;server name>-...
-   */
-  private static void
-      extractDeadServersFromZNodeString(String deadServerListStr, List<String> result) {
-
-    if(deadServerListStr == null || result == null || deadServerListStr.isEmpty()) return;
-
-    // valid server name delimiter "-" has to be after "," in a server name
-    int seenCommaCnt = 0;
-    int startIndex = 0;
-    int len = deadServerListStr.length();
-
-    for (int i = 0; i < len; i++) {
-      switch (deadServerListStr.charAt(i)) {
-      case ',':
-        seenCommaCnt += 1;
-        break;
-      case '-':
-        if(seenCommaCnt>=2) {
-          if (i > startIndex) {
-            String serverName = deadServerListStr.substring(startIndex, i);
-            if(ServerName.isFullServerName(serverName)){
-              result.add(serverName);
-            } else {
-              LOG.error("Found invalid server name:" + serverName);
-            }
-            startIndex = i + 1;
-          }
-          seenCommaCnt = 0;
-        }
-        break;
-      default:
-        break;
-      }
-    }
-
-    // add tail
-    if(startIndex < len - 1){
-      String serverName = deadServerListStr.substring(startIndex, len);
-      if(ServerName.isFullServerName(serverName)){
-        result.add(serverName);
-      } else {
-        LOG.error("Found invalid server name at the end:" + serverName);
-      }
-    }
-
-    LOG.debug("Found dead servers:" + result);
-  }
-
-  public List<String> getDeadRegionServers() {
-    return Collections.unmodifiableList(this.deadRegionServers);
-  }
-
-  public String getPeerId() {
-    return this.peerId;
-  }
-
-  public String getPeerClusterZnode() {
-    return this.peerClusterZnode;
-  }
-
-  public boolean isQueueRecovered() {
-    return queueRecovered;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
deleted file mode 100644
index be5a590..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.util.List;
-import java.util.SortedSet;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Pair;
-
-/**
- * This provides an interface for maintaining a region server's replication queues. These queues
- * keep track of the WALs and HFile references (if hbase.replication.bulkload.enabled is enabled)
- * that still need to be replicated to remote clusters.
- */
-@InterfaceAudience.Private
-public interface ReplicationQueues {
-
-  /**
-   * Initialize the region server replication queue interface.
-   * @param serverName The server name of the region server that owns the replication queues this
-   *          interface manages.
-   */
-  void init(String serverName) throws ReplicationException;
-
-  /**
-   * Remove a replication queue.
-   * @param queueId a String that identifies the queue.
-   */
-  void removeQueue(String queueId);
-
-  /**
-   * Add a new WAL file to the given queue. If the queue does not exist it is created.
-   * @param queueId a String that identifies the queue.
-   * @param filename name of the WAL
-   */
-  void addLog(String queueId, String filename) throws ReplicationException;
-
-  /**
-   * Remove an WAL file from the given queue.
-   * @param queueId a String that identifies the queue.
-   * @param filename name of the WAL
-   */
-  void removeLog(String queueId, String filename);
-
-  /**
-   * Set the current position for a specific WAL in a given queue.
-   * @param queueId a String that identifies the queue
-   * @param filename name of the WAL
-   * @param position the current position in the file
-   */
-  void setLogPosition(String queueId, String filename, long position);
-
-  /**
-   * Get the current position for a specific WAL in a given queue.
-   * @param queueId a String that identifies the queue
-   * @param filename name of the WAL
-   * @return the current position in the file
-   */
-  long getLogPosition(String queueId, String filename) throws ReplicationException;
-
-  /**
-   * Remove all replication queues for this region server.
-   */
-  void removeAllQueues();
-
-  /**
-   * Get a list of all WALs in the given queue.
-   * @param queueId a String that identifies the queue
-   * @return a list of WALs, null if no such queue exists for this server
-   */
-  List<String> getLogsInQueue(String queueId);
-
-  /**
-   * Get a list of all queues for this region server.
-   * @return a list of queueIds, an empty list if this region server is dead and has no outstanding queues
-   */
-  List<String> getAllQueues();
-
-  /**
-   * Get queueIds from a dead region server, whose queues has not been claimed by other region
-   * servers.
-   * @return empty if the queue exists but no children, null if the queue does not exist.
-  */
-  List<String> getUnClaimedQueueIds(String regionserver);
-
-  /**
-   * Take ownership for the queue identified by queueId and belongs to a dead region server.
-   * @param regionserver the id of the dead region server
-   * @param queueId the id of the queue
-   * @return the new PeerId and A SortedSet of WALs in its queue, and null if no unclaimed queue.
-   */
-  Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId);
-
-  /**
-   * Remove the znode of region server if the queue is empty.
-   * @param regionserver
-   */
-  void removeReplicatorIfQueueIsEmpty(String regionserver);
-
-  /**
-   * Get a list of all region servers that have outstanding replication queues. These servers could
-   * be alive, dead or from a previous run of the cluster.
-   * @return a list of server names
-   */
-  List<String> getListOfReplicators();
-
-  /**
-   * Checks if the provided znode is the same as this region server's
-   * @param regionserver the id of the region server
-   * @return if this is this rs's znode
-   */
-  boolean isThisOurRegionServer(String regionserver);
-
-  /**
-   * Add a peer to hfile reference queue if peer does not exist.
-   * @param peerId peer cluster id to be added
-   * @throws ReplicationException if fails to add a peer id to hfile reference queue
-   */
-  void addPeerToHFileRefs(String peerId) throws ReplicationException;
-
-  /**
-   * Remove a peer from hfile reference queue.
-   * @param peerId peer cluster id to be removed
-   */
-  void removePeerFromHFileRefs(String peerId);
-
-  /**
-   * Add new hfile references to the queue.
-   * @param peerId peer cluster id to which the hfiles need to be replicated
-   * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which
-   *          will be added in the queue }
-   * @throws ReplicationException if fails to add a hfile reference
-   */
-  void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) throws ReplicationException;
-
-  /**
-   * Remove hfile references from the queue.
-   * @param peerId peer cluster id from which this hfile references needs to be removed
-   * @param files list of hfile references to be removed
-   */
-  void removeHFileRefs(String peerId, List<String> files);
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java
deleted file mode 100644
index 12fc6a1..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-
-/**
- * Wrapper around common arguments used to construct ReplicationQueues. Used to construct various
- * ReplicationQueues Implementations with different constructor arguments by reflection.
- */
-@InterfaceAudience.Private
-public class ReplicationQueuesArguments {
-
-  private ZooKeeperWatcher zk;
-  private Configuration conf;
-  private Abortable abort;
-
-  public ReplicationQueuesArguments(Configuration conf, Abortable abort) {
-    this.conf = conf;
-    this.abort = abort;
-  }
-
-  public ReplicationQueuesArguments(Configuration conf, Abortable abort, ZooKeeperWatcher zk) {
-    this(conf, abort);
-    setZk(zk);
-  }
-
-  public ZooKeeperWatcher getZk() {
-    return zk;
-  }
-
-  public void setZk(ZooKeeperWatcher zk) {
-    this.zk = zk;
-  }
-
-  public Configuration getConf() {
-    return conf;
-  }
-
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-  }
-
-  public Abortable getAbortable() {
-    return abort;
-  }
-
-  public void setAbortable(Abortable abort) {
-    this.abort = abort;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
deleted file mode 100644
index 6d8900e..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.util.List;
-import java.util.Set;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.zookeeper.KeeperException;
-
-/**
- * This provides an interface for clients of replication to view replication queues. These queues
- * keep track of the sources(WALs/HFile references) that still need to be replicated to remote
- * clusters.
- */
-@InterfaceAudience.Private
-public interface ReplicationQueuesClient {
-
-  /**
-   * Initialize the replication queue client interface.
-   */
-  public void init() throws ReplicationException;
-
-  /**
-   * Get a list of all region servers that have outstanding replication queues. These servers could
-   * be alive, dead or from a previous run of the cluster.
-   * @return a list of server names
-   * @throws KeeperException zookeeper exception
-   */
-  List<String> getListOfReplicators() throws KeeperException;
-
-  /**
-   * Get a list of all WALs in the given queue on the given region server.
-   * @param serverName the server name of the region server that owns the queue
-   * @param queueId a String that identifies the queue
-   * @return a list of WALs, null if this region server is dead and has no outstanding queues
-   * @throws KeeperException zookeeper exception
-   */
-  List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException;
-
-  /**
-   * Get a list of all queues for the specified region server.
-   * @param serverName the server name of the region server that owns the set of queues
-   * @return a list of queueIds, null if this region server is not a replicator.
-   */
-  List<String> getAllQueues(String serverName) throws KeeperException;
-
-  /**
-   * Load all wals in all replication queues from ZK. This method guarantees to return a
-   * snapshot which contains all WALs in the zookeeper at the start of this call even there
-   * is concurrent queue failover. However, some newly created WALs during the call may
-   * not be included.
-   */
-   Set<String> getAllWALs() throws KeeperException;
-
-  /**
-   * Get the change version number of replication hfile references node. This can be used as
-   * optimistic locking to get a consistent snapshot of the replication queues of hfile references.
-   * @return change version number of hfile references node
-   */
-  int getHFileRefsNodeChangeVersion() throws KeeperException;
-
-  /**
-   * Get list of all peers from hfile reference queue.
-   * @return a list of peer ids
-   * @throws KeeperException zookeeper exception
-   */
-  List<String> getAllPeersFromHFileRefsQueue() throws KeeperException;
-
-  /**
-   * Get a list of all hfile references in the given peer.
-   * @param peerId a String that identifies the peer
-   * @return a list of hfile references, null if not found any
-   * @throws KeeperException zookeeper exception
-   */
-  List<String> getReplicableHFiles(String peerId) throws KeeperException;
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java
deleted file mode 100644
index 834f831..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-
-/**
- * Wrapper around common arguments used to construct ReplicationQueuesClient. Used to construct
- * various ReplicationQueuesClient Implementations with different constructor arguments by
- * reflection.
- */
-@InterfaceAudience.Private
-public class ReplicationQueuesClientArguments extends ReplicationQueuesArguments {
-  public ReplicationQueuesClientArguments(Configuration conf, Abortable abort,
-     ZooKeeperWatcher zk) {
-    super(conf, abort, zk);
-  }
-  public ReplicationQueuesClientArguments(Configuration conf, Abortable abort) {
-    super(conf, abort);
-  }
-}