You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2016/06/21 20:08:46 UTC

hbase git commit: HBASE-16036 Made Replication Table creation non-blocking.

Repository: hbase
Updated Branches:
  refs/heads/master b006e41a3 -> 152594560


HBASE-16036 Made Replication Table creation non-blocking.

All ReplicationTableBase method's that need to access the Replication Table will block until it is created though.
Also refactored ReplicationSourceManager so that abandoned queue adoption is run in the background too so that it does not block HRegionServer initialization.

Signed-off-by: Elliott Clark <ec...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/15259456
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/15259456
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/15259456

Branch: refs/heads/master
Commit: 152594560e29549642587b850320f5d66339b747
Parents: b006e41
Author: Joseph Hwang <jz...@fb.com>
Authored: Wed Jun 15 14:35:56 2016 -0700
Committer: Elliott Clark <ec...@apache.org>
Committed: Tue Jun 21 13:05:50 2016 -0700

----------------------------------------------------------------------
 .../replication/ReplicationQueuesArguments.java |   4 +
 .../ReplicationQueuesClientArguments.java       |   5 +
 .../hbase/replication/ReplicationTableBase.java | 186 ++++++++++++++-----
 .../TableBasedReplicationQueuesClientImpl.java  |   3 +-
 .../TableBasedReplicationQueuesImpl.java        | 106 +++++------
 .../regionserver/ReplicationSourceManager.java  |  43 +++--
 .../TestReplicationStateHBaseImpl.java          |   2 +-
 .../replication/TestReplicationTableBase.java   | 109 +++++++++++
 .../TestReplicationSourceManager.java           | 160 ++++------------
 .../TestReplicationSourceManagerZkImpl.java     | 152 +++++++++++++++
 ...tTableBasedReplicationSourceManagerImpl.java |  60 ++++++
 11 files changed, 579 insertions(+), 251 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/15259456/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java
