You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/08/23 21:42:44 UTC

[3/4] hbase git commit: HBASE-17442 Move most of the replication related classes from hbase-client to new hbase-replication package. (Guanghao Zhang).

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
deleted file mode 100644
index 1981131..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.util.List;
-import java.util.Set;
-
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableSet;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-
-@InterfaceAudience.Private
-public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implements
-    ReplicationQueuesClient {
-
-  Log LOG = LogFactory.getLog(ReplicationQueuesClientZKImpl.class);
-
-  public ReplicationQueuesClientZKImpl(ReplicationQueuesClientArguments args) {
-    this(args.getZk(), args.getConf(), args.getAbortable());
-  }
-
-  public ReplicationQueuesClientZKImpl(final ZooKeeperWatcher zk, Configuration conf,
-      Abortable abortable) {
-    super(zk, conf, abortable);
-  }
-
-  @Override
-  public void init() throws ReplicationException {
-    try {
-      if (ZKUtil.checkExists(this.zookeeper, this.queuesZNode) < 0) {
-        ZKUtil.createWithParents(this.zookeeper, this.queuesZNode);
-      }
-    } catch (KeeperException e) {
-      throw new ReplicationException("Internal error while initializing a queues client", e);
-    }
-  }
-
-  @Override
-  public List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException {
-    String znode = ZKUtil.joinZNode(this.queuesZNode, serverName);
-    znode = ZKUtil.joinZNode(znode, queueId);
-    List<String> result = null;
-    try {
-      result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
-    } catch (KeeperException e) {
-      this.abortable.abort("Failed to get list of wals for queueId=" + queueId
-          + " and serverName=" + serverName, e);
-      throw e;
-    }
-    return result;
-  }
-
-  @Override
-  public List<String> getAllQueues(String serverName) throws KeeperException {
-    String znode = ZKUtil.joinZNode(this.queuesZNode, serverName);
-    List<String> result = null;
-    try {
-      result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
-    } catch (KeeperException e) {
-      this.abortable.abort("Failed to get list of queues for serverName=" + serverName, e);
-      throw e;
-    }
-    return result;
-  }
-
-  @Override
-  public Set<String> getAllWALs() throws KeeperException {
-    /**
-     * Load all wals in all replication queues from ZK. This method guarantees to return a
-     * snapshot which contains all WALs in the zookeeper at the start of this call even there
-     * is concurrent queue failover. However, some newly created WALs during the call may
-     * not be included.
-     */
-    for (int retry = 0; ; retry++) {
-      int v0 = getQueuesZNodeCversion();
-      List<String> rss = getListOfReplicators();
-      if (rss == null || rss.isEmpty()) {
-        LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
-        return ImmutableSet.of();
-      }
-      Set<String> wals = Sets.newHashSet();
-      for (String rs : rss) {
-        List<String> listOfPeers = getAllQueues(rs);
-        // if rs just died, this will be null
-        if (listOfPeers == null) {
-          continue;
-        }
-        for (String id : listOfPeers) {
-          List<String> peersWals = getLogsInQueue(rs, id);
-          if (peersWals != null) {
-            wals.addAll(peersWals);
-          }
-        }
-      }
-      int v1 = getQueuesZNodeCversion();
-      if (v0 == v1) {
-        return wals;
-      }
-      LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d",
-        v0, v1, retry));
-    }
-  }
-
-  public int getQueuesZNodeCversion() throws KeeperException {
-    try {
-      Stat stat = new Stat();
-      ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat);
-      return stat.getCversion();
-    } catch (KeeperException e) {
-      this.abortable.abort("Failed to get stat of replication rs node", e);
-      throw e;
-    }
-  }
-
-  @Override
-  public int getHFileRefsNodeChangeVersion() throws KeeperException {
-    Stat stat = new Stat();
-    try {
-      ZKUtil.getDataNoWatch(this.zookeeper, this.hfileRefsZNode, stat);
-    } catch (KeeperException e) {
-      this.abortable.abort("Failed to get stat of replication hfile references node.", e);
-      throw e;
-    }
-    return stat.getCversion();
-  }
-
-  @Override
-  public List<String> getAllPeersFromHFileRefsQueue() throws KeeperException {
-    List<String> result = null;
-    try {
-      result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.hfileRefsZNode);
-    } catch (KeeperException e) {
-      this.abortable.abort("Failed to get list of all peers in hfile references node.", e);
-      throw e;
-    }
-    return result;
-  }
-
-  @Override
-  public List<String> getReplicableHFiles(String peerId) throws KeeperException {
-    String znode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
-    List<String> result = null;
-    try {
-      result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
-    } catch (KeeperException e) {
-      this.abortable.abort("Failed to get list of hfile references for peerId=" + peerId, e);
-      throw e;
-    }
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
deleted file mode 100644
index 4733706..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ /dev/null
@@ -1,407 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.KeeperException;
-
-/**
- * This class provides an implementation of the
- * interface using ZooKeeper. The
- * base znode that this class works at is the myQueuesZnode. The myQueuesZnode contains a list of
- * all outstanding WAL files on this region server that need to be replicated. The myQueuesZnode is
- * the regionserver name (a concatenation of the region server’s hostname, client port and start
- * code). For example:
- *
- * /hbase/replication/rs/hostname.example.org,6020,1234
- *
- * Within this znode, the region server maintains a set of WAL replication queues. These queues are
- * represented by child znodes named using there give queue id. For example:
- *
- * /hbase/replication/rs/hostname.example.org,6020,1234/1
- * /hbase/replication/rs/hostname.example.org,6020,1234/2
- *
- * Each queue has one child znode for every WAL that still needs to be replicated. The value of
- * these WAL child znodes is the latest position that has been replicated. This position is updated
- * every time a WAL entry is replicated. For example:
- *
- * /hbase/replication/rs/hostname.example.org,6020,1234/1/23522342.23422 [VALUE: 254]
- */
-@InterfaceAudience.Private
-public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements ReplicationQueues {
-
-  /** Znode containing all replication queues for this region server. */
-  private String myQueuesZnode;
-
-  private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class);
-
-  public ReplicationQueuesZKImpl(ReplicationQueuesArguments args) {
-    this(args.getZk(), args.getConf(), args.getAbortable());
-  }
-
-  public ReplicationQueuesZKImpl(final ZooKeeperWatcher zk, Configuration conf,
-      Abortable abortable) {
-    super(zk, conf, abortable);
-  }
-
-  @Override
-  public void init(String serverName) throws ReplicationException {
-    this.myQueuesZnode = ZKUtil.joinZNode(this.queuesZNode, serverName);
-    try {
-      if (ZKUtil.checkExists(this.zookeeper, this.myQueuesZnode) < 0) {
-        ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode);
-      }
-    } catch (KeeperException e) {
-      throw new ReplicationException("Could not initialize replication queues.", e);
-    }
-    if (conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
-      HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT)) {
-      try {
-        if (ZKUtil.checkExists(this.zookeeper, this.hfileRefsZNode) < 0) {
-          ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode);
-        }
-      } catch (KeeperException e) {
-        throw new ReplicationException("Could not initialize hfile references replication queue.",
-            e);
-      }
-    }
-  }
-
-  @Override
-  public void removeQueue(String queueId) {
-    try {
-      ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.myQueuesZnode, queueId));
-    } catch (KeeperException e) {
-      this.abortable.abort("Failed to delete queue (queueId=" + queueId + ")", e);
-    }
-  }
-
-  @Override
-  public void addLog(String queueId, String filename) throws ReplicationException {
-    String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
-    znode = ZKUtil.joinZNode(znode, filename);
-    try {
-      ZKUtil.createWithParents(this.zookeeper, znode);
-    } catch (KeeperException e) {
-      throw new ReplicationException(
-          "Could not add log because znode could not be created. queueId=" + queueId
-              + ", filename=" + filename);
-    }
-  }
-
-  @Override
-  public void removeLog(String queueId, String filename) {
-    try {
-      String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
-      znode = ZKUtil.joinZNode(znode, filename);
-      ZKUtil.deleteNode(this.zookeeper, znode);
-    } catch (KeeperException e) {
-      this.abortable.abort("Failed to remove wal from queue (queueId=" + queueId + ", filename="
-          + filename + ")", e);
-    }
-  }
-
-  @Override
-  public void setLogPosition(String queueId, String filename, long position) {
-    try {
-      String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
-      znode = ZKUtil.joinZNode(znode, filename);
-      // Why serialize String of Long and not Long as bytes?
-      ZKUtil.setData(this.zookeeper, znode, ZKUtil.positionToByteArray(position));
-    } catch (KeeperException e) {
-      this.abortable.abort("Failed to write replication wal position (filename=" + filename
-          + ", position=" + position + ")", e);
-    }
-  }
-
-  @Override
-  public long getLogPosition(String queueId, String filename) throws ReplicationException {
-    String clusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
-    String znode = ZKUtil.joinZNode(clusterZnode, filename);
-    byte[] bytes = null;
-    try {
-      bytes = ZKUtil.getData(this.zookeeper, znode);
-    } catch (KeeperException e) {
-      throw new ReplicationException("Internal Error: could not get position in log for queueId="
-          + queueId + ", filename=" + filename, e);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      return 0;
-    }
-    try {
-      return ZKUtil.parseWALPositionFrom(bytes);
-    } catch (DeserializationException de) {
-      LOG.warn("Failed to parse WALPosition for queueId=" + queueId + " and wal=" + filename
-          + " znode content, continuing.");
-    }
-    // if we can not parse the position, start at the beginning of the wal file
-    // again
-    return 0;
-  }
-
-  @Override
-  public boolean isThisOurRegionServer(String regionserver) {
-    return ZKUtil.joinZNode(this.queuesZNode, regionserver).equals(this.myQueuesZnode);
-  }
-
-  @Override
-  public List<String> getUnClaimedQueueIds(String regionserver) {
-    if (isThisOurRegionServer(regionserver)) {
-      return null;
-    }
-    String rsZnodePath = ZKUtil.joinZNode(this.queuesZNode, regionserver);
-    List<String> queues = null;
-    try {
-      queues = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZnodePath);
-    } catch (KeeperException e) {
-      this.abortable.abort("Failed to getUnClaimedQueueIds for RS" + regionserver, e);
-    }
-    return queues;
-  }
-
-  @Override
-  public Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId) {
-    LOG.info("Atomically moving " + regionserver + "/" + queueId + "'s WALs to my queue");
-    return moveQueueUsingMulti(regionserver, queueId);
-  }
-
-  @Override
-  public void removeReplicatorIfQueueIsEmpty(String regionserver) {
-    String rsPath = ZKUtil.joinZNode(this.queuesZNode, regionserver);
-    try {
-      List<String> list = ZKUtil.listChildrenNoWatch(this.zookeeper, rsPath);
-      if (list != null && list.isEmpty()){
-        ZKUtil.deleteNode(this.zookeeper, rsPath);
-      }
-    } catch (KeeperException e) {
-      LOG.warn("Got error while removing replicator", e);
-    }
-  }
-
-  @Override
-  public void removeAllQueues() {
-    try {
-      ZKUtil.deleteNodeRecursively(this.zookeeper, this.myQueuesZnode);
-    } catch (KeeperException e) {
-      // if the znode is already expired, don't bother going further
-      if (e instanceof KeeperException.SessionExpiredException) {
-        return;
-      }
-      this.abortable.abort("Failed to delete replication queues for region server: "
-          + this.myQueuesZnode, e);
-    }
-  }
-
-  @Override
-  public List<String> getLogsInQueue(String queueId) {
-    String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
-    List<String> result = null;
-    try {
-      result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
-    } catch (KeeperException e) {
-      this.abortable.abort("Failed to get list of wals for queueId=" + queueId, e);
-    }
-    return result;
-  }
-
-  @Override
-  public List<String> getAllQueues() {
-    List<String> listOfQueues = null;
-    try {
-      listOfQueues = ZKUtil.listChildrenNoWatch(this.zookeeper, this.myQueuesZnode);
-    } catch (KeeperException e) {
-      this.abortable.abort("Failed to get a list of queues for region server: "
-          + this.myQueuesZnode, e);
-    }
-    return listOfQueues == null ? new ArrayList<>() : listOfQueues;
-  }
-
-  /**
-   * It "atomically" copies one peer's wals queue from another dead region server and returns them
-   * all sorted. The new peer id is equal to the old peer id appended with the dead server's znode.
-   * @param znode pertaining to the region server to copy the queues from
-   * @peerId peerId pertaining to the queue need to be copied
-   */
-  private Pair<String, SortedSet<String>> moveQueueUsingMulti(String znode, String peerId) {
-    try {
-      // hbase/replication/rs/deadrs
-      String deadRSZnodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
-      List<ZKUtilOp> listOfOps = new ArrayList<>();
-      ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
-
-      String newPeerId = peerId + "-" + znode;
-      String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId);
-      // check the logs queue for the old peer cluster
-      String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId);
-      List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
-
-      if (!peerExists(replicationQueueInfo.getPeerId())) {
-        LOG.warn("Peer " + replicationQueueInfo.getPeerId() +
-                " didn't exist, will move its queue to avoid the failure of multi op");
-        for (String wal : wals) {
-          String oldWalZnode = ZKUtil.joinZNode(oldClusterZnode, wal);
-          listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode));
-        }
-        listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
-        ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
-        return null;
-      }
-
-      SortedSet<String> logQueue = new TreeSet<>();
-      if (wals == null || wals.isEmpty()) {
-        listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
-      } else {
-        // create the new cluster znode
-        ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY);
-        listOfOps.add(op);
-        // get the offset of the logs and set it to new znodes
-        for (String wal : wals) {
-          String oldWalZnode = ZKUtil.joinZNode(oldClusterZnode, wal);
-          byte[] logOffset = ZKUtil.getData(this.zookeeper, oldWalZnode);
-          LOG.debug("Creating " + wal + " with data " + Bytes.toString(logOffset));
-          String newLogZnode = ZKUtil.joinZNode(newPeerZnode, wal);
-          listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset));
-          listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode));
-          logQueue.add(wal);
-        }
-        // add delete op for peer
-        listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
-
-        if (LOG.isTraceEnabled())
-          LOG.trace(" The multi list size is: " + listOfOps.size());
-      }
-      ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
-
-      LOG.info("Atomically moved " + znode + "/" + peerId + "'s WALs to my queue");
-      return new Pair<>(newPeerId, logQueue);
-    } catch (KeeperException e) {
-      // Multi call failed; it looks like some other regionserver took away the logs.
-      LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
-    } catch (InterruptedException e) {
-      LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
-      Thread.currentThread().interrupt();
-    }
-    return null;
-  }
-
-  @Override
-  public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
-      throws ReplicationException {
-    String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
-    boolean debugEnabled = LOG.isDebugEnabled();
-    if (debugEnabled) {
-      LOG.debug("Adding hfile references " + pairs + " in queue " + peerZnode);
-    }
-
-    int size = pairs.size();
-    List<ZKUtilOp> listOfOps = new ArrayList<>(size);
-
-    for (int i = 0; i < size; i++) {
-      listOfOps.add(ZKUtilOp.createAndFailSilent(
-        ZKUtil.joinZNode(peerZnode, pairs.get(i).getSecond().getName()),
-        HConstants.EMPTY_BYTE_ARRAY));
-    }
-    if (debugEnabled) {
-      LOG.debug(" The multi list size for adding hfile references in zk for node " + peerZnode
-          + " is " + listOfOps.size());
-    }
-    try {
-      ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
-    } catch (KeeperException e) {
-      throw new ReplicationException("Failed to create hfile reference znode=" + e.getPath(), e);
-    }
-  }
-
-  @Override
-  public void removeHFileRefs(String peerId, List<String> files) {
-    String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
-    boolean debugEnabled = LOG.isDebugEnabled();
-    if (debugEnabled) {
-      LOG.debug("Removing hfile references " + files + " from queue " + peerZnode);
-    }
-
-    int size = files.size();
-    List<ZKUtilOp> listOfOps = new ArrayList<>(size);
-
-    for (int i = 0; i < size; i++) {
-      listOfOps.add(ZKUtilOp.deleteNodeFailSilent(ZKUtil.joinZNode(peerZnode, files.get(i))));
-    }
-    if (debugEnabled) {
-      LOG.debug(" The multi list size for removing hfile references in zk for node " + peerZnode
-          + " is " + listOfOps.size());
-    }
-    try {
-      ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
-    } catch (KeeperException e) {
-      LOG.error("Failed to remove hfile reference znode=" + e.getPath(), e);
-    }
-  }
-
-  @Override
-  public void addPeerToHFileRefs(String peerId) throws ReplicationException {
-    String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
-    try {
-      if (ZKUtil.checkExists(this.zookeeper, peerZnode) == -1) {
-        LOG.info("Adding peer " + peerId + " to hfile reference queue.");
-        ZKUtil.createWithParents(this.zookeeper, peerZnode);
-      }
-    } catch (KeeperException e) {
-      throw new ReplicationException("Failed to add peer " + peerId + " to hfile reference queue.",
-          e);
-    }
-  }
-
-  @Override
-  public void removePeerFromHFileRefs(String peerId) {
-    final String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
-    try {
-      if (ZKUtil.checkExists(this.zookeeper, peerZnode) == -1) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Peer " + peerZnode + " not found in hfile reference queue.");
-        }
-        return;
-      } else {
-        LOG.info("Removing peer " + peerZnode + " from hfile reference queue.");
-        ZKUtil.deleteNodeRecursively(this.zookeeper, peerZnode);
-      }
-    } catch (KeeperException e) {
-      LOG.error("Ignoring the exception to remove peer " + peerId + " from hfile reference queue.",
-        e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
deleted file mode 100644
index 47b780d..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
-
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
-import org.apache.hadoop.hbase.zookeeper.ZKConfig;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.KeeperException;
-
-
-/**
- * This is a base class for maintaining replication state in zookeeper.
- */
-@InterfaceAudience.Private
-public abstract class ReplicationStateZKBase {
-
-  /**
-   * The name of the znode that contains the replication status of a remote slave (i.e. peer)
-   * cluster.
-   */
-  protected final String peerStateNodeName;
-  /** The name of the base znode that contains all replication state. */
-  protected final String replicationZNode;
-  /** The name of the znode that contains a list of all remote slave (i.e. peer) clusters. */
-  protected final String peersZNode;
-  /** The name of the znode that contains all replication queues */
-  protected final String queuesZNode;
-  /** The name of the znode that contains queues of hfile references to be replicated */
-  protected final String hfileRefsZNode;
-  /** The cluster key of the local cluster */
-  protected final String ourClusterKey;
-  /** The name of the znode that contains tableCFs */
-  protected final String tableCFsNodeName;
-
-  protected final ZooKeeperWatcher zookeeper;
-  protected final Configuration conf;
-  protected final Abortable abortable;
-
-  // Public for testing
-  public static final byte[] ENABLED_ZNODE_BYTES =
-      toByteArray(ReplicationProtos.ReplicationState.State.ENABLED);
-  public static final byte[] DISABLED_ZNODE_BYTES =
-      toByteArray(ReplicationProtos.ReplicationState.State.DISABLED);
-  public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY =
-      "zookeeper.znode.replication.hfile.refs";
-  public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs";
-
-  public ReplicationStateZKBase(ZooKeeperWatcher zookeeper, Configuration conf,
-      Abortable abortable) {
-    this.zookeeper = zookeeper;
-    this.conf = conf;
-    this.abortable = abortable;
-
-    String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
-    String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
-    String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
-    String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
-      ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
-    this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
-    this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs");
-    this.ourClusterKey = ZKConfig.getZooKeeperClusterKey(this.conf);
-    this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.znodePaths.baseZNode,
-      replicationZNodeName);
-    this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
-    this.queuesZNode = ZKUtil.joinZNode(replicationZNode, queuesZNodeName);
-    this.hfileRefsZNode = ZKUtil.joinZNode(replicationZNode, hfileRefsZNodeName);
-  }
-
-  public List<String> getListOfReplicators() {
-    List<String> result = null;
-    try {
-      result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.queuesZNode);
-    } catch (KeeperException e) {
-      this.abortable.abort("Failed to get list of replicators", e);
-    }
-    return result;
-  }
-
-  /**
-   * @param state
-   * @return Serialized protobuf of <code>state</code> with pb magic prefix prepended suitable for
-   *         use as content of a peer-state znode under a peer cluster id as in
-   *         /hbase/replication/peers/PEER_ID/peer-state.
-   */
-  protected static byte[] toByteArray(final ReplicationProtos.ReplicationState.State state) {
-    ReplicationProtos.ReplicationState msg =
-        ReplicationProtos.ReplicationState.newBuilder().setState(state).build();
-    // There is no toByteArray on this pb Message?
-    // 32 bytes is default which seems fair enough here.
-    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-      CodedOutputStream cos = CodedOutputStream.newInstance(baos, 16);
-      msg.writeTo(cos);
-      cos.flush();
-      baos.flush();
-      return ProtobufUtil.prependPBMagic(baos.toByteArray());
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  protected boolean peerExists(String id) throws KeeperException {
-    return ZKUtil.checkExists(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
-  }
-
-  /**
-   * Determine if a ZK path points to a peer node.
-   * @param path path to be checked
-   * @return true if the path points to a peer node, otherwise false
-   */
-  protected boolean isPeerPath(String path) {
-    return path.split("/").length == peersZNode.split("/").length + 1;
-  }
-
-  @VisibleForTesting
-  protected String getTableCFsNode(String id) {
-    return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.tableCFsNodeName));
-  }
-
-  @VisibleForTesting
-  protected String getPeerStateNode(String id) {
-    return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
-  }
-  @VisibleForTesting
-  protected String getPeerNode(String id) {
-    return ZKUtil.joinZNode(this.peersZNode, id);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java
deleted file mode 100644
index fa1d2d6..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java
+++ /dev/null
@@ -1,441 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.hadoop.hbase.replication;
-
-import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.NamespaceDescriptor;
-import org.apache.hadoop.hbase.TableExistsException;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.filter.CompareFilter;
-import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
-import org.apache.hadoop.hbase.regionserver.BloomType;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.RetryCounter;
-import org.apache.hadoop.hbase.util.RetryCounterFactory;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-/*
- * Abstract class that provides an interface to the Replication Table. Which is currently
- * being used for WAL offset tracking.
- * The basic schema of this table will store each individual queue as a
- * seperate row. The row key will be a unique identifier of the creating server's name and the
- * queueId. Each queue must have the following two columns:
- *  COL_QUEUE_OWNER: tracks which server is currently responsible for tracking the queue
- *  COL_QUEUE_OWNER_HISTORY: a "|" delimited list of the previous server's that have owned this
- *    queue. The most recent previous owner is the leftmost entry.
- * They will also have columns mapping [WAL filename : offset]
- * The most flexible method of interacting with the Replication Table is by calling
- * getOrBlockOnReplicationTable() which will return a new copy of the Replication Table. It is up
- * to the caller to close the returned table.
- */
-@InterfaceAudience.Private
-abstract class ReplicationTableBase {
-
-  /** Name of the HBase Table used for tracking replication*/
-  public static final TableName REPLICATION_TABLE_NAME =
-    TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication");
-
-  // Column family and column names for Queues in the Replication Table
-  public static final byte[] CF_QUEUE = Bytes.toBytes("q");
-  public static final byte[] COL_QUEUE_OWNER = Bytes.toBytes("o");
-  public static final byte[] COL_QUEUE_OWNER_HISTORY = Bytes.toBytes("h");
-
-  // Column Descriptor for the Replication Table
-  private static final HColumnDescriptor REPLICATION_COL_DESCRIPTOR =
-    new HColumnDescriptor(CF_QUEUE).setMaxVersions(1)
-      .setInMemory(true)
-      .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
-        // TODO: Figure out which bloom filter to use
-      .setBloomFilterType(BloomType.NONE);
-
-  // The value used to delimit the queueId and server name inside of a queue's row key. Currently a
-  // hyphen, because it is guaranteed that queueId (which is a cluster id) cannot contain hyphens.
-  // See HBASE-11394.
-  public static final String ROW_KEY_DELIMITER = "-";
-
-  // The value used to delimit server names in the queue history list
-  public static final String QUEUE_HISTORY_DELIMITER = "|";
-
-  /*
-  * Make sure that HBase table operations for replication have a high number of retries. This is
-  * because the server is aborted if any HBase table operation fails. Each RPC will be attempted
-  * 3600 times before exiting. This provides each operation with 2 hours of retries
-  * before the server is aborted.
-  */
-  private static final int CLIENT_RETRIES = 3600;
-  private static final int RPC_TIMEOUT = 2000;
-  private static final int OPERATION_TIMEOUT = CLIENT_RETRIES * RPC_TIMEOUT;
-
-  // We only need a single thread to initialize the Replication Table
-  private static final int NUM_INITIALIZE_WORKERS = 1;
-
-  protected final Configuration conf;
-  protected final Abortable abortable;
-  private final Connection connection;
-  private final Executor executor;
-  private volatile CountDownLatch replicationTableInitialized;
-
-  public ReplicationTableBase(Configuration conf, Abortable abort) throws IOException {
-    this.conf = new Configuration(conf);
-    this.abortable = abort;
-    decorateConf();
-    this.connection = ConnectionFactory.createConnection(this.conf);
-    this.executor = setUpExecutor();
-    this.replicationTableInitialized = new CountDownLatch(1);
-    createReplicationTableInBackground();
-  }
-
-  /**
-   * Modify the connection's config so that operations run on the Replication Table have longer and
-   * a larger number of retries
-   */
-  private void decorateConf() {
-    this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES);
-  }
-
-  /**
-   * Sets up the thread pool executor used to build the Replication Table in the background
-   * @return the configured executor
-   */
-  private Executor setUpExecutor() {
-    ThreadPoolExecutor tempExecutor = new ThreadPoolExecutor(NUM_INITIALIZE_WORKERS,
-        NUM_INITIALIZE_WORKERS, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
-    ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
-    tfb.setNameFormat("ReplicationTableExecutor-%d");
-    tfb.setDaemon(true);
-    tempExecutor.setThreadFactory(tfb.build());
-    return tempExecutor;
-  }
-
-  /**
-   * Get whether the Replication Table has been successfully initialized yet
-   * @return whether the Replication Table is initialized
-   */
-  public boolean getInitializationStatus() {
-    return replicationTableInitialized.getCount() == 0;
-  }
-
-  /**
-   * Increases the RPC and operations timeouts for the Replication Table
-   */
-  private Table setReplicationTableTimeOuts(Table replicationTable) {
-    replicationTable.setRpcTimeout(RPC_TIMEOUT);
-    replicationTable.setOperationTimeout(OPERATION_TIMEOUT);
-    return replicationTable;
-  }
-
-  /**
-   * Build the row key for the given queueId. This will uniquely identify it from all other queues
-   * in the cluster.
-   * @param serverName The owner of the queue
-   * @param queueId String identifier of the queue
-   * @return String representation of the queue's row key
-   */
-  protected String buildQueueRowKey(String serverName, String queueId) {
-    return queueId + ROW_KEY_DELIMITER + serverName;
-  }
-
-  /**
-   * Parse the original queueId from a row key
-   * @param rowKey String representation of a queue's row key
-   * @return the original queueId
-   */
-  protected String getRawQueueIdFromRowKey(String rowKey) {
-    return rowKey.split(ROW_KEY_DELIMITER)[0];
-  }
-
-  /**
-   * Returns a queue's row key given either its raw or reclaimed queueId
-   *
-   * @param queueId queueId of the queue
-   * @return byte representation of the queue's row key
-   */
-  protected byte[] queueIdToRowKey(String serverName, String queueId) {
-    // Cluster id's are guaranteed to have no hyphens, so if the passed in queueId has no hyphen
-    // then this is not a reclaimed queue.
-    if (!queueId.contains(ROW_KEY_DELIMITER)) {
-      return Bytes.toBytes(buildQueueRowKey(serverName, queueId));
-      // If the queueId contained some hyphen it was reclaimed. In this case, the queueId is the
-      // queue's row key
-    } else {
-      return Bytes.toBytes(queueId);
-    }
-  }
-
-  /**
-   * Creates a "|" delimited record of the queue's past region server owners.
-   *
-   * @param originalHistory the queue's original owner history
-   * @param oldServer the name of the server that used to own the queue
-   * @return the queue's new owner history
-   */
-  protected String buildClaimedQueueHistory(String originalHistory, String oldServer) {
-    return oldServer + QUEUE_HISTORY_DELIMITER + originalHistory;
-  }
-
-  /**
-   * Get a list of all region servers that have outstanding replication queues. These servers could
-   * be alive, dead or from a previous run of the cluster.
-   * @return a list of server names
-   */
-  protected List<String> getListOfReplicators() {
-    // scan all of the queues and return a list of all unique OWNER values
-    Set<String> peerServers = new HashSet<>();
-    ResultScanner allQueuesInCluster = null;
-    try (Table replicationTable = getOrBlockOnReplicationTable()){
-      Scan scan = new Scan();
-      scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
-      allQueuesInCluster = replicationTable.getScanner(scan);
-      for (Result queue : allQueuesInCluster) {
-        peerServers.add(Bytes.toString(queue.getValue(CF_QUEUE, COL_QUEUE_OWNER)));
-      }
-    } catch (IOException e) {
-      String errMsg = "Failed getting list of replicators";
-      abortable.abort(errMsg, e);
-    } finally {
-      if (allQueuesInCluster != null) {
-        allQueuesInCluster.close();
-      }
-    }
-    return new ArrayList<>(peerServers);
-  }
-
-  protected List<String> getAllQueues(String serverName) {
-    List<String> allQueues = new ArrayList<>();
-    ResultScanner queueScanner = null;
-    try {
-      queueScanner = getQueuesBelongingToServer(serverName);
-      for (Result queue : queueScanner) {
-        String rowKey =  Bytes.toString(queue.getRow());
-        // If the queue does not have a Owner History, then we must be its original owner. So we
-        // want to return its queueId in raw form
-        if (Bytes.toString(queue.getValue(CF_QUEUE, COL_QUEUE_OWNER_HISTORY)).length() == 0) {
-          allQueues.add(getRawQueueIdFromRowKey(rowKey));
-        } else {
-          allQueues.add(rowKey);
-        }
-      }
-      return allQueues;
-    } catch (IOException e) {
-      String errMsg = "Failed getting list of all replication queues for serverName=" + serverName;
-      abortable.abort(errMsg, e);
-      return null;
-    } finally {
-      if (queueScanner != null) {
-        queueScanner.close();
-      }
-    }
-  }
-
-  protected List<String> getLogsInQueue(String serverName, String queueId) {
-    String rowKey = queueId;
-    if (!queueId.contains(ROW_KEY_DELIMITER)) {
-      rowKey = buildQueueRowKey(serverName, queueId);
-    }
-    return getLogsInQueue(Bytes.toBytes(rowKey));
-  }
-
-  protected List<String> getLogsInQueue(byte[] rowKey) {
-    String errMsg = "Failed getting logs in queue queueId=" + Bytes.toString(rowKey);
-    try (Table replicationTable = getOrBlockOnReplicationTable()) {
-      Get getQueue = new Get(rowKey);
-      Result queue = replicationTable.get(getQueue);
-      if (queue == null || queue.isEmpty()) {
-        abortable.abort(errMsg, new ReplicationException(errMsg));
-        return null;
-      }
-      return readWALsFromResult(queue);
-    } catch (IOException e) {
-      abortable.abort(errMsg, e);
-      return null;
-    }
-  }
-
-  /**
-   * Read all of the WAL's from a queue into a list
-   *
-   * @param queue HBase query result containing the queue
-   * @return a list of all the WAL filenames
-   */
-  protected List<String> readWALsFromResult(Result queue) {
-    List<String> wals = new ArrayList<>();
-    Map<byte[], byte[]> familyMap = queue.getFamilyMap(CF_QUEUE);
-    for (byte[] cQualifier : familyMap.keySet()) {
-      // Ignore the meta data fields of the queue
-      if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || Arrays.equals(cQualifier,
-          COL_QUEUE_OWNER_HISTORY)) {
-        continue;
-      }
-      wals.add(Bytes.toString(cQualifier));
-    }
-    return wals;
-  }
-
-  /**
-   * Get the queue id's and meta data (Owner and History) for the queues belonging to the named
-   * server
-   *
-   * @param server name of the server
-   * @return a ResultScanner over the QueueIds belonging to the server
-   * @throws IOException
-   */
-  protected ResultScanner getQueuesBelongingToServer(String server) throws IOException {
-    Scan scan = new Scan();
-    SingleColumnValueFilter filterMyQueues = new SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER,
-      CompareFilter.CompareOp.EQUAL, Bytes.toBytes(server));
-    scan.setFilter(filterMyQueues);
-    scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
-    scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY);
-    try (Table replicationTable = getOrBlockOnReplicationTable()) {
-      ResultScanner results = replicationTable.getScanner(scan);
-      return results;
-    }
-  }
-
-  /**
-   * Attempts to acquire the Replication Table. This operation will block until it is assigned by
-   * the CreateReplicationWorker thread. It is up to the caller of this method to close the
-   * returned Table
-   * @return the Replication Table when it is created
-   * @throws IOException
-   */
-  protected Table getOrBlockOnReplicationTable() throws IOException {
-    // Sleep until the Replication Table becomes available
-    try {
-      replicationTableInitialized.await();
-    } catch (InterruptedException e) {
-      String errMsg = "Unable to acquire the Replication Table due to InterruptedException: " +
-          e.getMessage();
-      throw new InterruptedIOException(errMsg);
-    }
-    return getAndSetUpReplicationTable();
-  }
-
-  /**
-   * Creates a new copy of the Replication Table and sets up the proper Table time outs for it
-   *
-   * @return the Replication Table
-   * @throws IOException
-   */
-  private Table getAndSetUpReplicationTable() throws IOException {
-    Table replicationTable = connection.getTable(REPLICATION_TABLE_NAME);
-    setReplicationTableTimeOuts(replicationTable);
-    return replicationTable;
-  }
-
-  /**
-   * Builds the Replication Table in a background thread. Any method accessing the Replication Table
-   * should do so through getOrBlockOnReplicationTable()
-   *
-   * @return the Replication Table
-   * @throws IOException if the Replication Table takes too long to build
-   */
-  private void createReplicationTableInBackground() throws IOException {
-    executor.execute(new CreateReplicationTableWorker());
-  }
-
-  /**
-   * Attempts to build the Replication Table. Will continue blocking until we have a valid
-   * Table for the Replication Table.
-   */
-  private class CreateReplicationTableWorker implements Runnable {
-
-    private Admin admin;
-
-    @Override
-    public void run() {
-      try {
-        admin = connection.getAdmin();
-        if (!replicationTableExists()) {
-          createReplicationTable();
-        }
-        int maxRetries = conf.getInt("hbase.replication.queues.createtable.retries.number",
-            CLIENT_RETRIES);
-        RetryCounterFactory counterFactory = new RetryCounterFactory(maxRetries, RPC_TIMEOUT);
-        RetryCounter retryCounter = counterFactory.create();
-        while (!replicationTableExists()) {
-          retryCounter.sleepUntilNextRetry();
-          if (!retryCounter.shouldRetry()) {
-            throw new IOException("Unable to acquire the Replication Table");
-          }
-        }
-        replicationTableInitialized.countDown();
-      } catch (IOException | InterruptedException e) {
-        abortable.abort("Failed building Replication Table", e);
-      }
-    }
-
-    /**
-     * Create the replication table with the provided HColumnDescriptor REPLICATION_COL_DESCRIPTOR
-     * in TableBasedReplicationQueuesImpl
-     *
-     * @throws IOException
-     */
-    private void createReplicationTable() throws IOException {
-      HTableDescriptor replicationTableDescriptor = new HTableDescriptor(REPLICATION_TABLE_NAME);
-      replicationTableDescriptor.addFamily(REPLICATION_COL_DESCRIPTOR);
-      try {
-        admin.createTable(replicationTableDescriptor);
-      } catch (TableExistsException e) {
-        // In this case we can just continue as normal
-      }
-    }
-
-    /**
-     * Checks whether the Replication Table exists yet
-     *
-     * @return whether the Replication Table exists
-     * @throws IOException
-     */
-    private boolean replicationTableExists() {
-      try {
-        return admin.tableExists(REPLICATION_TABLE_NAME);
-      } catch (IOException e) {
-        return false;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java
deleted file mode 100644
index 51d7473..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTracker.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.util.List;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-/**
- * This is the interface for a Replication Tracker. A replication tracker provides the facility to
- * subscribe and track events that reflect a change in replication state. These events are used by
- * the ReplicationSourceManager to coordinate replication tasks such as addition/deletion of queues
- * and queue failover. These events are defined in the ReplicationListener interface. If a class
- * would like to listen to replication events it must implement the ReplicationListener interface
- * and register itself with a Replication Tracker.
- */
-@InterfaceAudience.Private
-public interface ReplicationTracker {
-
-  /**
-   * Register a replication listener to receive replication events.
-   * @param listener
-   */
-  public void registerListener(ReplicationListener listener);
-
-  public void removeListener(ReplicationListener listener);
-
-  /**
-   * Returns a list of other live region servers in the cluster.
-   * @return List of region servers.
-   */
-  public List<String> getListOfRegionServers();
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java
deleted file mode 100644
index 9865d83..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.KeeperException;
-
-/**
- * This class is a ZooKeeper implementation of the ReplicationTracker interface. This class is
- * responsible for handling replication events that are defined in the ReplicationListener
- * interface.
- */
-@InterfaceAudience.Private
-public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements ReplicationTracker {
-
-  private static final Log LOG = LogFactory.getLog(ReplicationTrackerZKImpl.class);
-  // All about stopping
-  private final Stoppable stopper;
-  // listeners to be notified
-  private final List<ReplicationListener> listeners = new CopyOnWriteArrayList<>();
-  // List of all the other region servers in this cluster
-  private final ArrayList<String> otherRegionServers = new ArrayList<>();
-  private final ReplicationPeers replicationPeers;
-
-  public ReplicationTrackerZKImpl(ZooKeeperWatcher zookeeper,
-      final ReplicationPeers replicationPeers, Configuration conf, Abortable abortable,
-      Stoppable stopper) {
-    super(zookeeper, conf, abortable);
-    this.replicationPeers = replicationPeers;
-    this.stopper = stopper;
-    this.zookeeper.registerListener(new OtherRegionServerWatcher(this.zookeeper));
-    this.zookeeper.registerListener(new PeersWatcher(this.zookeeper));
-  }
-
-  @Override
-  public void registerListener(ReplicationListener listener) {
-    listeners.add(listener);
-  }
-
-  @Override
-  public void removeListener(ReplicationListener listener) {
-    listeners.remove(listener);
-  }
-
-  /**
-   * Return a snapshot of the current region servers.
-   */
-  @Override
-  public List<String> getListOfRegionServers() {
-    refreshOtherRegionServersList();
-
-    List<String> list = null;
-    synchronized (otherRegionServers) {
-      list = new ArrayList<>(otherRegionServers);
-    }
-    return list;
-  }
-
-  /**
-   * Watcher used to be notified of the other region server's death in the local cluster. It
-   * initiates the process to transfer the queues if it is able to grab the lock.
-   */
-  public class OtherRegionServerWatcher extends ZooKeeperListener {
-
-    /**
-     * Construct a ZooKeeper event listener.
-     */
-    public OtherRegionServerWatcher(ZooKeeperWatcher watcher) {
-      super(watcher);
-    }
-
-    /**
-     * Called when a new node has been created.
-     * @param path full path of the new node
-     */
-    public void nodeCreated(String path) {
-      refreshListIfRightPath(path);
-    }
-
-    /**
-     * Called when a node has been deleted
-     * @param path full path of the deleted node
-     */
-    public void nodeDeleted(String path) {
-      if (stopper.isStopped()) {
-        return;
-      }
-      boolean cont = refreshListIfRightPath(path);
-      if (!cont) {
-        return;
-      }
-      LOG.info(path + " znode expired, triggering replicatorRemoved event");
-      for (ReplicationListener rl : listeners) {
-        rl.regionServerRemoved(getZNodeName(path));
-      }
-    }
-
-    /**
-     * Called when an existing node has a child node added or removed.
-     * @param path full path of the node whose children have changed
-     */
-    public void nodeChildrenChanged(String path) {
-      if (stopper.isStopped()) {
-        return;
-      }
-      refreshListIfRightPath(path);
-    }
-
-    private boolean refreshListIfRightPath(String path) {
-      if (!path.startsWith(this.watcher.znodePaths.rsZNode)) {
-        return false;
-      }
-      return refreshOtherRegionServersList();
-    }
-  }
-
-  /**
-   * Watcher used to follow the creation and deletion of peer clusters.
-   */
-  public class PeersWatcher extends ZooKeeperListener {
-
-    /**
-     * Construct a ZooKeeper event listener.
-     */
-    public PeersWatcher(ZooKeeperWatcher watcher) {
-      super(watcher);
-    }
-
-    /**
-     * Called when a node has been deleted
-     * @param path full path of the deleted node
-     */
-    public void nodeDeleted(String path) {
-      List<String> peers = refreshPeersList(path);
-      if (peers == null) {
-        return;
-      }
-      if (isPeerPath(path)) {
-        String id = getZNodeName(path);
-        LOG.info(path + " znode expired, triggering peerRemoved event");
-        for (ReplicationListener rl : listeners) {
-          rl.peerRemoved(id);
-        }
-      }
-    }
-
-    /**
-     * Called when an existing node has a child node added or removed.
-     * @param path full path of the node whose children have changed
-     */
-    public void nodeChildrenChanged(String path) {
-      List<String> peers = refreshPeersList(path);
-      if (peers == null) {
-        return;
-      }
-      LOG.info(path + " znode expired, triggering peerListChanged event");
-      for (ReplicationListener rl : listeners) {
-        rl.peerListChanged(peers);
-      }
-    }
-  }
-
-  /**
-   * Verify if this event is meant for us, and if so then get the latest peers' list from ZK. Also
-   * reset the watches.
-   * @param path path to check against
-   * @return A list of peers' identifiers if the event concerns this watcher, else null.
-   */
-  private List<String> refreshPeersList(String path) {
-    if (!path.startsWith(getPeersZNode())) {
-      return null;
-    }
-    return this.replicationPeers.getAllPeerIds();
-  }
-
-  private String getPeersZNode() {
-    return this.peersZNode;
-  }
-
-  /**
-   * Extracts the znode name of a peer cluster from a ZK path
-   * @param fullPath Path to extract the id from
-   * @return the id or an empty string if path is invalid
-   */
-  private String getZNodeName(String fullPath) {
-    String[] parts = fullPath.split("/");
-    return parts.length > 0 ? parts[parts.length - 1] : "";
-  }
-
-  /**
-   * Reads the list of region servers from ZK and atomically clears our local view of it and
-   * replaces it with the updated list.
-   * @return true if the local list of the other region servers was updated with the ZK data (even
-   *         if it was empty), false if the data was missing in ZK
-   */
-  private boolean refreshOtherRegionServersList() {
-    List<String> newRsList = getRegisteredRegionServers();
-    if (newRsList == null) {
-      return false;
-    } else {
-      synchronized (otherRegionServers) {
-        otherRegionServers.clear();
-        otherRegionServers.addAll(newRsList);
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Get a list of all the other region servers in this cluster and set a watch
-   * @return a list of server nanes
-   */
-  private List<String> getRegisteredRegionServers() {
-    List<String> result = null;
-    try {
-      result = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.zookeeper.znodePaths.rsZNode);
-    } catch (KeeperException e) {
-      this.abortable.abort("Get list of registered region servers", e);
-    }
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java
deleted file mode 100644
index 3507547..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.zookeeper.KeeperException;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Implements the ReplicationQueuesClient interface on top of the Replication Table. It utilizes
- * the ReplicationTableBase to access the Replication Table.
- */
-@InterfaceAudience.Private
-public class TableBasedReplicationQueuesClientImpl extends ReplicationTableBase
-  implements ReplicationQueuesClient {
-
-  public TableBasedReplicationQueuesClientImpl(ReplicationQueuesClientArguments args)
-    throws IOException {
-    super(args.getConf(), args.getAbortable());
-  }
-  public TableBasedReplicationQueuesClientImpl(Configuration conf,
-                                               Abortable abortable) throws IOException {
-    super(conf, abortable);
-  }
-
-  @Override
-  public void init() throws ReplicationException{
-    // no-op
-  }
-
-  @Override
-  public List<String> getListOfReplicators() {
-    return super.getListOfReplicators();
-  }
-
-  @Override
-  public List<String> getLogsInQueue(String serverName, String queueId) {
-    return super.getLogsInQueue(serverName, queueId);
-  }
-
-  @Override
-  public List<String> getAllQueues(String serverName) {
-    return super.getAllQueues(serverName);
-  }
-
-  @Override
-  public Set<String> getAllWALs() {
-    Set<String> allWals = new HashSet<>();
-    ResultScanner allQueues = null;
-    try (Table replicationTable = getOrBlockOnReplicationTable()) {
-      allQueues = replicationTable.getScanner(new Scan());
-      for (Result queue : allQueues) {
-        for (String wal : readWALsFromResult(queue)) {
-          allWals.add(wal);
-        }
-      }
-    } catch (IOException e) {
-      String errMsg = "Failed getting all WAL's in Replication Table";
-      abortable.abort(errMsg, e);
-    } finally {
-      if (allQueues != null) {
-        allQueues.close();
-      }
-    }
-    return allWals;
-  }
-
-  @Override
-  public int getHFileRefsNodeChangeVersion() throws KeeperException {
-    // TODO
-    throw new NotImplementedException();
-  }
-
-  @Override
-  public List<String> getAllPeersFromHFileRefsQueue() throws KeeperException {
-    // TODO
-    throw new NotImplementedException();
-  }
-
-  @Override
-  public List<String> getReplicableHFiles(String peerId) throws KeeperException {
-    // TODO
-    throw new NotImplementedException();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
deleted file mode 100644
index bf55e8c..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
+++ /dev/null
@@ -1,450 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.hadoop.hbase.replication;
-
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.RowMutations;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.filter.CompareFilter;
-import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.KeeperException;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-/**
- * This class provides an implementation of the ReplicationQueues interface using an HBase table
- * "Replication Table". It utilizes the ReplicationTableBase to access the Replication Table.
- */
-@InterfaceAudience.Private
-public class TableBasedReplicationQueuesImpl extends ReplicationTableBase
-  implements ReplicationQueues {
-
-  private static final Log LOG = LogFactory.getLog(TableBasedReplicationQueuesImpl.class);
-
-  // Common byte values used in replication offset tracking
-  private static final byte[] INITIAL_OFFSET_BYTES = Bytes.toBytes(0L);
-  private static final byte[] EMPTY_STRING_BYTES = Bytes.toBytes("");
-
-  private String serverName = null;
-  private byte[] serverNameBytes = null;
-
-  // TODO: Only use this variable temporarily. Eventually we want to use HBase to store all
-  // TODO: replication information
-  private ReplicationStateZKBase replicationState;
-
-  public TableBasedReplicationQueuesImpl(ReplicationQueuesArguments args) throws IOException {
-    this(args.getConf(), args.getAbortable(), args.getZk());
-  }
-
-  public TableBasedReplicationQueuesImpl(Configuration conf, Abortable abort, ZooKeeperWatcher zkw)
-    throws IOException {
-    super(conf, abort);
-    replicationState = new ReplicationStateZKBase(zkw, conf, abort) {};
-  }
-
-  @Override
-  public void init(String serverName) throws ReplicationException {
-    this.serverName = serverName;
-    this.serverNameBytes = Bytes.toBytes(serverName);
-  }
-
-  @Override
-  public List<String> getListOfReplicators() {
-    return super.getListOfReplicators();
-  }
-
-  @Override
-  public void removeQueue(String queueId) {
-    try {
-      byte[] rowKey = queueIdToRowKey(queueId);
-      if (checkQueueExists(queueId)) {
-        Delete deleteQueue = new Delete(rowKey);
-        safeQueueUpdate(deleteQueue);
-      } else {
-        LOG.info("No logs were registered for queue id=" + queueId + " so no rows were removed " +
-            "from the replication table while removing the queue");
-      }
-    } catch (IOException | ReplicationException e) {
-      String errMsg = "Failed removing queue queueId=" + queueId;
-      abortable.abort(errMsg, e);
-    }
-  }
-
-  @Override
-  public void addLog(String queueId, String filename) throws ReplicationException {
-    try (Table replicationTable = getOrBlockOnReplicationTable()) {
-      if (!checkQueueExists(queueId)) {
-        // Each queue will have an Owner, OwnerHistory, and a collection of [WAL:offset] key values
-        Put putNewQueue = new Put(Bytes.toBytes(buildQueueRowKey(queueId)));
-        putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER, serverNameBytes);
-        putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY, EMPTY_STRING_BYTES);
-        putNewQueue.addColumn(CF_QUEUE, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES);
-        replicationTable.put(putNewQueue);
-      } else {
-        // Otherwise simply add the new log and offset as a new column
-        Put putNewLog = new Put(queueIdToRowKey(queueId));
-        putNewLog.addColumn(CF_QUEUE, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES);
-        safeQueueUpdate(putNewLog);
-      }
-    } catch (IOException | ReplicationException e) {
-      String errMsg = "Failed adding log queueId=" + queueId + " filename=" + filename;
-      abortable.abort(errMsg, e);
-    }
-  }
-
-  @Override
-  public void removeLog(String queueId, String filename) {
-    try {
-      byte[] rowKey = queueIdToRowKey(queueId);
-      Delete delete = new Delete(rowKey);
-      delete.addColumns(CF_QUEUE, Bytes.toBytes(filename));
-      safeQueueUpdate(delete);
-    } catch (IOException | ReplicationException e) {
-      String errMsg = "Failed removing log queueId=" + queueId + " filename=" + filename;
-      abortable.abort(errMsg, e);
-    }
-  }
-
-  @Override
-  public void setLogPosition(String queueId, String filename, long position) {
-    try (Table replicationTable = getOrBlockOnReplicationTable()) {
-      byte[] rowKey = queueIdToRowKey(queueId);
-      // Check that the log exists. addLog() must have been called before setLogPosition().
-      Get checkLogExists = new Get(rowKey);
-      checkLogExists.addColumn(CF_QUEUE, Bytes.toBytes(filename));
-      if (!replicationTable.exists(checkLogExists)) {
-        String errMsg = "Could not set position of non-existent log from queueId=" + queueId +
-          ", filename=" + filename;
-        abortable.abort(errMsg, new ReplicationException(errMsg));
-        return;
-      }
-      // Update the log offset if it exists
-      Put walAndOffset = new Put(rowKey);
-      walAndOffset.addColumn(CF_QUEUE, Bytes.toBytes(filename), Bytes.toBytes(position));
-      safeQueueUpdate(walAndOffset);
-    } catch (IOException | ReplicationException e) {
-      String errMsg = "Failed writing log position queueId=" + queueId + "filename=" +
-        filename + " position=" + position;
-      abortable.abort(errMsg, e);
-    }
-  }
-
-  @Override
-  public long getLogPosition(String queueId, String filename) throws ReplicationException {
-    try {
-      byte[] rowKey = queueIdToRowKey(queueId);
-      Get getOffset = new Get(rowKey);
-      getOffset.addColumn(CF_QUEUE, Bytes.toBytes(filename));
-      Result result = getResultIfOwner(getOffset);
-      if (result == null || !result.containsColumn(CF_QUEUE, Bytes.toBytes(filename))) {
-        throw new ReplicationException("Could not read empty result while getting log position " +
-          "queueId=" + queueId + ", filename=" + filename);
-      }
-      return Bytes.toLong(result.getValue(CF_QUEUE, Bytes.toBytes(filename)));
-    } catch (IOException e) {
-      throw new ReplicationException("Could not get position in log for queueId=" + queueId +
-        ", filename=" + filename);
-    }
-  }
-
-  @Override
-  public void removeAllQueues() {
-    List<String> myQueueIds = getAllQueues();
-    for (String queueId : myQueueIds) {
-      removeQueue(queueId);
-    }
-  }
-
-  @Override
-  public List<String> getLogsInQueue(String queueId) {
-    String errMsg = "Failed getting logs in queue queueId=" + queueId;
-    byte[] rowKey = queueIdToRowKey(queueId);
-    List<String> logs = new ArrayList<>();
-    try {
-      Get getQueue = new Get(rowKey);
-      Result queue = getResultIfOwner(getQueue);
-      if (queue == null || queue.isEmpty()) {
-        String errMsgLostOwnership = "Failed getting logs for queue queueId=" +
-            Bytes.toString(rowKey) + " because the queue was missing or we lost ownership";
-        abortable.abort(errMsg, new ReplicationException(errMsgLostOwnership));
-        return null;
-      }
-      Map<byte[], byte[]> familyMap = queue.getFamilyMap(CF_QUEUE);
-      for(byte[] cQualifier : familyMap.keySet()) {
-        if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || Arrays.equals(cQualifier,
-            COL_QUEUE_OWNER_HISTORY)) {
-          continue;
-        }
-        logs.add(Bytes.toString(cQualifier));
-      }
-    } catch (IOException e) {
-      abortable.abort(errMsg, e);
-      return null;
-    }
-    return logs;
-  }
-
-  @Override
-  public List<String> getAllQueues() {
-    return getAllQueues(serverName);
-  }
-
-  @Override public List<String> getUnClaimedQueueIds(String regionserver) {
-    if (isThisOurRegionServer(regionserver)) {
-      return null;
-    }
-    try (ResultScanner queuesToClaim = getQueuesBelongingToServer(regionserver)) {
-      List<String> res = new ArrayList<>();
-      for (Result queue : queuesToClaim) {
-        String rowKey = Bytes.toString(queue.getRow());
-        res.add(rowKey);
-      }
-      return res.isEmpty() ? null : res;
-    } catch (IOException e) {
-      String errMsg = "Failed getUnClaimedQueueIds";
-      abortable.abort(errMsg, e);
-    }
-    return null;
-  }
-
-  @Override public void removeReplicatorIfQueueIsEmpty(String regionserver) {
-    // Do nothing here
-  }
-
-  @Override
-  public Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId) {
-    if (isThisOurRegionServer(regionserver)) {
-      return null;
-    }
-
-    try (ResultScanner queuesToClaim = getQueuesBelongingToServer(regionserver)){
-      for (Result queue : queuesToClaim) {
-        String rowKey = Bytes.toString(queue.getRow());
-        if (!rowKey.equals(queueId)){
-          continue;
-        }
-        if (attemptToClaimQueue(queue, regionserver)) {
-          ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(rowKey);
-          if (replicationState.peerExists(replicationQueueInfo.getPeerId())) {
-            SortedSet<String> sortedLogs = new TreeSet<>();
-            List<String> logs = getLogsInQueue(queue.getRow());
-            for (String log : logs) {
-              sortedLogs.add(log);
-            }
-            LOG.info(serverName + " has claimed queue " + rowKey + " from " + regionserver);
-            return new Pair<>(rowKey, sortedLogs);
-          } else {
-            // Delete orphaned queues
-            removeQueue(Bytes.toString(queue.getRow()));
-            LOG.info(serverName + " has deleted abandoned queue " + queueId + " from " +
-              regionserver);
-          }
-        }
-      }
-    } catch (IOException | KeeperException e) {
-      String errMsg = "Failed claiming queues for regionserver=" + regionserver;
-      abortable.abort(errMsg, e);
-    }
-    return null;
-  }
-
-  @Override
-  public boolean isThisOurRegionServer(String regionserver) {
-    return this.serverName.equals(regionserver);
-  }
-
-  @Override
-  public void addPeerToHFileRefs(String peerId) throws ReplicationException {
-    // TODO
-    throw new NotImplementedException();
-  }
-
-  @Override
-  public void removePeerFromHFileRefs(String peerId) {
-    // TODO
-    throw new NotImplementedException();
-  }
-
-  @Override
-  public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
-      throws ReplicationException {
-    // TODO
-    throw new NotImplementedException();
-  }
-
-  @Override
-  public void removeHFileRefs(String peerId, List<String> files) {
-    // TODO
-    throw new NotImplementedException();
-  }
-
-  private String buildQueueRowKey(String queueId) {
-    return buildQueueRowKey(serverName, queueId);
-  }
-
-  /**
-   * Convenience method that gets the row key of the queue specified by queueId
-   * @param queueId queueId of a queue in this server
-   * @return the row key of the queue in the Replication Table
-   */
-  private byte[] queueIdToRowKey(String queueId) {
-    return queueIdToRowKey(serverName, queueId);
-  }
-
-  /**
-   * See safeQueueUpdate(RowMutations mutate)
-   *
-   * @param put Row mutation to perform on the queue
-   */
-  private void safeQueueUpdate(Put put) throws ReplicationException, IOException {
-    RowMutations mutations = new RowMutations(put.getRow());
-    mutations.add(put);
-    safeQueueUpdate(mutations);
-  }
-
-  /**
-   * See safeQueueUpdate(RowMutations mutate)
-   *
-   * @param delete Row mutation to perform on the queue
-   */
-  private void safeQueueUpdate(Delete delete) throws ReplicationException,
-    IOException{
-    RowMutations mutations = new RowMutations(delete.getRow());
-    mutations.add(delete);
-    safeQueueUpdate(mutations);
-  }
-
-  /**
-   * Attempt to mutate a given queue in the Replication Table with a checkAndPut on the OWNER column
-   * of the queue. Abort the server if this checkAndPut fails: which means we have somehow lost
-   * ownership of the column or an IO Exception has occurred during the transaction.
-   *
-   * @param mutate Mutation to perform on a given queue
-   */
-  private void safeQueueUpdate(RowMutations mutate) throws ReplicationException, IOException{
-    try (Table replicationTable = getOrBlockOnReplicationTable()) {
-      boolean updateSuccess = replicationTable.checkAndMutate(mutate.getRow(),
-          CF_QUEUE, COL_QUEUE_OWNER, CompareFilter.CompareOp.EQUAL, serverNameBytes, mutate);
-      if (!updateSuccess) {
-        throw new ReplicationException("Failed to update Replication Table because we lost queue " +
-            " ownership");
-      }
-    }
-  }
-
-  /**
-   * Check if the queue specified by queueId is stored in HBase
-   *
-   * @param queueId Either raw or reclaimed format of the queueId
-   * @return Whether the queue is stored in HBase
-   * @throws IOException
-   */
-  private boolean checkQueueExists(String queueId) throws IOException {
-    try (Table replicationTable = getOrBlockOnReplicationTable()) {
-      byte[] rowKey = queueIdToRowKey(queueId);
-      return replicationTable.exists(new Get(rowKey));
-    }
-  }
-
-  /**
-   * Attempt to claim the given queue with a checkAndPut on the OWNER column. We check that the
-   * recently killed server is still the OWNER before we claim it.
-   *
-   * @param queue The queue that we are trying to claim
-   * @param originalServer The server that originally owned the queue
-   * @return Whether we successfully claimed the queue
-   * @throws IOException
-   */
-  private boolean attemptToClaimQueue (Result queue, String originalServer) throws IOException{
-    Put putQueueNameAndHistory = new Put(queue.getRow());
-    putQueueNameAndHistory.addColumn(CF_QUEUE, COL_QUEUE_OWNER, Bytes.toBytes(serverName));
-    String newOwnerHistory = buildClaimedQueueHistory(Bytes.toString(queue.getValue(CF_QUEUE,
-      COL_QUEUE_OWNER_HISTORY)), originalServer);
-    putQueueNameAndHistory.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY,
-        Bytes.toBytes(newOwnerHistory));
-    RowMutations claimAndRenameQueue = new RowMutations(queue.getRow());
-    claimAndRenameQueue.add(putQueueNameAndHistory);
-    // Attempt to claim ownership for this queue by checking if the current OWNER is the original
-    // server. If it is not then another RS has already claimed it. If it is we set ourselves as the
-    // new owner and update the queue's history
-    try (Table replicationTable = getOrBlockOnReplicationTable()) {
-      boolean success = replicationTable.checkAndMutate(queue.getRow(),
-          CF_QUEUE, COL_QUEUE_OWNER, CompareFilter.CompareOp.EQUAL, Bytes.toBytes(originalServer),
-          claimAndRenameQueue);
-      return success;
-    }
-  }
-
-  /**
-   * Attempts to run a Get on some queue. Will only return a non-null result if we currently own
-   * the queue.
-   *
-   * @param get The Get that we want to query
-   * @return The result of the Get if this server is the owner of the queue. Else it returns null.
-   * @throws IOException
-   */
-  private Result getResultIfOwner(Get get) throws IOException {
-    Scan scan = new Scan(get);
-    // Check if the Get currently contains all columns or only specific columns
-    if (scan.getFamilyMap().size() > 0) {
-      // Add the OWNER column if the scan is already only over specific columns
-      scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
-    }
-    scan.setMaxResultSize(1);
-    SingleColumnValueFilter checkOwner = new SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER,
-      CompareFilter.CompareOp.EQUAL, serverNameBytes);
-    scan.setFilter(checkOwner);
-    ResultScanner scanner = null;
-    try (Table replicationTable = getOrBlockOnReplicationTable()) {
-      scanner = replicationTable.getScanner(scan);
-      Result result = scanner.next();
-      return (result == null || result.isEmpty()) ? null : result;
-    } finally {
-      if (scanner != null) {
-        scanner.close();
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
index dafe421..1de3652 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
@@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
 import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
@@ -1772,23 +1771,18 @@ public class ZKUtil {
    */
   private static void getReplicationZnodesDump(ZooKeeperWatcher zkw, StringBuilder sb)
       throws KeeperException {
-    String replicationZNodeName = zkw.getConfiguration().get("zookeeper.znode.replication",
-      "replication");
-    String replicationZnode = joinZNode(zkw.znodePaths.baseZNode, replicationZNodeName);
+    String replicationZnode = zkw.znodePaths.replicationZNode;
     if (ZKUtil.checkExists(zkw, replicationZnode) == -1) return;
     // do a ls -r on this znode
     sb.append("\n").append(replicationZnode).append(": ");
     List<String> children = ZKUtil.listChildrenNoWatch(zkw, replicationZnode);
     for (String child : children) {
       String znode = joinZNode(replicationZnode, child);
-      if (child.equals(zkw.getConfiguration().get("zookeeper.znode.replication.peers", "peers"))) {
+      if (znode.equals(zkw.znodePaths.peersZNode)) {
         appendPeersZnodes(zkw, znode, sb);
-      } else if (child.equals(zkw.getConfiguration().
-          get("zookeeper.znode.replication.rs", "rs"))) {
+      } else if (znode.equals(zkw.znodePaths.queuesZNode)) {
         appendRSZnodes(zkw, znode, sb);
-      } else if (child.equals(zkw.getConfiguration().get(
-        ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
-        ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT))) {
+      } else if (znode.equals(zkw.znodePaths.hfileRefsZNode)) {
         appendHFileRefsZnodes(zkw, znode, sb);
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java
index 4c66b8f..5608eb5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java
@@ -79,6 +79,15 @@ public class ZNodePaths {
   // znode of indicating master maintenance mode
   public final String masterMaintZNode;
 
+  // znode containing all replication state.
+  public final String replicationZNode;
+  // znode containing a list of all remote slave (i.e. peer) clusters.
+  public final String peersZNode;
+  // znode containing all replication queues
+  public final String queuesZNode;
+  // znode containing queues of hfile references to be replicated
+  public final String hfileRefsZNode;
+
   public ZNodePaths(Configuration conf) {
     baseZNode = conf.get(ZOOKEEPER_ZNODE_PARENT, DEFAULT_ZOOKEEPER_ZNODE_PARENT);
     ImmutableMap.Builder<Integer, String> builder = ImmutableMap.builder();
@@ -113,6 +122,15 @@ public class ZNodePaths {
       conf.get("zookeeper.znode.namespace", "namespace"));
     masterMaintZNode = ZKUtil.joinZNode(baseZNode,
       conf.get("zookeeper.znode.masterMaintenance", "master-maintenance"));
+    replicationZNode =
+        ZKUtil.joinZNode(baseZNode, conf.get("zookeeper.znode.replication", "replication"));
+    peersZNode =
+        ZKUtil.joinZNode(replicationZNode, conf.get("zookeeper.znode.replication.peers", "peers"));
+    queuesZNode =
+        ZKUtil.joinZNode(replicationZNode, conf.get("zookeeper.znode.replication.rs", "rs"));
+    hfileRefsZNode =
+        ZKUtil.joinZNode(replicationZNode,
+          conf.get("zookeeper.znode.replication.hfile.refs", "hfile-refs"));
   }
 
   @Override
@@ -125,7 +143,9 @@ public class ZNodePaths {
         + ", balancerZNode=" + balancerZNode + ", regionNormalizerZNode=" + regionNormalizerZNode
         + ", switchZNode=" + switchZNode + ", tableLockZNode=" + tableLockZNode
         + ", recoveringRegionsZNode=" + recoveringRegionsZNode + ", namespaceZNode="
-        + namespaceZNode + ", masterMaintZNode=" + masterMaintZNode + "]";
+        + namespaceZNode + ", masterMaintZNode=" + masterMaintZNode + ", replicationZNode="
+        + replicationZNode + ", peersZNode=" + peersZNode + ", queuesZNode=" + queuesZNode
+        + ", hfileRefsZNode=" + hfileRefsZNode + "]";
   }
 
   /**