You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/08/08 08:35:30 UTC

hbase git commit: HBASE-12770 Don't transfer all the queued hlogs of a dead server to the same alive server

Repository: hbase
Updated Branches:
  refs/heads/master 30d7eeaef -> e5f9df1e2


HBASE-12770 Don't transfer all the queued hlogs of a dead server to the same alive server

Signed-off-by: zhangduo <zh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e5f9df1e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e5f9df1e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e5f9df1e

Branch: refs/heads/master
Commit: e5f9df1e2394994323b4a5bfe2d7ba58aa669acd
Parents: 30d7eea
Author: Phil Yang <ud...@gmail.com>
Authored: Thu Jul 21 16:33:44 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Mon Aug 8 16:30:23 2016 +0800

----------------------------------------------------------------------
 .../hbase/replication/ReplicationQueues.java    |  25 +-
 .../replication/ReplicationQueuesZKImpl.java    | 263 ++++++++++---------
 .../TableBasedReplicationQueuesImpl.java        |  53 ++--
 .../regionserver/ReplicationSourceManager.java  |  26 +-
 .../replication/TestReplicationStateBasic.java  |  17 +-
 .../TestReplicationStateHBaseImpl.java          |  32 ++-
 .../TestReplicationSourceManager.java           |   8 +-
 .../TestReplicationSourceManagerZkImpl.java     |  28 +-
 8 files changed, 264 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e5f9df1e/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
index 0de0cc8..0ae27d0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
@@ -19,10 +19,10 @@
 package org.apache.hadoop.hbase.replication;
 
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.SortedSet;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * This provides an interface for maintaining a region server's replication queues. These queues
@@ -94,12 +94,25 @@ public interface ReplicationQueues {
   List<String> getAllQueues();
 
   /**
-   * Take ownership for the set of queues belonging to a dead region server.
+   * Get queueIds from a dead region server, whose queues has not been claimed by other region
+   * servers.
+   * @return empty if the queue exists but no children, null if the queue does not exist.
+  */
+  List<String> getUnClaimedQueueIds(String regionserver);
+
+  /**
+   * Take ownership for the queue identified by queueId and belongs to a dead region server.
    * @param regionserver the id of the dead region server
-   * @return A Map of the queues that have been claimed, including a Set of WALs in
-   *         each queue. Returns an empty map if no queues were failed-over.
+   * @param queueId the id of the queue
+   * @return the new PeerId and A SortedSet of WALs in its queue, and null if no unclaimed queue.
+   */
+  Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId);
+
+  /**
+   * Remove the znode of region server if the queue is empty.
+   * @param regionserver
    */
