You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/06/11 04:56:34 UTC

[48/50] hbase git commit: HBASE-15958 Implement ClaimQueues on top of HBase

HBASE-15958 Implement ClaimQueues on top of HBase

Building on HBase-15883.
Now implementing the claim queues procedure within an HBase table.
Also added UnitTests to test claimQueue.
Peer tracking will still be performed by ZooKeeper though.
Also modified the queueId tracking procedure so we no longer have to perform scans over the Replication Table.
This does make our queue naming schema slightly different from ReplicationQueuesZKImpl though.

Signed-off-by: Elliott Clark <ec...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/babdedc1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/babdedc1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/babdedc1

Branch: refs/heads/hbase-12439
Commit: babdedc1b0f0159eb526fb5c9ee08525de7ce404
Parents: 108d39a
Author: Joseph Hwang <jz...@fb.com>
Authored: Thu May 19 17:14:33 2016 -0700
Committer: Elliott Clark <ec...@apache.org>
Committed: Thu Jun 9 15:05:54 2016 -0700

----------------------------------------------------------------------
 .../hbase/replication/ReplicationQueues.java    |   8 +-
 .../replication/ReplicationQueuesArguments.java |   4 +-
 .../replication/ReplicationQueuesHBaseImpl.java | 485 ++++++++++++-------
 .../replication/ReplicationQueuesZKImpl.java    |  26 +-
 .../regionserver/ReplicationSourceManager.java  |   7 +-
 .../replication/TestReplicationStateBasic.java  |   6 +-
 .../TestReplicationStateHBaseImpl.java          | 302 +++++++++---
 .../replication/TestReplicationStateZKImpl.java |   1 -
 .../TestReplicationSourceManager.java           |  12 +-
 9 files changed, 579 insertions(+), 272 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/babdedc1/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
index 809b122..0de0cc8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
@@ -19,8 +19,8 @@
 package org.apache.hadoop.hbase.replication;
 
 import java.util.List;
