You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bh...@apache.org on 2021/02/19 05:31:52 UTC

[hbase] branch branch-2.4 updated: HBASE-25539: Add age of oldest wal metric (#2962)

This is an automated email from the ASF dual-hosted git repository.

bharathv pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 627ac01  HBASE-25539: Add age of oldest wal metric (#2962)
627ac01 is described below

commit 627ac016cc7ef21c0d3b89158ab7b20096e26646
Author: shahrs87 <sh...@gmail.com>
AuthorDate: Thu Feb 18 20:59:07 2021 -0800

    HBASE-25539: Add age of oldest wal metric (#2962)
    
    Signed-off-by: Bharath Vissapragada <bh...@apache.org>
    (cherry picked from commit 6a4c9be967f1b0c29cf937927e0b6aec929721f9)
---
 .../MetricsReplicationSourceSource.java            |   4 +
 .../MetricsReplicationGlobalSourceSourceImpl.java  |  12 ++
 .../MetricsReplicationSourceSourceImpl.java        |  14 ++
 .../replication/regionserver/MetricsSource.java    |  11 ++
 .../regionserver/RecoveredReplicationSource.java   |  12 +-
 .../RecoveredReplicationSourceShipper.java         |  14 +-
 .../regionserver/ReplicationSource.java            |  65 +++----
 .../regionserver/ReplicationSourceLogQueue.java    | 189 +++++++++++++++++++++
 .../regionserver/ReplicationSourceShipper.java     |   7 +-
 .../regionserver/ReplicationSourceWALReader.java   |  23 +--
 .../SerialReplicationSourceWALReader.java          |   7 +-
 .../replication/regionserver/WALEntryStream.java   |  13 +-
 .../hadoop/hbase/wal/AbstractFSWALProvider.java    |   2 +-
 .../regionserver/TestReplicationSource.java        |  71 +++++++-
 .../TestReplicationSourceLogQueue.java             |  83 +++++++++
 .../regionserver/TestWALEntryStream.java           | 104 +++++++-----
 16 files changed, 504 insertions(+), 127 deletions(-)

diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index 3fd5ac6..b5eb0aa 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -52,6 +52,8 @@ public interface MetricsReplicationSourceSource extends BaseSource {
   public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs";
   public static final String SOURCE_COMPLETED_RECOVERY_QUEUES = "source.completedRecoverQueues";
   public static final String SOURCE_FAILED_RECOVERY_QUEUES = "source.failedRecoverQueues";
+  /* Used to track the age of oldest wal in ms since its creation time */
+  String OLDEST_WAL_AGE = "source.oldestWalAge";
 
   void setLastShippedAge(long age);
   void incrSizeOfLogQueue(int size);
@@ -79,4 +81,6 @@ public interface MetricsReplicationSourceSource extends BaseSource {
   long getWALEditsRead();
   long getShippedOps();
   long getEditsFiltered();
+  void setOldestWalAge(long age);
+  long getOldestWalAge();
 }
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
index 52aa1b0..9e69f18 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
@@ -210,6 +210,18 @@ public class MetricsReplicationGlobalSourceSourceImpl
   public void incrFailedRecoveryQueue() {
     failedRecoveryQueue.incr(1L);
   }
+
+  @Override
+  public void setOldestWalAge(long age) {
+    // Not implemented
+  }
+
+  @Override
+  public long getOldestWalAge() {
+    // Not implemented
+    return 0;
+  }
+
   @Override
   public void init() {
     rms.init();
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index 2af3ac9..f9a907f 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -44,6 +44,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   private final String logReadInBytesKey;
   private final String shippedHFilesKey;
   private final String sizeOfHFileRefsQueueKey;
+  private final String oldestWalAgeKey;
 
   private final MutableHistogram ageOfLastShippedOpHist;
   private final MutableGaugeLong sizeOfLogQueueGauge;
@@ -71,6 +72,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   private final MutableFastCounter repeatedFileBytes;
   private final MutableFastCounter completedWAL;
   private final MutableFastCounter completedRecoveryQueue;
+  private final MutableGaugeLong oldestWalAge;
 
   public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) {
     this.rms = rms;
@@ -130,6 +132,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
 
     completedRecoveryKey = this.keyPrefix + "completedRecoverQueues";
     completedRecoveryQueue = rms.getMetricsRegistry().getCounter(completedRecoveryKey, 0L);
+
+    oldestWalAgeKey = this.keyPrefix + "oldestWalAge";
+    oldestWalAge = rms.getMetricsRegistry().getGauge(oldestWalAgeKey, 0L);
   }
 
   @Override public void setLastShippedAge(long age) {
@@ -195,6 +200,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
     rms.removeMetric(repeatedBytesKey);
     rms.removeMetric(completedLogsKey);
     rms.removeMetric(completedRecoveryKey);
+    rms.removeMetric(oldestWalAgeKey);
   }
 
   @Override
@@ -260,6 +266,14 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   @Override
   public void incrFailedRecoveryQueue() {/*no op*/}
 
+  @Override public void setOldestWalAge(long age) {
+    oldestWalAge.set(age);
+  }
+
+  @Override public long getOldestWalAge() {
+    return oldestWalAge.value();
+  }
+
   @Override
   public void init() {
     rms.init();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 650af2e..f11b7de 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -397,6 +397,17 @@ public class MetricsSource implements BaseSource {
     globalSourceSource.incrFailedRecoveryQueue();
   }
 
+  /*
+   Sets the age of oldest log file just for source.
+  */
+  public void setOldestWalAge(long age) {
+    singleSourceSource.setOldestWalAge(age);
+  }
+
+  public long getOldestWalAge() {
+    return singleSourceSource.getOldestWalAge();
+  }
+
   @Override
   public void init() {
     singleSourceSource.init();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index 39c4beb..516ccf1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -58,13 +58,13 @@ public class RecoveredReplicationSource extends ReplicationSource {
   }
 
   @Override
-  protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId,
-      PriorityBlockingQueue<Path> queue) {
-    return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, queueStorage);
+  protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId) {
+    return new RecoveredReplicationSourceShipper(conf, walGroupId, logQueue, this, queueStorage);
   }
 
-  public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOException {
+  public void locateRecoveredPaths(String walGroupId) throws IOException {
     boolean hasPathChanged = false;
+    PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
     PriorityBlockingQueue<Path> newPaths = new PriorityBlockingQueue<Path>(queueSizePerGroup,
       new AbstractFSWALProvider.WALStartTimeComparator());
     pathsLoop: for (Path path : queue) {
@@ -117,9 +117,9 @@ public class RecoveredReplicationSource extends ReplicationSource {
       // put the correct locations in the queue
       // since this is a recovered queue with no new incoming logs,
       // there shouldn't be any concurrency issues
-      queue.clear();
+      logQueue.clear(walGroupId);
       for (Path path : newPaths) {
-        queue.add(path);
+        logQueue.enqueueLog(path, walGroupId);
       }
     }
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
index 91109cf..83256c9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
@@ -18,9 +18,7 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.IOException;
-import java.util.concurrent.PriorityBlockingQueue;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.util.Threads;
@@ -40,9 +38,9 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
   private final ReplicationQueueStorage replicationQueues;
 
   public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId,
-      PriorityBlockingQueue<Path> queue, RecoveredReplicationSource source,
+      ReplicationSourceLogQueue logQueue, RecoveredReplicationSource source,
       ReplicationQueueStorage queueStorage) {
-    super(conf, walGroupId, queue, source);
+    super(conf, walGroupId, logQueue, source);
     this.source = source;
     this.replicationQueues = queueStorage;
   }
@@ -65,7 +63,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
     int numRetries = 0;
     while (numRetries <= maxRetriesMultiplier) {
       try {
-        source.locateRecoveredPaths(queue);
+        source.locateRecoveredPaths(walGroupId);
         break;
       } catch (IOException e) {
         LOG.error("Error while locating recovered queue paths, attempt #" + numRetries);
@@ -82,9 +80,9 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
     String peerClusterZNode = source.getQueueId();
     try {
       startPosition = this.replicationQueues.getWALPosition(source.getServer().getServerName(),
-        peerClusterZNode, this.queue.peek().getName());
-      LOG.trace("Recovered queue started with log {} at position {}", this.queue.peek(),
-        startPosition);
+        peerClusterZNode, this.logQueue.getQueue(walGroupId).peek().getName());
+      LOG.trace("Recovered queue started with log {} at position {}",
+        this.logQueue.getQueue(walGroupId).peek(), startPosition);
     } catch (ReplicationException e) {
       terminate("Couldn't get the position of this recovered queue " + peerClusterZNode, e);
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 6a64dd8..063b3d4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -24,14 +24,12 @@ import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -86,11 +84,9 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 public class ReplicationSource implements ReplicationSourceInterface {
 
   private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class);
-  // Queues of logs to process, entry in format of walGroupId->queue,
-  // each presents a queue for one wal group
-  private Map<String, PriorityBlockingQueue<Path>> queues = new HashMap<>();
   // per group queue size, keep no more than this number of logs in each wal group
   protected int queueSizePerGroup;
+  protected ReplicationSourceLogQueue logQueue;
   protected ReplicationQueueStorage queueStorage;
   protected ReplicationPeer replicationPeer;
 
@@ -118,8 +114,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
   volatile boolean sourceRunning = false;
   // Metrics for this source
   private MetricsSource metrics;
-  // WARN threshold for the number of queued logs, defaults to 2
-  private int logQueueWarnThreshold;
   // ReplicationEndpoint which will handle the actual replication
   private volatile ReplicationEndpoint replicationEndpoint;
 
@@ -213,6 +207,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
     this.maxRetriesMultiplier =
         this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
     this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
+    this.logQueue = new ReplicationSourceLogQueue(conf, metrics, this);
     this.queueStorage = queueStorage;
     this.replicationPeer = replicationPeer;
     this.manager = manager;
@@ -224,7 +219,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
     this.replicationQueueInfo = new ReplicationQueueInfo(queueId);
     // ReplicationQueueInfo parses the peerId out of the znode for us
     this.peerId = this.replicationQueueInfo.getPeerId();
-    this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
 
     // A defaultBandwidth of '0' means no bandwidth; i.e. no throttling.
     defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
@@ -255,35 +249,20 @@ public class ReplicationSource implements ReplicationSourceInterface {
     }
     // Use WAL prefix as the WALGroupId for this peer.
     String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName());
-    PriorityBlockingQueue<Path> queue = queues.get(walPrefix);
-    if (queue == null) {
-      queue = new PriorityBlockingQueue<>(queueSizePerGroup,
-        new AbstractFSWALProvider.WALStartTimeComparator());
-      // make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise
-      // the shipper may quit immediately
-      queue.put(wal);
-      queues.put(walPrefix, queue);
+    boolean queueExists = logQueue.enqueueLog(wal, walPrefix);
+
+    if (!queueExists) {
       if (this.isSourceActive() && this.walEntryFilter != null) {
         // new wal group observed after source startup, start a new worker thread to track it
         // notice: it's possible that wal enqueued when this.running is set but worker thread
         // still not launched, so it's necessary to check workerThreads before start the worker
-        tryStartNewShipper(walPrefix, queue);
+        tryStartNewShipper(walPrefix);
       }
-    } else {
-      queue.put(wal);
     }
     if (LOG.isTraceEnabled()) {
       LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), walPrefix,
         this.replicationQueueInfo.getQueueId());
     }
-    this.metrics.incrSizeOfLogQueue();
-    // This will wal a warning for each new wal that gets created above the warn threshold
-    int queueSize = queue.size();
-    if (queueSize > this.logQueueWarnThreshold) {
-      LOG.warn("{} WAL group {} queue size: {} exceeds value of "
-          + "replication.source.log.queue.warn: {}", logPeerId(),
-        walPrefix, queueSize, logQueueWarnThreshold);
-    }
   }
 
   @Override
@@ -375,16 +354,16 @@ public class ReplicationSource implements ReplicationSourceInterface {
     this.walEntryFilter = new ChainWALEntryFilter(filters);
   }
 
-  private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
+  private void tryStartNewShipper(String walGroupId) {
     workerThreads.compute(walGroupId, (key, value) -> {
       if (value != null) {
         LOG.debug("{} preempted start of shipping worker walGroupId={}", logPeerId(), walGroupId);
         return value;
       } else {
         LOG.debug("{} starting shipping worker for walGroupId={}", logPeerId(), walGroupId);
-        ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
+        ReplicationSourceShipper worker = createNewShipper(walGroupId);
         ReplicationSourceWALReader walReader =
-            createNewWALReader(walGroupId, queue, worker.getStartPosition());
+            createNewWALReader(walGroupId, worker.getStartPosition());
         Threads.setDaemonThreadRunning(
             walReader, Thread.currentThread().getName()
             + ".replicationSource.wal-reader." + walGroupId + "," + queueId,
@@ -404,7 +383,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
       String walGroupId = walGroupShipper.getKey();
       ReplicationSourceShipper shipper = walGroupShipper.getValue();
       ageOfLastShippedOp = metrics.getAgeOfLastShippedOp(walGroupId);
-      int queueSize = queues.get(walGroupId).size();
+      int queueSize = logQueue.getQueueSize(walGroupId);
       replicationDelay = metrics.getReplicationDelay();
       Path currentPath = shipper.getCurrentPath();
       fileSize = -1;
@@ -443,16 +422,16 @@ public class ReplicationSource implements ReplicationSourceInterface {
     return fileSize;
   }
 
-  protected ReplicationSourceShipper createNewShipper(String walGroupId,
-      PriorityBlockingQueue<Path> queue) {
-    return new ReplicationSourceShipper(conf, walGroupId, queue, this);
+  protected ReplicationSourceShipper createNewShipper(String walGroupId) {
+    return new ReplicationSourceShipper(conf, walGroupId, logQueue, this);
   }
 
-  private ReplicationSourceWALReader createNewWALReader(String walGroupId,
-      PriorityBlockingQueue<Path> queue, long startPosition) {
+  private ReplicationSourceWALReader createNewWALReader(String walGroupId, long startPosition) {
     return replicationPeer.getPeerConfig().isSerial()
-      ? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this)
-      : new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
+      ? new SerialReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter,
+      this, walGroupId)
+      : new ReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter,
+      this, walGroupId);
   }
 
   /**
@@ -621,14 +600,12 @@ public class ReplicationSource implements ReplicationSourceInterface {
       throw new IllegalStateException("Source should be active.");
     }
     LOG.info("{} queueId={} (queues={}) is replicating from cluster={} to cluster={}",
-      logPeerId(), this.replicationQueueInfo.getQueueId(), this.queues.size(), clusterId,
+      logPeerId(), this.replicationQueueInfo.getQueueId(), logQueue.getNumQueues(), clusterId,
       peerClusterId);
     initializeWALEntryFilter(peerClusterId);
     // Start workers
-    for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
-      String walGroupId = entry.getKey();
-      PriorityBlockingQueue<Path> queue = entry.getValue();
-      tryStartNewShipper(walGroupId, queue);
+    for (String walGroupId: logQueue.getQueues().keySet()) {
+      tryStartNewShipper(walGroupId);
     }
     this.startupOngoing.set(false);
   }
@@ -857,7 +834,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
   /**
    * @return String to use as a log prefix that contains current peerId.
    */
-  private String logPeerId(){
+  public String logPeerId(){
     return "peerId=" + this.getPeerId() + ",";
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceLogQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceLogQueue.java
new file mode 100644
index 0000000..8a774fb
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceLogQueue.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.PriorityBlockingQueue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+  Class that does enqueueing/dequeuing of wal at one place so that we can update the metrics
+  just at one place.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ReplicationSourceLogQueue {
+  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class);
+  // Queues of logs to process, entry in format of walGroupId->queue,
+  // each presents a queue for one wal group
+  private Map<String, PriorityBlockingQueue<Path>> queues = new ConcurrentHashMap<>();
+  private MetricsSource metrics;
+  private Configuration conf;
+  // per group queue size, keep no more than this number of logs in each wal group
+  private int queueSizePerGroup;
+  // WARN threshold for the number of queued logs, defaults to 2
+  private int logQueueWarnThreshold;
+  private ReplicationSource source;
+
+  public ReplicationSourceLogQueue(Configuration conf, MetricsSource metrics,
+      ReplicationSource source) {
+    this.conf = conf;
+    this.metrics = metrics;
+    this.source = source;
+    this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
+    this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
+  }
+
+  /**
+   * Enqueue the wal
+   * @param wal wal to be enqueued
+   * @param walGroupId Key for the wal in @queues map
+   * @return boolean whether this is the first time we are seeing this walGroupId.
+   */
+  public boolean enqueueLog(Path wal, String walGroupId) {
+    boolean exists = false;
+    PriorityBlockingQueue<Path> queue = queues.get(walGroupId);
+    if (queue == null) {
+      queue = new PriorityBlockingQueue<>(queueSizePerGroup,
+        new AbstractFSWALProvider.WALStartTimeComparator());
+      // make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise
+      // the shipper may quit immediately
+      queue.put(wal);
+      queues.put(walGroupId, queue);
+    } else {
+      exists = true;
+      queue.put(wal);
+    }
+    // Increment size of logQueue
+    this.metrics.incrSizeOfLogQueue();
+    // Compute oldest wal age
+    this.metrics.setOldestWalAge(getOldestWalAge());
+    // This will wal a warning for each new wal that gets created above the warn threshold
+    int queueSize = queue.size();
+    if (queueSize > this.logQueueWarnThreshold) {
+      LOG.warn("{} WAL group {} queue size: {} exceeds value of " +
+          "replication.source.log.queue.warn {}", source.logPeerId(), walGroupId, queueSize,
+        logQueueWarnThreshold);
+    }
+    return exists;
+  }
+
+  /**
+   * Get the queue size for the given walGroupId.
+   * @param walGroupId walGroupId
+   */
+  public int getQueueSize(String walGroupId) {
+    Queue queue = queues.get(walGroupId);
+    if (queue == null) {
+      return 0;
+    }
+    return queue.size();
+  }
+
+  /**
+   * Returns number of queues.
+   */
+  public int getNumQueues() {
+    return queues.size();
+  }
+
+  public Map<String, PriorityBlockingQueue<Path>> getQueues() {
+    return queues;
+  }
+
+  /**
+   * Return queue for the given walGroupId
+   * Please don't add or remove elements from the returned queue.
+   * Use @enqueueLog and @remove methods respectively.
+   * @param walGroupId walGroupId
+   */
+  public PriorityBlockingQueue<Path> getQueue(String walGroupId) {
+    return queues.get(walGroupId);
+  }
+
+  /**
+   * Remove head from the queue corresponding to given walGroupId.
+   * @param walGroupId walGroupId
+   */
+  public void remove(String walGroupId) {
+    PriorityBlockingQueue<Path> queue = getQueue(walGroupId);
+    if (queue == null || queue.isEmpty()) {
+      return;
+    }
+    queue.remove();
+    // Decrease size logQueue.
+    this.metrics.decrSizeOfLogQueue();
+    // Re-compute age of oldest wal metric.
+    this.metrics.setOldestWalAge(getOldestWalAge());
+  }
+
+  /**
+   * Remove all the elements from the queue corresponding to walGroupId
+   * @param walGroupId walGroupId
+   */
+  public void clear(String walGroupId) {
+    PriorityBlockingQueue<Path> queue = getQueue(walGroupId);
+    while (!queue.isEmpty()) {
+      // Need to iterate since metrics#decrSizeOfLogQueue decrements just by 1.
+      queue.remove();
+      metrics.decrSizeOfLogQueue();
+    }
+    this.metrics.setOldestWalAge(getOldestWalAge());
+  }
+
+  /*
+    Returns the age of oldest wal.
+   */
+  long getOldestWalAge() {
+    long now = EnvironmentEdgeManager.currentTime();
+    long timestamp = getOldestWalTimestamp();
+    if (timestamp == Long.MAX_VALUE) {
+      // If there are no wals in the queue then set the oldest wal timestamp to current time
+      // so that the oldest wal age will be 0.
+      timestamp = now;
+    }
+    long age = now - timestamp;
+    return age;
+  }
+
+  /*
+  Get the oldest wal timestamp from all the queues.
+  */
+  private long getOldestWalTimestamp() {
+    long oldestWalTimestamp = Long.MAX_VALUE;
+    for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
+      PriorityBlockingQueue<Path> queue = entry.getValue();
+      Path path = queue.peek();
+      // Can path ever be null ?
+      if (path != null) {
+        oldestWalTimestamp = Math.min(oldestWalTimestamp,
+          AbstractFSWALProvider.WALStartTimeComparator.getTS(path));
+      }
+    }
+    return oldestWalTimestamp;
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 5d4a71b..f188e7b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTimeout;
 import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.atomic.LongAccumulator;
 
 import org.apache.hadoop.conf.Configuration;
@@ -55,7 +54,7 @@ public class ReplicationSourceShipper extends Thread {
 
   private final Configuration conf;
   protected final String walGroupId;
-  protected final PriorityBlockingQueue<Path> queue;
+  protected final ReplicationSourceLogQueue logQueue;
   private final ReplicationSource source;
 
   // Last position in the log that we sent to ZooKeeper
@@ -76,10 +75,10 @@ public class ReplicationSourceShipper extends Thread {
   private final int shipEditsTimeout;
 
   public ReplicationSourceShipper(Configuration conf, String walGroupId,
-      PriorityBlockingQueue<Path> queue, ReplicationSource source) {
+      ReplicationSourceLogQueue logQueue, ReplicationSource source) {
     this.conf = conf;
     this.walGroupId = walGroupId;
-    this.queue = queue;
+    this.logQueue = logQueue;
     this.source = source;
     this.sleepForRetries =
         this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 05b34a9..52ac144 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -54,7 +54,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescript
 class ReplicationSourceWALReader extends Thread {
   private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class);
 
-  private final PriorityBlockingQueue<Path> logQueue;
+  private final ReplicationSourceLogQueue logQueue;
   private final FileSystem fs;
   private final Configuration conf;
   private final WALEntryFilter filter;
@@ -77,6 +77,7 @@ class ReplicationSourceWALReader extends Thread {
 
   private AtomicLong totalBufferUsed;
   private long totalBufferQuota;
+  private final String walGroupId;
 
   /**
    * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
@@ -89,8 +90,8 @@ class ReplicationSourceWALReader extends Thread {
    * @param source replication source
    */
   public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
-      PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter,
-      ReplicationSource source) {
+      ReplicationSourceLogQueue logQueue, long startPosition, WALEntryFilter filter,
+      ReplicationSource source, String walGroupId) {
     this.logQueue = logQueue;
     this.currentPosition = startPosition;
     this.fs = fs;
@@ -111,6 +112,7 @@ class ReplicationSourceWALReader extends Thread {
         this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
     this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false);
     this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
+    this.walGroupId = walGroupId;
     LOG.info("peerClusterZnode=" + source.getQueueId()
         + ", ReplicationSourceWALReaderThread : " + source.getPeerId()
         + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
@@ -125,7 +127,7 @@ class ReplicationSourceWALReader extends Thread {
       try (WALEntryStream entryStream =
           new WALEntryStream(logQueue, conf, currentPosition,
               source.getWALFileLengthProvider(), source.getServerWALsBelongTo(),
-              source.getSourceMetrics())) {
+              source.getSourceMetrics(), walGroupId)) {
         while (isReaderRunning()) { // loop here to keep reusing stream while we can
           if (!source.isPeerEnabled()) {
             Threads.sleep(sleepForRetries);
@@ -246,18 +248,19 @@ class ReplicationSourceWALReader extends Thread {
   // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
   // enabled, then dump the log
   private void handleEofException(IOException e) {
+    PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
     // Dump the log even if logQueue size is 1 if the source is from recovered Source
     // since we don't add current log to recovered source queue so it is safe to remove.
     if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
-      (source.isRecovered() || logQueue.size() > 1) && this.eofAutoRecovery) {
+      (source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) {
       try {
-        if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
-          LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());
-          logQueue.remove();
+        if (fs.getFileStatus(queue.peek()).getLen() == 0) {
+          LOG.warn("Forcing removal of 0 length log in queue: " + queue.peek());
+          logQueue.remove(walGroupId);
           currentPosition = 0;
         }
       } catch (IOException ioe) {
-        LOG.warn("Couldn't get file length information about log " + logQueue.peek());
+        LOG.warn("Couldn't get file length information about log " + queue.peek());
       }
     }
   }
@@ -269,7 +272,7 @@ class ReplicationSourceWALReader extends Thread {
       return batchQueueHead.getLastWalPath();
     }
     // otherwise, we must be currently reading from the head of the log queue
-    return logQueue.peek();
+    return logQueue.getQueue(walGroupId).peek();
   }
 
   //returns false if we've already exceeded the global quota
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
index 9edcc8a..d0e76fb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.IOException;
-import java.util.concurrent.PriorityBlockingQueue;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -44,9 +43,9 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader
   private final SerialReplicationChecker checker;
 
   public SerialReplicationSourceWALReader(FileSystem fs, Configuration conf,
-      PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter,
-      ReplicationSource source) {
-    super(fs, conf, logQueue, startPosition, filter, source);
+      ReplicationSourceLogQueue logQueue, long startPosition, WALEntryFilter filter,
+      ReplicationSource source, String walGroupId) {
+    super(fs, conf, logQueue, startPosition, filter, source, walGroupId);
     checker = new SerialReplicationChecker(conf, source);
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index 752872a..5b8f057 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -63,7 +63,8 @@ class WALEntryStream implements Closeable {
   private long currentPositionOfEntry = 0;
   // position after reading current entry
   private long currentPositionOfReader = 0;
-  private final PriorityBlockingQueue<Path> logQueue;
+  private final ReplicationSourceLogQueue logQueue;
+  private final String walGroupId;
   private final FileSystem fs;
   private final Configuration conf;
   private final WALFileLengthProvider walFileLengthProvider;
@@ -81,9 +82,9 @@ class WALEntryStream implements Closeable {
    * @param metrics the replication metrics
    * @throws IOException
    */
-  public WALEntryStream(PriorityBlockingQueue<Path> logQueue, Configuration conf,
+  public WALEntryStream(ReplicationSourceLogQueue logQueue, Configuration conf,
       long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
-      MetricsSource metrics) throws IOException {
+      MetricsSource metrics, String walGroupId) throws IOException {
     this.logQueue = logQueue;
     this.fs = CommonFSUtils.getWALFileSystem(conf);
     this.conf = conf;
@@ -91,6 +92,7 @@ class WALEntryStream implements Closeable {
     this.walFileLengthProvider = walFileLengthProvider;
     this.serverName = serverName;
     this.metrics = metrics;
+    this.walGroupId = walGroupId;
   }
 
   /**
@@ -251,7 +253,7 @@ class WALEntryStream implements Closeable {
   private void dequeueCurrentLog() throws IOException {
     LOG.debug("EOF, closing {}", currentPath);
     closeReader();
-    logQueue.remove();
+    logQueue.remove(walGroupId);
     setCurrentPath(null);
     setPosition(0);
     metrics.decrSizeOfLogQueue();
@@ -301,7 +303,8 @@ class WALEntryStream implements Closeable {
 
   // open a reader on the next log in queue
   private boolean openNextLog() throws IOException {
-    Path nextPath = logQueue.peek();
+    PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
+    Path nextPath = queue.peek();
     if (nextPath != null) {
       openReader(nextPath);
       if (reader != null) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index 187bb11..a7e3ef5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -436,7 +436,7 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
      * @param p path to split
      * @return start time
      */
-    private static long getTS(Path p) {
+    public static long getTS(Path p) {
       return WAL.getTimestamp(p.getName());
     }
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index 2b4b1ef..bb2b1da 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -33,7 +34,6 @@ import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellBuilderFactory;
 import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -62,6 +63,8 @@ import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALFactory;
@@ -291,7 +294,7 @@ public class TestReplicationSource {
     source.init(testConf, null, mockManager, null, mockPeer, null,
       "testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class));
     ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null,
-      conf, null, 0, null, source);
+      conf, null, 0, null, source, null);
     ReplicationSourceShipper shipper =
       new ReplicationSourceShipper(conf, null, null, source);
     shipper.entryReader = reader;
@@ -484,8 +487,6 @@ public class TestReplicationSource {
     String walGroupId = "fake-wal-group-id";
     ServerName serverName = ServerName.valueOf("www.example.com", 12006, 1524679704418L);
     ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L);
-    PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
-    queue.put(new Path("/www/html/test"));
     RecoveredReplicationSource source = mock(RecoveredReplicationSource.class);
     Server server = mock(Server.class);
     when(server.getServerName()).thenReturn(serverName);
@@ -498,8 +499,12 @@ public class TestReplicationSource {
       .thenReturn(-1L);
     Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
     conf.setInt("replication.source.maxretriesmultiplier", -1);
+    MetricsSource metricsSource = mock(MetricsSource.class);
+    doNothing().when(metricsSource).incrSizeOfLogQueue();
+    ReplicationSourceLogQueue logQueue = new ReplicationSourceLogQueue(conf, metricsSource, source);
+    logQueue.enqueueLog(new Path("/www/html/test"), walGroupId);
     RecoveredReplicationSourceShipper shipper =
-      new RecoveredReplicationSourceShipper(conf, walGroupId, queue, source, storage);
+      new RecoveredReplicationSourceShipper(conf, walGroupId, logQueue, source, storage);
     assertEquals(1001L, shipper.getStartPosition());
   }
 
@@ -592,5 +597,59 @@ public class TestReplicationSource {
       rss.stop("Done");
     }
   }
-}
 
+  /*
+    Test age of oldest wal metric.
+  */
+  @Test
+  public void testAgeOfOldestWal() throws Exception {
+    try {
+      ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge();
+      EnvironmentEdgeManager.injectEdge(manualEdge);
+
+      String id = "1";
+      MetricsSource metrics = new MetricsSource(id);
+      Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+      conf.setInt("replication.source.maxretriesmultiplier", 1);
+      ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
+      Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
+      Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
+      ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
+      Mockito.when(peerConfig.getReplicationEndpointImpl()).
+        thenReturn(DoNothingReplicationEndpoint.class.getName());
+      Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
+      ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
+      Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
+      Mockito.when(manager.getGlobalMetrics()).
+        thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
+      RegionServerServices rss =
+        TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
+
+      ReplicationSource source = new ReplicationSource();
+      source.init(conf, null, manager, null, mockPeer, rss, id, null,
+        p -> OptionalLong.empty(), metrics);
+
+      final Path log1 = new Path(logDir, "log-walgroup-a.8");
+      manualEdge.setValue(10);
+      // Diff of current time (10) and  log-walgroup-a.8 timestamp will be 2.
+      source.enqueueLog(log1);
+      MetricsReplicationSourceSource metricsSource1 = getSourceMetrics(id);
+      assertEquals(2, metricsSource1.getOldestWalAge());
+
+      final Path log2 = new Path(logDir, "log-walgroup-b.4");
+      // Diff of current time (10) and log-walgroup-b.4 will be 6 so oldestWalAge should be 6
+      source.enqueueLog(log2);
+      assertEquals(6, metricsSource1.getOldestWalAge());
+      // Clear all metrics.
+      metrics.clear();
+    } finally {
+      EnvironmentEdgeManager.reset();
+    }
+  }
+
+  private MetricsReplicationSourceSource getSourceMetrics(String sourceId) {
+    MetricsReplicationSourceFactory factory = CompatibilitySingletonFactory
+      .getInstance(MetricsReplicationSourceFactory.class);
+    return factory.getSource(sourceId);
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java
new file mode 100644
index 0000000..c28b180
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+@Category({SmallTests.class,ReplicationTests.class})
+public class TestReplicationSourceLogQueue {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestReplicationSourceLogQueue.class);
+
+  /*
+    Testing enqueue and dequeuing of wal and check age of oldest wal.
+  */
+  @Test
+  public void testEnqueueDequeue() {
+    try {
+      String walGroupId1 = "fake-walgroup-id-1";
+      String walGroupId2 = "fake-walgroup-id-2";
+
+      ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge();
+      EnvironmentEdgeManager.injectEdge(manualEdge);
+
+      MetricsSource metrics = new MetricsSource("1");
+      Configuration conf = HBaseConfiguration.create();
+      ReplicationSource source = mock(ReplicationSource.class);
+      Mockito.doReturn("peer").when(source).logPeerId();
+      ReplicationSourceLogQueue logQueue = new ReplicationSourceLogQueue(conf, metrics, source);
+      final Path log1 = new Path("log-walgroup-a.8");
+      manualEdge.setValue(10);
+      // Diff of current time (10) and  log-walgroup-a.8 timestamp will be 2.
+      logQueue.enqueueLog(log1, walGroupId1);
+      assertEquals(2, logQueue.getOldestWalAge());
+
+      final Path log2 = new Path("log-walgroup-b.4");
+      // Diff of current time (10) and log-walgroup-b.4 will be 6 so oldestWalAge should be 6
+      logQueue.enqueueLog(log2, walGroupId2);
+      assertEquals(6, logQueue.getOldestWalAge());
+
+      // Remove an element from walGroupId2.
+      // After this op, there will be only one element in the queue log-walgroup-a.8
+      logQueue.remove(walGroupId2);
+      assertEquals(2, logQueue.getOldestWalAge());
+
+      // Remove last element from the queue.
+      logQueue.remove(walGroupId1);
+      // This will test the case where there are no elements in the queue.
+      assertEquals(0, logQueue.getOldestWalAge());
+    } finally {
+      EnvironmentEdgeManager.reset();
+    }
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 1db9c17..9c6fafc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
@@ -99,6 +101,7 @@ public class TestWALEntryStream {
   private static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName)
       .setStartKey(HConstants.EMPTY_START_ROW).setEndKey(HConstants.LAST_ROW).build();
   private static final NavigableMap<byte[], Integer> scopes = getScopes();
+  private final String fakeWalGroupId = "fake-wal-group-id";
 
   private static NavigableMap<byte[], Integer> getScopes() {
     NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
@@ -107,7 +110,7 @@ public class TestWALEntryStream {
   }
 
   private WAL log;
-  PriorityBlockingQueue<Path> walQueue;
+  ReplicationSourceLogQueue logQueue;
   private PathWatcher pathWatcher;
 
   @Rule
@@ -131,7 +134,8 @@ public class TestWALEntryStream {
 
   @Before
   public void setUp() throws Exception {
-    walQueue = new PriorityBlockingQueue<>();
+    ReplicationSource source = mock(ReplicationSource.class);
+    logQueue = new ReplicationSourceLogQueue(CONF, new MetricsSource("2"), source);
     pathWatcher = new PathWatcher();
     final WALFactory wals = new WALFactory(CONF, tn.getMethodName());
     wals.getWALProvider().addWALActionsListener(pathWatcher);
@@ -165,7 +169,8 @@ public class TestWALEntryStream {
           log.rollWriter();
 
           try (WALEntryStream entryStream =
-              new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
+              new WALEntryStream(logQueue, CONF, 0, log, null,
+                new MetricsSource("1"), fakeWalGroupId)) {
             int i = 0;
             while (entryStream.hasNext()) {
               assertNotNull(entryStream.next());
@@ -192,7 +197,7 @@ public class TestWALEntryStream {
     appendToLogAndSync();
     long oldPos;
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
+        new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
       // There's one edit in the log, read it. Reading past it needs to throw exception
       assertTrue(entryStream.hasNext());
       WAL.Entry entry = entryStream.peek();
@@ -206,8 +211,8 @@ public class TestWALEntryStream {
 
     appendToLogAndSync();
 
-    try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, oldPos,
-        log, null, new MetricsSource("1"))) {
+    try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, oldPos,
+        log, null, new MetricsSource("1"), fakeWalGroupId)) {
       // Read the newly added entry, make sure we made progress
       WAL.Entry entry = entryStream.next();
       assertNotEquals(oldPos, entryStream.getPosition());
@@ -220,8 +225,8 @@ public class TestWALEntryStream {
     log.rollWriter();
     appendToLogAndSync();
 
-    try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, oldPos,
-        log, null, new MetricsSource("1"))) {
+    try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, oldPos,
+        log, null, new MetricsSource("1"), fakeWalGroupId)) {
       WAL.Entry entry = entryStream.next();
       assertNotEquals(oldPos, entryStream.getPosition());
       assertNotNull(entry);
@@ -246,7 +251,8 @@ public class TestWALEntryStream {
     appendToLog("1");
     appendToLog("2");// 2
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
+        new WALEntryStream(logQueue, CONF, 0, log, null,
+          new MetricsSource("1"), fakeWalGroupId)) {
       assertEquals("1", getRow(entryStream.next()));
 
       appendToLog("3"); // 3 - comes in after reader opened
@@ -254,12 +260,12 @@ public class TestWALEntryStream {
       appendToLog("4"); // 4 - this append is in the rolled log
 
       assertEquals("2", getRow(entryStream.next()));
-      assertEquals(2, walQueue.size()); // we should not have dequeued yet since there's still an
+      assertEquals(2, getQueue().size()); // we should not have dequeued yet since there's still an
                                         // entry in first log
       assertEquals("3", getRow(entryStream.next())); // if implemented improperly, this would be 4
                                                      // and 3 would be skipped
       assertEquals("4", getRow(entryStream.next())); // 4
-      assertEquals(1, walQueue.size()); // now we've dequeued and moved on to next log properly
+      assertEquals(1, getQueue().size()); // now we've dequeued and moved on to next log properly
       assertFalse(entryStream.hasNext());
     }
   }
@@ -267,11 +273,13 @@ public class TestWALEntryStream {
   /**
    * Tests that if writes come in while we have a stream open, we shouldn't miss them
    */
+
   @Test
   public void testNewEntriesWhileStreaming() throws Exception {
     appendToLog("1");
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
+        new WALEntryStream(logQueue, CONF, 0, log, null,
+          new MetricsSource("1"), fakeWalGroupId)) {
       entryStream.next(); // we've hit the end of the stream at this point
 
       // some new entries come in while we're streaming
@@ -294,7 +302,8 @@ public class TestWALEntryStream {
     long lastPosition = 0;
     appendToLog("1");
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
+        new WALEntryStream(logQueue, CONF, 0, log, null,
+          new MetricsSource("1"), fakeWalGroupId)) {
       entryStream.next(); // we've hit the end of the stream at this point
       appendToLog("2");
       appendToLog("3");
@@ -302,11 +311,12 @@ public class TestWALEntryStream {
     }
     // next stream should picks up where we left off
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, CONF, lastPosition, log, null, new MetricsSource("1"))) {
+        new WALEntryStream(logQueue, CONF, lastPosition, log, null,
+          new MetricsSource("1"), fakeWalGroupId)) {
       assertEquals("2", getRow(entryStream.next()));
       assertEquals("3", getRow(entryStream.next()));
       assertFalse(entryStream.hasNext()); // done
-      assertEquals(1, walQueue.size());
+      assertEquals(1, getQueue().size());
     }
   }
 
@@ -314,19 +324,21 @@ public class TestWALEntryStream {
    * Tests that if we stop before hitting the end of a stream, we can continue where we left off
    * using the last position
    */
+
   @Test
   public void testPosition() throws Exception {
     long lastPosition = 0;
     appendEntriesToLogAndSync(3);
     // read only one element
-    try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, lastPosition,
-        log, null, new MetricsSource("1"))) {
+    try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, lastPosition,
+        log, null, new MetricsSource("1"), fakeWalGroupId)) {
       entryStream.next();
       lastPosition = entryStream.getPosition();
     }
     // there should still be two more entries from where we left off
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, CONF, lastPosition, log, null, new MetricsSource("1"))) {
+        new WALEntryStream(logQueue, CONF, lastPosition, log, null,
+          new MetricsSource("1"), fakeWalGroupId)) {
       assertNotNull(entryStream.next());
       assertNotNull(entryStream.next());
       assertFalse(entryStream.hasNext());
@@ -337,7 +349,8 @@ public class TestWALEntryStream {
   @Test
   public void testEmptyStream() throws Exception {
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
+        new WALEntryStream(logQueue, CONF, 0, log, null,
+          new MetricsSource("1"), fakeWalGroupId)) {
       assertFalse(entryStream.hasNext());
     }
   }
@@ -391,7 +404,8 @@ public class TestWALEntryStream {
     ReplicationSource source = mockReplicationSource(recovered, conf);
     when(source.isPeerEnabled()).thenReturn(true);
     ReplicationSourceWALReader reader =
-      new ReplicationSourceWALReader(fs, conf, walQueue, 0, getDummyFilter(), source);
+      new ReplicationSourceWALReader(fs, conf, logQueue, 0, getDummyFilter(), source,
+        fakeWalGroupId);
     reader.start();
     return reader;
   }
@@ -402,7 +416,8 @@ public class TestWALEntryStream {
     // get ending position
     long position;
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
+        new WALEntryStream(logQueue, CONF, 0, log, null,
+          new MetricsSource("1"), fakeWalGroupId)) {
       entryStream.next();
       entryStream.next();
       entryStream.next();
@@ -410,7 +425,7 @@ public class TestWALEntryStream {
     }
 
     // start up a reader
-    Path walPath = walQueue.peek();
+    Path walPath = getQueue().peek();
     ReplicationSourceWALReader reader = createReader(false, CONF);
     WALEntryBatch entryBatch = reader.take();
 
@@ -430,7 +445,7 @@ public class TestWALEntryStream {
   @Test
   public void testReplicationSourceWALReaderRecovered() throws Exception {
     appendEntriesToLogAndSync(10);
-    Path walPath = walQueue.peek();
+    Path walPath = getQueue().peek();
     log.rollWriter();
     appendEntriesToLogAndSync(5);
     log.shutdown();
@@ -450,7 +465,7 @@ public class TestWALEntryStream {
     assertEquals(0, batch.getNbEntries());
     assertTrue(batch.isEndOfFile());
 
-    walPath = walQueue.peek();
+    walPath = getQueue().peek();
     batch = reader.take();
     assertEquals(walPath, batch.getLastWalPath());
     assertEquals(5, batch.getNbEntries());
@@ -463,7 +478,7 @@ public class TestWALEntryStream {
   @Test
   public void testReplicationSourceWALReaderWrongPosition() throws Exception {
     appendEntriesToLogAndSync(1);
-    Path walPath = walQueue.peek();
+    Path walPath = getQueue().peek();
     log.rollWriter();
     appendEntriesToLogAndSync(20);
     TEST_UTIL.waitFor(5000, new ExplainingPredicate<Exception>() {
@@ -490,7 +505,7 @@ public class TestWALEntryStream {
     assertEquals(1, entryBatch.getNbEntries());
     assertTrue(entryBatch.isEndOfFile());
 
-    Path walPath2 = walQueue.peek();
+    Path walPath2 = getQueue().peek();
     entryBatch = reader.take();
     assertEquals(walPath2, entryBatch.getLastWalPath());
     assertEquals(20, entryBatch.getNbEntries());
@@ -503,7 +518,7 @@ public class TestWALEntryStream {
     assertEquals(0, entryBatch.getNbEntries());
     assertTrue(entryBatch.isEndOfFile());
 
-    Path walPath3 = walQueue.peek();
+    Path walPath3 = getQueue().peek();
     entryBatch = reader.take();
     assertEquals(walPath3, entryBatch.getLastWalPath());
     assertEquals(10, entryBatch.getNbEntries());
@@ -517,7 +532,8 @@ public class TestWALEntryStream {
     // get ending position
     long position;
     try (WALEntryStream entryStream =
-      new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
+      new WALEntryStream(logQueue, CONF, 0, log, null,
+        new MetricsSource("1"), fakeWalGroupId)) {
       entryStream.next();
       entryStream.next();
       entryStream.next();
@@ -525,7 +541,7 @@ public class TestWALEntryStream {
     }
 
     // start up a reader
-    Path walPath = walQueue.peek();
+    Path walPath = getQueue().peek();
     ReplicationSource source = mockReplicationSource(false, CONF);
     AtomicInteger invokeCount = new AtomicInteger(0);
     AtomicBoolean enabled = new AtomicBoolean(false);
@@ -535,7 +551,8 @@ public class TestWALEntryStream {
     });
 
     ReplicationSourceWALReader reader =
-      new ReplicationSourceWALReader(fs, CONF, walQueue, 0, getDummyFilter(), source);
+      new ReplicationSourceWALReader(fs, CONF, logQueue, 0, getDummyFilter(),
+        source, fakeWalGroupId);
     reader.start();
     Future<WALEntryBatch> future = ForkJoinPool.commonPool().submit(() -> {
       return reader.take();
@@ -621,8 +638,8 @@ public class TestWALEntryStream {
     Path currentPath;
 
     @Override
-    public void preLogRoll(Path oldPath, Path newPath) throws IOException {
-      walQueue.add(newPath);
+    public void preLogRoll(Path oldPath, Path newPath) {
+      logQueue.enqueueLog(newPath, fakeWalGroupId);
       currentPath = newPath;
     }
   }
@@ -631,10 +648,10 @@ public class TestWALEntryStream {
   public void testReadBeyondCommittedLength() throws IOException, InterruptedException {
     appendToLog("1");
     appendToLog("2");
-    long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong();
+    long size = log.getLogFileSizeIfBeingWritten(getQueue().peek()).getAsLong();
     AtomicLong fileLength = new AtomicLong(size - 1);
-    try (WALEntryStream entryStream = new WALEntryStream(walQueue,  CONF, 0,
-        p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"))) {
+    try (WALEntryStream entryStream = new WALEntryStream(logQueue,  CONF, 0,
+      p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"), fakeWalGroupId)) {
       assertTrue(entryStream.hasNext());
       assertNotNull(entryStream.next());
       // can not get log 2
@@ -660,13 +677,11 @@ public class TestWALEntryStream {
    */
   @Test
   public void testEOFExceptionForRecoveredQueue() throws Exception {
-    PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
     // Create a 0 length log.
     Path emptyLog = new Path("emptyLog");
     FSDataOutputStream fsdos = fs.create(emptyLog);
     fsdos.close();
     assertEquals(0, fs.getFileStatus(emptyLog).getLen());
-    queue.add(emptyLog);
 
     Configuration conf = new Configuration(CONF);
     // Override the max retries multiplier to fail fast.
@@ -675,11 +690,22 @@ public class TestWALEntryStream {
     // Create a reader thread with source as recovered source.
     ReplicationSource source = mockReplicationSource(true, conf);
     when(source.isPeerEnabled()).thenReturn(true);
+
+    MetricsSource metrics = mock(MetricsSource.class);
+    doNothing().when(metrics).incrSizeOfLogQueue();
+    doNothing().when(metrics).decrSizeOfLogQueue();
+    ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source);
+    localLogQueue.enqueueLog(emptyLog, fakeWalGroupId);
     ReplicationSourceWALReader reader =
-      new ReplicationSourceWALReader(fs, conf, queue, 0, getDummyFilter(), source);
+      new ReplicationSourceWALReader(fs, conf, localLogQueue, 0,
+        getDummyFilter(), source, fakeWalGroupId);
     reader.run();
     // ReplicationSourceWALReaderThread#handleEofException method will
     // remove empty log from logQueue.
-    assertEquals(0, queue.size());
+    assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId));
+  }
+
+  private PriorityBlockingQueue<Path> getQueue() {
+    return logQueue.getQueue(fakeWalGroupId);
   }
 }