You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/12/28 00:54:30 UTC

[26/26] hbase git commit: HBASE-19617 Remove ReplicationQueues, use ReplicationQueueStorage directly

HBASE-19617 Remove ReplicationQueues, use ReplicationQueueStorage directly


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/264890a9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/264890a9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/264890a9

Branch: refs/heads/HBASE-19397
Commit: 264890a91d83b30fcbda91cc0350968972ede288
Parents: 28b93f4
Author: zhangduo <zh...@apache.org>
Authored: Wed Dec 27 22:03:51 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Thu Dec 28 08:52:14 2017 +0800

----------------------------------------------------------------------
 .../hbase/replication/ReplicationFactory.java   |   9 +-
 .../hbase/replication/ReplicationQueues.java    | 160 -------
 .../replication/ReplicationQueuesArguments.java |  70 ---
 .../replication/ReplicationQueuesZKImpl.java    | 407 -----------------
 .../hbase/replication/ReplicationTableBase.java | 442 -------------------
 .../replication/ReplicationTrackerZKImpl.java   |  21 +-
 .../replication/ZKReplicationQueueStorage.java  |  22 +
 .../replication/TestReplicationStateBasic.java  | 131 +++---
 .../replication/TestReplicationStateZKImpl.java |  41 +-
 .../regionserver/DumpReplicationQueues.java     |  15 +-
 .../RecoveredReplicationSource.java             |  17 +-
 .../RecoveredReplicationSourceShipper.java      |  22 +-
 .../replication/regionserver/Replication.java   |  38 +-
 .../regionserver/ReplicationSource.java         |  22 +-
 .../ReplicationSourceInterface.java             |  11 +-
 .../regionserver/ReplicationSourceManager.java  | 260 ++++++-----
 .../regionserver/ReplicationSyncUp.java         |  29 +-
 .../hbase/master/cleaner/TestLogsCleaner.java   |  12 +-
 .../cleaner/TestReplicationHFileCleaner.java    |  26 +-
 .../cleaner/TestReplicationZKNodeCleaner.java   |  22 +-
 .../replication/ReplicationSourceDummy.java     |   6 +-
 .../replication/TestReplicationSyncUpTool.java  |   6 +-
 .../TestReplicationSourceManager.java           |  98 ++--
 .../TestReplicationSourceManagerZkImpl.java     |  58 +--
 24 files changed, 381 insertions(+), 1564 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/264890a9/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
index 6c1c213..5e70e57 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
@@ -17,12 +17,11 @@
  */
 package org.apache.hadoop.hbase.replication;
 
