You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/03/02 07:34:49 UTC

[39/50] [abbrv] hbase git commit: HBASE-19617 Remove ReplicationQueues, use ReplicationQueueStorage directly

http://git-wip-us.apache.org/repos/asf/hbase/blob/dbc81ff5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index b8d80d2..5b7bab8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -33,17 +33,20 @@ import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -56,7 +59,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
-import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
@@ -91,7 +94,7 @@ public class ReplicationSourceManager implements ReplicationListener {
   private final List<ReplicationSourceInterface> sources;
   // List of all the sources we got from died RSs
   private final List<ReplicationSourceInterface> oldsources;
-  private final ReplicationQueues replicationQueues;
+  private final ReplicationQueueStorage queueStorage;
   private final ReplicationTracker replicationTracker;
   private final ReplicationPeers replicationPeers;
   // UUID for this cluster
@@ -124,7 +127,7 @@ public class ReplicationSourceManager implements ReplicationListener {
 
   /**
    * Creates a replication manager and sets the watch on all the other registered region servers
-   * @param replicationQueues the interface for manipulating replication queues
+   * @param queueStorage the interface for manipulating replication queues
    * @param replicationPeers
    * @param replicationTracker
    * @param conf the configuration to use
@@ -134,14 +137,14 @@ public class ReplicationSourceManager implements ReplicationListener {
    * @param oldLogDir the directory where old logs are archived
    * @param clusterId
    */
-  public ReplicationSourceManager(ReplicationQueues replicationQueues,
+  public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
       ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf,
       Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId,
       WALFileLengthProvider walFileLengthProvider) throws IOException {
     //CopyOnWriteArrayList is thread-safe.
     //Generally, reading is more than modifying.
     this.sources = new CopyOnWriteArrayList<>();
-    this.replicationQueues = replicationQueues;
+    this.queueStorage = queueStorage;
     this.replicationPeers = replicationPeers;
     this.replicationTracker = replicationTracker;
     this.server = server;
@@ -174,6 +177,19 @@ public class ReplicationSourceManager implements ReplicationListener {
       HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
   }
 
+  @FunctionalInterface
+  private interface ReplicationQueueOperation {
+    void exec() throws ReplicationException;
+  }
+
+  private void abortWhenFail(ReplicationQueueOperation op) {
+    try {
+      op.exec();
+    } catch (ReplicationException e) {
+      server.abort("Failed to operate on replication queue", e);
+    }
+  }
+
   /**
    * Provide the id of the peer and a log key and this method will figure which
    * wal it belongs to and will log, for this region server, the current
@@ -185,12 +201,13 @@ public class ReplicationSourceManager implements ReplicationListener {
    * @param queueRecovered indicates if this queue comes from another region server
    * @param holdLogInZK if true then the log is retained in ZK
    */
-  public void logPositionAndCleanOldLogs(Path log, String id, long position,
-      boolean queueRecovered, boolean holdLogInZK) {
+  public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered,
+      boolean holdLogInZK) {
     String fileName = log.getName();
-    this.replicationQueues.setLogPosition(id, fileName, position);
+    abortWhenFail(
+      () -> this.queueStorage.setWALPosition(server.getServerName(), id, fileName, position));
     if (holdLogInZK) {
-     return;
+      return;
     }
     cleanOldLogs(fileName, id, queueRecovered);
   }
@@ -217,36 +234,59 @@ public class ReplicationSourceManager implements ReplicationListener {
         }
       }
     }
- }
+  }
 
   private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
     SortedSet<String> walSet = wals.headSet(key);
-    LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
+    }
     for (String wal : walSet) {
-      this.replicationQueues.removeLog(id, wal);
+      abortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal));
     }
     walSet.clear();
   }
 
