You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2015/10/28 20:10:28 UTC

[2/2] hbase git commit: HBASE-6617 ReplicationSourceManager should be able to track multiple WAL paths (Yu Li)

HBASE-6617 ReplicationSourceManager should be able to track multiple WAL paths (Yu Li)

Signed-off-by: Elliott Clark <ec...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/242adad5
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/242adad5
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/242adad5

Branch: refs/heads/branch-1.2
Commit: 242adad59f2d260e5fb264341d5ad4e2eb73c345
Parents: af1d176
Author: tedyu <yu...@gmail.com>
Authored: Fri Sep 11 09:30:58 2015 -0700
Committer: Elliott Clark <ec...@apache.org>
Committed: Wed Oct 28 11:57:40 2015 -0700

----------------------------------------------------------------------
 .../hbase/regionserver/HRegionServer.java       |    9 +
 .../hadoop/hbase/regionserver/LogRoller.java    |   16 +-
 .../hbase/replication/ReplicationEndpoint.java  |    8 +
 .../HBaseInterClusterReplicationEndpoint.java   |    6 +-
 .../replication/regionserver/MetricsSource.java |   33 +-
 .../regionserver/ReplicationSource.java         | 1204 ++++++++++--------
 .../regionserver/ReplicationSourceManager.java  |  189 ++-
 .../hadoop/hbase/wal/DefaultWALProvider.java    |   11 +
 .../org/apache/hadoop/hbase/wal/WALFactory.java |    2 +-
 .../replication/TestReplicationEndpoint.java    |   29 +-
 .../TestReplicationEndpointWithMultipleWAL.java |   33 +
 ...onKillMasterRSCompressedWithMultipleWAL.java |   34 +
 ...estReplicationSyncUpToolWithMultipleWAL.java |   34 +
 ...egionReplicaReplicationEndpointNoMaster.java |    4 +-
 .../TestReplicationSourceManager.java           |   20 +-
 .../TestReplicationWALReaderManager.java        |    2 +-
 16 files changed, 1022 insertions(+), 612 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/242adad5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 5dddd9d..d1147d4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -3325,4 +3325,13 @@ public class HRegionServer extends HasThread implements
     }
     return max;
   }
+
+  /**
+   * For testing
+   * @return whether all wal roll request finished for this regionserver
+   */
+  @VisibleForTesting
+  public boolean walRollRequestFinished() {
+    return this.walRoller.walRollFinished();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/242adad5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
index f35fdb7..136e03e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
@@ -197,4 +197,18 @@ public class LogRoller extends HasThread {
         requester);
     }
   }