-import org.apache.commons.lang3.reflect.ConstructorUtils;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * A factory class for instantiating replication objects that deal with replication state.
@@ -30,12 +29,6 @@ import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 @InterfaceAudience.Private
 public class ReplicationFactory {
 
-  public static ReplicationQueues getReplicationQueues(ReplicationQueuesArguments args)
-      throws Exception {
-    return (ReplicationQueues) ConstructorUtils.invokeConstructor(ReplicationQueuesZKImpl.class,
-      args);
-  }
-
   public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf,
       Abortable abortable) {
     return getReplicationPeers(zk, conf, null, abortable);

http://git-wip-us.apache.org/repos/asf/hbase/blob/264890a9/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
deleted file mode 100644
index 7f440b1..0000000
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.util.List;
-import java.util.SortedSet;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Pair;
-
-/**
- * This provides an interface for maintaining a region server's replication queues. These queues
- * keep track of the WALs and HFile references (if hbase.replication.bulkload.enabled is enabled)
- * that still need to be replicated to remote clusters.
- */
-@InterfaceAudience.Private
-public interface ReplicationQueues {
-
-  /**
-   * Initialize the region server replication queue interface.
-   * @param serverName The server name of the region server that owns the replication queues this
-   *          interface manages.
-   */
-  void init(String serverName) throws ReplicationException;
-
-  /**
-   * Remove a replication queue.
-   * @param queueId a String that identifies the queue.
-   */
-  void removeQueue(String queueId);
-
-  /**
-   * Add a new WAL file to the given queue. If the queue does not exist it is created.
-   * @param queueId a String that identifies the queue.
-   * @param filename name of the WAL
-   */
-  void addLog(String queueId, String filename) throws ReplicationException;
-
-  /**
-   * Remove an WAL file from the given queue.
-   * @param queueId a String that identifies the queue.
-   * @param filename name of the WAL
-   */
-  void removeLog(String queueId, String filename);
-
-  /**
-   * Set the current position for a specific WAL in a given queue.
-   * @param queueId a String that identifies the queue
-   * @param filename name of the WAL
-   * @param position the current position in the file
-   */
-  void setLogPosition(String queueId, String filename, long position);
-
-  /**
-   * Get the current position for a specific WAL in a given queue.
-   * @param queueId a String that identifies the queue
-   * @param filename name of the WAL
-   * @return the current position in the file
-   */
-  long getLogPosition(String queueId, String filename) throws ReplicationException;
-
-  /**
-   * Remove all replication queues for this region server.
-   */
-  void removeAllQueues();
-
-  /**
-   * Get a list of all WALs in the given queue.
-   * @param queueId a String that identifies the queue
-   * @return a list of WALs, null if no such queue exists for this server
-   */
-  List<String> getLogsInQueue(String queueId);
-
-  /**
-   * Get a list of all queues for this region server.
-   * @return a list of queueIds, an empty list if this region server is dead and has no outstanding queues
-   */
-  List<String> getAllQueues();
-
-  /**
-   * Get queueIds from a dead region server, whose queues has not been claimed by other region
-   * servers.
-   * @return empty if the queue exists but no children, null if the queue does not exist.
-  */
-  List<String> getUnClaimedQueueIds(String regionserver);
-
-  /**
-   * Take ownership for the queue identified by queueId and belongs to a dead region server.
-   * @param regionserver the id of the dead region server
-   * @param queueId the id of the queue
-   * @return the new PeerId and A SortedSet of WALs in its queue, and null if no unclaimed queue.
-   */
-  Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId);
-
-  /**
-   * Remove the znode of region server if the queue is empty.
-   * @param regionserver
-   */
-  void removeReplicatorIfQueueIsEmpty(String regionserver);
-
-  /**
-   * Get a list of all region servers that have outstanding replication queues. These servers could
-   * be alive, dead or from a previous run of the cluster.
-   * @return a list of server names
-   */
-  List<String> getListOfReplicators();
-
-  /**
-   * Checks if the provided znode is the same as this region server's
-   * @param regionserver the id of the region server
-   * @return if this is this rs's znode
-   */
-  boolean isThisOurRegionServer(String regionserver);
-
-  /**
-   * Add a peer to hfile reference queue if peer does not exist.
-   * @param peerId peer cluster id to be added
-   * @throws ReplicationException if fails to add a peer id to hfile reference queue
-   */
-  void addPeerToHFileRefs(String peerId) throws ReplicationException;
-
-  /**
-   * Remove a peer from hfile reference queue.
-   * @param peerId peer cluster id to be removed
-   */
-  void removePeerFromHFileRefs(String peerId);
-
-  /**
-   * Add new hfile references to the queue.
-   * @param peerId peer cluster id to which the hfiles need to be replicated
-   * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which
-   *          will be added in the queue }
-   * @throws ReplicationException if fails to add a hfile reference
-   */
-  void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) throws ReplicationException;
-
-  /**
-   * Remove hfile references from the queue.
-   * @param peerId peer cluster id from which this hfile references needs to be removed
-   * @param files list of hfile references to be removed
-   */
-  void removeHFileRefs(String peerId, List<String> files);
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/264890a9/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java
deleted file mode 100644
index c2a5df3..0000000
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * Wrapper around common arguments used to construct ReplicationQueues. Used to construct various
- * ReplicationQueues Implementations with different constructor arguments by reflection.
- */
-@InterfaceAudience.Private
-public class ReplicationQueuesArguments {
-
-  private ZKWatcher zk;
-  private Configuration conf;
-  private Abortable abort;
-
-  public ReplicationQueuesArguments(Configuration conf, Abortable abort) {
-    this.conf = conf;
-    this.abort = abort;
-  }
-
-  public ReplicationQueuesArguments(Configuration conf, Abortable abort, ZKWatcher zk) {
-    this(conf, abort);
-    setZk(zk);
-  }
-
-  public ZKWatcher getZk() {
-    return zk;
-  }
-
-  public void setZk(ZKWatcher zk) {
-    this.zk = zk;
-  }
-
-  public Configuration getConf() {
-    return conf;
-  }
-
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-  }
-
-  public Abortable getAbortable() {
-    return abort;
-  }
-
-  public void setAbortable(Abortable abort) {
-    this.abort = abort;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/264890a9/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
deleted file mode 100644
index 7551cb7..0000000
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ /dev/null
@@ -1,407 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class provides an implementation of the
- * interface using ZooKeeper. The
- * base znode that this class works at is the myQueuesZnode. The myQueuesZnode contains a list of
- * all outstanding WAL files on this region server that need to be replicated. The myQueuesZnode is
- * the regionserver name (a concatenation of the region server’s hostname, client port and start
- * code). For example:
- *
- * /hbase/replication/rs/hostname.example.org,6020,1234
- *
- * Within this znode, the region server maintains a set of WAL replication queues. These queues are
- * represented by child znodes named using there give queue id. For example:
- *
- * /hbase/replication/rs/hostname.example.org,6020,1234/1
- * /hbase/replication/rs/hostname.example.org,6020,1234/2
- *
- * Each queue has one child znode for every WAL that still needs to be replicated. The value of
- * these WAL child znodes is the latest position that has been replicated. This position is updated
- * every time a WAL entry is replicated. For example:
- *
- * /hbase/replication/rs/hostname.example.org,6020,1234/1/23522342.23422 [VALUE: 254]
- */
-@InterfaceAudience.Private
-public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements ReplicationQueues {
-
-  /** Znode containing all replication queues for this region server. */
-  private String myQueuesZnode;
-
-  private static final Logger LOG = LoggerFactory.getLogger(ReplicationQueuesZKImpl.class);
-
-  public ReplicationQueuesZKImpl(ReplicationQueuesArguments args) {
-    this(args.getZk(), args.getConf(), args.getAbortable());
-  }
-
-  public ReplicationQueuesZKImpl(final ZKWatcher zk, Configuration conf,
-                                 Abortable abortable) {
-    super(zk, conf, abortable);
-  }
-
-  @Override
-  public void init(String serverName) throws ReplicationException {
-    this.myQueuesZnode = ZNodePaths.joinZNode(this.queuesZNode, serverName);
-    try {
-      if (ZKUtil.checkExists(this.zookeeper, this.myQueuesZnode) < 0) {
-        ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode);
-      }
-    } catch (KeeperException e) {
-      throw new ReplicationException("Could not initialize replication queues.", e);
-    }
-    if (conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
-      HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT)) {
-      try {
-        if (ZKUtil.checkExists(this.zookeeper, this.hfileRefsZNode) < 0) {
-          ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode);
-        }
-      } catch (KeeperException e) {
-        throw new ReplicationException("Could not initialize hfile references replication queue.",
-            e);
-      }
-    }
-  }
-
-  @Override
-  public void removeQueue(String queueId) {
-    try {
-      ZKUtil.deleteNodeRecursively(this.zookeeper,
-        ZNodePaths.joinZNode(this.myQueuesZnode, queueId));
-    } catch (KeeperException e) {
-      this.abortable.abort("Failed to delete queue (queueId=" + queueId + ")", e);
-    }
-  }
-
-  @Override
-  public void addLog(String queueId, String filename) throws ReplicationException {
-    String znode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId);
-    znode = ZNodePaths.joinZNode(znode, filename);
-    try {
-      ZKUtil.createWithParents(this.zookeeper, znode);
-    } catch (KeeperException e) {
-      throw new ReplicationException(
-          "Could not add log because znode could not be created. queueId=" + queueId
-              + ", filename=" + filename);
-    }
-  }
-
-  @Override
-  public void removeLog(String queueId, String filename) {
-    try {
-      String znode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId);
-      znode = ZNodePaths.joinZNode(znode, filename);
-      ZKUtil.deleteNode(this.zookeeper, znode);
-    } catch (KeeperException e) {
-      this.abortable.abort("Failed to remove wal from queue (queueId=" + queueId + ", filename="
-          + filename + ")", e);
-    }
-  }
-
-  @Override
-  public void setLogPosition(String queueId, String filename, long position) {
-    try {
-      String znode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId);
-      znode = ZNodePaths.joinZNode(znode, filename);
-      // Why serialize String of Long and not Long as bytes?
-      ZKUtil.setData(this.zookeeper, znode, ZKUtil.positionToByteArray(position));
-    } catch (KeeperException e) {
-      this.abortable.abort("Failed to write replication wal position (filename=" + filename
-          + ", position=" + position + ")", e);
-    }
-  }
-
-  @Override
-  public long getLogPosition(String queueId, String filename) throws ReplicationException {
-    String clusterZnode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId);
-    String znode = ZNodePaths.joinZNode(clusterZnode, filename);
-    byte[] bytes = null;
-    try {
-      bytes = ZKUtil.getData(this.zookeeper, znode);
-    } catch (KeeperException e) {
-      throw new ReplicationException("Internal Error: could not get position in log for queueId="
-          + queueId + ", filename=" + filename, e);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      return 0;
-    }
-    try {
-      return ZKUtil.parseWALPositionFrom(bytes);
-    } catch (DeserializationException de) {
-      LOG.warn("Failed to parse WALPosition for queueId=" + queueId + " and wal=" + filename
-          + " znode content, continuing.");
-    }
-    // if we can not parse the position, start at the beginning of the wal file
-    // again
-    return 0;
-  }
-
-  @Override
-  public boolean isThisOurRegionServer(String regionserver) {
-    return ZNodePaths.joinZNode(this.queuesZNode, regionserver).equals(this.myQueuesZnode);
-  }
-
-  @Override
-  public List<String> getUnClaimedQueueIds(String regionserver) {
-    if (isThisOurRegionServer(regionserver)) {
-      return null;
-    }
-    String rsZnodePath = ZNodePaths.joinZNode(this.queuesZNode, regionserver);
-    List<String> queues = null;
-    try {
-      queues = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZnodePath);
-    } catch (KeeperException e) {
-      this.abortable.abort("Failed to getUnClaimedQueueIds for RS" + regionserver, e);
-    }
-    return queues;
-  }
-
-  @Override
-  public Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId) {
-    LOG.info("Atomically moving " + regionserver + "/" + queueId + "'s WALs to my queue");
-    return moveQueueUsingMulti(regionserver, queueId);
-  }
-
-  @Override
-  public void removeReplicatorIfQueueIsEmpty(String regionserver) {
-    String rsPath = ZNodePaths.joinZNode(this.queuesZNode, regionserver);
-    try {
-      List<String> list = ZKUtil.listChildrenNoWatch(this.zookeeper, rsPath);
-      if (list != null && list.isEmpty()){
-        ZKUtil.deleteNode(this.zookeeper, rsPath);
-      }
-    } catch (KeeperException e) {
-      LOG.warn("Got error while removing replicator", e);
-    }
-  }
-
-  @Override
-  public void removeAllQueues() {
-    try {
-      ZKUtil.deleteNodeRecursively(this.zookeeper, this.myQueuesZnode);
-    } catch (KeeperException e) {
-      // if the znode is already expired, don't bother going further
-      if (e instanceof KeeperException.SessionExpiredException) {
-        return;
-      }
-      this.abortable.abort("Failed to delete replication queues for region server: "
-          + this.myQueuesZnode, e);
-    }
-  }
-
-  @Override
-  public List<String> getLogsInQueue(String queueId) {
-    String znode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId);
-    List<String> result = null;
-    try {
-      result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
-    } catch (KeeperException e) {
-      this.abortable.abort("Failed to get list of wals for queueId=" + queueId, e);
-    }
-    return result;
-  }
-
-  @Override
-  public List<String> getAllQueues() {
-    List<String> listOfQueues = null;
-    try {
-      listOfQueues = ZKUtil.listChildrenNoWatch(this.zookeeper, this.myQueuesZnode);
-    } catch (KeeperException e) {
-      this.abortable.abort("Failed to get a list of queues for region server: "
-          + this.myQueuesZnode, e);
-    }
-    return listOfQueues == null ? new ArrayList<>() : listOfQueues;
-  }
-
-  /**
-   * It "atomically" copies one peer's wals queue from another dead region server and returns them
-   * all sorted. The new peer id is equal to the old peer id appended with the dead server's znode.
-   * @param znode pertaining to the region server to copy the queues from
-   * @peerId peerId pertaining to the queue need to be copied
-   */
-  private Pair<String, SortedSet<String>> moveQueueUsingMulti(String znode, String peerId) {
-    try {
-      // hbase/replication/rs/deadrs
-      String deadRSZnodePath = ZNodePaths.joinZNode(this.queuesZNode, znode);
-      List<ZKUtilOp> listOfOps = new ArrayList<>();
-      ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
-
-      String newPeerId = peerId + "-" + znode;
-      String newPeerZnode = ZNodePaths.joinZNode(this.myQueuesZnode, newPeerId);
-      // check the logs queue for the old peer cluster
-      String oldClusterZnode = ZNodePaths.joinZNode(deadRSZnodePath, peerId);
-      List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
-
-      if (!peerExists(replicationQueueInfo.getPeerId())) {
-        LOG.warn("Peer " + replicationQueueInfo.getPeerId() +
-                " didn't exist, will move its queue to avoid the failure of multi op");
-        for (String wal : wals) {
-          String oldWalZnode = ZNodePaths.joinZNode(oldClusterZnode, wal);
-          listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode));
-        }
-        listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
-        ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
-        return null;
-      }
-
-      SortedSet<String> logQueue = new TreeSet<>();
-      if (wals == null || wals.isEmpty()) {
-        listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
-      } else {
-        // create the new cluster znode
-        ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY);
-        listOfOps.add(op);
-        // get the offset of the logs and set it to new znodes
-        for (String wal : wals) {
-          String oldWalZnode = ZNodePaths.joinZNode(oldClusterZnode, wal);
-          byte[] logOffset = ZKUtil.getData(this.zookeeper, oldWalZnode);
-          LOG.debug("Creating " + wal + " with data " + Bytes.toString(logOffset));
-          String newLogZnode = ZNodePaths.joinZNode(newPeerZnode, wal);
-          listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset));
-          listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode));
-          logQueue.add(wal);
-        }
-        // add delete op for peer
-        listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
-
-        if (LOG.isTraceEnabled())
-          LOG.trace(" The multi list size is: " + listOfOps.size());
-      }
-      ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
-
-      LOG.info("Atomically moved " + znode + "/" + peerId + "'s WALs to my queue");
-      return new Pair<>(newPeerId, logQueue);
-    } catch (KeeperException e) {
-      // Multi call failed; it looks like some other regionserver took away the logs.
-      LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
-    } catch (InterruptedException e) {
-      LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
-      Thread.currentThread().interrupt();
-    }
-    return null;
-  }
-
-  @Override
-  public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
-      throws ReplicationException {
-    String peerZnode = ZNodePaths.joinZNode(this.hfileRefsZNode, peerId);
-    boolean debugEnabled = LOG.isDebugEnabled();
-    if (debugEnabled) {
-      LOG.debug("Adding hfile references " + pairs + " in queue " + peerZnode);
-    }
-
-    int size = pairs.size();
-    List<ZKUtilOp> listOfOps = new ArrayList<>(size);
-
-    for (int i = 0; i < size; i++) {
-      listOfOps.add(ZKUtilOp.createAndFailSilent(
-        ZNodePaths.joinZNode(peerZnode, pairs.get(i).getSecond().getName()),
-        HConstants.EMPTY_BYTE_ARRAY));
-    }
-    if (debugEnabled) {
-      LOG.debug(" The multi list size for adding hfile references in zk for node " + peerZnode
-          + " is " + listOfOps.size());
-    }
-    try {
-      ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
-    } catch (KeeperException e) {
-      throw new ReplicationException("Failed to create hfile reference znode=" + e.getPath(), e);
-    }
-  }
-
-  @Override
-  public void removeHFileRefs(String peerId, List<String> files) {
-    String peerZnode = ZNodePaths.joinZNode(this.hfileRefsZNode, peerId);
-    boolean debugEnabled = LOG.isDebugEnabled();
-    if (debugEnabled) {
-      LOG.debug("Removing hfile references " + files + " from queue " + peerZnode);
-    }
-
-    int size = files.size();
-    List<ZKUtilOp> listOfOps = new ArrayList<>(size);
-
-    for (int i = 0; i < size; i++) {
-      listOfOps.add(ZKUtilOp.deleteNodeFailSilent(ZNodePaths.joinZNode(peerZnode, files.get(i))));
-    }
-    if (debugEnabled) {
-      LOG.debug(" The multi list size for removing hfile references in zk for node " + peerZnode
-          + " is " + listOfOps.size());
-    }
-    try {
-      ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
-    } catch (KeeperException e) {
-      LOG.error("Failed to remove hfile reference znode=" + e.getPath(), e);
-    }
-  }
-
-  @Override
-  public void addPeerToHFileRefs(String peerId) throws ReplicationException {
-    String peerZnode = ZNodePaths.joinZNode(this.hfileRefsZNode, peerId);
-    try {
-      if (ZKUtil.checkExists(this.zookeeper, peerZnode) == -1) {
-        LOG.info("Adding peer " + peerId + " to hfile reference queue.");
-        ZKUtil.createWithParents(this.zookeeper, peerZnode);
-      }
-    } catch (KeeperException e) {
-      throw new ReplicationException("Failed to add peer " + peerId + " to hfile reference queue.",
-          e);
-    }
-  }
-
-  @Override
-  public void removePeerFromHFileRefs(String peerId) {
-    final String peerZnode = ZNodePaths.joinZNode(this.hfileRefsZNode, peerId);
-    try {
-      if (ZKUtil.checkExists(this.zookeeper, peerZnode) == -1) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Peer " + peerZnode + " not found in hfile reference queue.");
-        }
-        return;
-      } else {
-        LOG.info("Removing peer " + peerZnode + " from hfile reference queue.");
-        ZKUtil.deleteNodeRecursively(this.zookeeper, peerZnode);
-      }
-    } catch (KeeperException e) {
-      LOG.error("Ignoring the exception to remove peer " + peerId + " from hfile reference queue.",
-        e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/264890a9/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java
deleted file mode 100644
index 86124bc..0000000
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java
+++ /dev/null
@@ -1,442 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.hadoop.hbase.replication;
-
-import org.apache.hadoop.hbase.CompareOperator;
-import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.NamespaceDescriptor;
-import org.apache.hadoop.hbase.TableExistsException;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.filter.CompareFilter;
-import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
-import org.apache.hadoop.hbase.regionserver.BloomType;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.RetryCounter;
-import org.apache.hadoop.hbase.util.RetryCounterFactory;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-/*
- * Abstract class that provides an interface to the Replication Table. Which is currently
- * being used for WAL offset tracking.
- * The basic schema of this table will store each individual queue as a
- * seperate row. The row key will be a unique identifier of the creating server's name and the
- * queueId. Each queue must have the following two columns:
- *  COL_QUEUE_OWNER: tracks which server is currently responsible for tracking the queue
- *  COL_QUEUE_OWNER_HISTORY: a "|" delimited list of the previous server's that have owned this
- *    queue. The most recent previous owner is the leftmost entry.
- * They will also have columns mapping [WAL filename : offset]
- * The most flexible method of interacting with the Replication Table is by calling
- * getOrBlockOnReplicationTable() which will return a new copy of the Replication Table. It is up
- * to the caller to close the returned table.
- */
-@InterfaceAudience.Private
-abstract class ReplicationTableBase {
-
-  /** Name of the HBase Table used for tracking replication*/
-  public static final TableName REPLICATION_TABLE_NAME =
-    TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication");
-
-  // Column family and column names for Queues in the Replication Table
-  public static final byte[] CF_QUEUE = Bytes.toBytes("q");
-  public static final byte[] COL_QUEUE_OWNER = Bytes.toBytes("o");
-  public static final byte[] COL_QUEUE_OWNER_HISTORY = Bytes.toBytes("h");
-
-  // Column Descriptor for the Replication Table
-  private static final HColumnDescriptor REPLICATION_COL_DESCRIPTOR =
-    new HColumnDescriptor(CF_QUEUE).setMaxVersions(1)
-      .setInMemory(true)
-      .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
-        // TODO: Figure out which bloom filter to use
-      .setBloomFilterType(BloomType.NONE);
-
-  // The value used to delimit the queueId and server name inside of a queue's row key. Currently a
-  // hyphen, because it is guaranteed that queueId (which is a cluster id) cannot contain hyphens.
-  // See HBASE-11394.
-  public static final String ROW_KEY_DELIMITER = "-";
-
-  // The value used to delimit server names in the queue history list
-  public static final String QUEUE_HISTORY_DELIMITER = "|";
-
-  /*
-  * Make sure that HBase table operations for replication have a high number of retries. This is
-  * because the server is aborted if any HBase table operation fails. Each RPC will be attempted
-  * 3600 times before exiting. This provides each operation with 2 hours of retries
-  * before the server is aborted.
-  */
-  private static final int CLIENT_RETRIES = 3600;
-  private static final int RPC_TIMEOUT = 2000;
-  private static final int OPERATION_TIMEOUT = CLIENT_RETRIES * RPC_TIMEOUT;
-
-  // We only need a single thread to initialize the Replication Table
-  private static final int NUM_INITIALIZE_WORKERS = 1;
-
-  protected final Configuration conf;
-  protected final Abortable abortable;
-  private final Connection connection;
-  private final Executor executor;
-  private volatile CountDownLatch replicationTableInitialized;
-
-  public ReplicationTableBase(Configuration conf, Abortable abort) throws IOException {
-    this.conf = new Configuration(conf);
-    this.abortable = abort;
-    decorateConf();
-    this.connection = ConnectionFactory.createConnection(this.conf);
-    this.executor = setUpExecutor();
-    this.replicationTableInitialized = new CountDownLatch(1);
-    createReplicationTableInBackground();
-  }
-
-  /**
-   * Modify the connection's config so that operations run on the Replication Table have longer and
-   * a larger number of retries
-   */
-  private void decorateConf() {
-    this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES);
-  }
-
-  /**
-   * Sets up the thread pool executor used to build the Replication Table in the background
-   * @return the configured executor
-   */
-  private Executor setUpExecutor() {
-    ThreadPoolExecutor tempExecutor = new ThreadPoolExecutor(NUM_INITIALIZE_WORKERS,
-        NUM_INITIALIZE_WORKERS, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
-    ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
-    tfb.setNameFormat("ReplicationTableExecutor-%d");
-    tfb.setDaemon(true);
-    tempExecutor.setThreadFactory(tfb.build());
-    return tempExecutor;
-  }
-
-  /**
-   * Get whether the Replication Table has been successfully initialized yet
-   * @return whether the Replication Table is initialized
-   */
-  public boolean getInitializationStatus() {
-    return replicationTableInitialized.getCount() == 0;
-  }
-
-  /**
-   * Increases the RPC and operations timeouts for the Replication Table
-   */
-  private Table setReplicationTableTimeOuts(Table replicationTable) {
-    replicationTable.setRpcTimeout(RPC_TIMEOUT);
-    replicationTable.setOperationTimeout(OPERATION_TIMEOUT);
-    return replicationTable;
-  }
-
-  /**
-   * Build the row key for the given queueId. This will uniquely identify it from all other queues
-   * in the cluster.
-   * @param serverName The owner of the queue
-   * @param queueId String identifier of the queue
-   * @return String representation of the queue's row key
-   */
-  protected String buildQueueRowKey(String serverName, String queueId) {
-    return queueId + ROW_KEY_DELIMITER + serverName;
-  }
-
-  /**
-   * Parse the original queueId from a row key
-   * @param rowKey String representation of a queue's row key
-   * @return the original queueId
-   */
-  protected String getRawQueueIdFromRowKey(String rowKey) {
-    return rowKey.split(ROW_KEY_DELIMITER)[0];
-  }
-
-  /**
-   * Returns a queue's row key given either its raw or reclaimed queueId
-   *
-   * @param queueId queueId of the queue
-   * @return byte representation of the queue's row key
-   */
-  protected byte[] queueIdToRowKey(String serverName, String queueId) {
-    // Cluster id's are guaranteed to have no hyphens, so if the passed in queueId has no hyphen
-    // then this is not a reclaimed queue.
-    if (!queueId.contains(ROW_KEY_DELIMITER)) {
-      return Bytes.toBytes(buildQueueRowKey(serverName, queueId));
-      // If the queueId contained some hyphen it was reclaimed. In this case, the queueId is the
-      // queue's row key
-    } else {
-      return Bytes.toBytes(queueId);
-    }
-  }
-
-  /**
-   * Creates a "|" delimited record of the queue's past region server owners.
-   *
-   * @param originalHistory the queue's original owner history
-   * @param oldServer the name of the server that used to own the queue
-   * @return the queue's new owner history
-   */
-  protected String buildClaimedQueueHistory(String originalHistory, String oldServer) {
-    return oldServer + QUEUE_HISTORY_DELIMITER + originalHistory;
-  }
-
-  /**
-   * Get a list of all region servers that have outstanding replication queues. These servers could
-   * be alive, dead or from a previous run of the cluster.
-   * @return a list of server names
-   */
-  protected List<String> getListOfReplicators() {
-    // scan all of the queues and return a list of all unique OWNER values
-    Set<String> peerServers = new HashSet<>();
-    ResultScanner allQueuesInCluster = null;
-    try (Table replicationTable = getOrBlockOnReplicationTable()){
-      Scan scan = new Scan();
-      scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
-      allQueuesInCluster = replicationTable.getScanner(scan);
-      for (Result queue : allQueuesInCluster) {
-        peerServers.add(Bytes.toString(queue.getValue(CF_QUEUE, COL_QUEUE_OWNER)));
-      }
-    } catch (IOException e) {
-      String errMsg = "Failed getting list of replicators";
-      abortable.abort(errMsg, e);
-    } finally {
-      if (allQueuesInCluster != null) {
-        allQueuesInCluster.close();
-      }
-    }
-    return new ArrayList<>(peerServers);
-  }
-
-  protected List<String> getAllQueues(String serverName) {
-    List<String> allQueues = new ArrayList<>();
-    ResultScanner queueScanner = null;
-    try {
-      queueScanner = getQueuesBelongingToServer(serverName);
-      for (Result queue : queueScanner) {
-        String rowKey =  Bytes.toString(queue.getRow());
-        // If the queue does not have a Owner History, then we must be its original owner. So we
-        // want to return its queueId in raw form
-        if (Bytes.toString(queue.getValue(CF_QUEUE, COL_QUEUE_OWNER_HISTORY)).length() == 0) {
-          allQueues.add(getRawQueueIdFromRowKey(rowKey));
-        } else {
-          allQueues.add(rowKey);
-        }
-      }
-      return allQueues;
-    } catch (IOException e) {
-      String errMsg = "Failed getting list of all replication queues for serverName=" + serverName;
-      abortable.abort(errMsg, e);
-      return null;
-    } finally {
-      if (queueScanner != null) {
-        queueScanner.close();
-      }
-    }
-  }
-
-  protected List<String> getLogsInQueue(String serverName, String queueId) {
-    String rowKey = queueId;
-    if (!queueId.contains(ROW_KEY_DELIMITER)) {
-      rowKey = buildQueueRowKey(serverName, queueId);
-    }
-    return getLogsInQueue(Bytes.toBytes(rowKey));
-  }
-
-  protected List<String> getLogsInQueue(byte[] rowKey) {
-    String errMsg = "Failed getting logs in queue queueId=" + Bytes.toString(rowKey);
-    try (Table replicationTable = getOrBlockOnReplicationTable()) {
-      Get getQueue = new Get(rowKey);
-      Result queue = replicationTable.get(getQueue);
-      if (queue == null || queue.isEmpty()) {
-        abortable.abort(errMsg, new ReplicationException(errMsg));
-        return null;
-      }
-      return readWALsFromResult(queue);
-    } catch (IOException e) {
-      abortable.abort(errMsg, e);
-      return null;
-    }
-  }
-
-  /**
-   * Read all of the WAL's from a queue into a list
-   *
-   * @param queue HBase query result containing the queue
-   * @return a list of all the WAL filenames
-   */
-  protected List<String> readWALsFromResult(Result queue) {
-    List<String> wals = new ArrayList<>();
-    Map<byte[], byte[]> familyMap = queue.getFamilyMap(CF_QUEUE);
-    for (byte[] cQualifier : familyMap.keySet()) {
-      // Ignore the meta data fields of the queue
-      if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || Arrays.equals(cQualifier,
-          COL_QUEUE_OWNER_HISTORY)) {
-        continue;
-      }
-      wals.add(Bytes.toString(cQualifier));
-    }
-    return wals;
-  }
-
-  /**
-   * Get the queue id's and meta data (Owner and History) for the queues belonging to the named
-   * server
-   *
-   * @param server name of the server
-   * @return a ResultScanner over the QueueIds belonging to the server
-   * @throws IOException
-   */
-  protected ResultScanner getQueuesBelongingToServer(String server) throws IOException {
-    Scan scan = new Scan();
-    SingleColumnValueFilter filterMyQueues = new SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER,
-    CompareOperator.EQUAL, Bytes.toBytes(server));
-    scan.setFilter(filterMyQueues);
-    scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
-    scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY);
-    try (Table replicationTable = getOrBlockOnReplicationTable()) {
-      ResultScanner results = replicationTable.getScanner(scan);
-      return results;
-    }
-  }
-
-  /**
-   * Attempts to acquire the Replication Table. This operation will block until it is assigned by
-   * the CreateReplicationWorker thread. It is up to the caller of this method to close the
-   * returned Table
-   * @return the Replication Table when it is created
-   * @throws IOException
-   */
-  protected Table getOrBlockOnReplicationTable() throws IOException {
-    // Sleep until the Replication Table becomes available
-    try {
-      replicationTableInitialized.await();
-    } catch (InterruptedException e) {
-      String errMsg = "Unable to acquire the Replication Table due to InterruptedException: " +
-          e.getMessage();
-      throw new InterruptedIOException(errMsg);
-    }
-    return getAndSetUpReplicationTable();
-  }
-
-  /**
-   * Creates a new copy of the Replication Table and sets up the proper Table time outs for it
-   *
-   * @return the Replication Table
-   * @throws IOException
-   */
-  private Table getAndSetUpReplicationTable() throws IOException {
-    Table replicationTable = connection.getTable(REPLICATION_TABLE_NAME);
-    setReplicationTableTimeOuts(replicationTable);
-    return replicationTable;
-  }
-
-  /**
-   * Builds the Replication Table in a background thread. Any method accessing the Replication Table
-   * should do so through getOrBlockOnReplicationTable()
-   *
-   * @return the Replication Table
-   * @throws IOException if the Replication Table takes too long to build
-   */
-  private void createReplicationTableInBackground() throws IOException {
-    executor.execute(new CreateReplicationTableWorker());
-  }
-
-  /**
-   * Attempts to build the Replication Table. Will continue blocking until we have a valid
-   * Table for the Replication Table.
-   */
-  private class CreateReplicationTableWorker implements Runnable {
-
-    private Admin admin;
-
-    @Override
-    public void run() {
-      try {
-        admin = connection.getAdmin();
-        if (!replicationTableExists()) {
-          createReplicationTable();
-        }
-        int maxRetries = conf.getInt("hbase.replication.queues.createtable.retries.number",
-            CLIENT_RETRIES);
-        RetryCounterFactory counterFactory = new RetryCounterFactory(maxRetries, RPC_TIMEOUT);
-        RetryCounter retryCounter = counterFactory.create();
-        while (!replicationTableExists()) {
-          retryCounter.sleepUntilNextRetry();
-          if (!retryCounter.shouldRetry()) {
-            throw new IOException("Unable to acquire the Replication Table");
-          }
-        }
-        replicationTableInitialized.countDown();
-      } catch (IOException | InterruptedException e) {
-        abortable.abort("Failed building Replication Table", e);
-      }
-    }
-
-    /**
-     * Create the replication table with the provided HColumnDescriptor REPLICATION_COL_DESCRIPTOR
-     * in TableBasedReplicationQueuesImpl
-     *
-     * @throws IOException
-     */
-    private void createReplicationTable() throws IOException {
-      HTableDescriptor replicationTableDescriptor = new HTableDescriptor(REPLICATION_TABLE_NAME);
-      replicationTableDescriptor.addFamily(REPLICATION_COL_DESCRIPTOR);
-      try {
-        admin.createTable(replicationTableDescriptor);
-      } catch (TableExistsException e) {
-        // In this case we can just continue as normal
-      }
-    }
-
-    /**
-     * Checks whether the Replication Table exists yet
-     *
-     * @return whether the Replication Table exists
-     * @throws IOException
-     */
-    private boolean replicationTableExists() {
-      try {
-        return admin.tableExists(REPLICATION_TABLE_NAME);
-      } catch (IOException e) {
-        return false;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/264890a9/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java
index 2c522f6..5659e4b 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -54,6 +53,8 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements
     super(zookeeper, conf, abortable);
     this.stopper = stopper;
     this.zookeeper.registerListener(new OtherRegionServerWatcher(this.zookeeper));
+    // watch the changes
+    refreshOtherRegionServersList(true);
   }
 
   @Override
@@ -71,7 +72,7 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements
    */
   @Override
   public List<String> getListOfRegionServers() {
-    refreshOtherRegionServersList();
+    refreshOtherRegionServersList(false);
 
     List<String> list = null;
     synchronized (otherRegionServers) {
@@ -137,7 +138,7 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements
       if (!path.startsWith(this.watcher.znodePaths.rsZNode)) {
         return false;
       }
-      return refreshOtherRegionServersList();
+      return refreshOtherRegionServersList(true);
     }
   }
 
@@ -157,8 +158,8 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements
    * @return true if the local list of the other region servers was updated with the ZK data (even
    *         if it was empty), false if the data was missing in ZK
    */
-  private boolean refreshOtherRegionServersList() {
-    List<String> newRsList = getRegisteredRegionServers();
+  private boolean refreshOtherRegionServersList(boolean watch) {
+    List<String> newRsList = getRegisteredRegionServers(watch);
     if (newRsList == null) {
       return false;
     } else {
@@ -174,10 +175,14 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements
    * Get a list of all the other region servers in this cluster and set a watch
    * @return a list of server nanes
    */
-  private List<String> getRegisteredRegionServers() {
+  private List<String> getRegisteredRegionServers(boolean watch) {
     List<String> result = null;
     try {
-      result = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.zookeeper.znodePaths.rsZNode);
+      if (watch) {
+        result = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.zookeeper.znodePaths.rsZNode);
+      } else {
+        result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.zookeeper.znodePaths.rsZNode);
+      }
     } catch (KeeperException e) {
       this.abortable.abort("Get list of registered region servers", e);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/264890a9/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
index 0275d52..41f50d8 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
@@ -54,6 +54,28 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe
 
 /**
  * ZK based replication queue storage.
+ * <p>
+ * The base znode for each regionserver is the regionserver name. For example:
+ *
+ * <pre>
+ * /hbase/replication/rs/hostname.example.org,6020,1234
+ * </pre>
+ *
+ * Within this znode, the region server maintains a set of WAL replication queues. These queues are
+ * represented by child znodes named using there give queue id. For example:
+ *
+ * <pre>
+ * /hbase/replication/rs/hostname.example.org,6020,1234/1
+ * /hbase/replication/rs/hostname.example.org,6020,1234/2
+ * </pre>
+ *
+ * Each queue has one child znode for every WAL that still needs to be replicated. The value of
+ * these WAL child znodes is the latest position that has been replicated. This position is updated
+ * every time a WAL entry is replicated. For example:
+ *
+ * <pre>
+ * /hbase/replication/rs/hostname.example.org,6020,1234/1/23522342.23422 [VALUE: 254]
+ * </pre>
  */
 @InterfaceAudience.Private
 class ZKReplicationQueueStorage extends ZKReplicationStorageBase

http://git-wip-us.apache.org/repos/asf/hbase/blob/264890a9/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index 8905d43..4afda5d 100644
--- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -42,9 +42,8 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class TestReplicationStateBasic {
 
-  protected ReplicationQueues rq1;
-  protected ReplicationQueues rq2;
-  protected ReplicationQueues rq3;
+  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateBasic.class);
+
   protected ReplicationQueueStorage rqs;
   protected ServerName server1 = ServerName.valueOf("hostname1.example.org", 1234, 12345);
   protected ServerName server2 = ServerName.valueOf("hostname2.example.org", 1234, 12345);
@@ -63,8 +62,6 @@ public abstract class TestReplicationStateBasic {
   protected static final int ZK_MAX_COUNT = 300;
   protected static final int ZK_SLEEP_INTERVAL = 100; // millis
 
-  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateBasic.class);
-
   @Test
   public void testReplicationQueueStorage() throws ReplicationException {
     // Test methods with empty state
@@ -76,15 +73,13 @@ public abstract class TestReplicationStateBasic {
      * Set up data Two replicators: -- server1: three queues with 0, 1 and 2 log files each --
      * server2: zero queues
      */
-    rq1.init(server1.getServerName());
-    rq2.init(server2.getServerName());
-    rq1.addLog("qId1", "trash");
-    rq1.removeLog("qId1", "trash");
-    rq1.addLog("qId2", "filename1");
-    rq1.addLog("qId3", "filename2");
-    rq1.addLog("qId3", "filename3");
-    rq2.addLog("trash", "trash");
-    rq2.removeQueue("trash");
+    rqs.addWAL(server1, "qId1", "trash");
+    rqs.removeWAL(server1, "qId1", "trash");
+    rqs.addWAL(server1,"qId2", "filename1");
+    rqs.addWAL(server1,"qId3", "filename2");
+    rqs.addWAL(server1,"qId3", "filename3");
+    rqs.addWAL(server2,"trash", "trash");
+    rqs.removeQueue(server2,"trash");
 
     List<ServerName> reps = rqs.getListOfReplicators();
     assertEquals(2, reps.size());
@@ -105,62 +100,55 @@ public abstract class TestReplicationStateBasic {
     assertTrue(list.contains("qId3"));
   }
 
+  private void removeAllQueues(ServerName serverName) throws ReplicationException {
+    for (String queue: rqs.getAllQueues(serverName)) {
+      rqs.removeQueue(serverName, queue);
+    }
+  }
   @Test
   public void testReplicationQueues() throws ReplicationException {
-    rq1.init(server1.getServerName());
-    rq2.init(server2.getServerName());
-    rq3.init(server3.getServerName());
     // Initialize ReplicationPeer so we can add peers (we don't transfer lone queues)
     rp.init();
 
-    // 3 replicators should exist
-    assertEquals(3, rq1.getListOfReplicators().size());
-    rq1.removeQueue("bogus");
-    rq1.removeLog("bogus", "bogus");
-    rq1.removeAllQueues();
-    assertEquals(0, rq1.getAllQueues().size());
-    assertEquals(0, rq1.getLogPosition("bogus", "bogus"));
-    assertNull(rq1.getLogsInQueue("bogus"));
-    assertNull(rq1.getUnClaimedQueueIds(ServerName.valueOf("bogus", 1234, -1L).toString()));
-
-    rq1.setLogPosition("bogus", "bogus", 5L);
+    rqs.removeQueue(server1, "bogus");
+    rqs.removeWAL(server1, "bogus", "bogus");
+    removeAllQueues(server1);
+    assertEquals(0, rqs.getAllQueues(server1).size());
+    assertEquals(0, rqs.getWALPosition(server1, "bogus", "bogus"));
+    assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty());
+    assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 1234, 12345)).isEmpty());
 
     populateQueues();
 
-    assertEquals(3, rq1.getListOfReplicators().size());
-    assertEquals(0, rq2.getLogsInQueue("qId1").size());
-    assertEquals(5, rq3.getLogsInQueue("qId5").size());
-    assertEquals(0, rq3.getLogPosition("qId1", "filename0"));
-    rq3.setLogPosition("qId5", "filename4", 354L);
-    assertEquals(354L, rq3.getLogPosition("qId5", "filename4"));
+    assertEquals(3, rqs.getListOfReplicators().size());
+    assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size());
+    assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
+    assertEquals(0, rqs.getWALPosition(server3, "qId1", "filename0"));
+    rqs.setWALPosition(server3, "qId5", "filename4", 354L);
+    assertEquals(354L, rqs.getWALPosition(server3, "qId5", "filename4"));
 
-    assertEquals(5, rq3.getLogsInQueue("qId5").size());
-    assertEquals(0, rq2.getLogsInQueue("qId1").size());
-    assertEquals(0, rq1.getAllQueues().size());
-    assertEquals(1, rq2.getAllQueues().size());
-    assertEquals(5, rq3.getAllQueues().size());
+    assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
+    assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size());
+    assertEquals(0, rqs.getAllQueues(server1).size());
+    assertEquals(1, rqs.getAllQueues(server2).size());
+    assertEquals(5, rqs.getAllQueues(server3).size());
 
-    assertEquals(0, rq3.getUnClaimedQueueIds(server1.getServerName()).size());
-    rq3.removeReplicatorIfQueueIsEmpty(server1.getServerName());
-    assertEquals(2, rq3.getListOfReplicators().size());
+    assertEquals(0, rqs.getAllQueues(server1).size());
+    rqs.removeReplicatorIfQueueIsEmpty(server1);
+    assertEquals(2, rqs.getListOfReplicators().size());
 
-    List<String> queues = rq2.getUnClaimedQueueIds(server3.getServerName());
+    List<String> queues = rqs.getAllQueues(server3);
     assertEquals(5, queues.size());
     for (String queue : queues) {
-      rq2.claimQueue(server3.getServerName(), queue);
+      rqs.claimQueue(server3, queue, server2);
     }
-    rq2.removeReplicatorIfQueueIsEmpty(server3.getServerName());
-    assertEquals(1, rq2.getListOfReplicators().size());
-
-    // Try to claim our own queues
-    assertNull(rq2.getUnClaimedQueueIds(server2.getServerName()));
-    rq2.removeReplicatorIfQueueIsEmpty(server2.getServerName());
-
-    assertEquals(6, rq2.getAllQueues().size());
+    rqs.removeReplicatorIfQueueIsEmpty(server3);
+    assertEquals(1, rqs.getListOfReplicators().size());
 
-    rq2.removeAllQueues();
-
-    assertEquals(0, rq2.getListOfReplicators().size());
+    assertEquals(6, rqs.getAllQueues(server2).size());
+    removeAllQueues(server2);
+    rqs.removeReplicatorIfQueueIsEmpty(server2);
+    assertEquals(0, rqs.getListOfReplicators().size());
   }
 
   @Test
@@ -197,7 +185,6 @@ public abstract class TestReplicationStateBasic {
   @Test
   public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException {
     rp.init();
-    rq1.init(server1.getServerName());
 
     List<Pair<Path, Path>> files1 = new ArrayList<>(3);
     files1.add(new Pair<>(null, new Path("file_1")));
@@ -206,8 +193,8 @@ public abstract class TestReplicationStateBasic {
     assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty());
     assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size());
     rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
-    rq1.addPeerToHFileRefs(ID_ONE);
-    rq1.addHFileRefs(ID_ONE, files1);
+    rqs.addPeerToHFileRefs(ID_ONE);
+    rqs.addHFileRefs(ID_ONE, files1);
     assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size());
     assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size());
     List<String> hfiles2 = new ArrayList<>(files1.size());