+  private void adoptAbandonedQueues() {
+    List<ServerName> currentReplicators = null;
+    try {
+      currentReplicators = queueStorage.getListOfReplicators();
+    } catch (ReplicationException e) {
+      server.abort("Failed to get all replicators", e);
+      return;
+    }
+    if (currentReplicators == null || currentReplicators.isEmpty()) {
+      return;
+    }
+    List<ServerName> otherRegionServers = replicationTracker.getListOfRegionServers().stream()
+        .map(ServerName::valueOf).collect(Collectors.toList());
+    LOG.info(
+      "Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers);
+
+    // Look if there's anything to process after a restart
+    for (ServerName rs : currentReplicators) {
+      if (!otherRegionServers.contains(rs)) {
+        transferQueues(rs);
+      }
+    }
+  }
+
   /**
-   * Adds a normal source per registered peer cluster and tries to process all
-   * old region server wal queues
+   * Adds a normal source per registered peer cluster and tries to process all old region server wal
+   * queues
+   * <p>
+   * The returned future is for adoptAbandonedQueues task.
    */
-  void init() throws IOException, ReplicationException {
+  Future<?> init() throws IOException, ReplicationException {
     for (String id : this.replicationPeers.getConnectedPeerIds()) {
       addSource(id);
       if (replicationForBulkLoadDataEnabled) {
         // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case
         // when a peer was added before replication for bulk loaded data was enabled.
-        this.replicationQueues.addPeerToHFileRefs(id);
+        this.queueStorage.addPeerToHFileRefs(id);
       }
     }
-    AdoptAbandonedQueuesWorker adoptionWorker = new AdoptAbandonedQueuesWorker();
-    try {
-      this.executor.execute(adoptionWorker);
-    } catch (RejectedExecutionException ex) {
-      LOG.info("Cancelling the adoption of abandoned queues because of " + ex.getMessage());
-    }
+    return this.executor.submit(this::adoptAbandonedQueues);
   }
 
   /**
@@ -254,15 +294,12 @@ public class ReplicationSourceManager implements ReplicationListener {
    * need to enqueue the latest log of each wal group and do replication
    * @param id the id of the peer cluster
    * @return the source that was created
-   * @throws IOException
    */
   @VisibleForTesting
   ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException {
     ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id);
     ReplicationPeer peer = replicationPeers.getConnectedPeer(id);
-    ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this,
-      this.replicationQueues, this.replicationPeers, server, id, this.clusterId, peerConfig, peer,
-      walFileLengthProvider);
+    ReplicationSourceInterface src = getReplicationSource(id, peerConfig, peer);
     synchronized (this.walsById) {
       this.sources.add(src);
       Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
@@ -277,11 +314,10 @@ public class ReplicationSourceManager implements ReplicationListener {
             logs.add(name);
             walsByGroup.put(walPrefix, logs);
             try {
-              this.replicationQueues.addLog(id, name);
+              this.queueStorage.addWAL(server.getServerName(), id, name);
             } catch (ReplicationException e) {
-              String message =
-                  "Cannot add log to queue when creating a new source, queueId=" + id
-                      + ", filename=" + name;
+              String message = "Cannot add log to queue when creating a new source, queueId=" + id +
+                ", filename=" + name;
               server.stop(message);
               throw e;
             }
@@ -306,7 +342,7 @@ public class ReplicationSourceManager implements ReplicationListener {
    * @param peerId Id of the peer cluster queue of wals to delete
    */
   public void deleteSource(String peerId, boolean closeConnection) {
-    this.replicationQueues.removeQueue(peerId);
+    abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), peerId));
     if (closeConnection) {
       this.replicationPeers.peerDisconnected(peerId);
     }
@@ -366,8 +402,8 @@ public class ReplicationSourceManager implements ReplicationListener {
   }
 
   @VisibleForTesting
-  List<String> getAllQueues() {
-    return replicationQueues.getAllQueues();
+  List<String> getAllQueues() throws ReplicationException {
+    return queueStorage.getAllQueues(server.getServerName());
   }
 
   // public because of we call it in TestReplicationEmptyWALRecovery
@@ -403,10 +439,10 @@ public class ReplicationSourceManager implements ReplicationListener {
     synchronized (replicationPeers) {
       for (String id : replicationPeers.getConnectedPeerIds()) {
         try {
-          this.replicationQueues.addLog(id, logName);
+          this.queueStorage.addWAL(server.getServerName(), id, logName);
         } catch (ReplicationException e) {
-          throw new IOException("Cannot add log to replication queue"
-              + " when creating a new source, queueId=" + id + ", filename=" + logName, e);
+          throw new IOException("Cannot add log to replication queue" +
+            " when creating a new source, queueId=" + id + ", filename=" + logName, e);
         }
       }
     }
@@ -455,19 +491,11 @@ public class ReplicationSourceManager implements ReplicationListener {
 
   /**
    * Factory method to create a replication source
-   * @param conf the configuration to use
-   * @param fs the file system to use
-   * @param manager the manager to use
-   * @param server the server object for this region server
    * @param peerId the id of the peer cluster
    * @return the created source
-   * @throws IOException
    */
-  private ReplicationSourceInterface getReplicationSource(Configuration conf, FileSystem fs,
-      ReplicationSourceManager manager, ReplicationQueues replicationQueues,
-      ReplicationPeers replicationPeers, Server server, String peerId, UUID clusterId,
-      ReplicationPeerConfig peerConfig, ReplicationPeer replicationPeer,
-      WALFileLengthProvider walFileLengthProvider) throws IOException {
+  private ReplicationSourceInterface getReplicationSource(String peerId,
+      ReplicationPeerConfig peerConfig, ReplicationPeer replicationPeer) throws IOException {
     RegionServerCoprocessorHost rsServerHost = null;
     TableDescriptors tableDescriptors = null;
     if (server instanceof HRegionServer) {
@@ -484,9 +512,8 @@ public class ReplicationSourceManager implements ReplicationListener {
         // Default to HBase inter-cluster replication endpoint
         replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
       }
-      @SuppressWarnings("rawtypes")
-      Class c = Class.forName(replicationEndpointImpl);
-      replicationEndpoint = (ReplicationEndpoint) c.newInstance();
+      replicationEndpoint = Class.forName(replicationEndpointImpl)
+          .asSubclass(ReplicationEndpoint.class).newInstance();
       if(rsServerHost != null) {
         ReplicationEndpoint newReplicationEndPoint = rsServerHost
             .postCreateReplicationEndPoint(replicationEndpoint);
@@ -503,7 +530,7 @@ public class ReplicationSourceManager implements ReplicationListener {
 
     MetricsSource metrics = new MetricsSource(peerId);
     // init replication source
-    src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId, clusterId,
+    src.init(conf, fs, this, queueStorage, replicationPeers, server, peerId, clusterId,
       replicationEndpoint, walFileLengthProvider, metrics);
 
     // init replication endpoint
@@ -514,21 +541,21 @@ public class ReplicationSourceManager implements ReplicationListener {
   }
 
   /**
-   * Transfer all the queues of the specified to this region server.
-   * First it tries to grab a lock and if it works it will move the
-   * znodes and finally will delete the old znodes.
-   *
+   * Transfer all the queues of the specified to this region server. First it tries to grab a lock
+   * and if it works it will move the znodes and finally will delete the old znodes.
+   * <p>
    * It creates one old source for any type of source of the old rs.
-   * @param rsZnode
    */
-  private void transferQueues(String rsZnode) {
-    NodeFailoverWorker transfer =
-        new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers,
-            this.clusterId);
+  private void transferQueues(ServerName deadRS) {
+    if (server.getServerName().equals(deadRS)) {
+      // it's just us, give up
+      return;
+    }
+    NodeFailoverWorker transfer = new NodeFailoverWorker(deadRS);
     try {
       this.executor.execute(transfer);
     } catch (RejectedExecutionException ex) {
-      LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
+      LOG.info("Cancelling the transfer of " + deadRS + " because of " + ex.getMessage());
     }
   }
 
@@ -565,7 +592,7 @@ public class ReplicationSourceManager implements ReplicationListener {
       LOG.info("Peer " + id + " connected success, trying to start the replication source thread.");
       addSource(id);
       if (replicationForBulkLoadDataEnabled) {
-        this.replicationQueues.addPeerToHFileRefs(id);
+        this.queueStorage.addPeerToHFileRefs(id);
       }
     }
   }
@@ -618,12 +645,12 @@ public class ReplicationSourceManager implements ReplicationListener {
       deleteSource(id, true);
     }
     // Remove HFile Refs znode from zookeeper
-    this.replicationQueues.removePeerFromHFileRefs(id);
+    abortWhenFail(() -> this.queueStorage.removePeerFromHFileRefs(id));
   }
 
   @Override
   public void regionServerRemoved(String regionserver) {
-    transferQueues(regionserver);
+    transferQueues(ServerName.valueOf(regionserver));
   }
 
   /**
@@ -632,37 +659,21 @@ public class ReplicationSourceManager implements ReplicationListener {
    */
   class NodeFailoverWorker extends Thread {
 
-    private String rsZnode;
-    private final ReplicationQueues rq;
-    private final ReplicationPeers rp;
-    private final UUID clusterId;
+    private final ServerName deadRS;
 
-    /**
-     * @param rsZnode
-     */
-    public NodeFailoverWorker(String rsZnode) {
-      this(rsZnode, replicationQueues, replicationPeers, ReplicationSourceManager.this.clusterId);
-    }
-
-    public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
-        final ReplicationPeers replicationPeers, final UUID clusterId) {
-      super("Failover-for-"+rsZnode);
-      this.rsZnode = rsZnode;
-      this.rq = replicationQueues;
-      this.rp = replicationPeers;
-      this.clusterId = clusterId;
+    @VisibleForTesting
+    public NodeFailoverWorker(ServerName deadRS) {
+      super("Failover-for-" + deadRS);
+      this.deadRS = deadRS;
     }
 
     @Override
     public void run() {
-      if (this.rq.isThisOurRegionServer(rsZnode)) {
-        return;
-      }
       // Wait a bit before transferring the queues, we may be shutting down.
       // This sleep may not be enough in some cases.
       try {
         Thread.sleep(sleepBeforeFailover +
-            (long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover));
+          (long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover));
       } catch (InterruptedException e) {
         LOG.warn("Interrupted while waiting before transferring a queue.");
         Thread.currentThread().interrupt();
@@ -673,25 +684,30 @@ public class ReplicationSourceManager implements ReplicationListener {
         return;
       }
       Map<String, Set<String>> newQueues = new HashMap<>();
-      List<String> peers = rq.getUnClaimedQueueIds(rsZnode);
-      while (peers != null && !peers.isEmpty()) {
-        Pair<String, SortedSet<String>> peer = this.rq.claimQueue(rsZnode,
-          peers.get(ThreadLocalRandom.current().nextInt(peers.size())));
-        long sleep = sleepBeforeFailover/2;
-        if (peer != null) {
-          newQueues.put(peer.getFirst(), peer.getSecond());
-          sleep = sleepBeforeFailover;
+      try {
+        List<String> peers = queueStorage.getAllQueues(deadRS);
+        while (!peers.isEmpty()) {
+          Pair<String, SortedSet<String>> peer = queueStorage.claimQueue(deadRS,
+            peers.get(ThreadLocalRandom.current().nextInt(peers.size())), server.getServerName());
+          long sleep = sleepBeforeFailover / 2;
+          if (!peer.getSecond().isEmpty()) {
+            newQueues.put(peer.getFirst(), peer.getSecond());
+            sleep = sleepBeforeFailover;
+          }
+          try {
+            Thread.sleep(sleep);
+          } catch (InterruptedException e) {
+            LOG.warn("Interrupted while waiting before transferring a queue.");
+            Thread.currentThread().interrupt();
+          }
+          peers = queueStorage.getAllQueues(deadRS);
         }
-        try {
-          Thread.sleep(sleep);
-        } catch (InterruptedException e) {
-          LOG.warn("Interrupted while waiting before transferring a queue.");
-          Thread.currentThread().interrupt();
+        if (!peers.isEmpty()) {
+          queueStorage.removeReplicatorIfQueueIsEmpty(deadRS);
         }
-        peers = rq.getUnClaimedQueueIds(rsZnode);
-      }
-      if (peers != null) {
-        rq.removeReplicatorIfQueueIsEmpty(rsZnode);
+      } catch (ReplicationException e) {
+        server.abort("Failed to claim queue from dead regionserver", e);
+        return;
       }
       // Copying over the failed queue is completed.
       if (newQueues.isEmpty()) {
@@ -716,8 +732,8 @@ public class ReplicationSourceManager implements ReplicationListener {
                 + ex);
           }
           if (peer == null || peerConfig == null) {
-            LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
-            replicationQueues.removeQueue(peerId);
+            LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS);
+            abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId));
             continue;
           }
           if (server instanceof ReplicationSyncUp.DummyServer
@@ -741,13 +757,11 @@ public class ReplicationSourceManager implements ReplicationListener {
           }
 
           // enqueue sources
-          ReplicationSourceInterface src =
-              getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
-                server, peerId, this.clusterId, peerConfig, peer, walFileLengthProvider);
+          ReplicationSourceInterface src = getReplicationSource(peerId, peerConfig, peer);
           // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
           // see removePeer
           synchronized (oldsources) {
-            if (!this.rp.getConnectedPeerIds().contains(src.getPeerId())) {
+            if (!replicationPeers.getConnectedPeerIds().contains(src.getPeerId())) {
               src.terminate("Recovered queue doesn't belong to any current peer");
               closeRecoveredQueue(src);
               continue;
@@ -766,35 +780,6 @@ public class ReplicationSourceManager implements ReplicationListener {
     }
   }
 
-  class AdoptAbandonedQueuesWorker extends Thread{
-
-    public AdoptAbandonedQueuesWorker() {}
-
-    @Override
-    public void run() {
-      List<String> currentReplicators = null;
-      try {
-        currentReplicators = replicationQueues.getListOfReplicators();
-      } catch (ReplicationException e) {
-        server.abort("Failed to get all replicators", e);
-        return;
-      }
-      if (currentReplicators == null || currentReplicators.isEmpty()) {
-        return;
-      }
-      List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
-      LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
-        + otherRegionServers);
-
-      // Look if there's anything to process after a restart
-      for (String rs : currentReplicators) {
-        if (!otherRegionServers.contains(rs)) {
-          transferQueues(rs);
-        }
-      }
-    }
-  }
-
   /**
    * Get the directory where wals are archived
    * @return the directory where wals are archived
@@ -849,6 +834,10 @@ public class ReplicationSourceManager implements ReplicationListener {
   }
 
   public void cleanUpHFileRefs(String peerId, List<String> files) {
-    this.replicationQueues.removeHFileRefs(peerId, files);
+    abortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, files));
+  }
+
+  int activeFailoverTaskCount() {
+    return executor.getActiveCount();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/dbc81ff5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
index f7ea78f..c2862de 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -42,7 +42,7 @@ import org.apache.hadoop.util.ToolRunner;
  * tool is used to sync-up the delta from Master to Slave using the info from ZooKeeper. The tool
  * will run on Master-Cluser, and assume ZK, Filesystem and NetWork still available after hbase
  * crashes
- * 
+ *
  * <pre>
  * hbase org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp
  * </pre>
@@ -83,11 +83,12 @@ public class ReplicationSyncUp extends Configured implements Tool {
       Replication replication = new Replication();
       replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir, null);
       ReplicationSourceManager manager = replication.getReplicationManager();
-      manager.init();
-      int numberOfOldSource = 1; // default wait once
-      while (numberOfOldSource > 0) {
+      manager.init().get();
+      while (manager.activeFailoverTaskCount() > 0) {
+        Thread.sleep(SLEEP_TIME);
+      }
+      while (manager.getOldSources().size() > 0) {
         Thread.sleep(SLEEP_TIME);
-        numberOfOldSource = manager.getOldSources().size();
       }
       manager.join();
     } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/dbc81ff5/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
index dd7aa6a..5981b23 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
@@ -46,9 +46,8 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.replication.ReplicationFactory;
-import org.apache.hadoop.hbase.replication.ReplicationQueues;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -116,9 +115,8 @@ public class TestLogsCleaner {
 
     HMaster.decorateMasterConfiguration(conf);
     Server server = new DummyServer();
-    ReplicationQueues repQueues = ReplicationFactory.getReplicationQueues(
-        new ReplicationQueuesArguments(conf, server, server.getZooKeeper()));
-    repQueues.init(server.getServerName().toString());
+    ReplicationQueueStorage queueStorage =
+        ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
     final Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME);
     final Path oldProcedureWALDir = new Path(oldLogDir, "masterProcedureWALs");
     String fakeMachineName = URLEncoder.encode(server.getServerName().toString(), "UTF8");
@@ -149,7 +147,7 @@ public class TestLogsCleaner {
       // Case 4: put 3 WALs in ZK indicating that they are scheduled for replication so these
       // files would pass TimeToLiveLogCleaner but would be rejected by ReplicationLogCleaner
       if (i % (30 / 3) == 1) {
-        repQueues.addLog(fakeMachineName, fileName.getName());
+        queueStorage.addWAL(server.getServerName(), fakeMachineName, fileName.getName());
         LOG.info("Replication log file: " + fileName);
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/dbc81ff5/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
index 102c8d9..e1eb822 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
@@ -48,8 +48,8 @@ import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
-import org.apache.hadoop.hbase.replication.ReplicationQueues;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -81,16 +81,13 @@ public class TestReplicationHFileCleaner {
   private static final Logger LOG = LoggerFactory.getLogger(TestReplicationHFileCleaner.class);
   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
   private static Server server;
-  private static ReplicationQueues rq;
+  private static ReplicationQueueStorage rq;
   private static ReplicationPeers rp;
   private static final String peerId = "TestReplicationHFileCleaner";
   private static Configuration conf = TEST_UTIL.getConfiguration();
   static FileSystem fs = null;
   Path root;
 
-  /**
-   * @throws java.lang.Exception
-   */
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     TEST_UTIL.startMiniZKCluster();
@@ -99,20 +96,10 @@ public class TestReplicationHFileCleaner {
     HMaster.decorateMasterConfiguration(conf);
     rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf, server);
     rp.init();
-    rq = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, server, server.getZooKeeper()));
-    rq.init(server.getServerName().toString());
-    try {
-      fs = FileSystem.get(conf);
-    } finally {
-      if (fs != null) {
-        fs.close();
-      }
-    }
+    rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
+    fs = FileSystem.get(conf);
   }
 
-  /**
-   * @throws java.lang.Exception
-   */
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
     TEST_UTIL.shutdownMiniZKCluster();

http://git-wip-us.apache.org/repos/asf/hbase/blob/dbc81ff5/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java
index 22ed1a3..4bcde0a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java
@@ -26,10 +26,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.replication.ReplicationFactory;
-import org.apache.hadoop.hbase.replication.ReplicationQueues;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@@ -55,14 +53,12 @@ public class TestReplicationZKNodeCleaner {
 
   private final Configuration conf;
   private final ZKWatcher zkw;
-  private final ReplicationQueues repQueues;
+  private final ReplicationQueueStorage repQueues;
 
   public TestReplicationZKNodeCleaner() throws Exception {
     conf = TEST_UTIL.getConfiguration();
     zkw = new ZKWatcher(conf, "TestReplicationZKNodeCleaner", null);
-    repQueues = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, null,
-        zkw));
-    assertTrue(repQueues instanceof ReplicationQueuesZKImpl);
+    repQueues = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
   }
 
   @BeforeClass
@@ -78,9 +74,8 @@ public class TestReplicationZKNodeCleaner {
 
   @Test
   public void testReplicationZKNodeCleaner() throws Exception {
-    repQueues.init(SERVER_ONE.getServerName());
     // add queue for ID_ONE which isn't exist
-    repQueues.addLog(ID_ONE, "file1");
+    repQueues.addWAL(SERVER_ONE, ID_ONE, "file1");
 
     ReplicationZKNodeCleaner cleaner = new ReplicationZKNodeCleaner(conf, zkw, null);
     Map<ServerName, List<String>> undeletedQueues = cleaner.getUnDeletedQueues();
@@ -90,7 +85,7 @@ public class TestReplicationZKNodeCleaner {
     assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_ONE));
 
     // add a recovery queue for ID_TWO which isn't exist
-    repQueues.addLog(ID_TWO + "-" + SERVER_TWO, "file2");
+    repQueues.addWAL(SERVER_ONE, ID_TWO + "-" + SERVER_TWO, "file2");
 
     undeletedQueues = cleaner.getUnDeletedQueues();
     assertEquals(1, undeletedQueues.size());
@@ -106,11 +101,10 @@ public class TestReplicationZKNodeCleaner {
 
   @Test
   public void testReplicationZKNodeCleanerChore() throws Exception {
-    repQueues.init(SERVER_ONE.getServerName());
     // add queue for ID_ONE which isn't exist
-    repQueues.addLog(ID_ONE, "file1");
+    repQueues.addWAL(SERVER_ONE, ID_ONE, "file1");
     // add a recovery queue for ID_TWO which isn't exist
-    repQueues.addLog(ID_TWO + "-" + SERVER_TWO, "file2");
+    repQueues.addWAL(SERVER_ONE, ID_TWO + "-" + SERVER_TWO, "file2");
 
     // Wait the cleaner chore to run
     Thread.sleep(20000);

http://git-wip-us.apache.org/repos/asf/hbase/blob/dbc81ff5/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index 7ea79f9..14c5e56 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -46,9 +45,10 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
   MetricsSource metrics;
   WALFileLengthProvider walFileLengthProvider;
   AtomicBoolean startup = new AtomicBoolean(false);
+
   @Override
   public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
-      ReplicationQueues rq, ReplicationPeers rp, Server server, String peerClusterId,
+      ReplicationQueueStorage rq, ReplicationPeers rp, Server server, String peerClusterId,
       UUID clusterId, ReplicationEndpoint replicationEndpoint,
       WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
     this.manager = manager;

http://git-wip-us.apache.org/repos/asf/hbase/blob/dbc81ff5/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
index b95871f..6c487ad 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -70,7 +70,6 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
 
   @Before
   public void setUp() throws Exception {
-
     HColumnDescriptor fam;
 
     t1_syncupSource = new HTableDescriptor(t1_su);
@@ -182,7 +181,6 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
      * verify correctly replicated to Slave
      */
     mimicSyncUpAfterPut();
-
   }
 
   protected void setupReplication() throws Exception {

http://git-wip-us.apache.org/repos/asf/hbase/blob/dbc81ff5/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index d1a9f83..1d3b6ea 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -68,10 +68,10 @@ import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
-import org.apache.hadoop.hbase.replication.ReplicationQueues;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
 import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@@ -338,18 +338,14 @@ public abstract class TestReplicationSourceManager {
 
   @Test
   public void testClaimQueues() throws Exception {
-    final Server server = new DummyServer("hostname0.example.org");
-
-
-    ReplicationQueues rq =
-        ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server,
-          server.getZooKeeper()));
-    rq.init(server.getServerName().toString());
+    Server server = new DummyServer("hostname0.example.org");
+    ReplicationQueueStorage rq = ReplicationStorageFactory
+        .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
     // populate some znodes in the peer znode
     files.add("log1");
     files.add("log2");
     for (String file : files) {
-      rq.addLog("1", file);
+      rq.addWAL(server.getServerName(), "1", file);
     }
     // create 3 DummyServers
     Server s1 = new DummyServer("dummyserver1.example.org");
@@ -357,12 +353,9 @@ public abstract class TestReplicationSourceManager {
     Server s3 = new DummyServer("dummyserver3.example.org");
 
     // create 3 DummyNodeFailoverWorkers
-    DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(
-        server.getServerName().getServerName(), s1);
-    DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker(
-        server.getServerName().getServerName(), s2);
-    DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker(
-        server.getServerName().getServerName(), s3);
+    DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(server.getServerName(), s1);
+    DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker(server.getServerName(), s2);
+    DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker(server.getServerName(), s3);
 
     latch = new CountDownLatch(3);
     // start the threads
@@ -381,11 +374,9 @@ public abstract class TestReplicationSourceManager {
 
   @Test
   public void testCleanupFailoverQueues() throws Exception {
-    final Server server = new DummyServer("hostname1.example.org");
-    ReplicationQueues rq =
-        ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server,
-          server.getZooKeeper()));
-    rq.init(server.getServerName().toString());
+    Server server = new DummyServer("hostname1.example.org");
+    ReplicationQueueStorage rq = ReplicationStorageFactory
+        .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
     // populate some znodes in the peer znode
     SortedSet<String> files = new TreeSet<>();
     String group = "testgroup";
@@ -394,19 +385,14 @@ public abstract class TestReplicationSourceManager {
     files.add(file1);
     files.add(file2);
     for (String file : files) {
-      rq.addLog("1", file);
+      rq.addWAL(server.getServerName(), "1", file);
     }
     Server s1 = new DummyServer("dummyserver1.example.org");
-    ReplicationQueues rq1 =
-        ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1,
-            s1.getZooKeeper()));
-    rq1.init(s1.getServerName().toString());
     ReplicationPeers rp1 =
         ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1);
     rp1.init();
     NodeFailoverWorker w1 =
-        manager.new NodeFailoverWorker(server.getServerName().getServerName(), rq1, rp1, new UUID(
-            new Long(1), new Long(2)));
+        manager.new NodeFailoverWorker(server.getServerName());
     w1.run();
     assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
     String id = "1-" + server.getServerName().getServerName();
@@ -418,17 +404,16 @@ public abstract class TestReplicationSourceManager {
 
   @Test
   public void testCleanupUnknownPeerZNode() throws Exception {
-    final Server server = new DummyServer("hostname2.example.org");
-    ReplicationQueues rq = ReplicationFactory.getReplicationQueues(
-      new ReplicationQueuesArguments(server.getConfiguration(), server, server.getZooKeeper()));
-    rq.init(server.getServerName().toString());
+    Server server = new DummyServer("hostname2.example.org");
+    ReplicationQueueStorage rq = ReplicationStorageFactory
+        .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
     // populate some znodes in the peer znode
     // add log to an unknown peer
     String group = "testgroup";
-    rq.addLog("2", group + ".log1");
-    rq.addLog("2", group + ".log2");
+    rq.addWAL(server.getServerName(), "2", group + ".log1");
+    rq.addWAL(server.getServerName(), "2", group + ".log2");
 
-    NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName().getServerName());
+    NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName());
     w1.run();
 
     // The log of the unknown peer should be removed from zk
@@ -506,10 +491,8 @@ public abstract class TestReplicationSourceManager {
         .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase");
     try {
       DummyServer server = new DummyServer();
-      final ReplicationQueues rq =
-          ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(
-              server.getConfiguration(), server, server.getZooKeeper()));
-      rq.init(server.getServerName().toString());
+      ReplicationQueueStorage rq = ReplicationStorageFactory
+          .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
       // Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface
       // initialization to throw an exception.
       conf.set("replication.replicationsource.implementation",
@@ -523,11 +506,11 @@ public abstract class TestReplicationSourceManager {
       assertNull(manager.getSource(peerId));
 
       // Create a replication queue for the fake peer
-      rq.addLog(peerId, "FakeFile");
+      rq.addWAL(server.getServerName(), peerId, "FakeFile");
       // Unregister peer, this should remove the peer and clear all queues associated with it
       // Need to wait for the ReplicationTracker to pick up the changes and notify listeners.
       removePeerAndWait(peerId);
-      assertFalse(rq.getAllQueues().contains(peerId));
+      assertFalse(rq.getAllQueues(server.getServerName()).contains(peerId));
     } finally {
       conf.set("replication.replicationsource.implementation", replicationSourceImplName);
       removePeerAndWait(peerId);
@@ -650,11 +633,12 @@ public abstract class TestReplicationSourceManager {
       }
     }
     Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
-      @Override public boolean evaluate() throws Exception {
+      @Override
+      public boolean evaluate() throws Exception {
         List<String> peers = rp.getAllPeerIds();
-        return (!manager.getAllQueues().contains(peerId)) && (rp.getConnectedPeer(peerId) == null)
-            && (!peers.contains(peerId))
-            && manager.getSource(peerId) == null;
+        return (!manager.getAllQueues().contains(peerId)) &&
+          (rp.getConnectedPeer(peerId) == null) && (!peers.contains(peerId)) &&
+          manager.getSource(peerId) == null;
       }
     });
   }
@@ -697,25 +681,24 @@ public abstract class TestReplicationSourceManager {
   static class DummyNodeFailoverWorker extends Thread {
     private Map<String, Set<String>> logZnodesMap;
     Server server;
-    private String deadRsZnode;
-    ReplicationQueues rq;
+    private ServerName deadRS;
+    ReplicationQueueStorage rq;
 
-    public DummyNodeFailoverWorker(String znode, Server s) throws Exception {
-      this.deadRsZnode = znode;
+    public DummyNodeFailoverWorker(ServerName deadRS, Server s) throws Exception {
+      this.deadRS = deadRS;
       this.server = s;
-      this.rq =
-          ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server,
-            server.getZooKeeper()));
-      this.rq.init(this.server.getServerName().toString());
+      this.rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(),
+        server.getConfiguration());
     }
 
     @Override
     public void run() {
       try {
         logZnodesMap = new HashMap<>();
-        List<String> queues = rq.getUnClaimedQueueIds(deadRsZnode);
-        for(String queue:queues){
-          Pair<String, SortedSet<String>> pair = rq.claimQueue(deadRsZnode, queue);
+        List<String> queues = rq.getAllQueues(deadRS);
+        for (String queue : queues) {
+          Pair<String, SortedSet<String>> pair =
+              rq.claimQueue(deadRS, queue, server.getServerName());
           if (pair != null) {
             logZnodesMap.put(pair.getFirst(), pair.getSecond());
           }
@@ -754,7 +737,7 @@ public abstract class TestReplicationSourceManager {
 
     @Override
     public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
-        ReplicationQueues rq, ReplicationPeers rp, Server server, String peerClusterId,
+        ReplicationQueueStorage rq, ReplicationPeers rp, Server server, String peerClusterId,
         UUID clusterId, ReplicationEndpoint replicationEndpoint,
         WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
       throw new IOException("Failing deliberately");

http://git-wip-us.apache.org/repos/asf/hbase/blob/dbc81ff5/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
index 4d81d1d..8e0ab0f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
@@ -25,11 +25,10 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
-import org.apache.hadoop.hbase.replication.ReplicationQueues;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.junit.BeforeClass;
@@ -42,7 +41,7 @@ import org.junit.experimental.categories.Category;
  * ReplicationQueuesClientZkImpl. Also includes extra tests outside of those in
  * TestReplicationSourceManager that test ReplicationQueueZkImpl-specific behaviors.
  */
-@Category({ReplicationTests.class, MediumTests.class})
+@Category({ ReplicationTests.class, MediumTests.class })
 public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceManager {
 
   @ClassRule
@@ -64,16 +63,14 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan
   // Tests the naming convention of adopted queues for ReplicationQueuesZkImpl
   @Test
   public void testNodeFailoverDeadServerParsing() throws Exception {
-    final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
-    ReplicationQueues repQueues =
-      ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, server,
-        server.getZooKeeper()));
-    repQueues.init(server.getServerName().toString());
+    Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
+    ReplicationQueueStorage queueStorage =
+        ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
     // populate some znodes in the peer znode
     files.add("log1");
     files.add("log2");
     for (String file : files) {
-      repQueues.addLog("1", file);
+      queueStorage.addWAL(server.getServerName(), "1", file);
     }
 
     // create 3 DummyServers
@@ -82,30 +79,22 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan
     Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com");
 
     // simulate three servers fail sequentially
-    ReplicationQueues rq1 =
-      ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1,
-        s1.getZooKeeper()));
-    rq1.init(s1.getServerName().toString());
-    String serverName = server.getServerName().getServerName();
-    List<String> unclaimed = rq1.getUnClaimedQueueIds(serverName);
-    rq1.claimQueue(serverName, unclaimed.get(0)).getSecond();
-    rq1.removeReplicatorIfQueueIsEmpty(unclaimed.get(0));
-    ReplicationQueues rq2 =
-      ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s2.getConfiguration(), s2,
-        s2.getZooKeeper()));
-    rq2.init(s2.getServerName().toString());
-    serverName = s1.getServerName().getServerName();
-    unclaimed = rq2.getUnClaimedQueueIds(serverName);
-    rq2.claimQueue(serverName, unclaimed.get(0)).getSecond();
-    rq2.removeReplicatorIfQueueIsEmpty(unclaimed.get(0));
-    ReplicationQueues rq3 =
-      ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s3.getConfiguration(), s3,
-        s3.getZooKeeper()));
-    rq3.init(s3.getServerName().toString());
-    serverName = s2.getServerName().getServerName();
-    unclaimed = rq3.getUnClaimedQueueIds(serverName);
-    String queue3 = rq3.claimQueue(serverName, unclaimed.get(0)).getFirst();
-    rq3.removeReplicatorIfQueueIsEmpty(unclaimed.get(0));
+    ServerName serverName = server.getServerName();
+    List<String> unclaimed = queueStorage.getAllQueues(serverName);
+    queueStorage.claimQueue(serverName, unclaimed.get(0), s1.getServerName());
+    queueStorage.removeReplicatorIfQueueIsEmpty(serverName);
+
+    serverName = s1.getServerName();
+    unclaimed = queueStorage.getAllQueues(serverName);
+    queueStorage.claimQueue(serverName, unclaimed.get(0), s2.getServerName());
+    queueStorage.removeReplicatorIfQueueIsEmpty(serverName);
+
+    serverName = s2.getServerName();
+    unclaimed = queueStorage.getAllQueues(serverName);
+    String queue3 =
+        queueStorage.claimQueue(serverName, unclaimed.get(0), s3.getServerName()).getFirst();
+    queueStorage.removeReplicatorIfQueueIsEmpty(serverName);
+
     ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queue3);
     List<ServerName> result = replicationQueueInfo.getDeadRegionServers();
     // verify