-  Map<String, Set<String>> claimQueues(String regionserver);
+  void removeReplicatorIfQueueIsEmpty(String regionserver);
 
   /**
    * Get a list of all region servers that have outstanding replication queues. These servers could

http://git-wip-us.apache.org/repos/asf/hbase/blob/e5f9df1e/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index 655aaae..c1e85cb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -19,11 +19,9 @@
 package org.apache.hadoop.hbase.replication;
 
 import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
@@ -36,6 +34,7 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -179,21 +178,66 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
   }
 
   @Override
-  public Map<String, Set<String>> claimQueues(String regionserverZnode) {
-    Map<String, Set<String>> newQueues = new HashMap<>();
-    // check whether there is multi support. If yes, use it.
+  public List<String> getUnClaimedQueueIds(String regionserver) {
+    if (isThisOurRegionServer(regionserver)) {
+      return null;
+    }
+    String rsZnodePath = ZKUtil.joinZNode(this.queuesZNode, regionserver);
+    List<String> queues = null;
+    try {
+      queues = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZnodePath);
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to getUnClaimedQueueIds for RS" + regionserver, e);
+    }
+    if (queues != null) {
+      queues.remove(RS_LOCK_ZNODE);
+    }
+    return queues;
+  }
+
+  @Override
+  public Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId) {
     if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {
-      LOG.info("Atomically moving " + regionserverZnode + "'s WALs to my queue");
-      newQueues = copyQueuesFromRSUsingMulti(regionserverZnode);
+      LOG.info("Atomically moving " + regionserver + "/" + queueId + "'s WALs to my queue");
+      return moveQueueUsingMulti(regionserver, queueId);
     } else {
-      LOG.info("Moving " + regionserverZnode + "'s wals to my queue");
-      if (!lockOtherRS(regionserverZnode)) {
-        return newQueues;
+      LOG.info("Moving " + regionserver + "/" + queueId + "'s wals to my queue");
+      if (!lockOtherRS(regionserver)) {
+        LOG.info("Can not take the lock now");
+        return null;
       }
-      newQueues = copyQueuesFromRS(regionserverZnode);
-      deleteAnotherRSQueues(regionserverZnode);
+      Pair<String, SortedSet<String>> newQueues;
+      try {
+        newQueues = copyQueueFromLockedRS(regionserver, queueId);
+        removeQueueFromLockedRS(regionserver, queueId);
+      } finally {
+        unlockOtherRS(regionserver);
+      }
+      return newQueues;
+    }
+  }
+
+  private void removeQueueFromLockedRS(String znode, String peerId) {
+    String nodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
+    String peerPath = ZKUtil.joinZNode(nodePath, peerId);
+    try {
+      ZKUtil.deleteNodeRecursively(this.zookeeper, peerPath);
+    } catch (KeeperException e) {
+      LOG.warn("Remove copied queue failed", e);
+    }
+  }
+
+  @Override
+  public void removeReplicatorIfQueueIsEmpty(String regionserver) {
+    String rsPath = ZKUtil.joinZNode(this.queuesZNode, regionserver);
+    try {
+      List<String> list = ZKUtil.listChildrenNoWatch(this.zookeeper, rsPath);
+      if (list != null && list.size() == 0){
+        ZKUtil.deleteNode(this.zookeeper, rsPath);
+      }
+    } catch (KeeperException e) {
+      LOG.warn("Got error while removing replicator", e);
     }
-    return newQueues;
   }
 
   @Override
@@ -276,36 +320,13 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
     return ZKUtil.checkExists(zookeeper, getLockZNode(znode)) >= 0;
   }
 
-  /**
-   * Delete all the replication queues for a given region server.
-   * @param regionserverZnode The znode of the region server to delete.
-   */
-  private void deleteAnotherRSQueues(String regionserverZnode) {
-    String fullpath = ZKUtil.joinZNode(this.queuesZNode, regionserverZnode);
+  private void unlockOtherRS(String znode){
+    String parent = ZKUtil.joinZNode(this.queuesZNode, znode);
+    String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE);
     try {
-      List<String> clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, fullpath);
-      for (String cluster : clusters) {
-        // No need to delete, it will be deleted later.
-        if (cluster.equals(RS_LOCK_ZNODE)) {
-          continue;
-        }
-        String fullClusterPath = ZKUtil.joinZNode(fullpath, cluster);
-        ZKUtil.deleteNodeRecursively(this.zookeeper, fullClusterPath);
-      }
-      // Finish cleaning up
-      ZKUtil.deleteNodeRecursively(this.zookeeper, fullpath);
+      ZKUtil.deleteNode(this.zookeeper, p);
     } catch (KeeperException e) {
-      if (e instanceof KeeperException.NoNodeException
-          || e instanceof KeeperException.NotEmptyException) {
-        // Testing a special case where another region server was able to
-        // create a lock just after we deleted it, but then was also able to
-        // delete the RS znode before us or its lock znode is still there.
-        if (e.getPath().equals(fullpath)) {
-          return;
-        }
-      }
-      this.abortable.abort("Failed to delete replication queues for region server: "
-          + regionserverZnode, e);
+      this.abortable.abort("Remove lock failed", e);
     }
   }
 