@@ -215,43 +202,41 @@ public abstract class TestReplicationStateBasic {
       hfiles2.add(p.getSecond().getName());
     }
     String removedString = hfiles2.remove(0);
-    rq1.removeHFileRefs(ID_ONE, hfiles2);
+    rqs.removeHFileRefs(ID_ONE, hfiles2);
     assertEquals(1, rqs.getReplicableHFiles(ID_ONE).size());
     hfiles2 = new ArrayList<>(1);
     hfiles2.add(removedString);
-    rq1.removeHFileRefs(ID_ONE, hfiles2);
+    rqs.removeHFileRefs(ID_ONE, hfiles2);
     assertEquals(0, rqs.getReplicableHFiles(ID_ONE).size());
     rp.unregisterPeer(ID_ONE);
   }
 
   @Test
   public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException {
-    rq1.init(server1.getServerName());
-
     rp.init();
     rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
-    rq1.addPeerToHFileRefs(ID_ONE);
+    rqs.addPeerToHFileRefs(ID_ONE);
     rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO));
-    rq1.addPeerToHFileRefs(ID_TWO);
+    rqs.addPeerToHFileRefs(ID_TWO);
 
     List<Pair<Path, Path>> files1 = new ArrayList<>(3);
     files1.add(new Pair<>(null, new Path("file_1")));
     files1.add(new Pair<>(null, new Path("file_2")));
     files1.add(new Pair<>(null, new Path("file_3")));