-}
\ No newline at end of file
+
+  /**
+   * For testing only
+   * @return true if all WAL roll finished
+   */
+  @VisibleForTesting
+  public boolean walRollFinished() {
+    for (boolean needRoll : walNeedsRoll.values()) {
+      if (needRoll) {
+        return false;
+      }
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/242adad5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
index e8a7ddc..ac1257f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
@@ -137,6 +137,7 @@ public interface ReplicationEndpoint extends Service {
   static class ReplicateContext {
     List<Entry> entries;
     int size;
+    String walGroupId;
     @InterfaceAudience.Private
     public ReplicateContext() {
     }
@@ -149,12 +150,19 @@ public interface ReplicationEndpoint extends Service {
       this.size = size;
       return this;
     }
+    public ReplicateContext setWalGroupId(String walGroupId) {
+      this.walGroupId = walGroupId;
+      return this;
+    }
     public List<Entry> getEntries() {
       return entries;
     }
     public int getSize() {
       return size;
     }
+    public String getWalGroupId(){
+      return walGroupId;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/242adad5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 44ec804..0dfad0c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -154,6 +154,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
   @Override
   public boolean replicate(ReplicateContext replicateContext) {
     List<Entry> entries = replicateContext.getEntries();
+    String walGroupId = replicateContext.getWalGroupId();
     int sleepMultiplier = 1;
 
     if (!peersSelected && this.isRunning()) {
@@ -222,12 +223,13 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
           throw iox;
         }
         // update metrics
-        this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
+        this.metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
+          walGroupId);
         return true;
 
       } catch (IOException ioe) {
         // Didn't ship anything, but must still age the last time we did
-        this.metrics.refreshAgeOfLastShippedOp();
+        this.metrics.refreshAgeOfLastShippedOp(walGroupId);
         if (ioe instanceof RemoteException) {
           ioe = ((RemoteException) ioe).unwrapRemoteException();
           LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);

http://git-wip-us.apache.org/repos/asf/hbase/blob/242adad5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index cf08787..f9f7001 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.hbase.replication.regionserver;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -34,7 +37,8 @@ public class MetricsSource {
 
   private static final Log LOG = LogFactory.getLog(MetricsSource.class);
 
-  private long lastTimestamp = 0;
+  // tracks last shipped timestamp for each wal group
+  private Map<String, Long> lastTimeStamps = new HashMap<String, Long>();
   private int lastQueueSize = 0;
   private String id;
 
@@ -56,23 +60,29 @@ public class MetricsSource {
 
   /**
    * Set the age of the last edit that was shipped
-   *
    * @param timestamp write time of the edit
+   * @param walGroup which group we are setting
    */
-  public void setAgeOfLastShippedOp(long timestamp) {
+  public void setAgeOfLastShippedOp(long timestamp, String walGroup) {
     long age = EnvironmentEdgeManager.currentTime() - timestamp;
     singleSourceSource.setLastShippedAge(age);
     globalSourceSource.setLastShippedAge(age);
-    this.lastTimestamp = timestamp;
+    this.lastTimeStamps.put(walGroup, timestamp);
   }
 
   /**
    * Convenience method to use the last given timestamp to refresh the age of the last edit. Used
    * when replication fails and need to keep that metric accurate.
+   * @param walGroupId id of the group to update
    */
-  public void refreshAgeOfLastShippedOp() {
-    if (this.lastTimestamp > 0) {
-      setAgeOfLastShippedOp(this.lastTimestamp);
+  public void refreshAgeOfLastShippedOp(String walGroupId) {
+    Long lastTimestamp = this.lastTimeStamps.get(walGroupId);
+    if (lastTimestamp == null) {
+      this.lastTimeStamps.put(walGroupId, 0L);
+      lastTimestamp = 0L;
+    }
+    if (lastTimestamp > 0) {
+      setAgeOfLastShippedOp(lastTimestamp, walGroupId);
     }
   }
 
@@ -143,6 +153,7 @@ public class MetricsSource {
   public void clear() {
     singleSourceSource.clear();
     globalSourceSource.decrSizeOfLogQueue(lastQueueSize);
+    lastTimeStamps.clear();
     lastQueueSize = 0;
   }
 
@@ -163,10 +174,16 @@ public class MetricsSource {
   }
 
   /**
-   * Get the timeStampsOfLastShippedOp
+   * Get the timeStampsOfLastShippedOp, if there are multiple groups, return the latest one
    * @return lastTimestampForAge
    */
   public long getTimeStampOfLastShippedOp() {
+    long lastTimestamp = 0L;
+    for (long ts : lastTimeStamps.values()) {
+      if (ts > lastTimestamp) {
+        lastTimestamp = ts;
+      }
+    }
     return lastTimestamp;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/242adad5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 697968e..62c31e6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -22,11 +22,17 @@ import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -77,8 +83,12 @@ public class ReplicationSource extends Thread
     implements ReplicationSourceInterface {
 
   private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
-  // Queue of logs to process
-  private PriorityBlockingQueue<Path> queue;
+  // Queues of logs to process, entry in format of walGroupId->queue,
+  // each presents a queue for one wal group
+  private Map<String, PriorityBlockingQueue<Path>> queues =
+      new HashMap<String, PriorityBlockingQueue<Path>>();
+  // per group queue size, keep no more than this number of logs in each wal group
+  private int queueSizePerGroup;
   private ReplicationQueues replicationQueues;
   private ReplicationPeers replicationPeers;
 
@@ -96,35 +106,23 @@ public class ReplicationSource extends Thread
   private long replicationQueueSizeCapacity;
   // Max number of entries in entriesArray
   private int replicationQueueNbCapacity;
-  // Our reader for the current log. open/close handled by repLogReader
-  private WAL.Reader reader;
-  // Last position in the log that we sent to ZooKeeper
-  private long lastLoggedPosition = -1;
-  // Path of the current log
-  private volatile Path currentPath;
   private FileSystem fs;
   // id of this cluster
   private UUID clusterId;
   // id of the other cluster
   private UUID peerClusterId;
   // total number of edits we replicated
-  private long totalReplicatedEdits = 0;
+  private AtomicLong totalReplicatedEdits = new AtomicLong(0);
   // total number of edits we replicated
-  private long totalReplicatedOperations = 0;
+  private AtomicLong totalReplicatedOperations = new AtomicLong(0);
   // The znode we currently play with
   private String peerClusterZnode;
   // Maximum number of retries before taking bold actions
   private int maxRetriesMultiplier;
-  // Current number of operations (Put/Delete) that we need to replicate
-  private int currentNbOperations = 0;
-  // Current size of data we need to replicate
-  private int currentSize = 0;
   // Indicates if this particular source is running
-  private volatile boolean running = true;
+  private volatile boolean sourceRunning = false;
   // Metrics for this source
   private MetricsSource metrics;
-  // Handle on the log reader helper
-  private ReplicationWALReaderManager repLogReader;
   //WARN threshold for the number of queued logs, defaults to 2
   private int logQueueWarnThreshold;
   // ReplicationEndpoint which will handle the actual replication
@@ -133,6 +131,9 @@ public class ReplicationSource extends Thread
   private WALEntryFilter walEntryFilter;
   // throttler
   private ReplicationThrottler throttler;
+  private AtomicInteger logQueueSize = new AtomicInteger(0);
+  private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads =
+      new ConcurrentHashMap<String, ReplicationSourceWorkerThread>();
 
   /**
    * Instantiation method used by region servers
@@ -165,10 +166,7 @@ public class ReplicationSource extends Thread
         this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
     this.maxRetriesMultiplier =
         this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
-    this.queue =
-        new PriorityBlockingQueue<Path>(
-            this.conf.getInt("hbase.regionserver.maxlogs", 32),
-            new LogsComparator());
+    this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
     long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
     this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
     this.replicationQueues = replicationQueues;
@@ -176,7 +174,6 @@ public class ReplicationSource extends Thread
     this.manager = manager;
     this.fs = fs;
     this.metrics = metrics;
-    this.repLogReader = new ReplicationWALReaderManager(this.fs, this.conf);
     this.clusterId = clusterId;
 
     this.peerClusterZnode = peerClusterZnode;
@@ -196,13 +193,33 @@ public class ReplicationSource extends Thread
 
   @Override
   public void enqueueLog(Path log) {
-    this.queue.put(log);
-    int queueSize = queue.size();
+    String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(log.getName());
+    PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
+    if (queue == null) {
+      queue = new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
+      queues.put(logPrefix, queue);
+      if (this.sourceRunning) {
+        // new wal group observed after source startup, start a new worker thread to track it
+        // notice: it's possible that log enqueued when this.running is set but worker thread
+        // still not launched, so it's necessary to check workerThreads before start the worker
+        final ReplicationSourceWorkerThread worker =
+            new ReplicationSourceWorkerThread(logPrefix, queue, replicationQueueInfo, this);
+        ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(logPrefix, worker);
+        if (extant != null) {
+          LOG.debug("Someone has beat us to start a worker thread for wal group " + logPrefix);
+        } else {
+          LOG.debug("Starting up worker for wal group " + logPrefix);
+          worker.startup();
+        }
+      }
+    }
+    queue.put(log);
+    int queueSize = logQueueSize.incrementAndGet();
     this.metrics.setSizeOfLogQueue(queueSize);
     // This will log a warning for each new log that gets created above the warn threshold
-    if (queueSize > this.logQueueWarnThreshold) {
-      LOG.warn("Queue size: " + queueSize +
-        " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
+    if (queue.size() > this.logQueueWarnThreshold) {
+      LOG.warn("WAL group " + logPrefix + " queue size: " + queueSize
+          + " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
     }
   }
 
@@ -217,12 +234,8 @@ public class ReplicationSource extends Thread
 
   @Override
   public void run() {
-    // We were stopped while looping to connect to sinks, just abort
-    if (!this.isActive()) {
-      uninitialize();
-      return;
-    }
-
+    // mark we are running now
+    this.sourceRunning = true;
     try {
       // start the endpoint, connect to the cluster
       Service.State state = replicationEndpoint.start().get();
@@ -247,22 +260,14 @@ public class ReplicationSource extends Thread
 
     int sleepMultiplier = 1;
     // delay this until we are in an asynchronous thread
-    while (this.isActive() && this.peerClusterId == null) {
+    while (this.isSourceActive() && this.peerClusterId == null) {
       this.peerClusterId = replicationEndpoint.getPeerUUID();
-      if (this.isActive() && this.peerClusterId == null) {
+      if (this.isSourceActive() && this.peerClusterId == null) {
         if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
           sleepMultiplier++;
         }
       }
     }
-    // We were stopped while looping to contact peer's zk ensemble, just abort
-    if (!this.isActive()) {
-      uninitialize();
-      return;
-    }
-
-    // resetting to 1 to reuse later
-    sleepMultiplier = 1;
 
     // In rare case, zookeeper setting may be messed up. That leads to the incorrect
     // peerClusterId value, which is the same as the source clusterId
@@ -271,349 +276,21 @@ public class ReplicationSource extends Thread
           + peerClusterId + " which is not allowed by ReplicationEndpoint:"
           + replicationEndpoint.getClass().getName(), null, false);
     }
-    LOG.info("Replicating "+clusterId + " -> " + peerClusterId);
-
-    // If this is recovered, the queue is already full and the first log
-    // normally has a position (unless the RS failed between 2 logs)
-    if (this.replicationQueueInfo.isQueueRecovered()) {
-      try {
-        this.repLogReader.setPosition(this.replicationQueues.getLogPosition(this.peerClusterZnode,
-          this.queue.peek().getName()));
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Recovered queue started with log " + this.queue.peek() +
-              " at position " + this.repLogReader.getPosition());
-        }
-      } catch (ReplicationException e) {
-        this.terminate("Couldn't get the position of this recovered queue " +
-            this.peerClusterZnode, e);
-      }
-    }
-    // Loop until we close down
-    while (isActive()) {
-      // Sleep until replication is enabled again
-      if (!isPeerEnabled()) {
-        if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
-          sleepMultiplier++;
-        }
-        continue;
-      }
-      Path oldPath = getCurrentPath(); //note that in the current scenario,
-                                       //oldPath will be null when a log roll
-                                       //happens.
-      // Get a new path
-      boolean hasCurrentPath = getNextPath();
-      if (getCurrentPath() != null && oldPath == null) {
-        sleepMultiplier = 1; //reset the sleepMultiplier on a path change
-      }
-      if (!hasCurrentPath) {
-        if (sleepForRetries("No log to process", sleepMultiplier)) {
-          sleepMultiplier++;
-        }
-        continue;
-      }
-      boolean currentWALisBeingWrittenTo = false;
-      //For WAL files we own (rather than recovered), take a snapshot of whether the
-      //current WAL file (this.currentPath) is in use (for writing) NOW!
-      //Since the new WAL paths are enqueued only after the prev WAL file
-      //is 'closed', presence of an element in the queue means that
-      //the previous WAL file was closed, else the file is in use (currentPath)
-      //We take the snapshot now so that we are protected against races
-      //where a new file gets enqueued while the current file is being processed
-      //(and where we just finished reading the current file).
-      if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) {
-        currentWALisBeingWrittenTo = true;
-      }
-      // Open a reader on it
-      if (!openReader(sleepMultiplier)) {
-        // Reset the sleep multiplier, else it'd be reused for the next file
-        sleepMultiplier = 1;
-        continue;
-      }
-
-      // If we got a null reader but didn't continue, then sleep and continue
-      if (this.reader == null) {
-        if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
-          sleepMultiplier++;
-        }
-        continue;
-      }
-
-      boolean gotIOE = false;
-      currentNbOperations = 0;
-      List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1);
-      currentSize = 0;
-      try {
-        if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
-          continue;
-        }
-      } catch (IOException ioe) {
-        LOG.warn(this.peerClusterZnode + " Got: ", ioe);
-        gotIOE = true;
-        if (ioe.getCause() instanceof EOFException) {
-
-          boolean considerDumping = false;
-          if (this.replicationQueueInfo.isQueueRecovered()) {
-            try {
-              FileStatus stat = this.fs.getFileStatus(this.currentPath);
-              if (stat.getLen() == 0) {
-                LOG.warn(this.peerClusterZnode + " Got EOF and the file was empty");
-              }
-              considerDumping = true;
-            } catch (IOException e) {
-              LOG.warn(this.peerClusterZnode + " Got while getting file size: ", e);
-            }
-          }
-
-          if (considerDumping &&
-              sleepMultiplier == this.maxRetriesMultiplier &&
-              processEndOfFile()) {
-            continue;
-          }
-        }
-      } finally {
-        try {
-          this.reader = null;
-          this.repLogReader.closeReader();
-        } catch (IOException e) {
-          gotIOE = true;
-          LOG.warn("Unable to finalize the tailing of a file", e);
-        }
-      }
-
-      // If we didn't get anything to replicate, or if we hit a IOE,
-      // wait a bit and retry.
-      // But if we need to stop, don't bother sleeping
-      if (this.isActive() && (gotIOE || entries.isEmpty())) {
-        if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
-          this.manager.logPositionAndCleanOldLogs(this.currentPath,
-              this.peerClusterZnode, this.repLogReader.getPosition(),
-              this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
-          this.lastLoggedPosition = this.repLogReader.getPosition();
-        }
-        // Reset the sleep multiplier if nothing has actually gone wrong
-        if (!gotIOE) {
-          sleepMultiplier = 1;
-          // if there was nothing to ship and it's not an error
-          // set "ageOfLastShippedOp" to <now> to indicate that we're current
-          this.metrics.setAgeOfLastShippedOp(System.currentTimeMillis());
-        }
-        if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
-          sleepMultiplier++;
-        }
-        continue;
-      }
-      sleepMultiplier = 1;
-      shipEdits(currentWALisBeingWrittenTo, entries);
-    }
-    uninitialize();
-  }
-
-  /**
-   * Read all the entries from the current log files and retain those
-   * that need to be replicated. Else, process the end of the current file.
-   * @param currentWALisBeingWrittenTo is the current WAL being written to
-   * @param entries resulting entries to be replicated
-   * @return true if we got nothing and went to the next file, false if we got
-   * entries
-   * @throws IOException
-   */
-  protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
-      List<WAL.Entry> entries) throws IOException {
-    long seenEntries = 0;
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Seeking in " + this.currentPath + " at position "
-          + this.repLogReader.getPosition());
-    }
-    this.repLogReader.seek();
-    long positionBeforeRead = this.repLogReader.getPosition();
-    WAL.Entry entry =
-        this.repLogReader.readNextAndSetPosition();
-    while (entry != null) {
-      this.metrics.incrLogEditsRead();
-      seenEntries++;
-
-      // don't replicate if the log entries have already been consumed by the cluster
-      if (replicationEndpoint.canReplicateToSameCluster()
-          || !entry.getKey().getClusterIds().contains(peerClusterId)) {
-        // Remove all KVs that should not be replicated
-        entry = walEntryFilter.filter(entry);
-        WALEdit edit = null;
-        WALKey logKey = null;
-        if (entry != null) {
-          edit = entry.getEdit();
-          logKey = entry.getKey();
-        }
-
-        if (edit != null && edit.size() != 0) {
-          //Mark that the current cluster has the change
-          logKey.addClusterId(clusterId);
-          currentNbOperations += countDistinctRowKeys(edit);
-          entries.add(entry);
-          currentSize += entry.getEdit().heapSize();
-        } else {
-          this.metrics.incrLogEditsFiltered();
-        }
-      }
-      // Stop if too many entries or too big
-      if (currentSize >= this.replicationQueueSizeCapacity ||
-          entries.size() >= this.replicationQueueNbCapacity) {
-        break;
-      }
-      try {
-        entry = this.repLogReader.readNextAndSetPosition();
-      } catch (IOException ie) {
-        LOG.debug("Break on IOE: " + ie.getMessage());
-        break;
-      }
-    }
-    metrics.incrLogReadInBytes(this.repLogReader.getPosition() - positionBeforeRead);
-    if (currentWALisBeingWrittenTo) {
-      return false;
-    }
-    // If we didn't get anything and the queue has an object, it means we
-    // hit the end of the file for sure
-    return seenEntries == 0 && processEndOfFile();
-  }
-
-  /**
-   * Poll for the next path
-   * @return true if a path was obtained, false if not
-   */
-  protected boolean getNextPath() {
-    try {
-      if (this.currentPath == null) {
-        this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
-        this.metrics.setSizeOfLogQueue(queue.size());
-        if (this.currentPath != null) {
-          this.manager.cleanOldLogs(this.currentPath.getName(),
-              this.peerId,
-              this.replicationQueueInfo.isQueueRecovered());
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("New log: " + this.currentPath);
-          }
-        }
-      }
-    } catch (InterruptedException e) {
-      LOG.warn("Interrupted while reading edits", e);
-    }
-    return this.currentPath != null;
-  }
-
-  /**
-   * Open a reader on the current path
-   *
-   * @param sleepMultiplier by how many times the default sleeping time is augmented
-   * @return true if we should continue with that file, false if we are over with it
-   */
-  protected boolean openReader(int sleepMultiplier) {
-    try {
-      try {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Opening log " + this.currentPath);
-        }
-        this.reader = repLogReader.openReader(this.currentPath);
-      } catch (FileNotFoundException fnfe) {
-        if (this.replicationQueueInfo.isQueueRecovered()) {
-          // We didn't find the log in the archive directory, look if it still
-          // exists in the dead RS folder (there could be a chain of failures
-          // to look at)
-          List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
-          LOG.info("NB dead servers : " + deadRegionServers.size());
-          final Path rootDir = FSUtils.getRootDir(this.conf);
-          for (String curDeadServerName : deadRegionServers) {
-            final Path deadRsDirectory = new Path(rootDir,
-                DefaultWALProvider.getWALDirectoryName(curDeadServerName));
-            Path[] locs = new Path[] {
-                new Path(deadRsDirectory, currentPath.getName()),
-                new Path(deadRsDirectory.suffix(DefaultWALProvider.SPLITTING_EXT),
-                                          currentPath.getName()),
-            };
-            for (Path possibleLogLocation : locs) {
-              LOG.info("Possible location " + possibleLogLocation.toUri().toString());
-              if (this.manager.getFs().exists(possibleLogLocation)) {
-                // We found the right new location
-                LOG.info("Log " + this.currentPath + " still exists at " +
-                    possibleLogLocation);
-                // Breaking here will make us sleep since reader is null
-                // TODO why don't we need to set currentPath and call openReader here?
-                return true;
-              }
-            }
-          }
-          // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
-          // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
-          if (stopper instanceof ReplicationSyncUp.DummyServer) {
-            // N.B. the ReplicationSyncUp tool sets the manager.getLogDir to the root of the wal
-            //      area rather than to the wal area for a particular region server.
-            FileStatus[] rss = fs.listStatus(manager.getLogDir());
-            for (FileStatus rs : rss) {
-              Path p = rs.getPath();
-              FileStatus[] logs = fs.listStatus(p);
-              for (FileStatus log : logs) {
-                p = new Path(p, log.getPath().getName());
-                if (p.getName().equals(currentPath.getName())) {
-                  currentPath = p;
-                  LOG.info("Log " + currentPath.getName() + " found at " + currentPath);
-                  // Open the log at the new location
-                  this.openReader(sleepMultiplier);
-                  return true;
-                }
-              }
-            }
-          }
-
-          // TODO What happens if the log was missing from every single location?
-          // Although we need to check a couple of times as the log could have
-          // been moved by the master between the checks
-          // It can also happen if a recovered queue wasn't properly cleaned,
-          // such that the znode pointing to a log exists but the log was
-          // deleted a long time ago.
-          // For the moment, we'll throw the IO and processEndOfFile
-          throw new IOException("File from recovered queue is " +
-              "nowhere to be found", fnfe);
-        } else {
-          // If the log was archived, continue reading from there
-          Path archivedLogLocation =
-              new Path(manager.getOldLogDir(), currentPath.getName());
-          if (this.manager.getFs().exists(archivedLogLocation)) {
-            currentPath = archivedLogLocation;
-            LOG.info("Log " + this.currentPath + " was moved to " +
-                archivedLogLocation);
-            // Open the log at the new location
-            this.openReader(sleepMultiplier);
-
-          }
-          // TODO What happens the log is missing in both places?
-        }
-      }
-    } catch (IOException ioe) {
-      if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
-      LOG.warn(this.peerClusterZnode + " Got: ", ioe);
-      this.reader = null;
-      if (ioe.getCause() instanceof NullPointerException) {
-        // Workaround for race condition in HDFS-4380
-        // which throws a NPE if we open a file before any data node has the most recent block
-        // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
-        LOG.warn("Got NPE opening reader, will retry.");
-      } else if (sleepMultiplier == this.maxRetriesMultiplier) {
-        // TODO Need a better way to determine if a file is really gone but
-        // TODO without scanning all logs dir
-        LOG.warn("Waited too long for this file, considering dumping");
-        return !processEndOfFile();
+    LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
+    // start workers
+    for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
+      String walGroupId = entry.getKey();
+      PriorityBlockingQueue<Path> queue = entry.getValue();
+      final ReplicationSourceWorkerThread worker =
+          new ReplicationSourceWorkerThread(walGroupId, queue, replicationQueueInfo, this);
+      ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(walGroupId, worker);
+      if (extant != null) {
+        LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
+      } else {
+        LOG.debug("Starting up worker for wal group " + walGroupId);
+        worker.startup();
       }
     }
-    return true;
-  }
-
-  /*
-   * Checks whether the current log file is empty, and it is not a recovered queue. This is to
-   * handle scenario when in an idle cluster, there is no entry in the current log and we keep on
-   * trying to read the log file and get EOFException. In case of a recovered queue the last log
-   * file may be empty, and we don't want to retry that.
-   */
-  private boolean isCurrentLogEmpty() {
-    return (this.repLogReader.getPosition() == 0 &&
-        !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
   }
 
   /**
@@ -636,101 +313,6 @@ public class ReplicationSource extends Thread
   }
 
   /**
-   * Count the number of different row keys in the given edit because of
-   * mini-batching. We assume that there's at least one Cell in the WALEdit.
-   * @param edit edit to count row keys from
-   * @return number of different row keys
-   */
-  private int countDistinctRowKeys(WALEdit edit) {
-    List<Cell> cells = edit.getCells();
-    int distinctRowKeys = 1;
-    Cell lastCell = cells.get(0);
-    for (int i = 0; i < edit.size(); i++) {
-      if (!CellUtil.matchingRow(cells.get(i), lastCell)) {
-        distinctRowKeys++;
-      }
-      lastCell = cells.get(i);
-    }
-    return distinctRowKeys;
-  }
-
-  /**
-   * Do the shipping logic
-   * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
-   * written to when this method was called
-   */
-  protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries) {
-    int sleepMultiplier = 0;
-    if (entries.isEmpty()) {
-      LOG.warn("Was given 0 edits to ship");
-      return;
-    }
-    while (this.isActive()) {
-      try {
-        if (this.throttler.isEnabled()) {
-          long sleepTicks = this.throttler.getNextSleepInterval(currentSize);
-          if (sleepTicks > 0) {
-            try {
-              if (LOG.isTraceEnabled()) {
-                LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
-              }
-              Thread.sleep(sleepTicks);
-            } catch (InterruptedException e) {
-              LOG.debug("Interrupted while sleeping for throttling control");
-              Thread.currentThread().interrupt();
-              // current thread might be interrupted to terminate
-              // directly go back to while() for confirm this
-              continue;
-            }
-            // reset throttler's cycle start tick when sleep for throttling occurs
-            this.throttler.resetStartTick();
-          }
-        }
-        // create replicateContext here, so the entries can be GC'd upon return from this call stack
-        ReplicationEndpoint.ReplicateContext replicateContext = new ReplicationEndpoint.ReplicateContext();
-        replicateContext.setEntries(entries).setSize(currentSize);
-
-        long startTimeNs = System.nanoTime();
-        // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
-        boolean replicated = replicationEndpoint.replicate(replicateContext);
-        long endTimeNs = System.nanoTime();
-
-        if (!replicated) {
-          continue;
-        } else {
-          sleepMultiplier = Math.max(sleepMultiplier-1, 0);
-        }
-
-        if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
-          this.manager.logPositionAndCleanOldLogs(this.currentPath,
-              this.peerClusterZnode, this.repLogReader.getPosition(),
-              this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
-          this.lastLoggedPosition = this.repLogReader.getPosition();
-        }
-        if (this.throttler.isEnabled()) {
-          this.throttler.addPushSize(currentSize);
-        }
-        this.totalReplicatedEdits += entries.size();
-        this.totalReplicatedOperations += currentNbOperations;
-        this.metrics.shipBatch(this.currentNbOperations, this.currentSize/1024);
-        this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Replicated " + this.totalReplicatedEdits + " entries in total, or "
-              + this.totalReplicatedOperations + " operations in " +
-              ((endTimeNs - startTimeNs)/1000000) + " ms");
-        }
-        break;
-      } catch (Exception ex) {
-        LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:" +
-            org.apache.hadoop.util.StringUtils.stringifyException(ex));
-        if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
-          sleepMultiplier++;
-        }
-      }
-    }
-  }
-
-  /**
    * check whether the peer is enabled or not
    *
    * @return true if the peer is enabled, otherwise false
@@ -739,53 +321,17 @@ public class ReplicationSource extends Thread
     return this.replicationPeers.getStatusOfPeer(this.peerId);
   }
 
-  /**
-   * If the queue isn't empty, switch to the next one
-   * Else if this is a recovered queue, it means we're done!
-   * Else we'll just continue to try reading the log file
-   * @return true if we're done with the current file, false if we should
-   * continue trying to read from it
-   */
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE",
-      justification="Yeah, this is how it works")
-  protected boolean processEndOfFile() {
-    if (this.queue.size() != 0) {
-      if (LOG.isTraceEnabled()) {
-        String filesize = "N/A";
-        try {
-          FileStatus stat = this.fs.getFileStatus(this.currentPath);
-          filesize = stat.getLen()+"";
-        } catch (IOException ex) {}
-        LOG.trace("Reached the end of a log, stats: " + getStats() +
-            ", and the length of the file is " + filesize);
-      }
-      this.currentPath = null;
-      this.repLogReader.finishCurrentFile();
-      this.reader = null;
-      return true;
-    } else if (this.replicationQueueInfo.isQueueRecovered()) {
-      this.manager.closeRecoveredQueue(this);
-      LOG.info("Finished recovering the queue with the following stats " + getStats());
-      this.running = false;
-      return true;
-    }
-    return false;
-  }
-
   @Override
   public void startup() {
     String n = Thread.currentThread().getName();
-    Thread.UncaughtExceptionHandler handler =
-        new Thread.UncaughtExceptionHandler() {
-          @Override
-          public void uncaughtException(final Thread t, final Throwable e) {
-            LOG.error("Unexpected exception in ReplicationSource," +
-              " currentPath=" + currentPath, e);
-          }
-        };
-    Threads.setDaemonThreadRunning(
-        this, n + ".replicationSource," +
-        this.peerClusterZnode, handler);
+    Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
+      @Override
+      public void uncaughtException(final Thread t, final Throwable e) {
+        LOG.error("Unexpected exception in ReplicationSource", e);
+      }
+    };
+    Threads
+        .setDaemonThreadRunning(this, n + ".replicationSource," + this.peerClusterZnode, handler);
   }
 
   @Override
@@ -807,14 +353,21 @@ public class ReplicationSource extends Thread
       LOG.error("Closing source " + this.peerClusterZnode
           + " because an error occurred: " + reason, cause);
     }
-    this.running = false;
-    this.interrupt();
+    this.sourceRunning = false;
+    Collection<ReplicationSourceWorkerThread> workers = workerThreads.values();
+    for (ReplicationSourceWorkerThread worker : workers) {
+      worker.setWorkerRunning(false);
+      worker.interrupt();
+    }
     ListenableFuture<Service.State> future = null;
     if (this.replicationEndpoint != null) {
       future = this.replicationEndpoint.stop();
     }
     if (join) {
-      Threads.shutdown(this, this.sleepForRetries);
+      for (ReplicationSourceWorkerThread worker : workers) {
+        Threads.shutdown(worker, this.sleepForRetries);
+        LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated");
+      }
       if (future != null) {
         try {
           future.get();
@@ -837,11 +390,15 @@ public class ReplicationSource extends Thread
 
   @Override
   public Path getCurrentPath() {
-    return this.currentPath;
+    // only for testing
+    for (ReplicationSourceWorkerThread worker : workerThreads.values()) {
+      if (worker.getCurrentPath() != null) return worker.getCurrentPath();
+    }
+    return null;
   }
 
-  private boolean isActive() {
-    return !this.stopper.isStopped() && this.running && !isInterrupted();
+  private boolean isSourceActive() {
+    return !this.stopper.isStopped() && this.sourceRunning;
   }
 
   /**
@@ -868,10 +425,23 @@ public class ReplicationSource extends Thread
 
   @Override
   public String getStats() {
-    long position = this.repLogReader.getPosition();
-    return "Total replicated edits: " + totalReplicatedEdits +
-      ", currently replicating from: " + this.currentPath +
-      " at position: " + position;
+    StringBuilder sb = new StringBuilder();
+    sb.append("Total replicated edits: ").append(totalReplicatedEdits)
+        .append(", current progress: \n");
+    for (Map.Entry<String, ReplicationSourceWorkerThread> entry : workerThreads.entrySet()) {
+      String walGroupId = entry.getKey();
+      ReplicationSourceWorkerThread worker = entry.getValue();
+      long position = worker.getCurrentPosition();
+      Path currentPath = worker.getCurrentPath();
+      sb.append("walGroup [").append(walGroupId).append("]: ");
+      if (currentPath != null) {
+        sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
+            .append(position).append("\n");
+      } else {
+        sb.append("no replication ongoing, waiting for new log");
+      }
+    }
+    return sb.toString();
   }
 
   /**
@@ -881,4 +451,572 @@ public class ReplicationSource extends Thread
   public MetricsSource getSourceMetrics() {
     return this.metrics;
   }
+
+  public class ReplicationSourceWorkerThread extends Thread {
+    private ReplicationSource source;
+    private String walGroupId;
+    private PriorityBlockingQueue<Path> queue;
+    private ReplicationQueueInfo replicationQueueInfo;
+    // Our reader for the current log. open/close handled by repLogReader
+    private WAL.Reader reader;
+    // Last position in the log that we sent to ZooKeeper
+    private long lastLoggedPosition = -1;
+    // Path of the current log
+    private volatile Path currentPath;
+    // Handle on the log reader helper
+    private ReplicationWALReaderManager repLogReader;
+    // Current number of operations (Put/Delete) that we need to replicate
+    private int currentNbOperations = 0;
+    // Current size of data we need to replicate
+    private int currentSize = 0;
+    // Indicates whether this particular worker is running
+    private boolean workerRunning = true;
+
+    public ReplicationSourceWorkerThread(String walGroupId, PriorityBlockingQueue<Path> queue,
+        ReplicationQueueInfo replicationQueueInfo, ReplicationSource source) {
+      this.walGroupId = walGroupId;
+      this.queue = queue;
+      this.replicationQueueInfo = replicationQueueInfo;
+      this.repLogReader = new ReplicationWALReaderManager(fs, conf);
+      this.source = source;
+    }
+
+    @Override
+    public void run() {
+      // If this is recovered, the queue is already full and the first log
+      // normally has a position (unless the RS failed between 2 logs)
+      if (this.replicationQueueInfo.isQueueRecovered()) {
+        try {
+          this.repLogReader.setPosition(replicationQueues.getLogPosition(peerClusterZnode,
+            this.queue.peek().getName()));
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
+                + this.repLogReader.getPosition());
+          }
+        } catch (ReplicationException e) {
+          terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
+        }
+      }
+      // Loop until we close down
+      while (isWorkerActive()) {
+        int sleepMultiplier = 1;
+        // Sleep until replication is enabled again
+        if (!isPeerEnabled()) {
+          if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
+            sleepMultiplier++;
+          }
+          continue;
+        }
+        Path oldPath = getCurrentPath(); //note that in the current scenario,
+                                         //oldPath will be null when a log roll
+                                         //happens.
+        // Get a new path
+        boolean hasCurrentPath = getNextPath();
+        if (getCurrentPath() != null && oldPath == null) {
+          sleepMultiplier = 1; //reset the sleepMultiplier on a path change
+        }
+        if (!hasCurrentPath) {
+          if (sleepForRetries("No log to process", sleepMultiplier)) {
+            sleepMultiplier++;
+          }
+          continue;
+        }
+        boolean currentWALisBeingWrittenTo = false;
+        //For WAL files we own (rather than recovered), take a snapshot of whether the
+        //current WAL file (this.currentPath) is in use (for writing) NOW!
+        //Since the new WAL paths are enqueued only after the prev WAL file
+        //is 'closed', presence of an element in the queue means that
+        //the previous WAL file was closed, else the file is in use (currentPath)
+        //We take the snapshot now so that we are protected against races
+        //where a new file gets enqueued while the current file is being processed
+        //(and where we just finished reading the current file).
+        if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) {
+          currentWALisBeingWrittenTo = true;
+        }
+        // Open a reader on it
+        if (!openReader(sleepMultiplier)) {
+          // Reset the sleep multiplier, else it'd be reused for the next file
+          sleepMultiplier = 1;
+          continue;
+        }
+
+        // If we got a null reader but didn't continue, then sleep and continue
+        if (this.reader == null) {
+          if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
+            sleepMultiplier++;
+          }
+          continue;
+        }
+
+        boolean gotIOE = false;
+        currentNbOperations = 0;
+        List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1);
+        currentSize = 0;
+        try {
+          if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
+            continue;
+          }
+        } catch (IOException ioe) {
+          LOG.warn(peerClusterZnode + " Got: ", ioe);
+          gotIOE = true;
+          if (ioe.getCause() instanceof EOFException) {
+
+            boolean considerDumping = false;
+            if (this.replicationQueueInfo.isQueueRecovered()) {
+              try {
+                FileStatus stat = fs.getFileStatus(this.currentPath);
+                if (stat.getLen() == 0) {
+                  LOG.warn(peerClusterZnode + " Got EOF and the file was empty");
+                }
+                considerDumping = true;
+              } catch (IOException e) {
+                LOG.warn(peerClusterZnode + " Got while getting file size: ", e);
+              }
+            }
+
+            if (considerDumping &&
+                sleepMultiplier == maxRetriesMultiplier &&
+                processEndOfFile()) {
+              continue;
+            }
+          }
+        } finally {
+          try {
+            this.reader = null;
+            this.repLogReader.closeReader();
+          } catch (IOException e) {
+            gotIOE = true;
+            LOG.warn("Unable to finalize the tailing of a file", e);
+          }
+        }
+
+        // If we didn't get anything to replicate, or if we hit a IOE,
+        // wait a bit and retry.
+        // But if we need to stop, don't bother sleeping
+        if (isWorkerActive() && (gotIOE || entries.isEmpty())) {
+          if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
+            manager.logPositionAndCleanOldLogs(this.currentPath,
+                peerClusterZnode, this.repLogReader.getPosition(),
+                this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
+            this.lastLoggedPosition = this.repLogReader.getPosition();
+          }
+          // Reset the sleep multiplier if nothing has actually gone wrong
+          if (!gotIOE) {
+            sleepMultiplier = 1;
+            // if there was nothing to ship and it's not an error
+            // set "ageOfLastShippedOp" to <now> to indicate that we're current
+            metrics.setAgeOfLastShippedOp(System.currentTimeMillis(), walGroupId);
+          }
+          if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
+            sleepMultiplier++;
+          }
+          continue;
+        }
+        sleepMultiplier = 1;
+        shipEdits(currentWALisBeingWrittenTo, entries);
+      }
+      if (replicationQueueInfo.isQueueRecovered()) {
+        // use synchronize to make sure one last thread will clean the queue
+        synchronized (workerThreads) {
+          Threads.sleep(100);// wait a short while for other worker thread to fully exit
+          boolean allOtherTaskDone = true;
+          for (ReplicationSourceWorkerThread worker : workerThreads.values()) {
+            if (!worker.equals(this) && worker.isAlive()) {
+              allOtherTaskDone = false;
+              break;
+            }
+          }
+          if (allOtherTaskDone) {
+            manager.closeRecoveredQueue(this.source);
+            LOG.info("Finished recovering queue " + peerClusterZnode
+                + " with the following stats: " + getStats());
+          }
+        }
+      }
+    }
+
+    /**
+     * Read all the entries from the current log files and retain those that need to be replicated.
+     * Else, process the end of the current file.
+     * @param currentWALisBeingWrittenTo is the current WAL being written to
+     * @param entries resulting entries to be replicated
+     * @return true if we got nothing and went to the next file, false if we got entries
+     * @throws IOException
+     */
+    protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
+        List<WAL.Entry> entries) throws IOException {
+      long seenEntries = 0;
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Seeking in " + this.currentPath + " at position "
+            + this.repLogReader.getPosition());
+      }
+      this.repLogReader.seek();
+      long positionBeforeRead = this.repLogReader.getPosition();
+      WAL.Entry entry = this.repLogReader.readNextAndSetPosition();
+      while (entry != null) {
+        metrics.incrLogEditsRead();
+        seenEntries++;
+
+        // don't replicate if the log entries have already been consumed by the cluster
+        if (replicationEndpoint.canReplicateToSameCluster()
+            || !entry.getKey().getClusterIds().contains(peerClusterId)) {
+          // Remove all KVs that should not be replicated
+          entry = walEntryFilter.filter(entry);
+          WALEdit edit = null;
+          WALKey logKey = null;
+          if (entry != null) {
+            edit = entry.getEdit();
+            logKey = entry.getKey();
+          }
+
+          if (edit != null && edit.size() != 0) {
+            // Mark that the current cluster has the change
+            logKey.addClusterId(clusterId);
+            currentNbOperations += countDistinctRowKeys(edit);
+            entries.add(entry);
+            currentSize += entry.getEdit().heapSize();
+          } else {
+            metrics.incrLogEditsFiltered();
+          }
+        }
+        // Stop if too many entries or too big
+        // FIXME check the relationship between single wal group and overall
+        if (currentSize >= replicationQueueSizeCapacity
+            || entries.size() >= replicationQueueNbCapacity) {
+          break;
+        }
+        try {
+          entry = this.repLogReader.readNextAndSetPosition();
+        } catch (IOException ie) {
+          LOG.debug("Break on IOE: " + ie.getMessage());
+          break;
+        }
+      }
+      metrics.incrLogReadInBytes(this.repLogReader.getPosition() - positionBeforeRead);
+      if (currentWALisBeingWrittenTo) {
+        return false;
+      }
+      // If we didn't get anything and the queue has an object, it means we
+      // hit the end of the file for sure
+      return seenEntries == 0 && processEndOfFile();
+    }
+
+    /**
+     * Poll for the next path
+     * @return true if a path was obtained, false if not
+     */
+    protected boolean getNextPath() {
+      try {
+        if (this.currentPath == null) {
+          this.currentPath = queue.poll(sleepForRetries, TimeUnit.MILLISECONDS);
+          int queueSize = logQueueSize.decrementAndGet();
+          metrics.setSizeOfLogQueue(queueSize);
+          if (this.currentPath != null) {
+            // For recovered queue: must use peerClusterZnode since peerId is a parsed value
+            manager.cleanOldLogs(this.currentPath.getName(), peerClusterZnode,
+              this.replicationQueueInfo.isQueueRecovered());
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("New log: " + this.currentPath);
+            }
+          }
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted while reading edits", e);
+      }
+      return this.currentPath != null;
+    }
+
+    /**
+     * Open a reader on the current path
+     *
+     * @param sleepMultiplier by how many times the default sleeping time is augmented
+     * @return true if we should continue with that file, false if we are over with it
+     */
+    protected boolean openReader(int sleepMultiplier) {
+
+      try {
+        try {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Opening log " + this.currentPath);
+          }
+          this.reader = repLogReader.openReader(this.currentPath);
+        } catch (FileNotFoundException fnfe) {
+          if (this.replicationQueueInfo.isQueueRecovered()) {
+            // We didn't find the log in the archive directory, look if it still
+            // exists in the dead RS folder (there could be a chain of failures
+            // to look at)
+            List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
+            LOG.info("NB dead servers : " + deadRegionServers.size());
+            final Path rootDir = FSUtils.getRootDir(conf);
+            for (String curDeadServerName : deadRegionServers) {
+              final Path deadRsDirectory = new Path(rootDir,
+                  DefaultWALProvider.getWALDirectoryName(curDeadServerName));
+              Path[] locs = new Path[] {
+                  new Path(deadRsDirectory, currentPath.getName()),
+                  new Path(deadRsDirectory.suffix(DefaultWALProvider.SPLITTING_EXT),
+                                            currentPath.getName()),
+              };
+              for (Path possibleLogLocation : locs) {
+                LOG.info("Possible location " + possibleLogLocation.toUri().toString());
+                if (manager.getFs().exists(possibleLogLocation)) {
+                  // We found the right new location
+                  LOG.info("Log " + this.currentPath + " still exists at " +
+                      possibleLogLocation);
+                  // Breaking here will make us sleep since reader is null
+                  // TODO why don't we need to set currentPath and call openReader here?
+                  return true;
+                }
+              }
+            }
+            // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
+            // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
+            if (stopper instanceof ReplicationSyncUp.DummyServer) {
+              // N.B. the ReplicationSyncUp tool sets the manager.getLogDir to the root of the wal
+              //      area rather than to the wal area for a particular region server.
+              FileStatus[] rss = fs.listStatus(manager.getLogDir());
+              for (FileStatus rs : rss) {
+                Path p = rs.getPath();
+                FileStatus[] logs = fs.listStatus(p);
+                for (FileStatus log : logs) {
+                  p = new Path(p, log.getPath().getName());
+                  if (p.getName().equals(currentPath.getName())) {
+                    currentPath = p;
+                    LOG.info("Log " + currentPath.getName() + " found at " + currentPath);
+                    // Open the log at the new location
+                    this.openReader(sleepMultiplier);
+                    return true;
+                  }
+                }
+              }
+            }
+
+            // TODO What happens if the log was missing from every single location?
+            // Although we need to check a couple of times as the log could have
+            // been moved by the master between the checks
+            // It can also happen if a recovered queue wasn't properly cleaned,
+            // such that the znode pointing to a log exists but the log was
+            // deleted a long time ago.
+            // For the moment, we'll throw the IO and processEndOfFile
+            throw new IOException("File from recovered queue is " +
+                "nowhere to be found", fnfe);
+          } else {
+            // If the log was archived, continue reading from there
+            Path archivedLogLocation =
+                new Path(manager.getOldLogDir(), currentPath.getName());
+            if (manager.getFs().exists(archivedLogLocation)) {
+              currentPath = archivedLogLocation;
+              LOG.info("Log " + this.currentPath + " was moved to " +
+                  archivedLogLocation);
+              // Open the log at the new location
+              this.openReader(sleepMultiplier);
+
+            }
+            // TODO What happens the log is missing in both places?
+          }
+        }
+      } catch (IOException ioe) {
+        if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
+        LOG.warn(peerClusterZnode + " Got: ", ioe);
+        this.reader = null;
+        if (ioe.getCause() instanceof NullPointerException) {
+          // Workaround for race condition in HDFS-4380
+          // which throws a NPE if we open a file before any data node has the most recent block
+          // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
+          LOG.warn("Got NPE opening reader, will retry.");
+        } else if (sleepMultiplier == maxRetriesMultiplier) {
+          // TODO Need a better way to determine if a file is really gone but
+          // TODO without scanning all logs dir
+          LOG.warn("Waited too long for this file, considering dumping");
+          return !processEndOfFile();
+        }
+      }
+      return true;
+    }
+
+    /*
+     * Checks whether the current log file is empty, and it is not a recovered queue. This is to
+     * handle scenario when in an idle cluster, there is no entry in the current log and we keep on
+     * trying to read the log file and get EOFException. In case of a recovered queue the last log
+     * file may be empty, and we don't want to retry that.
+     */
+    private boolean isCurrentLogEmpty() {
+      return (this.repLogReader.getPosition() == 0 &&
+          !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
+    }
+
+    /**
+     * Count the number of different row keys in the given edit because of mini-batching. We assume
+     * that there's at least one Cell in the WALEdit.
+     * @param edit edit to count row keys from
+     * @return number of different row keys
+     */
+    private int countDistinctRowKeys(WALEdit edit) {
+      List<Cell> cells = edit.getCells();
+      int distinctRowKeys = 1;
+      Cell lastCell = cells.get(0);
+      for (int i = 0; i < edit.size(); i++) {
+        if (!CellUtil.matchingRow(cells.get(i), lastCell)) {
+          distinctRowKeys++;
+        }
+      }
+      return distinctRowKeys;
+    }
+
+    /**
+     * Do the shipping logic
+     * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
+     * written to when this method was called
+     */
+    protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries) {
+      int sleepMultiplier = 0;
+      if (entries.isEmpty()) {
+        LOG.warn("Was given 0 edits to ship");
+        return;
+      }
+      while (isWorkerActive()) {
+        try {
+          if (throttler.isEnabled()) {
+            long sleepTicks = throttler.getNextSleepInterval(currentSize);
+            if (sleepTicks > 0) {
+              try {
+                if (LOG.isTraceEnabled()) {
+                  LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
+                }
+                Thread.sleep(sleepTicks);
+              } catch (InterruptedException e) {
+                LOG.debug("Interrupted while sleeping for throttling control");
+                Thread.currentThread().interrupt();
+                // current thread might be interrupted to terminate
+                // directly go back to while() for confirm this
+                continue;
+              }
+              // reset throttler's cycle start tick when sleep for throttling occurs
+              throttler.resetStartTick();
+            }
+          }
+          // create replicateContext here, so the entries can be GC'd upon return from this call
+          // stack
+          ReplicationEndpoint.ReplicateContext replicateContext =
+              new ReplicationEndpoint.ReplicateContext();
+          replicateContext.setEntries(entries).setSize(currentSize);
+          replicateContext.setWalGroupId(walGroupId);
+
+          long startTimeNs = System.nanoTime();
+          // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
+          boolean replicated = replicationEndpoint.replicate(replicateContext);
+          long endTimeNs = System.nanoTime();
+
+          if (!replicated) {
+            continue;
+          } else {
+            sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
+          }
+
+          if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
+            manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode,
+              this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(),
+              currentWALisBeingWrittenTo);
+            this.lastLoggedPosition = this.repLogReader.getPosition();
+          }
+          if (throttler.isEnabled()) {
+            throttler.addPushSize(currentSize);
+          }
+          totalReplicatedEdits.addAndGet(entries.size());
+          totalReplicatedOperations.addAndGet(currentNbOperations);
+          // FIXME check relationship between wal group and overall
+          metrics.shipBatch(currentNbOperations, currentSize / 1024);
+          metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
+            walGroupId);
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Replicated " + totalReplicatedEdits + " entries in total, or "
+                + totalReplicatedOperations + " operations in "
+                + ((endTimeNs - startTimeNs) / 1000000) + " ms");
+          }
+          break;
+        } catch (Exception ex) {
+          LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:"
+              + org.apache.hadoop.util.StringUtils.stringifyException(ex));
+          if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
+            sleepMultiplier++;
+          }
+        }
+      }
+    }
+
+    /**
+     * If the queue isn't empty, switch to the next one Else if this is a recovered queue, it means
+     * we're done! Else we'll just continue to try reading the log file
+     * @return true if we're done with the current file, false if we should continue trying to read
+     *         from it
+     */
+    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE",
+        justification = "Yeah, this is how it works")
+    protected boolean processEndOfFile() {
+      if (this.queue.size() != 0) {
+        if (LOG.isTraceEnabled()) {
+          String filesize = "N/A";
+          try {
+            FileStatus stat = fs.getFileStatus(this.currentPath);
+            filesize = stat.getLen() + "";
+          } catch (IOException ex) {
+          }
+          LOG.trace("Reached the end of log " + this.currentPath + ", stats: " + getStats()
+              + ", and the length of the file is " + filesize);
+        }
+        this.currentPath = null;
+        this.repLogReader.finishCurrentFile();
+        this.reader = null;
+        return true;
+      } else if (this.replicationQueueInfo.isQueueRecovered()) {
+        LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
+            + peerClusterZnode);
+        workerRunning = false;
+        return true;
+      }
+      return false;
+    }
+
+    public void startup() {
+      String n = Thread.currentThread().getName();
+      Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
+        @Override
+        public void uncaughtException(final Thread t, final Throwable e) {
+          LOG.error("Unexpected exception in ReplicationSourceWorkerThread," + " currentPath="
+              + getCurrentPath(), e);
+        }
+      };
+      Threads.setDaemonThreadRunning(this, n + ".replicationSource." + walGroupId + ","
+          + peerClusterZnode, handler);
+      workerThreads.put(walGroupId, this);
+    }
+
+    public Path getCurrentPath() {
+      return this.currentPath;
+    }
+
+    public long getCurrentPosition() {
+      return this.repLogReader.getPosition();
+    }
+
+    private boolean isWorkerActive() {
+      return !stopper.isStopped() && workerRunning && !isInterrupted();
+    }
+
+    private void terminate(String reason, Exception cause) {
+      if (cause == null) {
+        LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason);
+
+      } else {
+        LOG.error("Closing worker for wal group " + this.walGroupId
+            + " because an error occurred: " + reason, cause);
+      }
+      this.interrupt();
+      Threads.shutdown(this, sleepForRetries);
+      LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
+    }
+
+    public void setWorkerRunning(boolean workerRunning) {
+      this.workerRunning = workerRunning;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/242adad5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 0c8f6f9..a8cffba 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -23,9 +23,12 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeSet;
@@ -56,6 +59,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
@@ -91,13 +95,14 @@ public class ReplicationSourceManager implements ReplicationListener {
   // All about stopping
   private final Server server;
   // All logs we are currently tracking
-  private final Map<String, SortedSet<String>> walsById;
+  // Index structure of the map is: peer_id->logPrefix/logGroup->logs
+  private final Map<String, Map<String, SortedSet<String>>> walsById;
   // Logs for recovered sources we are currently tracking
-  private final Map<String, SortedSet<String>> walsByIdRecoveredQueues;
+  private final Map<String, Map<String, SortedSet<String>>> walsByIdRecoveredQueues;
   private final Configuration conf;
   private final FileSystem fs;
-  // The path to the latest log we saw, for new coming sources
-  private Path latestPath;
+  // The paths to the latest log of each wal group, for new coming peers
+  private Set<Path> latestPaths;
   // Path to the wals directories
   private final Path logDir;
   // Path to the wal archive
@@ -133,8 +138,8 @@ public class ReplicationSourceManager implements ReplicationListener {
     this.replicationPeers = replicationPeers;
     this.replicationTracker = replicationTracker;
     this.server = server;
-    this.walsById = new HashMap<String, SortedSet<String>>();
-    this.walsByIdRecoveredQueues = new ConcurrentHashMap<String, SortedSet<String>>();
+    this.walsById = new HashMap<String, Map<String, SortedSet<String>>>();
+    this.walsByIdRecoveredQueues = new ConcurrentHashMap<String, Map<String, SortedSet<String>>>();
     this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
     this.conf = conf;
     this.fs = fs;
@@ -158,6 +163,7 @@ public class ReplicationSourceManager implements ReplicationListener {
     tfb.setDaemon(true);
     this.executor.setThreadFactory(tfb.build());
     this.rand = new Random();
+    this.latestPaths = Collections.synchronizedSet(new HashSet<Path>());
   }
 
   /**
@@ -189,15 +195,16 @@ public class ReplicationSourceManager implements ReplicationListener {
    * @param queueRecovered Whether this is a recovered queue
    */
   public void cleanOldLogs(String key, String id, boolean queueRecovered) {
+    String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(key);
     if (queueRecovered) {
-      SortedSet<String> wals = walsByIdRecoveredQueues.get(id);
+      SortedSet<String> wals = walsByIdRecoveredQueues.get(id).get(logPrefix);
       if (wals != null && !wals.first().equals(key)) {
         cleanOldLogs(wals, key, id);
       }
     } else {
       synchronized (this.walsById) {
-        SortedSet<String> wals = walsById.get(id);
-        if (!wals.first().equals(key)) {
+        SortedSet<String> wals = walsById.get(id).get(logPrefix);
+        if (wals != null && !wals.first().equals(key)) {
           cleanOldLogs(wals, key, id);
         }
       }
@@ -238,36 +245,44 @@ public class ReplicationSourceManager implements ReplicationListener {
   }
 
   /**
-   * Add a new normal source to this region server
+   * Add sources for the given peer cluster on this region server. For the newly added peer, we only
+   * need to enqueue the latest log of each wal group and do replication
    * @param id the id of the peer cluster
    * @return the source that was created
    * @throws IOException
    */
   protected ReplicationSourceInterface addSource(String id) throws IOException,
       ReplicationException {
-    ReplicationPeerConfig peerConfig
-      = replicationPeers.getReplicationPeerConfig(id);
+    ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id);
     ReplicationPeer peer = replicationPeers.getPeer(id);
     ReplicationSourceInterface src =
         getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
           this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
     synchronized (this.walsById) {
       this.sources.add(src);
-      this.walsById.put(id, new TreeSet<String>());
+      Map<String, SortedSet<String>> walsByGroup = new HashMap<String, SortedSet<String>>();
+      this.walsById.put(id, walsByGroup);
       // Add the latest wal to that source's queue
-      if (this.latestPath != null) {
-        String name = this.latestPath.getName();
-        this.walsById.get(id).add(name);
-        try {
-          this.replicationQueues.addLog(src.getPeerClusterZnode(), name);
-        } catch (ReplicationException e) {
-          String message =
-              "Cannot add log to queue when creating a new source, queueId="
-                  + src.getPeerClusterZnode() + ", filename=" + name;
-          server.stop(message);
-          throw e;
+      synchronized (latestPaths) {
+        if (this.latestPaths.size() > 0) {
+          for (Path logPath : latestPaths) {
+            String name = logPath.getName();
+            String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(name);
+            SortedSet<String> logs = new TreeSet<String>();
+            logs.add(name);
+            walsByGroup.put(walPrefix, logs);
+            try {
+              this.replicationQueues.addLog(id, name);
+            } catch (ReplicationException e) {
+              String message =
+                  "Cannot add log to queue when creating a new source, queueId=" + id
+                      + ", filename=" + name;
+              server.stop(message);
+              throw e;
+            }
+            src.enqueueLog(logPath);
+          }
         }
-        src.enqueueLog(this.latestPath);
       }
     }
     src.startup();
@@ -302,7 +317,7 @@ public class ReplicationSourceManager implements ReplicationListener {
    * Get a copy of the wals of the first source on this rs
    * @return a sorted set of wal names
    */
-  protected Map<String, SortedSet<String>> getWALs() {
+  protected Map<String, Map<String, SortedSet<String>>> getWALs() {
     return Collections.unmodifiableMap(walsById);
   }
 
@@ -310,7 +325,7 @@ public class ReplicationSourceManager implements ReplicationListener {
    * Get a copy of the wals of the recovered sources on this rs
    * @return a sorted set of wal names
    */
-  protected Map<String, SortedSet<String>> getWalsByIdRecoveredQueues() {
+  protected Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() {
     return Collections.unmodifiableMap(walsByIdRecoveredQueues);
   }
 
@@ -331,27 +346,70 @@ public class ReplicationSourceManager implements ReplicationListener {
   }
 
   void preLogRoll(Path newLog) throws IOException {
-    synchronized (this.walsById) {
-      String name = newLog.getName();
-      for (ReplicationSourceInterface source : this.sources) {
+    recordLog(newLog);
+    String logName = newLog.getName();
+    String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(logName);
+    synchronized (latestPaths) {
+      Iterator<Path> iterator = latestPaths.iterator();
+      while (iterator.hasNext()) {
+        Path path = iterator.next();
+        if (path.getName().contains(logPrefix)) {
+          iterator.remove();
+          break;
+        }
+      }
+      this.latestPaths.add(newLog);
+    }
+  }
+
+  /**
+   * Check and enqueue the given log to the correct source. If there's still no source for the
+   * group to which the given log belongs, create one
+   * @param logPath the log path to check and enqueue
+   * @throws IOException
+   */
+  private void recordLog(Path logPath) throws IOException {
+    String logName = logPath.getName();
+    String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(logName);
+    // update replication queues on ZK
+    synchronized (replicationPeers) {// synchronize on replicationPeers to avoid adding source for
+                                     // the to-be-removed peer
+      for (String id : replicationPeers.getPeerIds()) {
         try {
-          this.replicationQueues.addLog(source.getPeerClusterZnode(), name);
+          this.replicationQueues.addLog(id, logName);
         } catch (ReplicationException e) {
-          throw new IOException("Cannot add log to replication queue with id="
-              + source.getPeerClusterZnode() + ", filename=" + name, e);
+          throw new IOException("Cannot add log to replication queue"
+              + " when creating a new source, queueId=" + id + ", filename=" + logName, e);
         }
       }
-      for (SortedSet<String> wals : this.walsById.values()) {
-        if (this.sources.isEmpty()) {
-          // If there's no slaves, don't need to keep the old wals since
-          // we only consider the last one when a new slave comes in
-          wals.clear();
+    }
+    // update walsById map
+    synchronized (walsById) {
+      for (Map.Entry<String, Map<String, SortedSet<String>>> entry : this.walsById.entrySet()) {
+        String peerId = entry.getKey();
+        Map<String, SortedSet<String>> walsByPrefix = entry.getValue();
+        boolean existingPrefix = false;
+        for (Map.Entry<String, SortedSet<String>> walsEntry : walsByPrefix.entrySet()) {
+          SortedSet<String> wals = walsEntry.getValue();
+          if (this.sources.isEmpty()) {
+            // If there's no slaves, don't need to keep the old wals since
+            // we only consider the last one when a new slave comes in
+            wals.clear();
+          }
+          if (logPrefix.equals(walsEntry.getKey())) {
+            wals.add(logName);
+            existingPrefix = true;
+          }
+        }
+        if (!existingPrefix) {
+          // The new log belongs to a new group, add it into this peer
+          LOG.debug("Start tracking logs for wal group " + logPrefix + " for peer " + peerId);
+          SortedSet<String> wals = new TreeSet<String>();
+          wals.add(logName);
+          walsByPrefix.put(logPrefix, wals);
         }
-        wals.add(name);
       }
     }
-
-    this.latestPath = newLog;
   }
 
   void postLogRoll(Path newLog) throws IOException {
@@ -376,7 +434,7 @@ public class ReplicationSourceManager implements ReplicationListener {
       final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
       final Server server, final String peerId, final UUID clusterId,
       final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
-          throws IOException {
+      throws IOException {
     RegionServerCoprocessorHost rsServerHost = null;
     TableDescriptors tableDescriptors = null;
     if (server instanceof HRegionServer) {
@@ -414,7 +472,8 @@ public class ReplicationSourceManager implements ReplicationListener {
         }
       }
     } catch (Exception e) {
-      LOG.warn("Passed replication endpoint implementation throws errors", e);
+      LOG.warn("Passed replication endpoint implementation throws errors"
+          + " while initializing ReplicationSource for peer: " + peerId, e);
       throw new IOException(e);
     }
 
@@ -470,7 +529,7 @@ public class ReplicationSourceManager implements ReplicationListener {
         + sources.size() + " and another "
         + oldsources.size() + " that were recovered");
     String terminateMessage = "Replication stream was removed by a user";
-    ReplicationSourceInterface srcToRemove = null;
+    List<ReplicationSourceInterface> srcToRemove = new ArrayList<ReplicationSourceInterface>();
     List<ReplicationSourceInterface> oldSourcesToDelete =
         new ArrayList<ReplicationSourceInterface>();
     // First close all the recovered sources for this peer
@@ -486,19 +545,23 @@ public class ReplicationSourceManager implements ReplicationListener {
     LOG.info("Number of deleted recovered sources for " + id + ": "
         + oldSourcesToDelete.size());
     // Now look for the one on this cluster
-    for (ReplicationSourceInterface src : this.sources) {
-      if (id.equals(src.getPeerClusterId())) {
-        srcToRemove = src;
-        break;
+    synchronized (this.replicationPeers) {// synchronize on replicationPeers to avoid adding source
+                                          // for the to-be-removed peer
+      for (ReplicationSourceInterface src : this.sources) {
+        if (id.equals(src.getPeerClusterId())) {
+          srcToRemove.add(src);
+        }
       }
+      if (srcToRemove.size() == 0) {
+        LOG.error("The queue we wanted to close is missing " + id);
+        return;
+      }
+      for (ReplicationSourceInterface toRemove : srcToRemove) {
+        toRemove.terminate(terminateMessage);
+        this.sources.remove(toRemove);
+      }
+      deleteSource(id, true);
     }
-    if (srcToRemove == null) {
-      LOG.error("The queue we wanted to close is missing " + id);
-      return;
-    }
-    srcToRemove.terminate(terminateMessage);
-    this.sources.remove(srcToRemove);
-    deleteSource(id, true);
   }
 
   @Override
@@ -580,6 +643,7 @@ public class ReplicationSourceManager implements ReplicationListener {
 
       for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
         String peerId = entry.getKey();
+        SortedSet<String> walsSet = entry.getValue();
         try {
           // there is not an actual peer defined corresponding to peerId for the failover.
           ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
@@ -596,7 +660,20 @@ public class ReplicationSourceManager implements ReplicationListener {
             LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
             continue;
           }
+          // track sources in walsByIdRecoveredQueues
+          Map<String, SortedSet<String>> walsByGroup = new HashMap<String, SortedSet<String>>();
+          walsByIdRecoveredQueues.put(peerId, walsByGroup);
+          for (String wal : walsSet) {
+            String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(wal);
+            SortedSet<String> wals = walsByGroup.get(walPrefix);
+            if (wals == null) {
+              wals = new TreeSet<String>();
+              walsByGroup.put(walPrefix, wals);
+            }
+            wals.add(wal);
+          }
 
+          // enqueue sources
           ReplicationSourceInterface src =
               getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
                 server, peerId, this.clusterId, peerConfig, peer);
@@ -605,12 +682,10 @@ public class ReplicationSourceManager implements ReplicationListener {
             break;
           }
           oldsources.add(src);
-          SortedSet<String> walsSet = entry.getValue();
           for (String wal : walsSet) {
             src.enqueueLog(new Path(oldLogDir, wal));
           }
           src.startup();
-          walsByIdRecoveredQueues.put(peerId, walsSet);
         } catch (IOException e) {
           // TODO manage it
           LOG.error("Failed creating a source", e);

http://git-wip-us.apache.org/repos/asf/hbase/blob/242adad5/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
index 44a033b..661016d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
@@ -366,4 +366,15 @@ public class DefaultWALProvider implements WALProvider {
     }
   }
 
+  /**
+   * Get prefix of the log from its name, assuming WAL name in format of
+   * log_prefix.filenumber.log_suffix @see {@link FSHLog#getCurrentFileName()}
+   * @param name Name of the WAL to parse
+   * @return prefix of the log
+   */
+  public static String getWALPrefixFromWALName(String name) {
+    int endIndex = name.replaceAll(META_WAL_PROVIDER_ID, "").lastIndexOf(".");
+    return name.substring(0, endIndex);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/242adad5/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index 4ef320a..e44a4d1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -84,7 +84,7 @@ public class WALFactory {
     }
   }
 
-  static final String WAL_PROVIDER = "hbase.wal.provider";
+  public static final String WAL_PROVIDER = "hbase.wal.provider";
   static final String DEFAULT_WAL_PROVIDER = Providers.defaultProvider.name();
 
   static final String META_WAL_PROVIDER = "hbase.wal.meta_provider";