index 4fdc4e7..12fc6a1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java
@@ -23,6 +23,10 @@ import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 
+/**
+ * Wrapper around common arguments used to construct ReplicationQueues. Used to construct various
+ * ReplicationQueues Implementations with different constructor arguments by reflection.
+ */
 @InterfaceAudience.Private
 public class ReplicationQueuesArguments {
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/15259456/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java
index 8a61993..834f831 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java
@@ -23,6 +23,11 @@ import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 
+/**
+ * Wrapper around common arguments used to construct ReplicationQueuesClient. Used to construct
+ * various ReplicationQueuesClient Implementations with different constructor arguments by
+ * reflection.
+ */
 @InterfaceAudience.Private
 public class ReplicationQueuesClientArguments extends ReplicationQueuesArguments {
   public ReplicationQueuesClientArguments(Configuration conf, Abortable abort,

http://git-wip-us.apache.org/repos/asf/hbase/blob/15259456/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java
index c1506cd..61bb041 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java
@@ -18,12 +18,14 @@
 */
 package org.apache.hadoop.hbase.replication;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Admin;
@@ -42,12 +44,18 @@ import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.hadoop.hbase.util.RetryCounterFactory;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 /*
  * Abstract class that provides an interface to the Replication Table. Which is currently
@@ -59,8 +67,10 @@ import java.util.Set;
  *  COL_QUEUE_OWNER_HISTORY: a "|" delimited list of the previous server's that have owned this
  *    queue. The most recent previous owner is the leftmost entry.
  * They will also have columns mapping [WAL filename : offset]
+ * The most flexible method of interacting with the Replication Table is by calling
+ * getOrBlockOnReplicationTable() which will return a new copy of the Replication Table. It is up
+ * to the caller to close the returned table.
  */
-
 @InterfaceAudience.Private
 abstract class ReplicationTableBase {
 
@@ -99,20 +109,23 @@ abstract class ReplicationTableBase {
   private static final int RPC_TIMEOUT = 2000;
   private static final int OPERATION_TIMEOUT = CLIENT_RETRIES * RPC_TIMEOUT;
 
-  protected final Table replicationTable;
+  // We only need a single thread to initialize the Replication Table
+  private static final int NUM_INITIALIZE_WORKERS = 1;
+
   protected final Configuration conf;
   protected final Abortable abortable;
-  private final Admin admin;
   private final Connection connection;
+  private final Executor executor;
+  private volatile CountDownLatch replicationTableInitialized;
 
   public ReplicationTableBase(Configuration conf, Abortable abort) throws IOException {
     this.conf = new Configuration(conf);
     this.abortable = abort;
     decorateConf();
     this.connection = ConnectionFactory.createConnection(this.conf);
-    this.admin = connection.getAdmin();
-    this.replicationTable = createAndGetReplicationTable();
-    setTableTimeOuts();
+    this.executor = setUpExecutor();
+    this.replicationTableInitialized = new CountDownLatch(1);
+    createReplicationTableInBackground();
   }
 
   /**
@@ -124,11 +137,34 @@ abstract class ReplicationTableBase {
   }
 
   /**
+   * Sets up the thread pool executor used to build the Replication Table in the background
+   * @return the configured executor
+   */
+  private Executor setUpExecutor() {
+    ThreadPoolExecutor tempExecutor = new ThreadPoolExecutor(NUM_INITIALIZE_WORKERS,
+        NUM_INITIALIZE_WORKERS, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+    ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
+    tfb.setNameFormat("ReplicationTableExecutor-%d");
+    tfb.setDaemon(true);
+    tempExecutor.setThreadFactory(tfb.build());
+    return tempExecutor;
+  }
+
+  /**
+   * Get whether the Replication Table has been successfully initialized yet
+   * @return whether the Replication Table is initialized
+   */
+  public boolean getInitializationStatus() {
+    return replicationTableInitialized.getCount() == 0;
+  }
+
+  /**
    * Increases the RPC and operations timeouts for the Replication Table
    */
-  private void setTableTimeOuts() {
+  private Table setReplicationTableTimeOuts(Table replicationTable) {
     replicationTable.setRpcTimeout(RPC_TIMEOUT);
     replicationTable.setOperationTimeout(OPERATION_TIMEOUT);
+    return replicationTable;
   }
 
   /**
@@ -189,7 +225,7 @@ abstract class ReplicationTableBase {
     // scan all of the queues and return a list of all unique OWNER values
     Set<String> peerServers = new HashSet<String>();
     ResultScanner allQueuesInCluster = null;
-    try {
+    try (Table replicationTable = getOrBlockOnReplicationTable()){
       Scan scan = new Scan();
       scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
       allQueuesInCluster = replicationTable.getScanner(scan);
@@ -244,7 +280,7 @@ abstract class ReplicationTableBase {
 
   protected List<String> getLogsInQueue(byte[] rowKey) {
     String errMsg = "Failed getting logs in queue queueId=" + Bytes.toString(rowKey);
-    try {
+    try (Table replicationTable = getOrBlockOnReplicationTable()) {
       Get getQueue = new Get(rowKey);
       Result queue = replicationTable.get(getQueue);
       if (queue == null || queue.isEmpty()) {
@@ -286,66 +322,120 @@ abstract class ReplicationTableBase {
    * @return a ResultScanner over the QueueIds belonging to the server
    * @throws IOException
    */
-  private ResultScanner getQueuesBelongingToServer(String server) throws IOException {
+  protected ResultScanner getQueuesBelongingToServer(String server) throws IOException {
     Scan scan = new Scan();
     SingleColumnValueFilter filterMyQueues = new SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER,
       CompareFilter.CompareOp.EQUAL, Bytes.toBytes(server));
     scan.setFilter(filterMyQueues);
     scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
     scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY);
-    ResultScanner results = replicationTable.getScanner(scan);
-    return results;
+    try (Table replicationTable = getOrBlockOnReplicationTable()) {
+      ResultScanner results = replicationTable.getScanner(scan);
+      return results;
+    }
   }
 
   /**
-   * Gets the Replication Table. Builds and blocks until the table is available if the Replication
-   * Table does not exist.
-   *
-   * @return the Replication Table
-   * @throws IOException if the Replication Table takes too long to build
+   * Attempts to acquire the Replication Table. This operation will block until it is assigned by
+   * the CreateReplicationWorker thread. It is up to the caller of this method to close the
+   * returned Table
+   * @return the Replication Table when it is created
+   * @throws IOException
    */
-  private Table createAndGetReplicationTable() throws IOException {
-    if (!replicationTableExists()) {
-      createReplicationTable();
-    }
-    int maxRetries = conf.getInt("replication.queues.createtable.retries.number", 100);
-    RetryCounterFactory counterFactory = new RetryCounterFactory(maxRetries, 100);
-    RetryCounter retryCounter = counterFactory.create();
-    while (!replicationTableExists()) {
-      try {
-        retryCounter.sleepUntilNextRetry();
-        if (!retryCounter.shouldRetry()) {
-          throw new IOException("Unable to acquire the Replication Table");
-        }
-      } catch (InterruptedException e) {
-        return null;
-      }
+  protected Table getOrBlockOnReplicationTable() throws IOException {
+    // Sleep until the Replication Table becomes available
+    try {
+      replicationTableInitialized.await();
+    } catch (InterruptedException e) {
+      String errMsg = "Unable to acquire the Replication Table due to InterruptedException: " +
+          e.getMessage();
+      throw new InterruptedIOException(errMsg);
     }
-    return connection.getTable(REPLICATION_TABLE_NAME);
+    return getAndSetUpReplicationTable();
   }
 
   /**
-   * Create the replication table with the provided HColumnDescriptor REPLICATION_COL_DESCRIPTOR
-   * in TableBasedReplicationQueuesImpl
+   * Creates a new copy of the Replication Table and sets up the proper Table time outs for it
+   *
+   * @return the Replication Table
    * @throws IOException
    */
-  private void createReplicationTable() throws IOException {
-    HTableDescriptor replicationTableDescriptor = new HTableDescriptor(REPLICATION_TABLE_NAME);
-    replicationTableDescriptor.addFamily(REPLICATION_COL_DESCRIPTOR);
-    admin.createTable(replicationTableDescriptor);
+  private Table getAndSetUpReplicationTable() throws IOException {
+    Table replicationTable = connection.getTable(REPLICATION_TABLE_NAME);
+    setReplicationTableTimeOuts(replicationTable);
+    return replicationTable;
   }
 
   /**
-   * Checks whether the Replication Table exists yet
+   * Builds the Replication Table in a background thread. Any method accessing the Replication Table
+   * should do so through getOrBlockOnReplicationTable()
    *
-   * @return whether the Replication Table exists
-   * @throws IOException
+   * @return the Replication Table
+   * @throws IOException if the Replication Table takes too long to build
    */
-  private boolean replicationTableExists() {
-    try {
-      return admin.tableExists(REPLICATION_TABLE_NAME);
-    } catch (IOException e) {
-      return false;
+  private void createReplicationTableInBackground() throws IOException {
+    executor.execute(new CreateReplicationTableWorker());
+  }
+
+  /**
+   * Attempts to build the Replication Table. Will continue blocking until we have a valid
+   * Table for the Replication Table.
+   */
+  private class CreateReplicationTableWorker implements Runnable {
+
+    private Admin admin;
+
+    @Override
+    public void run() {
+      try {
+        admin = connection.getAdmin();
+        if (!replicationTableExists()) {
+          createReplicationTable();
+        }
+        int maxRetries = conf.getInt("hbase.replication.queues.createtable.retries.number",
+            CLIENT_RETRIES);
+        RetryCounterFactory counterFactory = new RetryCounterFactory(maxRetries, RPC_TIMEOUT);
+        RetryCounter retryCounter = counterFactory.create();
+        while (!replicationTableExists()) {
+          retryCounter.sleepUntilNextRetry();
+          if (!retryCounter.shouldRetry()) {
+            throw new IOException("Unable to acquire the Replication Table");
+          }
+        }
+        replicationTableInitialized.countDown();
+      } catch (IOException | InterruptedException e) {
+        abortable.abort("Failed building Replication Table", e);
+      }
+    }
+
+    /**
+     * Create the replication table with the provided HColumnDescriptor REPLICATION_COL_DESCRIPTOR
+     * in TableBasedReplicationQueuesImpl
+     *
+     * @throws IOException
+     */
+    private void createReplicationTable() throws IOException {
+      HTableDescriptor replicationTableDescriptor = new HTableDescriptor(REPLICATION_TABLE_NAME);
+      replicationTableDescriptor.addFamily(REPLICATION_COL_DESCRIPTOR);
+      try {
+        admin.createTable(replicationTableDescriptor);
+      } catch (TableExistsException e) {
+        // In this case we can just continue as normal
+      }
+    }
+
+    /**
+     * Checks whether the Replication Table exists yet
+     *
+     * @return whether the Replication Table exists
+     * @throws IOException
+     */
+    private boolean replicationTableExists() {
+      try {
+        return admin.tableExists(REPLICATION_TABLE_NAME);
+      } catch (IOException e) {
+        return false;
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/15259456/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java
index 55dfdd8..dcbed7a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.zookeeper.KeeperException;
 
 import java.io.IOException;
@@ -73,7 +74,7 @@ public class TableBasedReplicationQueuesClientImpl extends ReplicationTableBase
   public Set<String> getAllWALs() {
     Set<String> allWals = new HashSet<String>();
     ResultScanner allQueues = null;
-    try {
+    try (Table replicationTable = getOrBlockOnReplicationTable()) {
       allQueues = replicationTable.getScanner(new Scan());
       for (Result queue : allQueues) {
         for (String wal : readWALsFromResult(queue)) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/15259456/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
index 6ea7801..28fa967 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.filter.CompareFilter;
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -105,7 +106,7 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase
 
   @Override
   public void addLog(String queueId, String filename) throws ReplicationException {
-    try {
+    try (Table replicationTable = getOrBlockOnReplicationTable()) {
       if (!checkQueueExists(queueId)) {
         // Each queue will have an Owner, OwnerHistory, and a collection of [WAL:offset] key values
         Put putNewQueue = new Put(Bytes.toBytes(buildQueueRowKey(queueId)));
@@ -140,7 +141,7 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase
 
   @Override
   public void setLogPosition(String queueId, String filename, long position) {
-    try {
+    try (Table replicationTable = getOrBlockOnReplicationTable()) {
       byte[] rowKey = queueIdToRowKey(queueId);
       // Check that the log exists. addLog() must have been called before setLogPosition().
       Get checkLogExists = new Get(rowKey);
@@ -190,8 +191,31 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase
 
   @Override
   public List<String> getLogsInQueue(String queueId) {
+    String errMsg = "Failed getting logs in queue queueId=" + queueId;
     byte[] rowKey = queueIdToRowKey(queueId);
-    return getLogsInQueueAndCheckOwnership(rowKey);
+    List<String> logs = new ArrayList<String>();
+    try {
+      Get getQueue = new Get(rowKey);
+      Result queue = getResultIfOwner(getQueue);
+      if (queue == null || queue.isEmpty()) {
+        String errMsgLostOwnership = "Failed getting logs for queue queueId=" +
+            Bytes.toString(rowKey) + " because the queue was missing or we lost ownership";
+        abortable.abort(errMsg, new ReplicationException(errMsgLostOwnership));
+        return null;
+      }
+      Map<byte[], byte[]> familyMap = queue.getFamilyMap(CF_QUEUE);
+      for(byte[] cQualifier : familyMap.keySet()) {
+        if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || Arrays.equals(cQualifier,
+            COL_QUEUE_OWNER_HISTORY)) {
+          continue;
+        }
+        logs.add(Bytes.toString(cQualifier));
+      }
+    } catch (IOException e) {
+      abortable.abort(errMsg, e);
+      return null;
+    }
+    return logs;
   }
 
   @Override
@@ -207,7 +231,7 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase
     }
     ResultScanner queuesToClaim = null;
     try {
-      queuesToClaim = getAllQueuesScanner(regionserver);
+      queuesToClaim = getQueuesBelongingToServer(regionserver);
       for (Result queue : queuesToClaim) {
         if (attemptToClaimQueue(queue, regionserver)) {
           String rowKey = Bytes.toString(queue.getRow());
@@ -240,24 +264,6 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase
     return queues;
   }
 
-  /**
-   * Get the QueueIds belonging to the named server from the ReplicationTableBase
-   *
-   * @param server name of the server
-   * @return a ResultScanner over the QueueIds belonging to the server
-   * @throws IOException
-   */
-  private ResultScanner getAllQueuesScanner(String server) throws IOException {
-    Scan scan = new Scan();
-    SingleColumnValueFilter filterMyQueues = new SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER,
-      CompareFilter.CompareOp.EQUAL, Bytes.toBytes(server));
-    scan.setFilter(filterMyQueues);
-    scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
-    scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY);
-    ResultScanner results = replicationTable.getScanner(scan);
-    return results;
-  }
-
   @Override
   public boolean isThisOurRegionServer(String regionserver) {
     return this.serverName.equals(regionserver);
@@ -287,33 +293,6 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase
     throw new NotImplementedException();
   }
 
-  private List<String> getLogsInQueueAndCheckOwnership(byte[] rowKey) {
-    String errMsg = "Failed getting logs in queue queueId=" + Bytes.toString(rowKey);
-    List<String> logs = new ArrayList<String>();
-    try {
-      Get getQueue = new Get(rowKey);
-      Result queue = getResultIfOwner(getQueue);
-      if (queue == null || queue.isEmpty()) {
-        String errMsgLostOwnership = "Failed getting logs for queue queueId=" +
-          Bytes.toString(rowKey) + " because the queue was missing or we lost ownership";
-        abortable.abort(errMsg, new ReplicationException(errMsgLostOwnership));
-        return null;
-      }
-      Map<byte[], byte[]> familyMap = queue.getFamilyMap(CF_QUEUE);
-      for(byte[] cQualifier : familyMap.keySet()) {
-        if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || Arrays.equals(cQualifier,
-            COL_QUEUE_OWNER_HISTORY)) {
-          continue;
-        }
-        logs.add(Bytes.toString(cQualifier));
-      }
-    } catch (IOException e) {
-      abortable.abort(errMsg, e);
-      return null;
-    }
-    return logs;
-  }
-
   private String buildQueueRowKey(String queueId) {
     return buildQueueRowKey(serverName, queueId);
   }
@@ -358,11 +337,13 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase
    * @param mutate Mutation to perform on a given queue
    */
   private void safeQueueUpdate(RowMutations mutate) throws ReplicationException, IOException{
-    boolean updateSuccess = replicationTable.checkAndMutate(mutate.getRow(), CF_QUEUE,
-        COL_QUEUE_OWNER, CompareFilter.CompareOp.EQUAL, serverNameBytes, mutate);
-    if (!updateSuccess) {
-      throw new ReplicationException("Failed to update Replication Table because we lost queue " +
-        " ownership");
+    try (Table replicationTable = getOrBlockOnReplicationTable()) {
+      boolean updateSuccess = replicationTable.checkAndMutate(mutate.getRow(),
+          CF_QUEUE, COL_QUEUE_OWNER, CompareFilter.CompareOp.EQUAL, serverNameBytes, mutate);
+      if (!updateSuccess) {
+        throw new ReplicationException("Failed to update Replication Table because we lost queue " +
+            " ownership");
+      }
     }
   }
 
@@ -374,8 +355,10 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase
    * @throws IOException
    */
   private boolean checkQueueExists(String queueId) throws IOException {
-    byte[] rowKey = queueIdToRowKey(queueId);
-    return replicationTable.exists(new Get(rowKey));
+    try (Table replicationTable = getOrBlockOnReplicationTable()) {
+      byte[] rowKey = queueIdToRowKey(queueId);
+      return replicationTable.exists(new Get(rowKey));
+    }
   }
 
   /**
@@ -399,9 +382,12 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase
     // Attempt to claim ownership for this queue by checking if the current OWNER is the original
     // server. If it is not then another RS has already claimed it. If it is we set ourselves as the
     // new owner and update the queue's history
-    boolean success = replicationTable.checkAndMutate(queue.getRow(), CF_QUEUE, COL_QUEUE_OWNER,
-      CompareFilter.CompareOp.EQUAL, Bytes.toBytes(originalServer), claimAndRenameQueue);
-    return success;
+    try (Table replicationTable = getOrBlockOnReplicationTable()) {
+      boolean success = replicationTable.checkAndMutate(queue.getRow(),
+          CF_QUEUE, COL_QUEUE_OWNER, CompareFilter.CompareOp.EQUAL, Bytes.toBytes(originalServer),
+          claimAndRenameQueue);
+      return success;
+    }
   }
 
   /**
@@ -424,7 +410,7 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase
       CompareFilter.CompareOp.EQUAL, serverNameBytes);
     scan.setFilter(checkOwner);
     ResultScanner scanner = null;
-    try {
+    try (Table replicationTable = getOrBlockOnReplicationTable()) {
       scanner = replicationTable.getScanner(scan);
       Result result = scanner.next();
       return (result == null || result.isEmpty()) ? null : result;

http://git-wip-us.apache.org/repos/asf/hbase/blob/15259456/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 7532c64..07ee46a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -238,19 +238,11 @@ public class ReplicationSourceManager implements ReplicationListener {
         this.replicationQueues.addPeerToHFileRefs(id);
       }
     }
-    List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
-    if (currentReplicators == null || currentReplicators.size() == 0) {
-      return;
-    }
-    List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
-    LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
-        + otherRegionServers);
-
-    // Look if there's anything to process after a restart
-    for (String rs : currentReplicators) {
-      if (!otherRegionServers.contains(rs)) {
-        transferQueues(rs);
-      }
+    AdoptAbandonedQueuesWorker adoptionWorker = new AdoptAbandonedQueuesWorker();
+    try {
+      this.executor.execute(adoptionWorker);
+    } catch (RejectedExecutionException ex) {
+      LOG.info("Cancelling the adoption of abandoned queues because of " + ex.getMessage());
     }
   }
 
@@ -705,6 +697,31 @@ public class ReplicationSourceManager implements ReplicationListener {
     }
   }
 
+  class AdoptAbandonedQueuesWorker extends Thread{
+
+    public AdoptAbandonedQueuesWorker() {}
+
+    @Override
+    public void run() {
+      List<String> currentReplicators = replicationQueues.getListOfReplicators();
+      if (currentReplicators == null || currentReplicators.size() == 0) {
+        return;
+      }
+      List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
+      LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
+        + otherRegionServers);
+
+      // Look if there's anything to process after a restart
+      for (String rs : currentReplicators) {
+        if (!otherRegionServers.contains(rs)) {
+          transferQueues(rs);
+        }
+      }
+    }
+  }
+
+
+
   /**
    * Get the directory where wals are archived
    * @return the directory where wals are archived

http://git-wip-us.apache.org/repos/asf/hbase/blob/15259456/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
index 3a9a5a5..25f30d8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
@@ -80,12 +80,12 @@ public class TestReplicationStateHBaseImpl {
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     utility = new HBaseTestingUtility();
-    utility.startMiniCluster();
     conf = utility.getConfiguration();
     conf.setClass("hbase.region.replica.replication.ReplicationQueuesType",
       TableBasedReplicationQueuesImpl.class, ReplicationQueues.class);
     conf.setClass("hbase.region.replica.replication.ReplicationQueuesClientType",
       TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class);
+    utility.startMiniCluster();
     zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
     String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
     replicationZNode = ZKUtil.joinZNode(zkw.baseZNode, replicationZNodeName);

http://git-wip-us.apache.org/repos/asf/hbase/blob/15259456/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTableBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTableBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTableBase.java
new file mode 100644
index 0000000..aa5cfed
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTableBase.java
@@ -0,0 +1,109 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests ReplicationTableBase behavior when the Master startup is delayed. The table initialization
+ * should be non-blocking, but any method calls that access the table should be blocking.
+ */
+@Category({ReplicationTests.class, MediumTests.class})
+public class TestReplicationTableBase {
+
+  private static long SLEEP_MILLIS = 5000;
+  private static long TIME_OUT_MILLIS = 3000;
+  private static Configuration conf;
+  private static HBaseTestingUtility utility;
+  private static ZooKeeperWatcher zkw;
+  private static ReplicationTableBase rb;
+  private static ReplicationQueues rq;
+  private static ReplicationQueuesClient rqc;
+  private volatile boolean asyncRequestSuccess = false;
+
+  @Test
+  public void testSlowStartup() throws Exception{
+    utility = new HBaseTestingUtility();
+    utility.startMiniZKCluster();
+    conf = utility.getConfiguration();
+    conf.setClass("hbase.region.replica.replication.ReplicationQueuesType",
+      TableBasedReplicationQueuesImpl.class, ReplicationQueues.class);
+    conf.setClass("hbase.region.replica.replication.ReplicationQueuesClientType",
+      TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class);
+    zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
+    utility.waitFor(0, TIME_OUT_MILLIS, new Waiter.ExplainingPredicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        rb = new ReplicationTableBase(conf, zkw) {};
+        rq = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(
+          conf, zkw, zkw));
+        rqc = ReplicationFactory.getReplicationQueuesClient(
+          new ReplicationQueuesClientArguments(conf, zkw, zkw));
+        return true;
+      }
+      @Override
+      public String explainFailure() throws Exception {
+        return "Failed to initialize ReplicationTableBase, TableBasedReplicationQueuesClient and " +
+          "TableBasedReplicationQueues after a timeout=" + TIME_OUT_MILLIS +
+          " ms. Their initialization " + "should be non-blocking";
+      }
+    });
+    final RequestReplicationQueueData async = new RequestReplicationQueueData();
+    async.start();
+    Thread.sleep(SLEEP_MILLIS);
+    // Test that the Replication Table has not been assigned and the methods are blocking
+    assertFalse(rb.getInitializationStatus());
+    assertFalse(asyncRequestSuccess);
+    utility.startMiniCluster();
+    // Test that the methods do return the correct results after getting the table
+    utility.waitFor(0, TIME_OUT_MILLIS, new Waiter.ExplainingPredicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        async.join();
+        return true;
+      }
+      @Override
+      public String explainFailure() throws Exception {
+        return "ReplicationQueue failed to return list of replicators even after Replication Table "
+          + "was initialized timeout=" + TIME_OUT_MILLIS + " ms";
+      }
+    });
+    assertTrue(asyncRequestSuccess);
+  }
+
+  public class RequestReplicationQueueData extends Thread {
+    @Override
+    public void run() {
+      assertEquals(0, rq.getListOfReplicators().size());
+      asyncRequestSuccess = true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/15259456/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index bf47d4f..4b278bb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -64,13 +65,8 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
-import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
-import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
 import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -88,69 +84,63 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import com.google.common.collect.Sets;
 
+/**
+ * An abstract class that tests ReplicationSourceManager. Classes that extend this class should
+ * set up the proper config for this class and initialize the proper cluster using
+ * HBaseTestingUtility.
+ */
 @Category({ReplicationTests.class, MediumTests.class})
-public class TestReplicationSourceManager {
+public abstract class TestReplicationSourceManager {
 
-  private static final Log LOG =
+  protected static final Log LOG =
       LogFactory.getLog(TestReplicationSourceManager.class);
 
-  private static Configuration conf;
+  protected static Configuration conf;
 
-  private static HBaseTestingUtility utility;
+  protected static HBaseTestingUtility utility;
 
-  private static Replication replication;
+  protected static Replication replication;
 
-  private static ReplicationSourceManager manager;
+  protected static ReplicationSourceManager manager;
 
-  private static ZooKeeperWatcher zkw;
+  protected static ZooKeeperWatcher zkw;
 
-  private static HTableDescriptor htd;
+  protected static HTableDescriptor htd;
 
-  private static HRegionInfo hri;
+  protected static HRegionInfo hri;
 
-  private static final byte[] r1 = Bytes.toBytes("r1");
+  protected static final byte[] r1 = Bytes.toBytes("r1");
 
-  private static final byte[] r2 = Bytes.toBytes("r2");
+  protected static final byte[] r2 = Bytes.toBytes("r2");
 
-  private static final byte[] f1 = Bytes.toBytes("f1");
+  protected static final byte[] f1 = Bytes.toBytes("f1");
 
-  private static final byte[] f2 = Bytes.toBytes("f2");
+  protected static final byte[] f2 = Bytes.toBytes("f2");
 
-  private static final TableName test =
+  protected static final TableName test =
       TableName.valueOf("test");
 
-  private static final String slaveId = "1";
-
-  private static FileSystem fs;
-
-  private static Path oldLogDir;
+  protected static final String slaveId = "1";
 
-  private static Path logDir;
+  protected static FileSystem fs;
 
-  private static CountDownLatch latch;
+  protected static Path oldLogDir;
 
-  private static List<String> files = new ArrayList<String>();
-  private static NavigableMap<byte[], Integer> scopes;
+  protected static Path logDir;
 
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
+  protected static CountDownLatch latch;
 
-    conf = HBaseConfiguration.create();
-    conf.set("replication.replicationsource.implementation",
-        ReplicationSourceDummy.class.getCanonicalName());
-    conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
-        HConstants.REPLICATION_ENABLE_DEFAULT);
-    conf.setLong("replication.sleep.before.failover", 2000);
-    conf.setInt("replication.source.maxretriesmultiplier", 10);
-    utility = new HBaseTestingUtility(conf);
-    utility.startMiniZKCluster();
+  protected static List<String> files = new ArrayList<String>();
+  protected static NavigableMap<byte[], Integer> scopes;
 
+  protected static void setupZkAndReplication() throws Exception {
+    // The implementing class should set up the conf
+    assertNotNull(conf);
     zkw = new ZooKeeperWatcher(conf, "test", null);
     ZKUtil.createWithParents(zkw, "/hbase/replication");
     ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
@@ -347,7 +337,7 @@ public class TestReplicationSourceManager {
     Server s1 = new DummyServer("dummyserver1.example.org");
     ReplicationQueues rq1 =
         ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1,
-          s1.getZooKeeper()));
+            s1.getZooKeeper()));
     rq1.init(s1.getServerName().toString());
     ReplicationPeers rp1 =
         ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1);
@@ -356,7 +346,7 @@ public class TestReplicationSourceManager {
         manager.new NodeFailoverWorker(server.getServerName().getServerName(), rq1, rp1, new UUID(
             new Long(1), new Long(2)));
     w1.start();
-    w1.join(5000);
+    w1.join(10000);
     assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
     String id = "1-" + server.getServerName().getServerName();
     assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group));
@@ -366,92 +356,6 @@ public class TestReplicationSourceManager {
   }
 
   @Test
-  public void testNodeFailoverDeadServerParsing() throws Exception {
-    LOG.debug("testNodeFailoverDeadServerParsing");
-    conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
-    final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
-    ReplicationQueues repQueues =
-        ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, server,
-          server.getZooKeeper()));
-    repQueues.init(server.getServerName().toString());
-    // populate some znodes in the peer znode
-    files.add("log1");
-    files.add("log2");
-    for (String file : files) {
-      repQueues.addLog("1", file);
-    }
-
-    // create 3 DummyServers
-    Server s1 = new DummyServer("ip-10-8-101-114.ec2.internal");
-    Server s2 = new DummyServer("ec2-107-20-52-47.compute-1.amazonaws.com");
-    Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com");
-
-    // simulate three servers fail sequentially
-    ReplicationQueues rq1 =
-        ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1,
-          s1.getZooKeeper()));
-    rq1.init(s1.getServerName().toString());
-    Map<String, Set<String>> testMap =
-        rq1.claimQueues(server.getServerName().getServerName());
-    ReplicationQueues rq2 =
-        ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s2.getConfiguration(), s2,
-          s2.getZooKeeper()));
-    rq2.init(s2.getServerName().toString());
-    testMap = rq2.claimQueues(s1.getServerName().getServerName());
-    ReplicationQueues rq3 =
-        ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s3.getConfiguration(), s3,
-          s3.getZooKeeper()));
-    rq3.init(s3.getServerName().toString());
-    testMap = rq3.claimQueues(s2.getServerName().getServerName());
-
-    ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.keySet().iterator().next());
-    List<String> result = replicationQueueInfo.getDeadRegionServers();
-
-    // verify
-    assertTrue(result.contains(server.getServerName().getServerName()));
-    assertTrue(result.contains(s1.getServerName().getServerName()));
-    assertTrue(result.contains(s2.getServerName().getServerName()));
-
-    server.abort("", null);
-  }
-
-  @Test
-  public void testFailoverDeadServerCversionChange() throws Exception {
-    LOG.debug("testFailoverDeadServerCversionChange");
-
-    conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
-    final Server s0 = new DummyServer("cversion-change0.example.org");
-    ReplicationQueues repQueues =
-        ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, s0,
-          s0.getZooKeeper()));
-    repQueues.init(s0.getServerName().toString());
-    // populate some znodes in the peer znode
-    files.add("log1");
-    files.add("log2");
-    for (String file : files) {
-      repQueues.addLog("1", file);
-    }
-    // simulate queue transfer
-    Server s1 = new DummyServer("cversion-change1.example.org");
-    ReplicationQueues rq1 =
-        ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1,
-          s1.getZooKeeper()));
-    rq1.init(s1.getServerName().toString());
-
-    ReplicationQueuesClientZKImpl client =
-        (ReplicationQueuesClientZKImpl)ReplicationFactory.getReplicationQueuesClient(
-        new ReplicationQueuesClientArguments(s1.getConfiguration(), s1, s1.getZooKeeper()));
-
-    int v0 = client.getQueuesZNodeCversion();
-    rq1.claimQueues(s0.getServerName().getServerName());
-    int v1 = client.getQueuesZNodeCversion();
-    // cversion should increased by 1 since a child node is deleted
-    assertEquals(v0 + 1, v1);
-
-    s0.abort("", null);
-  }
-
-  @Test
   public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception {
     NavigableMap<byte[], Integer> scope = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
     // 1. Get the bulk load wal edit event

http://git-wip-us.apache.org/repos/asf/hbase/blob/15259456/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
new file mode 100644
index 0000000..72042b1
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
@@ -0,0 +1,152 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
+import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
+import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests the ReplicationSourceManager with ReplicationQueueZkImpl's and
+ * ReplicationQueuesClientZkImpl. Also includes extra tests outside of those in
+ * TestReplicationSourceManager that test ReplicationQueueZkImpl-specific behaviors.
+ */
+@Category({ReplicationTests.class, MediumTests.class})
+public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceManager {
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    conf = HBaseConfiguration.create();
+    conf.set("replication.replicationsource.implementation",
+      ReplicationSourceDummy.class.getCanonicalName());
+    conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
+      HConstants.REPLICATION_ENABLE_DEFAULT);
+    conf.setLong("replication.sleep.before.failover", 2000);
+    conf.setInt("replication.source.maxretriesmultiplier", 10);
+    utility = new HBaseTestingUtility(conf);
+    utility.startMiniZKCluster();
+    setupZkAndReplication();
+  }
+
+  // Tests the naming convention of adopted queues for ReplicationQueuesZkImpl
+  @Test
+  public void testNodeFailoverDeadServerParsing() throws Exception {
+    LOG.debug("testNodeFailoverDeadServerParsing");
+    conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
+    final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
+    ReplicationQueues repQueues =
+      ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, server,
+        server.getZooKeeper()));
+    repQueues.init(server.getServerName().toString());
+    // populate some znodes in the peer znode
+    files.add("log1");
+    files.add("log2");
+    for (String file : files) {
+      repQueues.addLog("1", file);
+    }
+
+    // create 3 DummyServers
+    Server s1 = new DummyServer("ip-10-8-101-114.ec2.internal");
+    Server s2 = new DummyServer("ec2-107-20-52-47.compute-1.amazonaws.com");
+    Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com");
+
+    // simulate three servers fail sequentially
+    ReplicationQueues rq1 =
+      ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1,
+        s1.getZooKeeper()));
+    rq1.init(s1.getServerName().toString());
+    Map<String, Set<String>> testMap =
+      rq1.claimQueues(server.getServerName().getServerName());
+    ReplicationQueues rq2 =
+      ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s2.getConfiguration(), s2,
+        s2.getZooKeeper()));
+    rq2.init(s2.getServerName().toString());
+    testMap = rq2.claimQueues(s1.getServerName().getServerName());
+    ReplicationQueues rq3 =
+      ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s3.getConfiguration(), s3,
+        s3.getZooKeeper()));
+    rq3.init(s3.getServerName().toString());
+    testMap = rq3.claimQueues(s2.getServerName().getServerName());
+
+    ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.keySet().iterator().next());
+    List<String> result = replicationQueueInfo.getDeadRegionServers();
+
+    // verify
+    assertTrue(result.contains(server.getServerName().getServerName()));
+    assertTrue(result.contains(s1.getServerName().getServerName()));
+    assertTrue(result.contains(s2.getServerName().getServerName()));
+
+    server.stop("");
+  }
+
+  @Test
+  public void testFailoverDeadServerCversionChange() throws Exception {
+    LOG.debug("testFailoverDeadServerCversionChange");
+
+    conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
+    final Server s0 = new DummyServer("cversion-change0.example.org");
+    ReplicationQueues repQueues =
+      ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, s0,
+        s0.getZooKeeper()));
+    repQueues.init(s0.getServerName().toString());
+    // populate some znodes in the peer znode
+    files.add("log1");
+    files.add("log2");
+    for (String file : files) {
+      repQueues.addLog("1", file);
+    }
+    // simulate queue transfer
+    Server s1 = new DummyServer("cversion-change1.example.org");
+    ReplicationQueues rq1 =
+      ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1,
+        s1.getZooKeeper()));
+    rq1.init(s1.getServerName().toString());
+
+    ReplicationQueuesClientZKImpl client =
+      (ReplicationQueuesClientZKImpl)ReplicationFactory.getReplicationQueuesClient(
+        new ReplicationQueuesClientArguments(s1.getConfiguration(), s1, s1.getZooKeeper()));
+
+    int v0 = client.getQueuesZNodeCversion();
+    rq1.claimQueues(s0.getServerName().getServerName());
+    int v1 = client.getQueuesZNodeCversion();
+    // cversion should increase by 1 since a child node is deleted
+    assertEquals(v0 + 1, v1);
+
+    s0.stop("");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/15259456/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java
new file mode 100644
index 0000000..59acfb3
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java
@@ -0,0 +1,60 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
+import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
+import org.apache.hadoop.hbase.replication.TableBasedReplicationQueuesClientImpl;
+import org.apache.hadoop.hbase.replication.TableBasedReplicationQueuesImpl;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Tests the ReplicationSourceManager with TableBasedReplicationQueue's and
+ * TableBasedReplicationQueuesClient
+ */
+@Category({ReplicationTests.class, MediumTests.class})
+public class TestTableBasedReplicationSourceManagerImpl extends TestReplicationSourceManager {
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    conf = HBaseConfiguration.create();
+    conf.set("replication.replicationsource.implementation",
+      ReplicationSourceDummy.class.getCanonicalName());
+    conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
+      HConstants.REPLICATION_ENABLE_DEFAULT);
+    conf.setLong("replication.sleep.before.failover", 2000);
+    conf.setInt("replication.source.maxretriesmultiplier", 10);
+
+    conf.setClass("hbase.region.replica.replication.ReplicationQueuesType",
+      TableBasedReplicationQueuesImpl.class, ReplicationQueues.class);
+    conf.setClass("hbase.region.replica.replication.ReplicationQueuesClientType",
+      TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class);
+    utility = new HBaseTestingUtility(conf);
+    utility.startMiniCluster();
+    setupZkAndReplication();
+  }
+
+}