-    rq1.addHFileRefs(ID_ONE, files1);
-    rq1.addHFileRefs(ID_TWO, files1);
+    rqs.addHFileRefs(ID_ONE, files1);
+    rqs.addHFileRefs(ID_TWO, files1);
     assertEquals(2, rqs.getAllPeersFromHFileRefsQueue().size());
     assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size());
     assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size());
 
     rp.unregisterPeer(ID_ONE);
-    rq1.removePeerFromHFileRefs(ID_ONE);
+    rqs.removePeerFromHFileRefs(ID_ONE);
     assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size());
     assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty());
     assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size());
 
     rp.unregisterPeer(ID_TWO);
-    rq1.removePeerFromHFileRefs(ID_TWO);
+    rqs.removePeerFromHFileRefs(ID_TWO);
     assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size());
     assertTrue(rqs.getReplicableHFiles(ID_TWO).isEmpty());
   }
@@ -363,15 +348,15 @@ public abstract class TestReplicationStateBasic {
    * 3, 4, 5 log files respectively
    */
   protected void populateQueues() throws ReplicationException {
-    rq1.addLog("trash", "trash");
-    rq1.removeQueue("trash");
+    rqs.addWAL(server1, "trash", "trash");
+    rqs.removeQueue(server1, "trash");
 
-    rq2.addLog("qId1", "trash");
-    rq2.removeLog("qId1", "trash");
+    rqs.addWAL(server2, "qId1", "trash");
+    rqs.removeWAL(server2, "qId1", "trash");
 
     for (int i = 1; i < 6; i++) {
       for (int j = 0; j < i; j++) {
-        rq3.addLog("qId" + i, "filename" + j);
+        rqs.addWAL(server3, "qId" + i, "filename" + j);
       }
       // Add peers for the corresponding queues so they are not orphans
       rp.registerPeer("qId" + i,

http://git-wip-us.apache.org/repos/asf/hbase/blob/264890a9/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
index 5fe7c55..ac869d9 100644
--- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
@@ -17,10 +17,6 @@
  */
 package org.apache.hadoop.hbase.replication;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
@@ -40,7 +36,6 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,7 +49,6 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
   private static HBaseZKTestingUtility utility;
   private static ZKWatcher zkw;
   private static String replicationZNode;
-  private ReplicationQueuesZKImpl rqZK;
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
@@ -84,23 +78,9 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
   @Before
   public void setUp() {
     zkTimeoutCount = 0;
-    WarnOnlyAbortable abortable = new WarnOnlyAbortable();
-    try {
-      rq1 = ReplicationFactory
-          .getReplicationQueues(new ReplicationQueuesArguments(conf, abortable, zkw));
-      rq2 = ReplicationFactory
-          .getReplicationQueues(new ReplicationQueuesArguments(conf, abortable, zkw));
-      rq3 = ReplicationFactory
-          .getReplicationQueues(new ReplicationQueuesArguments(conf, abortable, zkw));
-      rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
-    } catch (Exception e) {
-      // This should not occur, because getReplicationQueues() only throws for
-      // TableBasedReplicationQueuesImpl
-      fail("ReplicationFactory.getReplicationQueues() threw an IO Exception");
-    }
-    rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
+    rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
+    rp = ReplicationFactory.getReplicationPeers(zkw, conf, new WarnOnlyAbortable());
     OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf);
-    rqZK = new ReplicationQueuesZKImpl(zkw, conf, abortable);
   }
 
   @After
@@ -113,23 +93,6 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
     utility.shutdownMiniZKCluster();
   }
 
-  @Test
-  public void testIsPeerPath_PathToParentOfPeerNode() {
-    assertFalse(rqZK.isPeerPath(rqZK.peersZNode));
-  }
-
-  @Test
-  public void testIsPeerPath_PathToChildOfPeerNode() {
-    String peerChild = ZNodePaths.joinZNode(ZNodePaths.joinZNode(rqZK.peersZNode, "1"), "child");
-    assertFalse(rqZK.isPeerPath(peerChild));
-  }
-
-  @Test
-  public void testIsPeerPath_ActualPeerPath() {
-    String peerPath = ZNodePaths.joinZNode(rqZK.peersZNode, "1");
-    assertTrue(rqZK.isPeerPath(peerPath));
-  }
-
   private static class WarnOnlyAbortable implements Abortable {
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/264890a9/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
index 1faaae3..ce03f1c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
@@ -50,8 +50,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
-import org.apache.hadoop.hbase.replication.ReplicationQueues;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
 import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -308,14 +306,10 @@ public class DumpReplicationQueues extends Configured implements Tool {
       boolean hdfs) throws Exception {
     ReplicationQueueStorage queueStorage;
     ReplicationPeers replicationPeers;
-    ReplicationQueues replicationQueues;
     ReplicationTracker replicationTracker;
-    ReplicationQueuesArguments replicationArgs =
-        new ReplicationQueuesArguments(getConf(), new WarnOnlyAbortable(), zkw);
     StringBuilder sb = new StringBuilder();
 
     queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
-    replicationQueues = ReplicationFactory.getReplicationQueues(replicationArgs);
     replicationPeers =
         ReplicationFactory.getReplicationPeers(zkw, getConf(), queueStorage, connection);
     replicationTracker = ReplicationFactory.getReplicationTracker(zkw, replicationPeers, getConf(),
@@ -329,7 +323,6 @@ public class DumpReplicationQueues extends Configured implements Tool {
     }
     for (ServerName regionserver : regionservers) {
       List<String> queueIds = queueStorage.getAllQueues(regionserver);
-      replicationQueues.init(regionserver.getServerName());
       if (!liveRegionServers.contains(regionserver.getServerName())) {
         deadRegionServers.add(regionserver.getServerName());
       }
@@ -339,17 +332,17 @@ public class DumpReplicationQueues extends Configured implements Tool {
         if (!peerIds.contains(queueInfo.getPeerId())) {
           deletedQueues.add(regionserver + "/" + queueId);
           sb.append(
-            formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, true, hdfs));
+            formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs));
         } else {
           sb.append(
-            formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, false, hdfs));
+            formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs));
         }
       }
     }
     return sb.toString();
   }
 