@@ -313,38 +334,30 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
    * It "atomically" copies all the wals queues from another region server and returns them all
    * sorted per peer cluster (appended with the dead server's znode).
    * @param znode pertaining to the region server to copy the queues from
-   * @return WAL queues sorted per peer cluster
    */
-  private Map<String, Set<String>> copyQueuesFromRSUsingMulti(String znode) {
-    Map<String, Set<String>> queues = new HashMap<>();
-    // hbase/replication/rs/deadrs
-    String deadRSZnodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
-    List<String> peerIdsToProcess = null;
-    List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
+  private Pair<String, SortedSet<String>> moveQueueUsingMulti(String znode, String peerId) {
     try {
-      peerIdsToProcess = ZKUtil.listChildrenNoWatch(this.zookeeper, deadRSZnodePath);
-      if (peerIdsToProcess == null) return queues; // node already processed
-      for (String peerId : peerIdsToProcess) {
-        ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
-        if (!peerExists(replicationQueueInfo.getPeerId())) {
-          // the orphaned queues must be moved, otherwise the delete op of dead rs will fail,
-          // this will cause the whole multi op fail.
-          // NodeFailoverWorker will skip the orphaned queues.
-          LOG.warn("Peer " + peerId
-              + " didn't exist, will move its queue to avoid the failure of multi op");
-        }
-        String newPeerId = peerId + "-" + znode;
-        String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId);
-        // check the logs queue for the old peer cluster
-        String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId);
-        List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
-        if (wals == null || wals.size() == 0) {
-          listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
-          continue; // empty log queue.
-        }
+      // hbase/replication/rs/deadrs
+      String deadRSZnodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
+      List<ZKUtilOp> listOfOps = new ArrayList<>();
+      ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
+      if (!peerExists(replicationQueueInfo.getPeerId())) {
+        // the orphaned queues must be moved, otherwise the delete op of dead rs will fail,
+        // this will cause the whole multi op fail.
+        // NodeFailoverWorker will skip the orphaned queues.
+        LOG.warn("Peer " + peerId +
+            " didn't exist, will move its queue to avoid the failure of multi op");
+      }
+      String newPeerId = peerId + "-" + znode;
+      String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId);
+      // check the logs queue for the old peer cluster
+      String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId);
+      List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
+      SortedSet<String> logQueue = new TreeSet<>();
+      if (wals == null || wals.size() == 0) {
+        listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
+      } else {
         // create the new cluster znode
-        Set<String> logQueue = new HashSet<String>();
-        queues.put(newPeerId, logQueue);
         ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY);
         listOfOps.add(op);
         // get the offset of the logs and set it to new znodes
@@ -354,98 +367,86 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
           LOG.debug("Creating " + wal + " with data " + Bytes.toString(logOffset));
           String newLogZnode = ZKUtil.joinZNode(newPeerZnode, wal);
           listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset));
-          // add ops for deleting
           listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode));
           logQueue.add(wal);
         }
         // add delete op for peer
         listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
+
+        if (LOG.isTraceEnabled())
+          LOG.trace(" The multi list size is: " + listOfOps.size());
       }
-      // add delete op for dead rs, this will update the cversion of the parent.
-      // The reader will make optimistic locking with this to get a consistent
-      // snapshot
-      listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath));
-      if (LOG.isTraceEnabled()) LOG.trace(" The multi list size is: " + listOfOps.size());
       ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
-      if (LOG.isTraceEnabled()) LOG.trace("Atomically moved the dead regionserver logs. ");
+      if (LOG.isTraceEnabled())
+        LOG.trace("Atomically moved the dead regionserver logs. ");
+      return new Pair<>(newPeerId, logQueue);
     } catch (KeeperException e) {
       // Multi call failed; it looks like some other regionserver took away the logs.
       LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
-      queues.clear();
     } catch (InterruptedException e) {
       LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
-      queues.clear();
       Thread.currentThread().interrupt();
     }
-    return queues;
+    return null;
   }
 
   /**
-   * This methods copies all the wals queues from another region server and returns them all sorted
+   * This methods moves all the wals queues from another region server and returns them all sorted
    * per peer cluster (appended with the dead server's znode)
    * @param znode server names to copy
-   * @return all wals for all peers of that cluster, null if an error occurred
+   * @return all wals for the peer of that cluster, null if an error occurred
    */