-import java.util.SortedMap;
-import java.util.SortedSet;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
@@ -96,10 +96,10 @@ public interface ReplicationQueues {
   /**
    * Take ownership for the set of queues belonging to a dead region server.
    * @param regionserver the id of the dead region server
-   * @return A SortedMap of the queues that have been claimed, including a SortedSet of WALs in
+   * @return A Map of the queues that have been claimed, including a Set of WALs in
    *         each queue. Returns an empty map if no queues were failed-over.
    */
-  SortedMap<String, SortedSet<String>> claimQueues(String regionserver);
+  Map<String, Set<String>> claimQueues(String regionserver);
 
   /**
    * Get a list of all region servers that have outstanding replication queues. These servers could

http://git-wip-us.apache.org/repos/asf/hbase/blob/babdedc1/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java
index 4907b73..4fdc4e7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java
@@ -56,11 +56,11 @@ public class ReplicationQueuesArguments {
     this.conf = conf;
   }
 
-  public Abortable getAbort() {
+  public Abortable getAbortable() {
     return abort;
   }
 
-  public void setAbort(Abortable abort) {
+  public void setAbortable(Abortable abort) {
     this.abort = abort;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/babdedc1/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java
index 29f0632..34a5289 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java
@@ -19,6 +19,8 @@
 
 package org.apache.hadoop.hbase.replication;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 
 import org.apache.hadoop.hbase.Abortable;
@@ -41,25 +43,41 @@ import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.filter.CompareFilter;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.hadoop.hbase.util.RetryCounterFactory;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
 import sun.reflect.generics.reflectiveObjects.NotImplementedException;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.SortedMap;
-import java.util.SortedSet;
+import java.util.Set;
+
+/**
+ * This class provides an implementation of the ReplicationQueues interface using an HBase table
+ * "Replication Table". The basic schema of this table will store each individual queue as a
+ * seperate row. The row key will be a unique identifier of the creating server's name and the
+ * queueId. Each queue must have the following two columns:
+ *  COL_OWNER: tracks which server is currently responsible for tracking the queue
+ *  COL_QUEUE_ID: tracks the queue's id as stored in ReplicationSource
+ * They will also have columns mapping [WAL filename : offset]
+ * One key difference from the ReplicationQueuesZkImpl is that when queues are reclaimed we
+ * simply return its HBase row key as its new "queueId"
+ */
 
 @InterfaceAudience.Private
-public class ReplicationQueuesHBaseImpl implements ReplicationQueues {
+public class ReplicationQueuesHBaseImpl extends ReplicationStateZKBase
+    implements ReplicationQueues {
+
+  private static final Log LOG = LogFactory.getLog(ReplicationQueuesHBaseImpl.class);
 
   /** Name of the HBase Table used for tracking replication*/
   public static final TableName REPLICATION_TABLE_NAME =
@@ -68,7 +86,12 @@ public class ReplicationQueuesHBaseImpl implements ReplicationQueues {
   // Column family and column names for the Replication Table
   private static final byte[] CF = Bytes.toBytes("r");
   private static final byte[] COL_OWNER = Bytes.toBytes("o");
-  private static final byte[] COL_QUEUE_ID = Bytes.toBytes("q");
+  private static final byte[] COL_OWNER_HISTORY = Bytes.toBytes("h");
+
+  // The value used to delimit the queueId and server name inside of a queue's row key. Currently a
+  // hyphen, because it is guaranteed that queueId (which is a cluster id) cannot contain hyphens.
+  // See HBASE-11394.
+  private static String ROW_KEY_DELIMITER = "-";
 
   // Column Descriptor for the Replication Table
   private static final HColumnDescriptor REPLICATION_COL_DESCRIPTOR =
@@ -80,7 +103,8 @@ public class ReplicationQueuesHBaseImpl implements ReplicationQueues {
       .setCacheDataInL1(true);
 
   // Common byte values used in replication offset tracking
-  private static final byte[] INITIAL_OFFSET = Bytes.toBytes(0L);
+  private static final byte[] INITIAL_OFFSET_BYTES = Bytes.toBytes(0L);
+  private static final byte[] EMPTY_STRING_BYTES = Bytes.toBytes("");
 
   /*
    * Make sure that HBase table operations for replication have a high number of retries. This is
@@ -92,104 +116,92 @@ public class ReplicationQueuesHBaseImpl implements ReplicationQueues {
   private static final int RPC_TIMEOUT = 2000;
   private static final int OPERATION_TIMEOUT = CLIENT_RETRIES * RPC_TIMEOUT;
 
-  private final Configuration conf;
-  private final Admin admin;
-  private final Connection connection;
-  private final Table replicationTable;
-  private final Abortable abortable;
+  private Configuration modifiedConf;
+  private Admin admin;
+  private Connection connection;
+  private Table replicationTable;
   private String serverName = null;
   private byte[] serverNameBytes = null;
 
-  public ReplicationQueuesHBaseImpl(ReplicationQueuesArguments args) throws IOException {
-    this(args.getConf(), args.getAbort());
+  public ReplicationQueuesHBaseImpl(ReplicationQueuesArguments args) {
+    this(args.getConf(), args.getAbortable(), args.getZk());
   }
 
-  public ReplicationQueuesHBaseImpl(Configuration conf, Abortable abort) throws IOException {
-    this.conf = new Configuration(conf);
-    // Modify the connection's config so that the Replication Table it returns has a much higher
-    // number of client retries
-    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES);
-    this.connection = ConnectionFactory.createConnection(conf);
-    this.admin = connection.getAdmin();
-    this.abortable = abort;
-    replicationTable = createAndGetReplicationTable();
-    replicationTable.setRpcTimeout(RPC_TIMEOUT);
-    replicationTable.setOperationTimeout(OPERATION_TIMEOUT);
+  public ReplicationQueuesHBaseImpl(Configuration conf, Abortable abort, ZooKeeperWatcher zkw) {
+    super(zkw, conf, abort);
+    modifiedConf = new Configuration(conf);
+    modifiedConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES);
   }
 
   @Override
   public void init(String serverName) throws ReplicationException {
-    this.serverName = serverName;
-    this.serverNameBytes = Bytes.toBytes(serverName);
+    try {
+      this.serverName = serverName;
+      this.serverNameBytes = Bytes.toBytes(serverName);
+      // Modify the connection's config so that the Replication Table it returns has a much higher
+      // number of client retries
+      this.connection = ConnectionFactory.createConnection(modifiedConf);
+      this.admin = connection.getAdmin();
+      replicationTable = createAndGetReplicationTable();
+      replicationTable.setRpcTimeout(RPC_TIMEOUT);
+      replicationTable.setOperationTimeout(OPERATION_TIMEOUT);
+    } catch (IOException e) {
+      throw new ReplicationException(e);
+    }
   }
 
   @Override
   public void removeQueue(String queueId) {
+
     try {
-      byte[] rowKey = this.queueIdToRowKey(queueId);
-      // The rowkey will be null if the queue cannot be found in the Replication Table
-      if (rowKey == null) {
-        String errMsg = "Could not remove non-existent queue with queueId=" + queueId;
-        abortable.abort(errMsg, new ReplicationException(errMsg));
-        return;
-      }
+      byte[] rowKey = queueIdToRowKey(queueId);
       Delete deleteQueue = new Delete(rowKey);
       safeQueueUpdate(deleteQueue);
-    } catch (IOException e) {
-      abortable.abort("Could not remove queue with queueId=" + queueId, e);
+    } catch (IOException | ReplicationException e) {
+      String errMsg = "Failed removing queue queueId=" + queueId;
+      abortable.abort(errMsg, e);
     }
   }
 
   @Override
   public void addLog(String queueId, String filename) throws ReplicationException {
     try {
-      // Check if the queue info (Owner, QueueId) is currently stored in the Replication Table
-      if (this.queueIdToRowKey(queueId) == null) {
-        // Each queue will have an Owner, QueueId, and a collection of [WAL:offset] key values.
-        Put putNewQueue = new Put(Bytes.toBytes(buildServerQueueName(queueId)));
-        putNewQueue.addColumn(CF, COL_OWNER, Bytes.toBytes(serverName));
-        putNewQueue.addColumn(CF, COL_QUEUE_ID, Bytes.toBytes(queueId));
-        putNewQueue.addColumn(CF, Bytes.toBytes(filename), INITIAL_OFFSET);
+      if (!checkQueueExists(queueId)) {
+        // Each queue will have an Owner, OwnerHistory, and a collection of [WAL:offset] key values
+        Put putNewQueue = new Put(Bytes.toBytes(buildQueueRowKey(queueId)));
+        putNewQueue.addColumn(CF, COL_OWNER, serverNameBytes);
+        putNewQueue.addColumn(CF, COL_OWNER_HISTORY, EMPTY_STRING_BYTES);
+        putNewQueue.addColumn(CF, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES);
         replicationTable.put(putNewQueue);
       } else {
         // Otherwise simply add the new log and offset as a new column
-        Put putNewLog = new Put(this.queueIdToRowKey(queueId));
-        putNewLog.addColumn(CF, Bytes.toBytes(filename), INITIAL_OFFSET);
+        Put putNewLog = new Put(queueIdToRowKey(queueId));
+        putNewLog.addColumn(CF, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES);
         safeQueueUpdate(putNewLog);
       }
-    } catch (IOException e) {
-      abortable.abort("Could not add queue queueId=" + queueId + " filename=" + filename, e);
+    } catch (IOException | ReplicationException e) {
+      String errMsg = "Failed adding log queueId=" + queueId + " filename=" + filename;
+      abortable.abort(errMsg, e);
     }
   }
 
   @Override
   public void removeLog(String queueId, String filename) {
     try {
-      byte[] rowKey = this.queueIdToRowKey(queueId);
-      if (rowKey == null) {
-        String errMsg = "Could not remove log from non-existent queueId=" + queueId + ", filename="
-          + filename;
-        abortable.abort(errMsg, new ReplicationException(errMsg));
-        return;
-      }
+      byte[] rowKey = queueIdToRowKey(queueId);
       Delete delete = new Delete(rowKey);
       delete.addColumns(CF, Bytes.toBytes(filename));
       safeQueueUpdate(delete);
-    } catch (IOException e) {
-      abortable.abort("Could not remove log from queueId=" + queueId + ", filename=" + filename, e);
+    } catch (IOException | ReplicationException e) {
+      String errMsg = "Failed removing log queueId=" + queueId + " filename=" + filename;
+      abortable.abort(errMsg, e);
     }
   }
 
   @Override
   public void setLogPosition(String queueId, String filename, long position) {
     try {
-      byte[] rowKey = this.queueIdToRowKey(queueId);
-      if (rowKey == null) {
-        String errMsg = "Could not set position of log from non-existent queueId=" + queueId +
-          ", filename=" + filename;
-        abortable.abort(errMsg, new ReplicationException(errMsg));
-        return;
-      }
+      byte[] rowKey = queueIdToRowKey(queueId);
       // Check that the log exists. addLog() must have been called before setLogPosition().
       Get checkLogExists = new Get(rowKey);
       checkLogExists.addColumn(CF, Bytes.toBytes(filename));
@@ -203,24 +215,21 @@ public class ReplicationQueuesHBaseImpl implements ReplicationQueues {
       Put walAndOffset = new Put(rowKey);
       walAndOffset.addColumn(CF, Bytes.toBytes(filename), Bytes.toBytes(position));
       safeQueueUpdate(walAndOffset);
-    } catch (IOException e) {
-      abortable.abort("Failed to write replication wal position (filename=" + filename +
-          ", position=" + position + ")", e);
+    } catch (IOException | ReplicationException e) {
+      String errMsg = "Failed writing log position queueId=" + queueId + "filename=" +
+        filename + " position=" + position;
+      abortable.abort(errMsg, e);
     }
   }
 
   @Override
   public long getLogPosition(String queueId, String filename) throws ReplicationException {
     try {
-      byte[] rowKey = this.queueIdToRowKey(queueId);
-      if (rowKey == null) {
-        throw new ReplicationException("Could not get position in log for non-existent queue " +
-            "queueId=" + queueId + ", filename=" + filename);
-      }
+      byte[] rowKey = queueIdToRowKey(queueId);
       Get getOffset = new Get(rowKey);
       getOffset.addColumn(CF, Bytes.toBytes(filename));
-      Result result = replicationTable.get(getOffset);
-      if (result.isEmpty()) {
+      Result result = getResultIfOwner(getOffset);
+      if (result == null || !result.containsColumn(CF, Bytes.toBytes(filename))) {
         throw new ReplicationException("Could not read empty result while getting log position " +
             "queueId=" + queueId + ", filename=" + filename);
       }
@@ -241,53 +250,117 @@ public class ReplicationQueuesHBaseImpl implements ReplicationQueues {
 
   @Override
   public List<String> getLogsInQueue(String queueId) {
-    List<String> logs = new ArrayList<String>();
+    byte[] rowKey = queueIdToRowKey(queueId);
+    return getLogsInQueue(rowKey);
+  }
+
+  private List<String> getLogsInQueue(byte[] rowKey) {
+    String errMsg = "Could not get logs in queue queueId=" + Bytes.toString(rowKey);
     try {
-      byte[] rowKey = this.queueIdToRowKey(queueId);
-      if (rowKey == null) {
-        String errMsg = "Could not get logs from non-existent queueId=" + queueId;
-        abortable.abort(errMsg, new ReplicationException(errMsg));
-        return null;
-      }
       Get getQueue = new Get(rowKey);
-      Result queue = replicationTable.get(getQueue);
-      if (queue.isEmpty()) {
+      Result queue = getResultIfOwner(getQueue);
+      // The returned queue could be null if we have lost ownership of it
+      if (queue == null) {
+        abortable.abort(errMsg, new ReplicationException(errMsg));
         return null;
       }
-      Map<byte[], byte[]> familyMap = queue.getFamilyMap(CF);
-      for (byte[] cQualifier : familyMap.keySet()) {
-        if (Arrays.equals(cQualifier, COL_OWNER) || Arrays.equals(cQualifier, COL_QUEUE_ID)) {
-          continue;
-        }
-        logs.add(Bytes.toString(cQualifier));
-      }
+      return readWALsFromResult(queue);
     } catch (IOException e) {
-      abortable.abort("Could not get logs from queue queueId=" + queueId, e);
+      abortable.abort(errMsg, e);
       return null;
     }
-    return logs;
   }
 
   @Override
   public List<String> getAllQueues() {
+    List<String> allQueues = new ArrayList<String>();
+    ResultScanner queueScanner = null;
     try {
-      return this.getQueuesBelongingToServer(serverName);
+      queueScanner = this.getQueuesBelongingToServer(serverName);
+      for (Result queue : queueScanner) {
+        String rowKey =  Bytes.toString(queue.getRow());
+        // If the queue does not have a Owner History, then we must be its original owner. So we
+        // want to return its queueId in raw form
+        if (Bytes.toString(queue.getValue(CF, COL_OWNER_HISTORY)).length() == 0) {
+          allQueues.add(getRawQueueIdFromRowKey(rowKey));
+        } else {
+          allQueues.add(rowKey);
+        }
+      }
+      return allQueues;
     } catch (IOException e) {
-      abortable.abort("Could not get all replication queues", e);
+      String errMsg = "Failed getting list of all replication queues";
+      abortable.abort(errMsg, e);
       return null;
+    } finally {
+      if (queueScanner != null) {
+        queueScanner.close();
+      }
     }
   }
 
   @Override
-  public SortedMap<String, SortedSet<String>> claimQueues(String regionserver) {
-    // TODO
-    throw new NotImplementedException();
+  public Map<String, Set<String>> claimQueues(String regionserver) {
+    Map<String, Set<String>> queues = new HashMap<>();
+    if (isThisOurRegionServer(regionserver)) {
+      return queues;
+    }
+    ResultScanner queuesToClaim = null;
+    try {
+      queuesToClaim = this.getQueuesBelongingToServer(regionserver);
+      for (Result queue : queuesToClaim) {
+        if (attemptToClaimQueue(queue, regionserver)) {
+          String rowKey = Bytes.toString(queue.getRow());
+          ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(rowKey);
+          if (peerExists(replicationQueueInfo.getPeerId())) {
+            Set<String> sortedLogs = new HashSet<String>();
+            List<String> logs = getLogsInQueue(queue.getRow());
+            for (String log : logs) {
+              sortedLogs.add(log);
+            }
+            queues.put(rowKey, sortedLogs);
+            LOG.info(serverName + " has claimed queue " + rowKey + " from " + regionserver);
+          } else {
+            // Delete orphaned queues
+            removeQueue(Bytes.toString(queue.getRow()));
+            LOG.info(serverName + " has deleted abandoned queue " + rowKey + " from " +
+                regionserver);
+          }
+        }
+      }
+    } catch (IOException | KeeperException e) {
+      String errMsg = "Failed claiming queues for regionserver=" + regionserver;
+      abortable.abort(errMsg, e);
+      queues.clear();
+    } finally {
+      if (queuesToClaim != null) {
+        queuesToClaim.close();
+      }
+    }
+    return queues;
   }
 
   @Override
   public List<String> getListOfReplicators() {
-    // TODO
-    throw new NotImplementedException();
+    // scan all of the queues and return a list of all unique OWNER values
+    Set<String> peerServers = new HashSet<String>();
+    ResultScanner allQueuesInCluster = null;
+    try {
+      Scan scan = new Scan();
+      scan.addColumn(CF, COL_OWNER);
+      allQueuesInCluster = replicationTable.getScanner(scan);
+      for (Result queue : allQueuesInCluster) {
+        peerServers.add(Bytes.toString(queue.getValue(CF, COL_OWNER)));
+      }
+    } catch (IOException e) {
+      String errMsg = "Failed getting list of replicators";
+      abortable.abort(errMsg, e);
+    } finally {
+      if (allQueuesInCluster != null) {
+        allQueuesInCluster.close();
+      }
+    }
+    return new ArrayList<String>(peerServers);
   }
 
   @Override
@@ -363,6 +436,7 @@ public class ReplicationQueuesHBaseImpl implements ReplicationQueues {
   /**
    * Create the replication table with the provided HColumnDescriptor REPLICATION_COL_DESCRIPTOR
    * in ReplicationQueuesHBaseImpl
+   *
    * @throws IOException
    */
   private void createReplicationTable() throws IOException {
@@ -372,41 +446,49 @@ public class ReplicationQueuesHBaseImpl implements ReplicationQueues {
   }
 
   /**
-   * Builds the unique identifier for a queue in the Replication table by appending the queueId to
-   * the servername
-   *
-   * @param queueId a String that identifies the queue
-   * @return unique identifier for a queue in the Replication table
+   * Build the row key for the given queueId. This will uniquely identify it from all other queues
+   * in the cluster.
+   * @param serverName The owner of the queue
+   * @param queueId String identifier of the queue
+   * @return String representation of the queue's row key
+   */
+  private String buildQueueRowKey(String serverName, String queueId) {
+    return queueId + ROW_KEY_DELIMITER + serverName;
+  }
+
+  private String buildQueueRowKey(String queueId) {
+    return buildQueueRowKey(serverName, queueId);
+  }
+
+  /**
+   * Parse the original queueId from a row key
+   * @param rowKey String representation of a queue's row key
+   * @return the original queueId
    */
-  private String buildServerQueueName(String queueId) {
-    return serverName + "-" + queueId;
+  private String getRawQueueIdFromRowKey(String rowKey) {
+    return rowKey.split(ROW_KEY_DELIMITER)[0];
   }
-  
+
   /**
    * See safeQueueUpdate(RowMutations mutate)
+   *
    * @param put Row mutation to perform on the queue
    */
-  private void safeQueueUpdate(Put put) {
+  private void safeQueueUpdate(Put put) throws ReplicationException, IOException {
     RowMutations mutations = new RowMutations(put.getRow());
-    try {
-      mutations.add(put);
-    } catch (IOException e){
-      abortable.abort("Failed to update Replication Table because of IOException", e);
-    }
+    mutations.add(put);
     safeQueueUpdate(mutations);
   }
 
   /**
    * See safeQueueUpdate(RowMutations mutate)
+   *
    * @param delete Row mutation to perform on the queue
    */
-  private void safeQueueUpdate(Delete delete) {
+  private void safeQueueUpdate(Delete delete) throws ReplicationException,
+      IOException{
     RowMutations mutations = new RowMutations(delete.getRow());
-    try {
-      mutations.add(delete);
-    } catch (IOException e) {
-      abortable.abort("Failed to update Replication Table because of IOException", e);
-    }
+    mutations.add(delete);
     safeQueueUpdate(mutations);
   }
 
@@ -417,16 +499,30 @@ public class ReplicationQueuesHBaseImpl implements ReplicationQueues {
    *
    * @param mutate Mutation to perform on a given queue
    */
-  private void safeQueueUpdate(RowMutations mutate) {
-    try {
-      boolean updateSuccess = replicationTable.checkAndMutate(mutate.getRow(), CF, COL_OWNER,
-        CompareFilter.CompareOp.EQUAL, serverNameBytes, mutate);
-      if (!updateSuccess) {
-        String errMsg = "Failed to update Replication Table because we lost queue ownership";
-        abortable.abort(errMsg, new ReplicationException(errMsg));
-      }
-    } catch (IOException e) {
-      abortable.abort("Failed to update Replication Table because of IOException", e);
+  private void safeQueueUpdate(RowMutations mutate) throws ReplicationException, IOException{
+    boolean updateSuccess = replicationTable.checkAndMutate(mutate.getRow(), CF, COL_OWNER,
+      CompareFilter.CompareOp.EQUAL, serverNameBytes, mutate);
+    if (!updateSuccess) {
+      throw new ReplicationException("Failed to update Replication Table because we lost queue " +
+        " ownership");
+    }
+  }
+
+  /**
+   * Returns a queue's row key given either its raw or reclaimed queueId
+   *
+   * @param queueId queueId of the queue
+   * @return byte representation of the queue's row key
+   */
+  private byte[] queueIdToRowKey(String queueId) {
+    // Cluster id's are guaranteed to have no hyphens, so if the passed in queueId has no hyphen
+    // then this is not a reclaimed queue.
+    if (!queueId.contains(ROW_KEY_DELIMITER)) {
+      return Bytes.toBytes(buildQueueRowKey(queueId));
+      // If the queueId contained some hyphen it was reclaimed. In this case, the queueId is the
+      // queue's row key
+    } else {
+      return Bytes.toBytes(queueId);
     }
   }
 
@@ -434,64 +530,115 @@ public class ReplicationQueuesHBaseImpl implements ReplicationQueues {
    * Get the QueueIds belonging to the named server from the ReplicationTable
    *
    * @param server name of the server
-   * @return a list of the QueueIds belonging to the server
+   * @return a ResultScanner over the QueueIds belonging to the server
    * @throws IOException
    */
-  private List<String> getQueuesBelongingToServer(String server) throws IOException {
-    List<String> queues = new ArrayList<String>();
+  private ResultScanner getQueuesBelongingToServer(String server) throws IOException {
     Scan scan = new Scan();
     SingleColumnValueFilter filterMyQueues = new SingleColumnValueFilter(CF, COL_OWNER,
       CompareFilter.CompareOp.EQUAL, Bytes.toBytes(server));
     scan.setFilter(filterMyQueues);
-    scan.addColumn(CF, COL_QUEUE_ID);
     scan.addColumn(CF, COL_OWNER);
+    scan.addColumn(CF, COL_OWNER_HISTORY);
     ResultScanner results = replicationTable.getScanner(scan);
-    for (Result result : results) {
-      queues.add(Bytes.toString(result.getValue(CF, COL_QUEUE_ID)));
+    return results;
+  }
+
+  /**
+   * Check if the queue specified by queueId is stored in HBase
+   *
+   * @param queueId Either raw or reclaimed format of the queueId
+   * @return Whether the queue is stored in HBase
+   * @throws IOException
+   */
+  private boolean checkQueueExists(String queueId) throws IOException {
+    byte[] rowKey = queueIdToRowKey(queueId);
+    return replicationTable.exists(new Get(rowKey));
+  }
+
+  /**
+   * Read all of the WAL's from a queue into a list
+   *
+   * @param queue HBase query result containing the queue
+   * @return a list of all the WAL filenames
+   */
+  private List<String> readWALsFromResult(Result queue) {
+    List<String> wals = new ArrayList<>();
+    Map<byte[], byte[]> familyMap = queue.getFamilyMap(CF);
+    for(byte[] cQualifier : familyMap.keySet()) {
+      // Ignore the meta data fields of the queue
+      if (Arrays.equals(cQualifier, COL_OWNER) || Arrays.equals(cQualifier, COL_OWNER_HISTORY)) {
+        continue;
+      }
+      wals.add(Bytes.toString(cQualifier));
     }
-    results.close();
-    return queues;
+    return wals;
   }
 
   /**
-   * Finds the row key of the HBase row corresponding to the provided queue. This has to be done,
-   * because the row key is [original server name + "-" + queueId0]. And the original server will
-   * make calls to getLog(), getQueue(), etc. with the argument queueId = queueId0.
-   * On the original server we can build the row key by concatenating servername + queueId0.
-   * Yet if the queue is claimed by another server, future calls to getLog(), getQueue(), etc.
-   * will be made with the argument queueId = queueId0 + "-" + pastOwner0 + "-" + pastOwner1 ...
-   * so we need a way to look up rows by their modified queueId's.
+   * Attempt to claim the given queue with a checkAndPut on the OWNER column. We check that the
+   * recently killed server is still the OWNER before we claim it.
    *
-   * TODO: Consider updating the queueId passed to getLog, getQueue()... inside of ReplicationSource
-   * TODO: and ReplicationSourceManager or the parsing of the passed in queueId's so that we don't
-   * TODO have to scan the table for row keys for each update. See HBASE-15956.
+   * @param queue The queue that we are trying to claim
+   * @param originalServer The server that originally owned the queue
+   * @return Whether we successfully claimed the queue
+   * @throws IOException
+   */
+  private boolean attemptToClaimQueue (Result queue, String originalServer) throws IOException{
+    Put putQueueNameAndHistory = new Put(queue.getRow());
+    putQueueNameAndHistory.addColumn(CF, COL_OWNER, Bytes.toBytes(serverName));
+    String newOwnerHistory = buildClaimedQueueHistory(Bytes.toString(queue.getValue(CF,
+      COL_OWNER_HISTORY)), originalServer);
+    putQueueNameAndHistory.addColumn(CF, COL_OWNER_HISTORY, Bytes.toBytes(newOwnerHistory));
+    RowMutations claimAndRenameQueue = new RowMutations(queue.getRow());
+    claimAndRenameQueue.add(putQueueNameAndHistory);
+    // Attempt to claim ownership for this queue by checking if the current OWNER is the original
+    // server. If it is not then another RS has already claimed it. If it is we set ourselves as the
+    // new owner and update the queue's history
+    boolean success = replicationTable.checkAndMutate(queue.getRow(), CF, COL_OWNER,
+      CompareFilter.CompareOp.EQUAL, Bytes.toBytes(originalServer), claimAndRenameQueue);
+    return success;
+  }
+
+  /**
+   * Creates a "|" delimited record of the queue's past region server owners.
    *
-   * TODO: We can also cache queueId's if ReplicationQueuesHBaseImpl becomes a bottleneck. We
-   * TODO: currently perform scan's over all the rows looking for one with a matching QueueId.
+   * @param originalHistory the queue's original owner history
+   * @param oldServer the name of the server that used to own the queue
+   * @return the queue's new owner history
+   */
+  private String buildClaimedQueueHistory(String originalHistory, String oldServer) {
+    return originalHistory + "|" + oldServer;
+  }
+
+  /**
+   * Attempts to run a Get on some queue. Will only return a non-null result if we currently own
+   * the queue.
    *
-   * @param queueId string representation of the queue id
-   * @return the rowkey of the corresponding queue. This returns null if the corresponding queue
-   * cannot be found.
+   * @param get The get that we want to query
+   * @return The result of the get if this server is the owner of the queue. Else it returns null
    * @throws IOException
    */
-  private byte[] queueIdToRowKey(String queueId) throws IOException {
-    Scan scan = new Scan();
-    scan.addColumn(CF, COL_QUEUE_ID);
-    scan.addColumn(CF, COL_OWNER);
+  private Result getResultIfOwner(Get get) throws IOException {
+    Scan scan = new Scan(get);
+    // Check if the Get currently contains all columns or only specific columns
+    if (scan.getFamilyMap().size() > 0) {
+      // Add the OWNER column if the scan is already only over specific columns
+      scan.addColumn(CF, COL_OWNER);
+    }
     scan.setMaxResultSize(1);
-    // Search for the queue that matches this queueId
-    SingleColumnValueFilter filterByQueueId = new SingleColumnValueFilter(CF, COL_QUEUE_ID,
-        CompareFilter.CompareOp.EQUAL, Bytes.toBytes(queueId));
-    // Make sure that we are the owners of the queue. QueueId's may overlap.
-    SingleColumnValueFilter filterByOwner = new SingleColumnValueFilter(CF, COL_OWNER,
-        CompareFilter.CompareOp.EQUAL, Bytes.toBytes(serverName));
-    // We only want the row key
-    FirstKeyOnlyFilter filterOutColumns = new FirstKeyOnlyFilter();
-    FilterList filterList = new FilterList(filterByQueueId, filterByOwner, filterOutColumns);
-    scan.setFilter(filterList);
-    ResultScanner results = replicationTable.getScanner(scan);
-    Result result = results.next();
-    results.close();
-    return (result == null) ? null : result.getRow();
+    SingleColumnValueFilter checkOwner = new SingleColumnValueFilter(CF, COL_OWNER,
+      CompareFilter.CompareOp.EQUAL, serverNameBytes);
+    scan.setFilter(checkOwner);
+    ResultScanner scanner = null;
+    try {
+      scanner = replicationTable.getScanner(scan);
+      Result result = scanner.next();
+      return (result == null || result.isEmpty()) ? null : result;
+    } finally {
+      if (scanner != null) {
+        scanner.close();
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/babdedc1/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index f03efff..a3635e4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -19,11 +19,11 @@
 package org.apache.hadoop.hbase.replication;
 
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
-import java.util.SortedMap;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -73,7 +73,7 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
   private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class);
 
   public ReplicationQueuesZKImpl(ReplicationQueuesArguments args) {
-    this(args.getZk(), args.getConf(), args.getAbort());
+    this(args.getZk(), args.getConf(), args.getAbortable());
   }
 
   public ReplicationQueuesZKImpl(final ZooKeeperWatcher zk, Configuration conf,
@@ -178,8 +178,8 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
   }
 
   @Override
-  public SortedMap<String, SortedSet<String>> claimQueues(String regionserverZnode) {
-    SortedMap<String, SortedSet<String>> newQueues = new TreeMap<String, SortedSet<String>>();
+  public Map<String, Set<String>> claimQueues(String regionserverZnode) {
+    Map<String, Set<String>> newQueues = new HashMap<>();
     // check whether there is multi support. If yes, use it.
     if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {
       LOG.info("Atomically moving " + regionserverZnode + "'s WALs to my queue");
@@ -304,8 +304,8 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
    * @param znode pertaining to the region server to copy the queues from
    * @return WAL queues sorted per peer cluster
    */
-  private SortedMap<String, SortedSet<String>> copyQueuesFromRSUsingMulti(String znode) {
-    SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>();
+  private Map<String, Set<String>> copyQueuesFromRSUsingMulti(String znode) {
+    Map<String, Set<String>> queues = new HashMap<>();
     // hbase/replication/rs/deadrs
     String deadRSZnodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
     List<String> peerIdsToProcess = null;
@@ -330,7 +330,7 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
           continue; // empty log queue.
         }
         // create the new cluster znode
-        SortedSet<String> logQueue = new TreeSet<String>();
+        Set<String> logQueue = new HashSet<String>();
         queues.put(newPeerId, logQueue);
         ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY);
         listOfOps.add(op);
@@ -373,10 +373,10 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
    * @param znode server names to copy
    * @return all wals for all peers of that cluster, null if an error occurred
    */
-  private SortedMap<String, SortedSet<String>> copyQueuesFromRS(String znode) {
+  private Map<String, Set<String>> copyQueuesFromRS(String znode) {
     // TODO this method isn't atomic enough, we could start copying and then
     // TODO fail for some reason and we would end up with znodes we don't want.
-    SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>();
+    Map<String, Set<String>> queues = new HashMap<>();
     try {
       String nodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
       List<String> clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, nodePath);
@@ -406,7 +406,7 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
         }
         ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode,
           HConstants.EMPTY_BYTE_ARRAY);
-        SortedSet<String> logQueue = new TreeSet<String>();
+        Set<String> logQueue = new HashSet<String>();
         queues.put(newCluster, logQueue);
         for (String wal : wals) {
           String z = ZKUtil.joinZNode(clusterPath, wal);

http://git-wip-us.apache.org/repos/asf/hbase/blob/babdedc1/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index e9330f4..433f9c5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -31,7 +31,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
-import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.UUID;
@@ -642,7 +641,7 @@ public class ReplicationSourceManager implements ReplicationListener {
         LOG.info("Not transferring queue since we are shutting down");
         return;
       }
-      SortedMap<String, SortedSet<String>> newQueues = null;
+      Map<String, Set<String>> newQueues = null;
 
       newQueues = this.rq.claimQueues(rsZnode);
 
@@ -653,9 +652,9 @@ public class ReplicationSourceManager implements ReplicationListener {
         return;
       }
 
-      for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
+      for (Map.Entry<String, Set<String>> entry : newQueues.entrySet()) {
         String peerId = entry.getKey();
-        SortedSet<String> walsSet = entry.getValue();
+        Set<String> walsSet = entry.getValue();
         try {
           // there is not an actual peer defined corresponding to peerId for the failover.
           ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);

http://git-wip-us.apache.org/repos/asf/hbase/blob/babdedc1/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index de5cc31..b4451f2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -22,8 +22,8 @@ import static org.junit.Assert.*;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.SortedMap;
-import java.util.SortedSet;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -146,7 +146,7 @@ public abstract class TestReplicationStateBasic {
     assertEquals(0, rq3.claimQueues(server1).size());
     assertEquals(2, rq3.getListOfReplicators().size());
 
-    SortedMap<String, SortedSet<String>> queues = rq2.claimQueues(server3);
+    Map<String, Set<String>> queues = rq2.claimQueues(server3);
     assertEquals(5, queues.size());
     assertEquals(1, rq2.getListOfReplicators().size());
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/babdedc1/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
index 8186213..bd6d070 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
@@ -18,26 +18,30 @@
 
 package org.apache.hadoop.hbase.replication;
 
-import junit.framework.Assert;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ChoreService;
 import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
 import static junit.framework.TestCase.assertNull;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -48,10 +52,24 @@ public class TestReplicationStateHBaseImpl {
 
   private static Configuration conf;
   private static HBaseTestingUtility utility;
-  private static Connection connection;
-  private static ReplicationQueues rqH;
+  private static ZooKeeperWatcher zkw;
+  private static String replicationZNode;
+
+  private static ReplicationQueues rq1;
+  private static ReplicationQueues rq2;
+  private static ReplicationQueues rq3;
+  private static ReplicationPeers rp;
+
+  private static final String server1 = ServerName.valueOf("hostname1.example.org", 1234, 123L)
+      .toString();
+  private static final String server2 = ServerName.valueOf("hostname2.example.org", 1234, 1L)
+      .toString();
+  private static final String server3 = ServerName.valueOf("hostname3.example.org", 1234, 1L)
+      .toString();
 
-  private final String server1 = ServerName.valueOf("hostname1.example.org", 1234, -1L).toString();
+  private static DummyServer ds1;
+  private static DummyServer ds2;
+  private static DummyServer ds3;
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
@@ -60,51 +78,63 @@ public class TestReplicationStateHBaseImpl {
     conf = utility.getConfiguration();
     conf.setClass("hbase.region.replica.replication.ReplicationQueuesType",
         ReplicationQueuesHBaseImpl.class, ReplicationQueues.class);
-    connection = ConnectionFactory.createConnection(conf);
+    conf.setClass("hbase.region.replica.replication.ReplicationQueuesType",
+      ReplicationQueuesHBaseImpl.class, ReplicationQueues.class);
+    zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
+    String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
+    replicationZNode = ZKUtil.joinZNode(zkw.baseZNode, replicationZNodeName);
+  }
+
+  @Before
+  public void setUp() {
+    try {
+      ds1 = new DummyServer(server1);
+      rq1 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds1, zkw));
+      rq1.init(server1);
+      ds2 = new DummyServer(server2);
+      rq2 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds2, zkw));
+      rq2.init(server2);
+      ds3 = new DummyServer(server3);
+      rq3 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds3, zkw));
+      rq3.init(server3);
+      rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
+      rp.init();
+      rp.addPeer("Queue1", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus1"));
+      rp.addPeer("Queue2", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus2"));
+      rp.addPeer("Queue3", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus3"));
+    } catch (Exception e) {
+      fail("testReplicationStateHBaseConstruction received an exception" + e.getMessage());
+    }
   }
 
   @Test
   public void checkNamingSchema() throws Exception {
-    rqH.init(server1);
-    assertTrue(rqH.isThisOurRegionServer(server1));
-    assertTrue(!rqH.isThisOurRegionServer(server1 + "a"));
-    assertTrue(!rqH.isThisOurRegionServer(null));
+    assertTrue(rq1.isThisOurRegionServer(server1));
+    assertTrue(!rq1.isThisOurRegionServer(server1 + "a"));
+    assertTrue(!rq1.isThisOurRegionServer(null));
   }
 
   @Test
-  public void testReplicationStateHBase() {
-    DummyServer ds = new DummyServer(server1);
-    try {
-      rqH = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds, null));
-      rqH.init(server1);
-      // Check that the proper System Tables have been generated
-      Table replicationTable = connection.getTable(
-          ReplicationQueuesHBaseImpl.REPLICATION_TABLE_NAME);
-      assertTrue(replicationTable.getName().isSystemTable());
-
-    } catch (Exception e) {
-      e.printStackTrace();
-      fail("testReplicationStateHBaseConstruction received an Exception");
-    }
+  public void testSingleReplicationQueuesHBaseImpl() {
     try {
       // Test adding in WAL files
-      assertEquals(0, rqH.getAllQueues().size());
-      rqH.addLog("Queue1", "WALLogFile1.1");
-      assertEquals(1, rqH.getAllQueues().size());
-      rqH.addLog("Queue1", "WALLogFile1.2");
-      rqH.addLog("Queue1", "WALLogFile1.3");
-      rqH.addLog("Queue1", "WALLogFile1.4");
-      rqH.addLog("Queue2", "WALLogFile2.1");
-      rqH.addLog("Queue3", "WALLogFile3.1");
-      assertEquals(3, rqH.getAllQueues().size());
-      assertEquals(4, rqH.getLogsInQueue("Queue1").size());
-      assertEquals(1, rqH.getLogsInQueue("Queue2").size());
-      assertEquals(1, rqH.getLogsInQueue("Queue3").size());
+      assertEquals(0, rq1.getAllQueues().size());
+      rq1.addLog("Queue1", "WALLogFile1.1");
+      assertEquals(1, rq1.getAllQueues().size());
+      rq1.addLog("Queue1", "WALLogFile1.2");
+      rq1.addLog("Queue1", "WALLogFile1.3");
+      rq1.addLog("Queue1", "WALLogFile1.4");
+      rq1.addLog("Queue2", "WALLogFile2.1");
+      rq1.addLog("Queue3", "WALLogFile3.1");
+      assertEquals(3, rq1.getAllQueues().size());
+      assertEquals(4, rq1.getLogsInQueue("Queue1").size());
+      assertEquals(1, rq1.getLogsInQueue("Queue2").size());
+      assertEquals(1, rq1.getLogsInQueue("Queue3").size());
       // Make sure that abortCount is still 0
-      assertEquals(0, ds.getAbortCount());
+      assertEquals(0, ds1.getAbortCount());
       // Make sure that getting a log from a non-existent queue triggers an abort
-      assertNull(rqH.getLogsInQueue("Queue4"));
-      assertEquals(1, ds.getAbortCount());
+      assertNull(rq1.getLogsInQueue("Queue4"));
+      assertEquals(1, ds1.getAbortCount());
     } catch (ReplicationException e) {
       e.printStackTrace();
       fail("testAddLog received a ReplicationException");
@@ -112,59 +142,186 @@ public class TestReplicationStateHBaseImpl {
     try {
 
       // Test updating the log positions
-      assertEquals(0L, rqH.getLogPosition("Queue1", "WALLogFile1.1"));
-      rqH.setLogPosition("Queue1", "WALLogFile1.1", 123L);
-      assertEquals(123L, rqH.getLogPosition("Queue1", "WALLogFile1.1"));
-      rqH.setLogPosition("Queue1", "WALLogFile1.1", 123456789L);
-      assertEquals(123456789L, rqH.getLogPosition("Queue1", "WALLogFile1.1"));
-      rqH.setLogPosition("Queue2", "WALLogFile2.1", 242L);
-      assertEquals(242L, rqH.getLogPosition("Queue2", "WALLogFile2.1"));
-      rqH.setLogPosition("Queue3", "WALLogFile3.1", 243L);
-      assertEquals(243L, rqH.getLogPosition("Queue3", "WALLogFile3.1"));
+      assertEquals(0L, rq1.getLogPosition("Queue1", "WALLogFile1.1"));
+      rq1.setLogPosition("Queue1", "WALLogFile1.1", 123L);
+      assertEquals(123L, rq1.getLogPosition("Queue1", "WALLogFile1.1"));
+      rq1.setLogPosition("Queue1", "WALLogFile1.1", 123456789L);
+      assertEquals(123456789L, rq1.getLogPosition("Queue1", "WALLogFile1.1"));
+      rq1.setLogPosition("Queue2", "WALLogFile2.1", 242L);
+      assertEquals(242L, rq1.getLogPosition("Queue2", "WALLogFile2.1"));
+      rq1.setLogPosition("Queue3", "WALLogFile3.1", 243L);
+      assertEquals(243L, rq1.getLogPosition("Queue3", "WALLogFile3.1"));
 
       // Test that setting log positions in non-existing logs will cause an abort
-      assertEquals(1, ds.getAbortCount());
-      rqH.setLogPosition("NotHereQueue", "WALLogFile3.1", 243L);
-      assertEquals(2, ds.getAbortCount());
-      rqH.setLogPosition("NotHereQueue", "NotHereFile", 243L);
-      assertEquals(3, ds.getAbortCount());
-      rqH.setLogPosition("Queue1", "NotHereFile", 243l);
-      assertEquals(4, ds.getAbortCount());
+      assertEquals(1, ds1.getAbortCount());
+      rq1.setLogPosition("NotHereQueue", "WALLogFile3.1", 243L);
+      assertEquals(2, ds1.getAbortCount());
+      rq1.setLogPosition("NotHereQueue", "NotHereFile", 243L);
+      assertEquals(3, ds1.getAbortCount());
+      rq1.setLogPosition("Queue1", "NotHereFile", 243l);
+      assertEquals(4, ds1.getAbortCount());
 
       // Test reading log positions for non-existent queues and WAL's
       try {
-        rqH.getLogPosition("Queue1", "NotHereWAL");
+        rq1.getLogPosition("Queue1", "NotHereWAL");
         fail("Replication queue should have thrown a ReplicationException for reading from a " +
             "non-existent WAL");
       } catch (ReplicationException e) {
       }
       try {
-        rqH.getLogPosition("NotHereQueue", "NotHereWAL");
+        rq1.getLogPosition("NotHereQueue", "NotHereWAL");
         fail("Replication queue should have thrown a ReplicationException for reading from a " +
             "non-existent queue");
       } catch (ReplicationException e) {
       }
       // Test removing logs
-      rqH.removeLog("Queue1", "WALLogFile1.1");
-      assertEquals(3, rqH.getLogsInQueue("Queue1").size());
+      rq1.removeLog("Queue1", "WALLogFile1.1");
+      assertEquals(3, rq1.getLogsInQueue("Queue1").size());
       // Test removing queues
-      rqH.removeQueue("Queue2");
-      assertEquals(2, rqH.getAllQueues().size());
-      assertNull(rqH.getLogsInQueue("Queue2"));
+      rq1.removeQueue("Queue2");
+      assertEquals(2, rq1.getAllQueues().size());
+      assertNull(rq1.getLogsInQueue("Queue2"));
       // Test that getting logs from a non-existent queue aborts
-      assertEquals(5, ds.getAbortCount());
+      assertEquals(5, ds1.getAbortCount());
       // Test removing all queues for a Region Server
-      rqH.removeAllQueues();
-      assertEquals(0, rqH.getAllQueues().size());
-      assertNull(rqH.getLogsInQueue("Queue1"));
+      rq1.removeAllQueues();
+      assertEquals(0, rq1.getAllQueues().size());
+      assertNull(rq1.getLogsInQueue("Queue1"));
       // Test that getting logs from a non-existent queue aborts
-      assertEquals(6, ds.getAbortCount());
+      assertEquals(6, ds1.getAbortCount());
     } catch (ReplicationException e) {
       e.printStackTrace();
       fail("testAddLog received a ReplicationException");
     }
   }
 
+  @Test
+  public void TestMultipleReplicationQueuesHBaseImpl () {
+    try {
+      // Test adding in WAL files
+      rq1.addLog("Queue1", "WALLogFile1.1");
+      rq1.addLog("Queue1", "WALLogFile1.2");
+      rq1.addLog("Queue1", "WALLogFile1.3");
+      rq1.addLog("Queue1", "WALLogFile1.4");
+      rq1.addLog("Queue2", "WALLogFile2.1");
+      rq1.addLog("Queue3", "WALLogFile3.1");
+      rq2.addLog("Queue1", "WALLogFile1.1");
+      rq2.addLog("Queue1", "WALLogFile1.2");
+      rq2.addLog("Queue2", "WALLogFile2.1");
+      rq3.addLog("Queue1", "WALLogFile1.1");
+      // Test adding logs to replication queues
+      assertEquals(3, rq1.getAllQueues().size());
+      assertEquals(2, rq2.getAllQueues().size());
+      assertEquals(1, rq3.getAllQueues().size());
+      assertEquals(4, rq1.getLogsInQueue("Queue1").size());
+      assertEquals(1, rq1.getLogsInQueue("Queue2").size());
+      assertEquals(1, rq1.getLogsInQueue("Queue3").size());
+      assertEquals(2, rq2.getLogsInQueue("Queue1").size());
+      assertEquals(1, rq2.getLogsInQueue("Queue2").size());
+      assertEquals(1, rq3.getLogsInQueue("Queue1").size());
+    } catch (ReplicationException e) {
+      e.printStackTrace();
+      fail("testAddLogs received a ReplicationException");
+    }
+    try {
+      // Test setting and reading offset in queues
+      rq1.setLogPosition("Queue1", "WALLogFile1.1", 1l);
+      rq1.setLogPosition("Queue1", "WALLogFile1.2", 2l);
+      rq1.setLogPosition("Queue1", "WALLogFile1.3", 3l);
+      rq1.setLogPosition("Queue2", "WALLogFile2.1", 4l);
+      rq1.setLogPosition("Queue2", "WALLogFile2.2", 5l);
+      rq1.setLogPosition("Queue3", "WALLogFile3.1", 6l);
+      rq2.setLogPosition("Queue1", "WALLogFile1.1", 7l);
+      rq2.setLogPosition("Queue2", "WALLogFile2.1", 8l);
+      rq3.setLogPosition("Queue1", "WALLogFile1.1", 9l);
+      assertEquals(1l, rq1.getLogPosition("Queue1", "WALLogFile1.1"));
+      assertEquals(2l, rq1.getLogPosition("Queue1", "WALLogFile1.2"));
+      assertEquals(4l, rq1.getLogPosition("Queue2", "WALLogFile2.1"));
+      assertEquals(6l, rq1.getLogPosition("Queue3", "WALLogFile3.1"));
+      assertEquals(7l, rq2.getLogPosition("Queue1", "WALLogFile1.1"));
+      assertEquals(8l, rq2.getLogPosition("Queue2", "WALLogFile2.1"));
+      assertEquals(9l, rq3.getLogPosition("Queue1", "WALLogFile1.1"));
+      assertEquals(rq1.getListOfReplicators().size(), 3);
+      assertEquals(rq2.getListOfReplicators().size(), 3);
+      assertEquals(rq3.getListOfReplicators().size(), 3);
+    } catch (ReplicationException e) {
+      fail("testAddLogs threw a ReplicationException");
+    }
+    try {
+      // Test claiming queues
+      Map<String, Set<String>> claimedQueuesFromRq2 = rq1.claimQueues(server2);
+      // Check to make sure that list of peers with outstanding queues is decremented by one
+      // after claimQueues
+      assertEquals(rq1.getListOfReplicators().size(), 2);
+      assertEquals(rq2.getListOfReplicators().size(), 2);
+      assertEquals(rq3.getListOfReplicators().size(), 2);
+      // Check to make sure that we claimed the proper number of queues
+      assertEquals(2, claimedQueuesFromRq2.size());
+      assertTrue(claimedQueuesFromRq2.containsKey("Queue1-" + server2));
+      assertTrue(claimedQueuesFromRq2.containsKey("Queue2-" + server2));
+      assertEquals(2, claimedQueuesFromRq2.get("Queue1-" + server2).size());
+      assertEquals(1, claimedQueuesFromRq2.get("Queue2-" + server2).size());
+      assertEquals(5, rq1.getAllQueues().size());
+      // Check that all the logs in the other queue were claimed
+      assertEquals(2, rq1.getLogsInQueue("Queue1-" + server2).size());
+      assertEquals(1, rq1.getLogsInQueue("Queue2-" + server2).size());
+      // Check that the offsets of the claimed queues are the same
+      assertEquals(7l, rq1.getLogPosition("Queue1-" + server2, "WALLogFile1.1"));
+      assertEquals(8l, rq1.getLogPosition("Queue2-" + server2, "WALLogFile2.1"));
+      // Check that the queues were properly removed from rq2
+      assertEquals(0, rq2.getAllQueues().size());
+      assertNull(rq2.getLogsInQueue("Queue1"));
+      assertNull(rq2.getLogsInQueue("Queue2"));
+      // Check that non-existent peer queues are not claimed
+      rq1.addLog("UnclaimableQueue", "WALLogFile1.1");
+      rq1.addLog("UnclaimableQueue", "WALLogFile1.2");
+      assertEquals(6, rq1.getAllQueues().size());
+      Map<String, Set<String>> claimedQueuesFromRq1 = rq3.claimQueues(server1);
+      assertEquals(rq1.getListOfReplicators().size(), 1);
+      assertEquals(rq2.getListOfReplicators().size(), 1);
+      assertEquals(rq3.getListOfReplicators().size(), 1);
+      // Note that we do not pick up the queue: UnclaimableQueue which was not registered in
+      // Replication Peers
+      assertEquals(6, rq3.getAllQueues().size());
+      // Test claiming non-existing queues
+      Map<String, Set<String>> noQueues = rq3.claimQueues("NotARealServer");
+      assertEquals(0, noQueues.size());
+      assertEquals(6, rq3.getAllQueues().size());
+      // Test claiming own queues
+      noQueues = rq3.claimQueues(server3);
+      assertEquals(0, noQueues.size());
+      assertEquals(6, rq3.getAllQueues().size());
+      // Check that rq3 still remain on list of replicators
+      assertEquals(1, rq3.getListOfReplicators().size());
+    } catch (ReplicationException e) {
+      fail("testClaimQueue threw a ReplicationException");
+    }
+  }
+
+  @After
+  public void clearQueues() throws Exception{
+    rq1.removeAllQueues();
+    rq2.removeAllQueues();
+    rq3.removeAllQueues();
+    assertEquals(0, rq1.getAllQueues().size());
+    assertEquals(0, rq2.getAllQueues().size());
+    assertEquals(0, rq3.getAllQueues().size());
+    ds1.resetAbortCount();
+    ds2.resetAbortCount();
+    ds3.resetAbortCount();
+  }
+
+  @After
+  public void tearDown() throws KeeperException, IOException {
+     ZKUtil.deleteNodeRecursively(zkw, replicationZNode);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    utility.shutdownMiniCluster();
+    utility.shutdownMiniZKCluster();
+  }
+
   static class DummyServer implements Server {
     private String serverName;
     private boolean isAborted = false;
@@ -239,5 +396,10 @@ public class TestReplicationStateHBaseImpl {
     public int getAbortCount() {
       return abortCount;
     }
+
+    public void resetAbortCount() {
+      abortCount = 0;
+    }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/babdedc1/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
index e731135..972a400 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;

http://git-wip-us.apache.org/repos/asf/hbase/blob/babdedc1/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index d1db068..e14fd3c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -31,7 +31,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
-import java.util.SortedMap;
+import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
@@ -389,7 +389,7 @@ public class TestReplicationSourceManager {
         ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1,
           s1.getZooKeeper()));
     rq1.init(s1.getServerName().toString());
-    SortedMap<String, SortedSet<String>> testMap =
+    Map<String, Set<String>> testMap =
         rq1.claimQueues(server.getServerName().getServerName());
     ReplicationQueues rq2 =
         ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s2.getConfiguration(), s2,
@@ -402,7 +402,7 @@ public class TestReplicationSourceManager {
     rq3.init(s3.getServerName().toString());
     testMap = rq3.claimQueues(s2.getServerName().getServerName());
 
-    ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.firstKey());
+    ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.keySet().iterator().next());
     List<String> result = replicationQueueInfo.getDeadRegionServers();
 
     // verify
@@ -523,7 +523,7 @@ public class TestReplicationSourceManager {
   }
 
   static class DummyNodeFailoverWorker extends Thread {
-    private SortedMap<String, SortedSet<String>> logZnodesMap;
+    private Map<String, Set<String>> logZnodesMap;
     Server server;
     private String deadRsZnode;
     ReplicationQueues rq;
@@ -553,12 +553,12 @@ public class TestReplicationSourceManager {
      * @return 1 when the map is not empty.
      */
     private int isLogZnodesMapPopulated() {
-      Collection<SortedSet<String>> sets = logZnodesMap.values();
+      Collection<Set<String>> sets = logZnodesMap.values();
       if (sets.size() > 1) {
         throw new RuntimeException("unexpected size of logZnodesMap: " + sets.size());
       }
       if (sets.size() == 1) {
-        SortedSet<String> s = sets.iterator().next();
+        Set<String> s = sets.iterator().next();
         for (String file : files) {
           // at least one file was missing
           if (!s.contains(file)) {