-  private String formatQueue(ServerName regionserver, ReplicationQueues replicationQueues,
+  private String formatQueue(ServerName regionserver, ReplicationQueueStorage queueStorage,
       ReplicationQueueInfo queueInfo, String queueId, List<String> wals, boolean isDeleted,
       boolean hdfs) throws Exception {
     StringBuilder sb = new StringBuilder();
@@ -371,7 +364,7 @@ public class DumpReplicationQueues extends Configured implements Tool {
     peersQueueSize.addAndGet(queueInfo.getPeerId(), wals.size());
 
     for (String wal : wals) {
-      long position = replicationQueues.getLogPosition(queueInfo.getPeerId(), wal);
+      long position = queueStorage.getWALPosition(regionserver, queueInfo.getPeerId(), wal);
       sb.append("    Replication position for " + wal + ": " + (position > 0 ? position : "0"
           + " (not started or nothing to replicate)") + "\n");
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/264890a9/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index bd191e3..e0c45d5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -29,15 +28,15 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
-import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Class that handles the recovered source of a replication stream, which is transfered from
@@ -52,10 +51,10 @@ public class RecoveredReplicationSource extends ReplicationSource {
 
   @Override
   public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
-      ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server,
+      ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, Server server,
       String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint,
       WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
-    super.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerClusterZnode,
+    super.init(conf, fs, manager, queueStorage, replicationPeers, server, peerClusterZnode,
       clusterId, replicationEndpoint, walFileLengthProvider, metrics);
     this.actualPeerId = this.replicationQueueInfo.getPeerId();
   }
@@ -64,7 +63,7 @@ public class RecoveredReplicationSource extends ReplicationSource {
   protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
     final RecoveredReplicationSourceShipper worker =
         new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this,
-            this.replicationQueues);
+            this.queueStorage);
     ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
     if (extant != null) {
       LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);

http://git-wip-us.apache.org/repos/asf/hbase/blob/264890a9/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
index 630b90b..fb365bc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
@@ -23,13 +23,13 @@ import java.util.concurrent.PriorityBlockingQueue;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  *  Used by a {@link RecoveredReplicationSource}.
@@ -40,14 +40,14 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
       LoggerFactory.getLogger(RecoveredReplicationSourceShipper.class);
 
   protected final RecoveredReplicationSource source;
-  private final ReplicationQueues replicationQueues;
+  private final ReplicationQueueStorage replicationQueues;
 
   public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId,
       PriorityBlockingQueue<Path> queue, RecoveredReplicationSource source,
-      ReplicationQueues replicationQueues) {
+      ReplicationQueueStorage queueStorage) {
     super(conf, walGroupId, queue, source);
     this.source = source;
-    this.replicationQueues = replicationQueues;
+    this.replicationQueues = queueStorage;
   }
 
   @Override
@@ -116,11 +116,11 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
     long startPosition = 0;
     String peerClusterZnode = source.getPeerClusterZnode();
     try {
-      startPosition = this.replicationQueues.getLogPosition(peerClusterZnode,
-        this.queue.peek().getName());
+      startPosition = this.replicationQueues.getWALPosition(source.getServerWALsBelongTo(),
+        peerClusterZnode, this.queue.peek().getName());
       if (LOG.isTraceEnabled()) {
-        LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
-            + startPosition);
+        LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position " +
+          startPosition);
       }
     } catch (ReplicationException e) {
       terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);