-  private Map<String, Set<String>> copyQueuesFromRS(String znode) {
+  private Pair<String, SortedSet<String>> copyQueueFromLockedRS(String znode, String peerId) {
     // TODO this method isn't atomic enough, we could start copying and then
     // TODO fail for some reason and we would end up with znodes we don't want.
-    Map<String, Set<String>> queues = new HashMap<>();
     try {
       String nodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
-      List<String> clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, nodePath);
-      // We have a lock znode in there, it will count as one.
-      if (clusters == null || clusters.size() <= 1) {
-        return queues;
+      ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
+      String clusterPath = ZKUtil.joinZNode(nodePath, peerId);
+      if (!peerExists(replicationQueueInfo.getPeerId())) {
+        LOG.warn("Peer " + peerId + " didn't exist, skipping the replay");
+        // Protection against moving orphaned queues
+        return null;
       }
-      // The lock isn't a peer cluster, remove it
-      clusters.remove(RS_LOCK_ZNODE);
-      for (String cluster : clusters) {
-        ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(cluster);
-        if (!peerExists(replicationQueueInfo.getPeerId())) {
-          LOG.warn("Peer " + cluster + " didn't exist, skipping the replay");
-          // Protection against moving orphaned queues
-          continue;
-        }
-        // We add the name of the recovered RS to the new znode, we can even
-        // do that for queues that were recovered 10 times giving a znode like
-        // number-startcode-number-otherstartcode-number-anotherstartcode-etc
-        String newCluster = cluster + "-" + znode;
-        String newClusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, newCluster);
-        String clusterPath = ZKUtil.joinZNode(nodePath, cluster);
-        List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath);
-        // That region server didn't have anything to replicate for this cluster
-        if (wals == null || wals.size() == 0) {
-          continue;
-        }
-        ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode,
+      // We add the name of the recovered RS to the new znode, we can even
+      // do that for queues that were recovered 10 times giving a znode like
+      // number-startcode-number-otherstartcode-number-anotherstartcode-etc
+      String newCluster = peerId + "-" + znode;
+      String newClusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, newCluster);
+
+      List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath);
+      // That region server didn't have anything to replicate for this cluster
+      if (wals == null || wals.size() == 0) {
+        return null;
+      }
+      ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode,
           HConstants.EMPTY_BYTE_ARRAY);
-        Set<String> logQueue = new HashSet<String>();
-        queues.put(newCluster, logQueue);
-        for (String wal : wals) {
-          String z = ZKUtil.joinZNode(clusterPath, wal);
-          byte[] positionBytes = ZKUtil.getData(this.zookeeper, z);
-          long position = 0;
-          try {
-            position = ZKUtil.parseWALPositionFrom(positionBytes);
-          } catch (DeserializationException e) {
-            LOG.warn("Failed parse of wal position from the following znode: " + z
-                + ", Exception: " + e);
-          }
-          LOG.debug("Creating " + wal + " with data " + position);
-          String child = ZKUtil.joinZNode(newClusterZnode, wal);
-          // Position doesn't actually change, we are just deserializing it for
-          // logging, so just use the already serialized version
-          ZKUtil.createAndWatch(this.zookeeper, child, positionBytes);
-          logQueue.add(wal);
+      SortedSet<String> logQueue = new TreeSet<>();
+      for (String wal : wals) {
+        String z = ZKUtil.joinZNode(clusterPath, wal);
+        byte[] positionBytes = ZKUtil.getData(this.zookeeper, z);
+        long position = 0;
+        try {
+          position = ZKUtil.parseWALPositionFrom(positionBytes);
+        } catch (DeserializationException e) {
+          LOG.warn("Failed parse of wal position from the following znode: " + z
+              + ", Exception: " + e);
         }
+        LOG.debug("Creating " + wal + " with data " + position);
+        String child = ZKUtil.joinZNode(newClusterZnode, wal);
+        // Position doesn't actually change, we are just deserializing it for
+        // logging, so just use the already serialized version
+        ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, child, positionBytes);
+        logQueue.add(wal);
       }
+      return new Pair<>(newCluster, logQueue);
     } catch (KeeperException e) {
-      this.abortable.abort("Copy queues from rs", e);
+      LOG.warn("Got exception in copyQueueFromLockedRS: ", e);
     } catch (InterruptedException e) {
       LOG.warn(e);
       Thread.currentThread().interrupt();
     }
-    return queues;
+    return null;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/e5f9df1e/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
index 3ee6fde..28b9bdf 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.filter.CompareFilter;
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
 
