You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bh...@apache.org on 2021/02/22 23:01:23 UTC

[hbase] branch branch-1 updated: [HBASE-25539] Add age of oldest wal metric (#2963)

This is an automated email from the ASF dual-hosted git repository.

bharathv pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new 9e9301a  [HBASE-25539] Add age of oldest wal metric (#2963)
9e9301a is described below

commit 9e9301a242e492a344c702da183bf419f03a7083
Author: shahrs87 <sh...@gmail.com>
AuthorDate: Mon Feb 22 15:00:41 2021 -0800

    [HBASE-25539] Add age of oldest wal metric (#2963)
    
    Signed-off-by: Bharath Vissapragada <bh...@apache.org>
---
 .../MetricsReplicationSourceSource.java            |   5 +-
 .../MetricsReplicationGlobalSourceSource.java      |  12 ++
 .../MetricsReplicationSourceSourceImpl.java        |  14 ++
 .../replication/regionserver/MetricsSource.java    |  11 ++
 .../regionserver/ReplicationSource.java            |  58 +++----
 .../regionserver/ReplicationSourceLogQueue.java    | 184 +++++++++++++++++++++
 .../ReplicationSourceWALReaderThread.java          |  26 +--
 .../replication/regionserver/WALEntryStream.java   |  25 +--
 .../hbase/replication/TestReplicationSource.java   |  55 +++++-
 .../TestReplicationSourceLogQueue.java             |  73 ++++++++
 .../regionserver/TestWALEntryStream.java           | 121 ++++++++------
 11 files changed, 477 insertions(+), 107 deletions(-)

diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index 25d72af..a7cea25 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -50,6 +50,8 @@ public interface MetricsReplicationSourceSource extends BaseSource {
   public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs";
   public static final String SOURCE_COMPLETED_RECOVERY_QUEUES = "source.completedRecoverQueues";
   public static final String SOURCE_FAILED_RECOVERY_QUEUES = "source.failedRecoverQueues";
+  /* Used to track the age of oldest wal in ms since its creation time */
+  String OLDEST_WAL_AGE = "source.oldestWalAge";
 
   void setLastShippedAge(long age);
   void incrSizeOfLogQueue(int size);
@@ -74,5 +76,6 @@ public interface MetricsReplicationSourceSource extends BaseSource {
   void incrCompletedWAL();
   void incrCompletedRecoveryQueue();
   void incrFailedRecoveryQueue();
-
+  void setOldestWalAge(long age);
+  long getOldestWalAge();
 }
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index 64585fa..df774d3 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -196,6 +196,18 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
   public void incrFailedRecoveryQueue() {
     failedRecoveryQueue.incr(1L);
   }
+
+  @Override
+  public void setOldestWalAge(long age) {
+    // Not implemented
+  }
+
+  @Override
+  public long getOldestWalAge() {
+    // Not implemented
+    return 0;
+  }
+
   @Override
   public void init() {
     rms.init();
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index 0078a97..c593950 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -40,6 +40,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   private final String logReadInBytesKey;
   private final String shippedHFilesKey;
   private final String sizeOfHFileRefsQueueKey;
+  private final String oldestWalAgeKey;
 
   private final MutableHistogram ageOfLastShippedOpHist;
   private final MutableGaugeLong sizeOfLogQueueGauge;
@@ -67,6 +68,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   private final MutableFastCounter repeatedFileBytes;
   private final MutableFastCounter completedWAL;
   private final MutableFastCounter completedRecoveryQueue;
+  private final MutableGaugeLong oldestWalAge;
 
   public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) {
     this.rms = rms;
@@ -126,6 +128,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
 
     completedRecoveryKey = this.keyPrefix + "completedRecoverQueues";
     completedRecoveryQueue = rms.getMetricsRegistry().getCounter(completedRecoveryKey, 0L);
+
+    oldestWalAgeKey = this.keyPrefix + "oldestWalAge";
+    oldestWalAge = rms.getMetricsRegistry().getGauge(oldestWalAgeKey, 0L);
   }
 
   @Override public void setLastShippedAge(long age) {
@@ -191,6 +196,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
     rms.removeMetric(repeatedBytesKey);
     rms.removeMetric(completedLogsKey);
     rms.removeMetric(completedRecoveryKey);
+    rms.removeMetric(oldestWalAgeKey);
   }
 
   @Override
@@ -256,6 +262,14 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   @Override
   public void incrFailedRecoveryQueue() {/*no op*/}
 
+  @Override public void setOldestWalAge(long age) {
+    oldestWalAge.set(age);
+  }
+
+  @Override public long getOldestWalAge() {
+    return oldestWalAge.value();
+  }
+
   @Override
   public void init() {
     rms.init();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 5dd17e7..83bc653 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -395,6 +395,17 @@ public class MetricsSource implements BaseSource {
     globalSourceSource.updateHistogram(name, value);
   }
 
+  /*
+   Sets the age of oldest log file just for source.
+  */
+  public void setOldestWalAge(long age) {
+    singleSourceSource.setOldestWalAge(age);
+  }
+
+  public long getOldestWalAge() {
+    return singleSourceSource.getOldestWalAge();
+  }
+
   @Override
   public String getMetricsContext() {
     return globalSourceSource.getMetricsContext();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 7be880d..a58289e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -26,7 +26,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -90,10 +89,7 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
 public class ReplicationSource extends Thread implements ReplicationSourceInterface {
 
   private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
-  // Queues of logs to process, entry in format of walGroupId->queue,
-  // each presents a queue for one wal group
-  private Map<String, PriorityBlockingQueue<Path>> queues =
-      new HashMap<String, PriorityBlockingQueue<Path>>();
+  protected ReplicationSourceLogQueue logQueue;
   // per group queue size, keep no more than this number of logs in each wal group
   private int queueSizePerGroup;
   private ReplicationQueues replicationQueues;
@@ -126,8 +122,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
   private volatile boolean sourceRunning = false;
   // Metrics for this source
   private MetricsSource metrics;
-  //WARN threshold for the number of queued logs, defaults to 2
-  private int logQueueWarnThreshold;
   // ReplicationEndpoint which will handle the actual replication
   private ReplicationEndpoint replicationEndpoint;
   // A filter (or a chain of filters) for the WAL entries.
@@ -176,6 +170,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
     this.maxRetriesMultiplier =
         this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
     this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
+    this.logQueue = new ReplicationSourceLogQueue(conf, metrics);
     this.replicationQueues = replicationQueues;
     this.replicationPeers = replicationPeers;
     this.manager = manager;
@@ -187,7 +182,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
     this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
     // ReplicationQueueInfo parses the peerId out of the znode for us
     this.peerId = this.replicationQueueInfo.getPeerId();
-    this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
     this.replicationEndpoint = replicationEndpoint;
 
     defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
@@ -208,16 +202,14 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
   @Override
   public void enqueueLog(Path log) {
     String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(log.getName());
-    PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
-    if (queue == null) {
-      queue = new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
-      queues.put(logPrefix, queue);
+    boolean queueExists = logQueue.enqueueLog(log, logPrefix);
+    if (!queueExists) {
       if (this.sourceRunning) {
         // new wal group observed after source startup, start a new worker thread to track it
         // notice: it's possible that log enqueued when this.running is set but worker thread
         // still not launched, so it's necessary to check workerThreads before start the worker
         final ReplicationSourceShipperThread worker =
-            new ReplicationSourceShipperThread(logPrefix, queue, replicationQueueInfo, this);
+            new ReplicationSourceShipperThread(logPrefix, logQueue, replicationQueueInfo, this);
         ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(logPrefix, worker);
         if (extant != null) {
           LOG.debug("Someone has beat us to start a worker thread for wal group " + logPrefix);
@@ -227,14 +219,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
         }
       }
     }
-    queue.put(log);
-    this.metrics.incrSizeOfLogQueue();
-    // This will log a warning for each new log that gets created above the warn threshold
-    int queueSize = queue.size();
-    if (queueSize > this.logQueueWarnThreshold) {
-      LOG.warn("WAL group " + logPrefix + " queue size: " + queueSize
-          + " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
-    }
   }
 
   @Override
@@ -326,11 +310,11 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
     }
     LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
     // start workers
-    for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
+    for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : logQueue.getQueues().entrySet()) {
       String walGroupId = entry.getKey();
       PriorityBlockingQueue<Path> queue = entry.getValue();
       final ReplicationSourceShipperThread worker =
-          new ReplicationSourceShipperThread(walGroupId, queue, replicationQueueInfo, this);
+          new ReplicationSourceShipperThread(walGroupId, logQueue, replicationQueueInfo, this);
       ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(walGroupId, worker);
       if (extant != null) {
         LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
@@ -483,7 +467,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
      * @param p path to split
      * @return start time
      */
-    private static long getTS(Path p) {
+    public static long getTS(Path p) {
       int tsIndex = p.getName().lastIndexOf('.') + 1;
       return Long.parseLong(p.getName().substring(tsIndex));
     }
@@ -530,7 +514,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
       String walGroupId = worker.getWalGroupId();
       lastTimeStamp = metrics.getLastTimeStampOfWalGroup(walGroupId);
       ageOfLastShippedOp = metrics.getAgeOfLastShippedOp(walGroupId);
-      int queueSize = queues.get(walGroupId).size();
+      int queueSize = logQueue.getQueueSize(walGroupId);
       replicationDelay =
           ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize);
       Path currentPath = worker.getLastLoggedPath();
@@ -566,7 +550,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
   public class ReplicationSourceShipperThread extends Thread {
     ReplicationSourceInterface source;
     String walGroupId;
-    PriorityBlockingQueue<Path> queue;
+    ReplicationSourceLogQueue logQueue;
     ReplicationQueueInfo replicationQueueInfo;
     // Last position in the log that we sent to ZooKeeper
     private long lastLoggedPosition = -1;
@@ -577,10 +561,10 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
     ReplicationSourceWALReaderThread entryReader;
 
     public ReplicationSourceShipperThread(String walGroupId,
-        PriorityBlockingQueue<Path> queue, ReplicationQueueInfo replicationQueueInfo,
+        ReplicationSourceLogQueue logQueue, ReplicationQueueInfo replicationQueueInfo,
         ReplicationSourceInterface source) {
       this.walGroupId = walGroupId;
-      this.queue = queue;
+      this.logQueue = logQueue;
       this.replicationQueueInfo = replicationQueueInfo;
       this.source = source;
     }
@@ -842,11 +826,11 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
     // normally has a position (unless the RS failed between 2 logs)
     private long getRecoveredQueueStartPos(long startPosition) {
       try {
-        startPosition =
-            (replicationQueues.getLogPosition(peerClusterZnode, this.queue.peek().getName()));
+        startPosition = (replicationQueues.getLogPosition(peerClusterZnode,
+              this.logQueue.getQueue(walGroupId).peek().getName()));
         if (LOG.isTraceEnabled()) {
-          LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
-              + startPosition);
+          LOG.trace("Recovered queue started with log " +
+              this.logQueue.getQueue(walGroupId).peek() + " at position " + startPosition);
         }
       } catch (ReplicationException e) {
         terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
@@ -860,8 +844,9 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
       ArrayList<WALEntryFilter> filters = Lists.newArrayList(walEntryFilter,
         new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
       ChainWALEntryFilter readerFilter = new ChainWALEntryFilter(filters);
-      entryReader = new ReplicationSourceWALReaderThread(manager, replicationQueueInfo, queue,
-          startPosition, fs, conf, readerFilter, metrics, ReplicationSource.this);
+      entryReader = new ReplicationSourceWALReaderThread(manager, replicationQueueInfo, logQueue,
+          startPosition, fs, conf, readerFilter, metrics, ReplicationSource.this,
+          this.walGroupId);
       Threads.setDaemonThreadRunning(entryReader, threadName
           + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode,
         handler);
@@ -873,6 +858,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
       boolean hasPathChanged = false;
       PriorityBlockingQueue<Path> newPaths =
           new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
+      PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
       pathsLoop: for (Path path : queue) {
         if (fs.exists(path)) { // still in same location, don't need to do anything
           newPaths.add(path);
@@ -922,9 +908,9 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
         // put the correct locations in the queue
         // since this is a recovered queue with no new incoming logs,
         // there shouldn't be any concurrency issues
-        queue.clear();
+        logQueue.clear(walGroupId);
         for (Path path : newPaths) {
-          queue.add(path);
+          logQueue.enqueueLog(path, walGroupId);
         }
       }
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceLogQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceLogQueue.java
new file mode 100644
index 0000000..1cc8cb9
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceLogQueue.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.PriorityBlockingQueue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+  Class that does enqueueing/dequeuing of wal at one place so that we can update the metrics
+  just at one place.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ReplicationSourceLogQueue {
+  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class);
+  // Queues of logs to process, entry in format of walGroupId->queue,
+  // each presents a queue for one wal group
+  private Map<String, PriorityBlockingQueue<Path>> queues = new ConcurrentHashMap<>();
+  private MetricsSource metrics;
+  private Configuration conf;
+  // per group queue size, keep no more than this number of logs in each wal group
+  private int queueSizePerGroup;
+  // WARN threshold for the number of queued logs, defaults to 2
+  private int logQueueWarnThreshold;
+
+  public ReplicationSourceLogQueue(Configuration conf, MetricsSource metrics) {
+    this.conf = conf;
+    this.metrics = metrics;
+    this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
+    this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
+  }
+
+  /**
+   * Enqueue the wal
+   * @param wal wal to be enqueued
+   * @param walGroupId Key for the wal in @queues map
+   * @return boolean whether this is the first time we are seeing this walGroupId.
+   */
+  public boolean enqueueLog(Path wal, String walGroupId) {
+    boolean exists = false;
+    PriorityBlockingQueue<Path> queue = queues.get(walGroupId);
+    if (queue == null) {
+      queue = new PriorityBlockingQueue<>(queueSizePerGroup,
+        new ReplicationSource.LogsComparator());
+      // make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise
+      // the shipper may quit immediately
+      queue.put(wal);
+      queues.put(walGroupId, queue);
+    } else {
+      exists = true;
+      queue.put(wal);
+    }
+    // Increment size of logQueue
+    this.metrics.incrSizeOfLogQueue();
+    // Compute oldest wal age
+    this.metrics.setOldestWalAge(getOldestWalAge());
+    // This will wal a warning for each new wal that gets created above the warn threshold
+    int queueSize = queue.size();
+    if (queueSize > this.logQueueWarnThreshold) {
+      LOG.warn("WAL group " + walGroupId + " queue size: " + queueSize
+        + " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
+    }
+    return exists;
+  }
+
+  /**
+   * Get the queue size for the given walGroupId.
+   * @param walGroupId walGroupId
+   */
+  public int getQueueSize(String walGroupId) {
+    Queue queue = queues.get(walGroupId);
+    if (queue == null) {
+      return 0;
+    }
+    return queue.size();
+  }
+
+  /**
+   * Returns number of queues.
+   */
+  public int getNumQueues() {
+    return queues.size();
+  }
+
+  public Map<String, PriorityBlockingQueue<Path>> getQueues() {
+    return queues;
+  }
+
+  /**
+   * Return queue for the given walGroupId
+   * Please don't add or remove elements from the returned queue.
+   * Use @enqueueLog and @remove methods respectively.
+   * @param walGroupId walGroupId
+   */
+  public PriorityBlockingQueue<Path> getQueue(String walGroupId) {
+    return queues.get(walGroupId);
+  }
+
+  /**
+   * Remove head from the queue corresponding to given walGroupId.
+   * @param walGroupId walGroupId
+   */
+  public void remove(String walGroupId) {
+    PriorityBlockingQueue<Path> queue = getQueue(walGroupId);
+    if (queue == null || queue.isEmpty()) {
+      return;
+    }
+    queue.remove();
+    // Decrease size logQueue.
+    this.metrics.decrSizeOfLogQueue();
+    // Re-compute age of oldest wal metric.
+    this.metrics.setOldestWalAge(getOldestWalAge());
+  }
+
+  /**
+   * Remove all the elements from the queue corresponding to walGroupId
+   * @param walGroupId walGroupId
+   */
+  public void clear(String walGroupId) {
+    PriorityBlockingQueue<Path> queue = getQueue(walGroupId);
+    while (!queue.isEmpty()) {
+      // Need to iterate since metrics#decrSizeOfLogQueue decrements just by 1.
+      queue.remove();
+      metrics.decrSizeOfLogQueue();
+    }
+    this.metrics.setOldestWalAge(getOldestWalAge());
+  }
+
+  /*
+    Returns the age of oldest wal.
+   */
+  long getOldestWalAge() {
+    long now = EnvironmentEdgeManager.currentTime();
+    long timestamp = getOldestWalTimestamp();
+    if (timestamp == Long.MAX_VALUE) {
+      // If there are no wals in the queue then set the oldest wal timestamp to current time
+      // so that the oldest wal age will be 0.
+      timestamp = now;
+    }
+    long age = now - timestamp;
+    return age;
+  }
+
+  /*
+  Get the oldest wal timestamp from all the queues.
+  */
+  private long getOldestWalTimestamp() {
+    long oldestWalTimestamp = Long.MAX_VALUE;
+    for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
+      PriorityBlockingQueue<Path> queue = entry.getValue();
+      Path path = queue.peek();
+      // Can path ever be null ?
+      if (path != null) {
+        oldestWalTimestamp = Math.min(oldestWalTimestamp,
+          ReplicationSource.LogsComparator.getTS(path));
+      }
+    }
+    return oldestWalTimestamp;
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
index b67ff53..bd155d5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
@@ -58,7 +58,7 @@ import org.apache.hadoop.hbase.wal.WALKey;
 public class ReplicationSourceWALReaderThread extends Thread {
   private static final Log LOG = LogFactory.getLog(ReplicationSourceWALReaderThread.class);
 
-  private PriorityBlockingQueue<Path> logQueue;
+  private ReplicationSourceLogQueue logQueue;
   private FileSystem fs;
   private Configuration conf;
   private BlockingQueue<WALEntryBatch> entryBatchQueue;
@@ -79,6 +79,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
 
   private AtomicLong totalBufferUsed;
   private long totalBufferQuota;
+  private final String walGroupId;
 
   private ReplicationSource source;
   private ReplicationSourceManager manager;
@@ -96,12 +97,13 @@ public class ReplicationSourceWALReaderThread extends Thread {
    * @param metrics replication metrics
    */
   public ReplicationSourceWALReaderThread(ReplicationSourceManager manager,
-      ReplicationQueueInfo replicationQueueInfo, PriorityBlockingQueue<Path> logQueue,
+      ReplicationQueueInfo replicationQueueInfo, ReplicationSourceLogQueue logQueue,
       long startPosition, FileSystem fs, Configuration conf, WALEntryFilter filter,
-      MetricsSource metrics, ReplicationSource source) {
+      MetricsSource metrics, ReplicationSource source, String walGroupId) {
     this.replicationQueueInfo = replicationQueueInfo;
     this.logQueue = logQueue;
-    this.lastReadPath = logQueue.peek();
+    this.walGroupId = walGroupId;
+    this.lastReadPath = logQueue.getQueue(walGroupId).peek();
     this.lastReadPosition = startPosition;
     this.fs = fs;
     this.conf = conf;
@@ -135,7 +137,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
     int sleepMultiplier = 1;
     while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
       try (WALEntryStream entryStream =
-          new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics)) {
+          new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics, walGroupId)) {
         while (isReaderRunning()) { // loop here to keep reusing stream while we can
           if (!source.isPeerEnabled()) {
             Threads.sleep(sleepForRetries);
@@ -232,24 +234,26 @@ public class ReplicationSourceWALReaderThread extends Thread {
   // enabled, then dump the log
   private void handleEofException(Exception e) {
     boolean isRecoveredSource = manager.getOldSources().contains(source);
+    PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
     // Dump the log even if logQueue size is 1 if the source is from recovered Source since we don't
     // add current log to recovered source queue so it is safe to remove.
-    if (e.getCause() instanceof EOFException && (isRecoveredSource || logQueue.size() > 1)
+    if (e.getCause() instanceof EOFException && (isRecoveredSource || queue.size() > 1)
         && conf.getBoolean("replication.source.eof.autorecovery", false)) {
       try {
-        if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
-          LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());
-          lastReadPath = logQueue.remove();
+        if (fs.getFileStatus(queue.peek()).getLen() == 0) {
+          LOG.warn("Forcing removal of 0 length log in queue: " + queue.peek());
+          lastReadPath = queue.peek();
+          logQueue.remove(walGroupId);
           lastReadPosition = 0;
         }
       } catch (IOException ioe) {
-        LOG.warn("Couldn't get file length information about log " + logQueue.peek());
+        LOG.warn("Couldn't get file length information about log " + queue.peek());
       }
     }
   }
 
   public Path getCurrentPath() {
-    return logQueue.peek();
+    return logQueue.getQueue(walGroupId).peek();
   }
 
   //returns false if we've already exceeded the global quota
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index 7a44075..a0b09dd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -59,7 +59,8 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
   private Entry currentEntry;
   // position after reading current entry
   private long currentPosition = 0;
-  private PriorityBlockingQueue<Path> logQueue;
+  private final ReplicationSourceLogQueue logQueue;
+  private final String walGroupId;
   private FileSystem fs;
   private Configuration conf;
   private MetricsSource metrics;
@@ -70,12 +71,13 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
    * @param fs {@link FileSystem} to use to create {@link Reader} for this stream
    * @param conf {@link Configuration} to use to create {@link Reader} for this stream
    * @param metrics replication metrics
+   * @param walGroupId wal prefix
    * @throws IOException
    */
-  public WALEntryStream(PriorityBlockingQueue<Path> logQueue, FileSystem fs, Configuration conf,
-      MetricsSource metrics)
+  public WALEntryStream(ReplicationSourceLogQueue logQueue, FileSystem fs, Configuration conf,
+      MetricsSource metrics, String walGroupId)
       throws IOException {
-    this(logQueue, fs, conf, 0, metrics);
+    this(logQueue, fs, conf, 0, metrics, walGroupId);
   }
 
   /**
@@ -83,18 +85,18 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
    * @param logQueue the queue of WAL paths
    * @param conf the {@link Configuration} to use to create {@link Reader} for this stream
    * @param startPosition the position in the first WAL to start reading at
-   * @param walFileLengthProvider provides the length of the WAL file
-   * @param serverName the server name which all WALs belong to
    * @param metrics the replication metrics
+   * @param walGroupId wal prefix
    * @throws IOException
    */
-  public WALEntryStream(PriorityBlockingQueue<Path> logQueue, FileSystem fs, Configuration conf,
-      long startPosition, MetricsSource metrics) throws IOException {
+  public WALEntryStream(ReplicationSourceLogQueue logQueue, FileSystem fs, Configuration conf,
+      long startPosition, MetricsSource metrics, String walGroupId) throws IOException {
     this.logQueue = logQueue;
     this.fs = fs;
     this.conf = conf;
     this.currentPosition = startPosition;
     this.metrics = metrics;
+    this.walGroupId = walGroupId;
   }
 
   /**
@@ -198,7 +200,7 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
     if (checkReader()) {
       readNextEntryAndSetPosition();
       if (currentEntry == null) { // no more entries in this log file - see if log was rolled
-        if (logQueue.size() > 1) { // log was rolled
+        if (logQueue.getQueue(walGroupId).size() > 1) { // log was rolled
           // Before dequeueing, we should always get one more attempt at reading.
           // This is in case more entries came in after we opened the reader,
           // and a new log was enqueued while we were reading. See HBASE-6758
@@ -266,7 +268,7 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
       LOG.debug("Reached the end of log " + currentPath);
     }
     closeReader();
-    logQueue.remove();
+    logQueue.remove(walGroupId);
     setCurrentPath(null);
     setPosition(0);
     metrics.decrSizeOfLogQueue();
@@ -300,7 +302,8 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
 
   // open a reader on the next log in queue
   private boolean openNextLog() throws IOException {
-    Path nextPath = logQueue.peek();
+    PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
+    Path nextPath = queue.peek();
     if (nextPath != null) {
       openReader(nextPath);
       if (reader != null) return true;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
index f85a52b..ce185f4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
@@ -52,6 +52,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -69,6 +70,8 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceFactory;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
 import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
@@ -76,7 +79,9 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.HFileTestUtil;
+import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKey;
@@ -299,6 +304,15 @@ public class TestReplicationSource {
       return source;
     }
 
+    ReplicationSource createReplicationSourceWithMocks(MetricsSource metrics,
+            ReplicationEndpoint endpoint) throws IOException {
+      final ReplicationSource source = new ReplicationSource();
+      endpoint.init(context);
+      source.init(conf, FS, manager, queues, peers, mock(Stoppable.class),
+        "testPeerClusterZnode", UUID.randomUUID(), endpoint, metrics);
+      return source;
+    }
+
     public AtomicLong getTotalBufferUsed() {
       return totalBufferUsed;
     }
@@ -648,5 +662,44 @@ public class TestReplicationSource {
     }
   }
 
-}
+  /*
+  Test age of oldest wal metric.
+  */
+  @Test
+  public void testAgeOfOldestWal() throws Exception {
+    try {
+      ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge();
+      EnvironmentEdgeManager.injectEdge(manualEdge);
+
+      String id = "1";
+      MetricsSource metrics = new MetricsSource(id);
+      Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+      conf.setInt("replication.source.maxretriesmultiplier", 1);
+      Mocks mocks = new Mocks();
+      ReplicationEndpoint endpoint = mock(ReplicationEndpoint.class);
+      ReplicationSource source = mocks.createReplicationSourceWithMocks(metrics, endpoint);
+
+      final Path log1 = new Path(logDir, "log-walgroup-a.8");
+      manualEdge.setValue(10);
+      // Diff of current time (10) and  log-walgroup-a.8 timestamp will be 2.
+      source.enqueueLog(log1);
+      MetricsReplicationSourceSource metricsSource1 = getSourceMetrics(id);
+      assertEquals(2, metricsSource1.getOldestWalAge());
+
+      final Path log2 = new Path(logDir, "log-walgroup-b.4");
+      // Diff of current time (10) and log-walgroup-b.4 will be 6 so oldestWalAge should be 6
+      source.enqueueLog(log2);
+      assertEquals(6, metricsSource1.getOldestWalAge());
+      // Clear all metrics.
+      metrics.clear();
+    } finally {
+      EnvironmentEdgeManager.reset();
+    }
+  }
 
+  private MetricsReplicationSourceSource getSourceMetrics(String sourceId) {
+    MetricsReplicationSourceFactory factory = CompatibilitySingletonFactory
+      .getInstance(MetricsReplicationSourceFactory.class);
+    return factory.getSource(sourceId);
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java
new file mode 100644
index 0000000..57c97b8
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({SmallTests.class,ReplicationTests.class})
+public class TestReplicationSourceLogQueue {
+
+  /*
+    Testing enqueue and dequeuing of wal and check age of oldest wal.
+  */
+  @Test
+  public void testEnqueueDequeue() {
+    try {
+      String walGroupId1 = "fake-walgroup-id-1";
+      String walGroupId2 = "fake-walgroup-id-2";
+
+      ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge();
+      EnvironmentEdgeManager.injectEdge(manualEdge);
+
+      MetricsSource metrics = new MetricsSource("1");
+      Configuration conf = HBaseConfiguration.create();
+      ReplicationSourceLogQueue logQueue = new ReplicationSourceLogQueue(conf, metrics);
+      final Path log1 = new Path("log-walgroup-a.8");
+      manualEdge.setValue(10);
+      // Diff of current time (10) and  log-walgroup-a.8 timestamp will be 2.
+      logQueue.enqueueLog(log1, walGroupId1);
+      assertEquals(2, logQueue.getOldestWalAge());
+
+      final Path log2 = new Path("log-walgroup-b.4");
+      // Diff of current time (10) and log-walgroup-b.4 will be 6 so oldestWalAge should be 6
+      logQueue.enqueueLog(log2, walGroupId2);
+      assertEquals(6, logQueue.getOldestWalAge());
+
+      // Remove an element from walGroupId2.
+      // After this op, there will be only one element in the queue log-walgroup-a.8
+      logQueue.remove(walGroupId2);
+      assertEquals(2, logQueue.getOldestWalAge());
+
+      // Remove last element from the queue.
+      logQueue.remove(walGroupId1);
+      // This will test the case where there are no elements in the queue.
+      assertEquals(0, logQueue.getOldestWalAge());
+    } finally {
+      EnvironmentEdgeManager.reset();
+    }
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index eaf7e0a..adf427b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
@@ -75,6 +76,7 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALRead
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALFactory;
@@ -109,9 +111,10 @@ public class TestWALEntryStream {
       new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false);
   private static final HTableDescriptor htd = new HTableDescriptor(tableName);
   private static NavigableMap<byte[], Integer> scopes;
+  private final String fakeWalGroupId = "fake-wal-group-id";
 
   private WAL log;
-  PriorityBlockingQueue<Path> walQueue;
+  ReplicationSourceLogQueue logQueue;
   private PathWatcher pathWatcher;
 
   @Rule
@@ -139,7 +142,7 @@ public class TestWALEntryStream {
 
   @Before
   public void setUp() throws Exception {
-    walQueue = new PriorityBlockingQueue<>();
+    logQueue = new ReplicationSourceLogQueue(conf, new MetricsSource("2"));
     List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
     pathWatcher = new PathWatcher();
     listeners.add(pathWatcher);
@@ -174,7 +177,7 @@ public class TestWALEntryStream {
           log.rollWriter();
 
           try (WALEntryStream entryStream =
-              new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
+              new WALEntryStream(logQueue, fs, conf, new MetricsSource("1"), fakeWalGroupId)) {
             int i = 0;
             for (WAL.Entry e : entryStream) {
               assertNotNull(e);
@@ -202,7 +205,7 @@ public class TestWALEntryStream {
 
     long oldPos;
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
+        new WALEntryStream(logQueue, fs, conf, new MetricsSource("1"), fakeWalGroupId)) {
       // There's one edit in the log, read it. Reading past it needs to throw exception
       assertTrue(entryStream.hasNext());
       WAL.Entry entry = entryStream.next();
@@ -220,7 +223,7 @@ public class TestWALEntryStream {
     appendToLog();
 
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, oldPos, new MetricsSource("1"))) {
+        new WALEntryStream(logQueue, fs, conf, oldPos, new MetricsSource("1"), fakeWalGroupId)) {
       // Read the newly added entry, make sure we made progress
       WAL.Entry entry = entryStream.next();
       assertNotEquals(oldPos, entryStream.getPosition());
@@ -234,7 +237,7 @@ public class TestWALEntryStream {
     appendToLog();
 
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, oldPos, new MetricsSource("1"))) {
+        new WALEntryStream(logQueue, fs, conf, oldPos, new MetricsSource("1"), fakeWalGroupId)) {
       WAL.Entry entry = entryStream.next();
       assertNotEquals(oldPos, entryStream.getPosition());
       assertNotNull(entry);
@@ -259,7 +262,7 @@ public class TestWALEntryStream {
     appendToLog("1");
     appendToLog("2");// 2
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
+        new WALEntryStream(logQueue, fs, conf, new MetricsSource("1"), fakeWalGroupId)) {
       assertEquals("1", getRow(entryStream.next()));
 
       appendToLog("3"); // 3 - comes in after reader opened
@@ -267,12 +270,12 @@ public class TestWALEntryStream {
       appendToLog("4"); // 4 - this append is in the rolled log
 
       assertEquals("2", getRow(entryStream.next()));
-      assertEquals(2, walQueue.size()); // we should not have dequeued yet since there's still an
+      assertEquals(2, getQueue().size()); // we should not have dequeued yet since there's still an
                                         // entry in first log
       assertEquals("3", getRow(entryStream.next())); // if implemented improperly, this would be 4
                                                      // and 3 would be skipped
       assertEquals("4", getRow(entryStream.next())); // 4
-      assertEquals(1, walQueue.size()); // now we've dequeued and moved on to next log properly
+      assertEquals(1, getQueue().size()); // now we've dequeued and moved on to next log properly
       assertFalse(entryStream.hasNext());
     }
   }
@@ -284,7 +287,7 @@ public class TestWALEntryStream {
   public void testNewEntriesWhileStreaming() throws Exception {
     appendToLog("1");
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
+        new WALEntryStream(logQueue, fs, conf, 0, new MetricsSource("1"), fakeWalGroupId)) {
       entryStream.next(); // we've hit the end of the stream at this point
 
       // some new entries come in while we're streaming
@@ -307,7 +310,7 @@ public class TestWALEntryStream {
     long lastPosition = 0;
     appendToLog("1");
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
+        new WALEntryStream(logQueue, fs, conf, 0, new MetricsSource("1"), fakeWalGroupId)) {
       entryStream.next(); // we've hit the end of the stream at this point
       appendToLog("2");
       appendToLog("3");
@@ -315,11 +318,12 @@ public class TestWALEntryStream {
     }
     // next stream should picks up where we left off
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) {
+        new WALEntryStream(logQueue, fs, conf, lastPosition, new MetricsSource("1"),
+        fakeWalGroupId)) {
       assertEquals("2", getRow(entryStream.next()));
       assertEquals("3", getRow(entryStream.next()));
       assertFalse(entryStream.hasNext()); // done
-      assertEquals(1, walQueue.size());
+      assertEquals(1, getQueue().size());
     }
   }
 
@@ -333,13 +337,15 @@ public class TestWALEntryStream {
     appendEntriesToLog(3);
     // read only one element
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) {
+        new WALEntryStream(logQueue, fs, conf, lastPosition, new MetricsSource("1"),
+        fakeWalGroupId)) {
       entryStream.next();
       lastPosition = entryStream.getPosition();
     }
     // there should still be two more entries from where we left off
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) {
+        new WALEntryStream(logQueue, fs, conf, lastPosition, new MetricsSource("1"),
+        fakeWalGroupId)) {
       assertNotNull(entryStream.next());
       assertNotNull(entryStream.next());
       assertFalse(entryStream.hasNext());
@@ -350,7 +356,7 @@ public class TestWALEntryStream {
   @Test
   public void testEmptyStream() throws Exception {
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
+        new WALEntryStream(logQueue, fs, conf, 0, new MetricsSource("1"), fakeWalGroupId)) {
       assertFalse(entryStream.hasNext());
     }
   }
@@ -361,7 +367,7 @@ public class TestWALEntryStream {
     // get ending position
     long position;
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
+        new WALEntryStream(logQueue, fs, conf, new MetricsSource("1"), fakeWalGroupId)) {
       entryStream.next();
       entryStream.next();
       entryStream.next();
@@ -374,9 +380,9 @@ public class TestWALEntryStream {
     when(source.isPeerEnabled()).thenReturn(true);
     when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
     ReplicationSourceWALReaderThread batcher =
-            new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(),walQueue, 0,
-                    fs, conf, getDummyFilter(), new MetricsSource("1"), source);
-    Path walPath = walQueue.peek();
+            new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(),logQueue, 0,
+                    fs, conf, getDummyFilter(), new MetricsSource("1"), source, fakeWalGroupId);
+    Path walPath = getQueue().peek();
     batcher.start();
     WALEntryBatch entryBatch = batcher.take();
 
@@ -400,8 +406,13 @@ public class TestWALEntryStream {
     appendEntriesToLog(2);
 
     long position;
-    try (WALEntryStream entryStream = new WALEntryStream(new PriorityBlockingQueue<>(walQueue),
-            fs, conf, new MetricsSource("1"))) {
+    ReplicationSourceLogQueue tempQueue = new ReplicationSourceLogQueue(conf,
+        getMockMetrics());
+    for (Path path : getQueue()) {
+      tempQueue.enqueueLog(path, fakeWalGroupId);
+    }
+    try (WALEntryStream entryStream = new WALEntryStream(tempQueue,
+            fs, conf, new MetricsSource("1"), fakeWalGroupId)) {
       entryStream.next();
       entryStream.next();
       entryStream.next();
@@ -416,9 +427,9 @@ public class TestWALEntryStream {
     when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
     ReplicationSourceWALReaderThread reader =
             new ReplicationSourceWALReaderThread(mockSourceManager, getRecoveredQueueInfo(),
-              walQueue, 0, fs, conf, getDummyFilter(),
-              new MetricsSource("1"), source);
-    Path walPath = walQueue.toArray(new Path[2])[1];
+            logQueue, 0, fs, conf, getDummyFilter(),
+            new MetricsSource("1"), source, fakeWalGroupId);
+    Path walPath = getQueue().toArray(new Path[2])[1];
     reader.start();
     WALEntryBatch entryBatch = reader.take();
 
@@ -476,8 +487,8 @@ public class TestWALEntryStream {
     when(source.isPeerEnabled()).thenReturn(true);
     when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
     final ReplicationSourceWALReaderThread reader =
-            new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), walQueue,
-                    0, fs, conf, filter, new MetricsSource("1"), source);
+            new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), logQueue,
+                    0, fs, conf, filter, new MetricsSource("1"), source, fakeWalGroupId);
     reader.start();
 
     WALEntryBatch entryBatch = reader.take();
@@ -504,7 +515,7 @@ public class TestWALEntryStream {
 
     appendToLogPlus(3, notReplicatedCf);
 
-    Path firstWAL = walQueue.peek();
+    Path firstWAL = getQueue().peek();
     final long eof = getPosition(firstWAL);
 
     ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
@@ -512,8 +523,8 @@ public class TestWALEntryStream {
     when(source.isPeerEnabled()).thenReturn(true);
     when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
     final ReplicationSourceWALReaderThread reader =
-            new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), walQueue,
-                    0, fs, conf, filter, new MetricsSource("1"), source);
+            new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), logQueue,
+                    0, fs, conf, filter, new MetricsSource("1"), source, fakeWalGroupId);
     reader.start();
 
     // reader won't put any batch, even if EOF reached.
@@ -529,21 +540,24 @@ public class TestWALEntryStream {
     // should get empty batch with current wal position, after wal rolled
     WALEntryBatch entryBatch = reader.take();
 
-    Path lastWAL= walQueue.peek();
+    Path lastWAL= getQueue().peek();
     long positionToBeLogged = getPosition(lastWAL);
 
     assertNotNull(entryBatch);
     assertTrue(entryBatch.isEmpty());
-    assertEquals(1, walQueue.size());
+    assertEquals(1, getQueue().size());
     assertNotEquals(firstWAL, entryBatch.getLastWalPath());
     assertEquals(lastWAL, entryBatch.getLastWalPath());
     assertEquals(positionToBeLogged, entryBatch.getLastWalPosition());
   }
 
   private long getPosition(Path walPath) throws IOException {
+    ReplicationSourceLogQueue tempQueue =
+        new ReplicationSourceLogQueue(conf, getMockMetrics());
+    String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(walPath.getName());
+    tempQueue.enqueueLog(walPath, walPrefix);
     WALEntryStream entryStream =
-            new WALEntryStream(new PriorityBlockingQueue<>(Collections.singletonList(walPath)),
-                    fs, conf, new MetricsSource("1"));
+            new WALEntryStream(tempQueue, fs, conf, getMockMetrics(), walPrefix);
     entryStream.hasNext();
     return entryStream.getPosition();
   }
@@ -628,8 +642,8 @@ public class TestWALEntryStream {
     Path currentPath;
 
     @Override
-    public void preLogRoll(Path oldPath, Path newPath) throws IOException {
-      walQueue.add(newPath);
+    public void preLogRoll(Path oldPath, Path newPath) {
+      logQueue.enqueueLog(newPath, fakeWalGroupId);
       currentPath = newPath;
     }
   }
@@ -644,7 +658,7 @@ public class TestWALEntryStream {
     // get ending position
     long position;
     try (WALEntryStream entryStream =
-      new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
+      new WALEntryStream(logQueue, fs, conf, 0, new MetricsSource("1"), fakeWalGroupId)) {
       entryStream.next();
       entryStream.next();
       entryStream.next();
@@ -652,7 +666,7 @@ public class TestWALEntryStream {
     }
 
     // start up a reader
-    Path walPath = walQueue.peek();
+    Path walPath = getQueue().peek();
     ReplicationSource source = Mockito.mock(ReplicationSource.class);
     when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
 
@@ -667,8 +681,8 @@ public class TestWALEntryStream {
     ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
     when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
     final ReplicationSourceWALReaderThread reader =
-      new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), walQueue,
-        0, fs, conf, getDummyFilter(), new MetricsSource("1"), source);
+      new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), logQueue,
+        0, fs, conf, getDummyFilter(), new MetricsSource("1"), source, fakeWalGroupId);
 
     reader.start();
     Future<WALEntryBatch> future =
@@ -701,13 +715,11 @@ public class TestWALEntryStream {
     */
   @Test
   public void testEOFExceptionForRecoveredQueue() throws Exception {
-    PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
     // Create a 0 length log.
-    Path emptyLog = new Path("emptyLog");
+    Path emptyLog = new Path("emptyLog.1");
     FSDataOutputStream fsdos = fs.create(emptyLog);
     fsdos.close();
     assertEquals(0, fs.getFileStatus(emptyLog).getLen());
-    queue.add(emptyLog);
 
     ReplicationSource source = Mockito.mock(ReplicationSource.class);
 
@@ -720,14 +732,29 @@ public class TestWALEntryStream {
     // Override the max retries multiplier to fail fast.
     conf.setInt("replication.source.maxretriesmultiplier", 1);
     conf.setBoolean("replication.source.eof.autorecovery", true);
+
+    ReplicationSourceLogQueue localLogQueue =
+        new ReplicationSourceLogQueue(conf, getMockMetrics());
+    localLogQueue.enqueueLog(emptyLog, fakeWalGroupId);
     // Create a reader thread.
     ReplicationSourceWALReaderThread reader =
-      new ReplicationSourceWALReaderThread(mockSourceManager, getRecoveredQueueInfo(),
-        queue, 0, fs, conf, getDummyFilter(),
-        new MetricsSource("1"), (ReplicationSource) source);
+        new ReplicationSourceWALReaderThread(mockSourceManager, getRecoveredQueueInfo(),
+        localLogQueue, 0, fs, conf, getDummyFilter(), getMockMetrics(), source, fakeWalGroupId);
     reader.run();
     // ReplicationSourceWALReaderThread#handleEofException method will
     // remove empty log from logQueue.
-    assertEquals(0, queue.size());
+    assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId));
+  }
+
+  private PriorityBlockingQueue<Path> getQueue() {
+    return logQueue.getQueue(fakeWalGroupId);
+  }
+
+  private MetricsSource getMockMetrics() {
+    MetricsSource source = mock(MetricsSource.class);
+    doNothing().when(source).incrSizeOfLogQueue();
+    doNothing().when(source).decrSizeOfLogQueue();
+    doNothing().when(source).setOldestWalAge(Mockito.anyInt());
+    return source;
   }
 }