http://git-wip-us.apache.org/repos/asf/hbase/blob/264890a9/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 571ee75..6d871cc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -29,11 +28,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hbase.wal.WALKeyImpl;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -43,26 +37,32 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
-import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
 import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
 import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
-import org.apache.hadoop.hbase.replication.ReplicationQueues;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
-import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
-import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
 import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
 
 /**
  * Gateway to Replication.  Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
@@ -74,7 +74,7 @@ public class Replication implements
       LoggerFactory.getLogger(Replication.class);
   private boolean replicationForBulkLoadData;
   private ReplicationSourceManager replicationManager;
-  private ReplicationQueues replicationQueues;
+  private ReplicationQueueStorage queueStorage;
   private ReplicationPeers replicationPeers;
   private ReplicationTracker replicationTracker;
   private Configuration conf;
@@ -127,10 +127,8 @@ public class Replication implements
     }
 
     try {
-      this.replicationQueues =
-          ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, this.server,
-            server.getZooKeeper()));
-      this.replicationQueues.init(this.server.getServerName().toString());
+      this.queueStorage =
+          ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
       this.replicationPeers =
           ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server);
       this.replicationPeers.init();
@@ -147,7 +145,7 @@ public class Replication implements
       throw new IOException("Could not read cluster id", ke);
     }
     this.replicationManager =
-        new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker, conf,
+        new ReplicationSourceManager(queueStorage, replicationPeers, replicationTracker, conf,
             this.server, fs, logDir, oldLogDir, clusterId, walFileLengthProvider);
     this.statsThreadPeriod =
         this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);

http://git-wip-us.apache.org/repos/asf/hbase/blob/264890a9/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 19ea240..fef9054 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -41,9 +40,6 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
 import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
 import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
@@ -52,7 +48,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
-import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -60,6 +56,10 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
 
 
@@ -83,7 +83,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
   private Map<String, PriorityBlockingQueue<Path>> queues = new HashMap<>();
   // per group queue size, keep no more than this number of logs in each wal group
   protected int queueSizePerGroup;
-  protected ReplicationQueues replicationQueues;
+  protected ReplicationQueueStorage queueStorage;
   private ReplicationPeers replicationPeers;
 
   protected Configuration conf;
@@ -148,7 +148,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
    */
   @Override
   public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
-      ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server,
+      ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, Server server,
       String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint,
       WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
     this.server = server;
@@ -161,7 +161,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
     this.maxRetriesMultiplier =
         this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
     this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
-    this.replicationQueues = replicationQueues;
+    this.queueStorage = queueStorage;
     this.replicationPeers = replicationPeers;
     this.manager = manager;
     this.fs = fs;
@@ -229,7 +229,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
       List<String> tableCfs = tableCFMap.get(tableName);
       if (tableCFMap.containsKey(tableName)
           && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
-        this.replicationQueues.addHFileRefs(peerId, pairs);
+        this.queueStorage.addHFileRefs(peerId, pairs);
         metrics.incrSizeOfHFileRefsQueue(pairs.size());
       } else {
         LOG.debug("HFiles will not be replicated belonging to the table " + tableName + " family "
@@ -238,7 +238,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
     } else {
       // user has explicitly not defined any table cfs for replication, means replicate all the
       // data
-      this.replicationQueues.addHFileRefs(peerId, pairs);
+      this.queueStorage.addHFileRefs(peerId, pairs);
       metrics.incrSizeOfHFileRefsQueue(pairs.size());
     }
   }