@@ -48,6 +49,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
 
 /**
  * This class provides an implementation of the ReplicationQueues interface using an HBase table
@@ -227,31 +230,54 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase
     return getAllQueues(serverName);
   }
 
+  @Override public List<String> getUnClaimedQueueIds(String regionserver) {
+    if (isThisOurRegionServer(regionserver)) {
+      return null;
+    }
+    try (ResultScanner queuesToClaim = getQueuesBelongingToServer(regionserver)) {
+      List<String> res = new ArrayList<>();
+      for (Result queue : queuesToClaim) {
+        String rowKey = Bytes.toString(queue.getRow());
+        res.add(rowKey);
+      }
+      return res.isEmpty() ? null : res;
+    } catch (IOException e) {
+      String errMsg = "Failed getUnClaimedQueueIds";
+      abortable.abort(errMsg, e);
+    }
+    return null;
+  }
+
+  @Override public void removeReplicatorIfQueueIsEmpty(String regionserver) {
+    // Do nothing here
+  }
+
   @Override
-  public Map<String, Set<String>> claimQueues(String regionserver) {
-    Map<String, Set<String>> queues = new HashMap<>();
+  public Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId) {
     if (isThisOurRegionServer(regionserver)) {
-      return queues;
+      return null;
     }
-    ResultScanner queuesToClaim = null;
-    try {
-      queuesToClaim = getQueuesBelongingToServer(regionserver);
+
+    try (ResultScanner queuesToClaim = getQueuesBelongingToServer(regionserver)){
       for (Result queue : queuesToClaim) {
+        String rowKey = Bytes.toString(queue.getRow());
+        if (!rowKey.equals(queueId)){
+          continue;
+        }
         if (attemptToClaimQueue(queue, regionserver)) {
-          String rowKey = Bytes.toString(queue.getRow());
           ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(rowKey);
           if (replicationState.peerExists(replicationQueueInfo.getPeerId())) {
-            Set<String> sortedLogs = new HashSet<String>();
+            SortedSet<String> sortedLogs = new TreeSet<>();
             List<String> logs = getLogsInQueue(queue.getRow());
             for (String log : logs) {
               sortedLogs.add(log);
             }
-            queues.put(rowKey, sortedLogs);
             LOG.info(serverName + " has claimed queue " + rowKey + " from " + regionserver);
+            return new Pair<>(rowKey, sortedLogs);
           } else {
             // Delete orphaned queues
             removeQueue(Bytes.toString(queue.getRow()));
-            LOG.info(serverName + " has deleted abandoned queue " + rowKey + " from " +
+            LOG.info(serverName + " has deleted abandoned queue " + queueId + " from " +
               regionserver);
           }
         }
@@ -259,13 +285,8 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase
     } catch (IOException | KeeperException e) {
       String errMsg = "Failed claiming queues for regionserver=" + regionserver;
       abortable.abort(errMsg, e);
-      queues.clear();
-    } finally {
-      if (queuesToClaim != null) {
-        queuesToClaim.close();
-      }
     }
-    return queues;
+    return null;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/e5f9df1e/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 586aace..3cb7a84 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 
 /**
@@ -647,10 +648,27 @@ public class ReplicationSourceManager implements ReplicationListener {
         LOG.info("Not transferring queue since we are shutting down");
         return;
       }
-      Map<String, Set<String>> newQueues = null;
-
-      newQueues = this.rq.claimQueues(rsZnode);
-
+      Map<String, Set<String>> newQueues = new HashMap<>();
+      List<String> peers = rq.getUnClaimedQueueIds(rsZnode);
+      while (peers != null && !peers.isEmpty()) {
+        Pair<String, SortedSet<String>> peer = this.rq.claimQueue(rsZnode,
+            peers.get(rand.nextInt(peers.size())));
+        long sleep = sleepBeforeFailover/2;
+        if (peer != null) {
+          newQueues.put(peer.getFirst(), peer.getSecond());
+          sleep = sleepBeforeFailover;
+        }
+        try {
+          Thread.sleep(sleep);
+        } catch (InterruptedException e) {
+          LOG.warn("Interrupted while waiting before transferring a queue.");
+          Thread.currentThread().interrupt();
+        }
+        peers = rq.getUnClaimedQueueIds(rsZnode);
+      }
+      if (peers != null) {
+        rq.removeReplicatorIfQueueIsEmpty(rsZnode);
+      }
       // Copying over the failed queue is completed.
       if (newQueues.isEmpty()) {
         // We either didn't get the lock or the failed region server didn't have any outstanding

http://git-wip-us.apache.org/repos/asf/hbase/blob/e5f9df1e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index 933c621..fcab105 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -22,8 +22,6 @@ import static org.junit.Assert.*;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -124,7 +122,8 @@ public abstract class TestReplicationStateBasic {
     assertEquals(0, rq1.getAllQueues().size());
     assertEquals(0, rq1.getLogPosition("bogus", "bogus"));
     assertNull(rq1.getLogsInQueue("bogus"));
-    assertEquals(0, rq1.claimQueues(ServerName.valueOf("bogus", 1234, -1L).toString()).size());
+    assertNull(rq1.getUnClaimedQueueIds(
+        ServerName.valueOf("bogus", 1234, -1L).toString()));
 
     rq1.setLogPosition("bogus", "bogus", 5L);
 
@@ -143,15 +142,21 @@ public abstract class TestReplicationStateBasic {
     assertEquals(1, rq2.getAllQueues().size());
     assertEquals(5, rq3.getAllQueues().size());
 
-    assertEquals(0, rq3.claimQueues(server1).size());
+    assertEquals(0, rq3.getUnClaimedQueueIds(server1).size());
+    rq3.removeReplicatorIfQueueIsEmpty(server1);
     assertEquals(2, rq3.getListOfReplicators().size());
 
-    Map<String, Set<String>> queues = rq2.claimQueues(server3);
+    List<String> queues = rq2.getUnClaimedQueueIds(server3);
     assertEquals(5, queues.size());
+    for(String queue: queues) {
+      rq2.claimQueue(server3, queue);
+    }
+    rq2.removeReplicatorIfQueueIsEmpty(server3);
     assertEquals(1, rq2.getListOfReplicators().size());
 
     // Try to claim our own queues
-    assertEquals(0, rq2.claimQueues(server2).size());
+    assertNull(rq2.getUnClaimedQueueIds(server2));
+    rq2.removeReplicatorIfQueueIsEmpty(server2);
 
     assertEquals(6, rq2.getAllQueues().size());
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e5f9df1e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
index 7ec6df8..35c4e24 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -40,8 +41,6 @@ import org.junit.experimental.categories.Category;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 
 import static junit.framework.TestCase.assertNull;
 import static org.junit.Assert.assertEquals;
@@ -267,18 +266,19 @@ public class TestReplicationStateHBaseImpl {
     }
     try {
       // Test claiming queues
-      Map<String, Set<String>> claimedQueuesFromRq2 = rq1.claimQueues(server2);
+      List<String> claimedQueuesFromRq2 = rq1.getUnClaimedQueueIds(server2);
       // Check to make sure that list of peers with outstanding queues is decremented by one
       // after claimQueues
+      // Check to make sure that we claimed the proper number of queues
+      assertEquals(2, claimedQueuesFromRq2.size());
+      assertTrue(claimedQueuesFromRq2.contains("Queue1-" + server2));
+      assertTrue(claimedQueuesFromRq2.contains("Queue2-" + server2));
+      assertEquals(2, rq1.claimQueue(server2, "Queue1-" + server2).getSecond().size());
+      assertEquals(1, rq1.claimQueue(server2, "Queue2-" + server2).getSecond().size());
+      rq1.removeReplicatorIfQueueIsEmpty(server2);
       assertEquals(rq1.getListOfReplicators().size(), 2);
       assertEquals(rq2.getListOfReplicators().size(), 2);
       assertEquals(rq3.getListOfReplicators().size(), 2);
-      // Check to make sure that we claimed the proper number of queues
-      assertEquals(2, claimedQueuesFromRq2.size());
-      assertTrue(claimedQueuesFromRq2.containsKey("Queue1-" + server2));
-      assertTrue(claimedQueuesFromRq2.containsKey("Queue2-" + server2));
-      assertEquals(2, claimedQueuesFromRq2.get("Queue1-" + server2).size());
-      assertEquals(1, claimedQueuesFromRq2.get("Queue2-" + server2).size());
       assertEquals(5, rq1.getAllQueues().size());
       // Check that all the logs in the other queue were claimed
       assertEquals(2, rq1.getLogsInQueue("Queue1-" + server2).size());
@@ -294,7 +294,11 @@ public class TestReplicationStateHBaseImpl {
       rq1.addLog("UnclaimableQueue", "WALLogFile1.1");
       rq1.addLog("UnclaimableQueue", "WALLogFile1.2");
       assertEquals(6, rq1.getAllQueues().size());
-      Map<String, Set<String>> claimedQueuesFromRq1 = rq3.claimQueues(server1);
+      List<String> claimedQueuesFromRq1 = rq3.getUnClaimedQueueIds(server1);
+      for(String queue : claimedQueuesFromRq1) {
+        rq3.claimQueue(server1, queue);
+      }
+      rq3.removeReplicatorIfQueueIsEmpty(server1);
       assertEquals(rq1.getListOfReplicators().size(), 1);
       assertEquals(rq2.getListOfReplicators().size(), 1);
       assertEquals(rq3.getListOfReplicators().size(), 1);
@@ -302,12 +306,12 @@ public class TestReplicationStateHBaseImpl {
       // Replication Peers
       assertEquals(6, rq3.getAllQueues().size());
       // Test claiming non-existing queues
-      Map<String, Set<String>> noQueues = rq3.claimQueues("NotARealServer");
-      assertEquals(0, noQueues.size());
+      List<String> noQueues = rq3.getUnClaimedQueueIds("NotARealServer");
+      assertNull(noQueues);
       assertEquals(6, rq3.getAllQueues().size());
       // Test claiming own queues
-      noQueues = rq3.claimQueues(server3);
-      assertEquals(0, noQueues.size());
+      noQueues = rq3.getUnClaimedQueueIds(server3);
+      Assert.assertNull(noQueues);
       assertEquals(6, rq3.getAllQueues().size());
       // Check that rq3 still remain on list of replicators
       assertEquals(1, rq3.getListOfReplicators().size());

http://git-wip-us.apache.org/repos/asf/hbase/blob/e5f9df1e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 7696e95..4ee783d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -75,6 +75,7 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKey;
@@ -517,7 +518,12 @@ public abstract class TestReplicationSourceManager {
     @Override
     public void run() {
       try {
-        logZnodesMap = rq.claimQueues(deadRsZnode);
+        logZnodesMap = new HashMap<>();
+        List<String> queues = rq.getUnClaimedQueueIds(deadRsZnode);
+        for(String queue:queues){
+          Pair<String, SortedSet<String>> pair = rq.claimQueue(deadRsZnode, queue);
+          logZnodesMap.put(pair.getFirst(), pair.getSecond());
+        }
         server.abort("Done with testing", null);
       } catch (Exception e) {
         LOG.error("Got exception while running NodeFailoverWorker", e);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e5f9df1e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
index 75ed835..a9d0766 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
@@ -36,8 +36,6 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -87,22 +85,28 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan
       ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1,
         s1.getZooKeeper()));
     rq1.init(s1.getServerName().toString());
-    Map<String, Set<String>> testMap =
-      rq1.claimQueues(server.getServerName().getServerName());
+    String serverName = server.getServerName().getServerName();
+    List<String> unclaimed = rq1.getUnClaimedQueueIds(serverName);
+    rq1.claimQueue(serverName, unclaimed.get(0)).getSecond();
+    rq1.removeReplicatorIfQueueIsEmpty(unclaimed.get(0));
     ReplicationQueues rq2 =
       ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s2.getConfiguration(), s2,
         s2.getZooKeeper()));
     rq2.init(s2.getServerName().toString());
-    testMap = rq2.claimQueues(s1.getServerName().getServerName());
+    serverName = s1.getServerName().getServerName();
+    unclaimed = rq2.getUnClaimedQueueIds(serverName);
+    rq2.claimQueue(serverName, unclaimed.get(0)).getSecond();
+    rq2.removeReplicatorIfQueueIsEmpty(unclaimed.get(0));
     ReplicationQueues rq3 =
       ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s3.getConfiguration(), s3,
         s3.getZooKeeper()));
     rq3.init(s3.getServerName().toString());
-    testMap = rq3.claimQueues(s2.getServerName().getServerName());
-
-    ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.keySet().iterator().next());
+    serverName = s2.getServerName().getServerName();
+    unclaimed = rq3.getUnClaimedQueueIds(serverName);
+    String queue3 = rq3.claimQueue(serverName, unclaimed.get(0)).getFirst();
+    rq3.removeReplicatorIfQueueIsEmpty(unclaimed.get(0));
+    ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queue3);
     List<String> result = replicationQueueInfo.getDeadRegionServers();
-
     // verify
     assertTrue(result.contains(server.getServerName().getServerName()));
     assertTrue(result.contains(s1.getServerName().getServerName()));
@@ -137,7 +141,11 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan
         new ReplicationQueuesClientArguments(s1.getConfiguration(), s1, s1.getZooKeeper()));
 
     int v0 = client.getQueuesZNodeCversion();
-    rq1.claimQueues(s0.getServerName().getServerName());
+    List<String> queues = rq1.getUnClaimedQueueIds(s0.getServerName().getServerName());
+    for(String queue : queues) {
+      rq1.claimQueue(s0.getServerName().getServerName(), queue);
+    }
+    rq1.removeReplicatorIfQueueIsEmpty(s0.getServerName().getServerName());
     int v1 = client.getQueuesZNodeCversion();
     // cversion should increase by 1 since a child node is deleted
     assertEquals(v0 + 1, v1);