You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by we...@apache.org on 2020/09/11 17:58:32 UTC

[hbase] branch branch-2.2 updated: HBASE-25003 Backport HBASE-24350 and HBASE-24779 to branch-2.2 (#2372)

This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.2 by this push:
     new 7cea318  HBASE-25003 Backport HBASE-24350 and HBASE-24779 to branch-2.2 (#2372)
7cea318 is described below

commit 7cea31869ee836f051fa2298b053676babeb78dc
Author: tamasadami <42...@users.noreply.github.com>
AuthorDate: Fri Sep 11 19:58:12 2020 +0200

    HBASE-25003 Backport HBASE-24350 and HBASE-24779 to branch-2.2 (#2372)
    
    Includes the following
    * HBASE-24350: Extending and Fixing HBaseTable level replication metrics (#1704)
    * Amend HBASE-24350: Extending and Fixing HBaseTable level replication metrics (#1704)
    
    * Rename WALEntryBatch#getWaEntriesWithSize -> getWalEntriesWithSize
    
    * HBASE-24779 Report on the WAL edit buffer usage/limit for replication
    
    * HBASE-24779 Report on the WAL edit buffer usage/limit for replication - addendum
    
    Co-authored-by: Sandeep Pal <50...@users.noreply.github.com>
    Co-authored-by: Andrew Purtell <ap...@apache.org>
    Co-authored-by: Josh Elser <el...@apache.org>
---
 .../MetricsReplicationGlobalSourceSource.java      |  28 ++---
 .../MetricsReplicationSourceFactory.java           |   3 +-
 ...ory.java => MetricsReplicationTableSource.java} |  12 +-
 ... MetricsReplicationGlobalSourceSourceImpl.java} |  18 ++-
 .../MetricsReplicationSourceFactoryImpl.java       |   8 +-
 .../MetricsReplicationSourceSourceImpl.java        |   2 +-
 .../MetricsReplicationTableSourceImpl.java         | 134 +++++++++++++++++++++
 .../replication/regionserver/MetricsSource.java    |  55 +++++++--
 .../replication/regionserver/Replication.java      |   7 +-
 .../regionserver/ReplicationSource.java            |   6 +-
 .../regionserver/ReplicationSourceManager.java     |  25 +++-
 .../regionserver/ReplicationSourceShipper.java     |  10 +-
 .../regionserver/ReplicationSourceWALReader.java   |  17 +--
 .../replication/regionserver/WALEntryBatch.java    |  24 ++--
 .../hbase/replication/TestReplicationEndpoint.java |  97 ++++++++++++---
 .../regionserver/TestWALEntryStream.java           |   5 +
 16 files changed, 380 insertions(+), 71 deletions(-)

diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
similarity index 56%
copy from hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
copy to hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index af310f0..e373a6c 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -20,22 +20,20 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import org.apache.yetus.audience.InterfaceAudience;
 
 @InterfaceAudience.Private
-public class MetricsReplicationSourceFactoryImpl implements MetricsReplicationSourceFactory {
+public interface MetricsReplicationGlobalSourceSource extends MetricsReplicationSourceSource {
 
-  private static enum SourceHolder {
-    INSTANCE;
-    final MetricsReplicationSourceImpl source = new MetricsReplicationSourceImpl();
-  }
+  public static final String SOURCE_WAL_READER_EDITS_BUFFER = "source.walReaderEditsBufferUsage";
 
-  @Override public MetricsReplicationSinkSource getSink() {
-    return new MetricsReplicationSinkSourceImpl(SourceHolder.INSTANCE.source);
-  }
+  /**
+   * Sets the total usage of memory used by edits in memory read from WALs. The memory represented
+   * by this usage measure is across peers/sources. For example, we may batch the same WAL edits
+   * multiple times for the sake of replicating them to multiple peers..
+   * @param usage The memory used by edits in bytes
+   */
+  void setWALReaderEditsBufferBytes(long usage);
 
-  @Override public MetricsReplicationSourceSource getSource(String id) {
-    return new MetricsReplicationSourceSourceImpl(SourceHolder.INSTANCE.source, id);
-  }
-
-  @Override public MetricsReplicationSourceSource getGlobalSource() {
-    return new MetricsReplicationGlobalSourceSource(SourceHolder.INSTANCE.source);
-  }
+  /**
+   * Returns the size, in bytes, of edits held in memory to be replicated across all peers.
+   */
+  long getWALReaderEditsBufferBytes();
 }
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
index 6534b11..5e4ad27 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
@@ -24,5 +24,6 @@ import org.apache.yetus.audience.InterfaceAudience;
 public interface MetricsReplicationSourceFactory {
   public MetricsReplicationSinkSource getSink();
   public MetricsReplicationSourceSource getSource(String id);
-  public MetricsReplicationSourceSource getGlobalSource();
+  public MetricsReplicationTableSource getTableSource(String tableName);
+  public MetricsReplicationGlobalSourceSource getGlobalSource();
 }
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSource.java
similarity index 78%
copy from hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
copy to hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSource.java
index 6534b11..faa944a 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSource.java
@@ -18,11 +18,15 @@
 
 package org.apache.hadoop.hbase.replication.regionserver;
 
+import org.apache.hadoop.hbase.metrics.BaseSource;
 import org.apache.yetus.audience.InterfaceAudience;
 
 @InterfaceAudience.Private
-public interface MetricsReplicationSourceFactory {
-  public MetricsReplicationSinkSource getSink();
-  public MetricsReplicationSourceSource getSource(String id);
-  public MetricsReplicationSourceSource getGlobalSource();
+public interface MetricsReplicationTableSource extends BaseSource {
+
+  void setLastShippedAge(long age);
+  void incrShippedBytes(long size);
+  long getShippedBytes();
+  void clear();
+  long getLastShippedAge();
 }
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
similarity index 93%
rename from hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
rename to hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
index 4e8c810..963abba 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
@@ -24,7 +24,8 @@ import org.apache.hadoop.metrics2.lib.MutableHistogram;
 import org.apache.yetus.audience.InterfaceAudience;
 
 @InterfaceAudience.Private
-public class MetricsReplicationGlobalSourceSource implements MetricsReplicationSourceSource{
+public class MetricsReplicationGlobalSourceSourceImpl
+    implements MetricsReplicationGlobalSourceSource {
   private static final String KEY_PREFIX = "source.";
 
   private final MetricsReplicationSourceImpl rms;
@@ -53,8 +54,9 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
   private final MutableFastCounter completedWAL;
   private final MutableFastCounter completedRecoveryQueue;
   private final MutableFastCounter failedRecoveryQueue;
+  private final MutableGaugeLong walReaderBufferUsageBytes;
 
-  public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {
+  public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl rms) {
     this.rms = rms;
 
     ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(SOURCE_AGE_OF_LAST_SHIPPED_OP);
@@ -92,6 +94,8 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
             .getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L);
     failedRecoveryQueue = rms.getMetricsRegistry()
             .getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L);
+    walReaderBufferUsageBytes = rms.getMetricsRegistry()
+        .getGauge(SOURCE_WAL_READER_EDITS_BUFFER, 0L);
   }
 
   @Override public void setLastShippedAge(long age) {
@@ -260,4 +264,14 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
   public String getMetricsName() {
     return rms.getMetricsName();
   }
+
+  @Override
+  public void setWALReaderEditsBufferBytes(long usage) {
+    this.walReaderBufferUsageBytes.set(usage);
+  }
+
+  @Override
+  public long getWALReaderEditsBufferBytes() {
+    return this.walReaderBufferUsageBytes.value();
+  }
 }
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
index af310f0..c0cd1c7 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
@@ -35,7 +35,11 @@ public class MetricsReplicationSourceFactoryImpl implements MetricsReplicationSo
     return new MetricsReplicationSourceSourceImpl(SourceHolder.INSTANCE.source, id);
   }
 
-  @Override public MetricsReplicationSourceSource getGlobalSource() {
-    return new MetricsReplicationGlobalSourceSource(SourceHolder.INSTANCE.source);
+  @Override public MetricsReplicationTableSource getTableSource(String tableName) {
+    return new MetricsReplicationTableSourceImpl(SourceHolder.INSTANCE.source, tableName);
+  }
+
+  @Override public MetricsReplicationGlobalSourceSource getGlobalSource() {
+    return new MetricsReplicationGlobalSourceSourceImpl(SourceHolder.INSTANCE.source);
   }
 }
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index 0ad5052..79e9bc1 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -162,7 +162,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
 
   @Override public void incrShippedBytes(long size) {
     shippedBytesCounter.incr(size);
-    MetricsReplicationGlobalSourceSource
+    MetricsReplicationGlobalSourceSourceImpl
       .incrementKBsCounter(shippedBytesCounter, shippedKBsCounter);
   }
 
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSourceImpl.java
new file mode 100644
index 0000000..7120a73
--- /dev/null
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSourceImpl.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.metrics2.lib.MutableFastCounter;
+import org.apache.hadoop.metrics2.lib.MutableHistogram;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * This is the metric source for table level replication metrics.
+ * We can easy monitor some useful table level replication metrics such as
+ * ageOfLastShippedOp and shippedBytes
+ */
+@InterfaceAudience.Private
+public class MetricsReplicationTableSourceImpl implements MetricsReplicationTableSource {
+
+  private final MetricsReplicationSourceImpl rms;
+  private final String tableName;
+  private final String ageOfLastShippedOpKey;
+  private String keyPrefix;
+
+  private final String shippedBytesKey;
+
+  private final MutableHistogram ageOfLastShippedOpHist;
+  private final MutableFastCounter shippedBytesCounter;
+
+  public MetricsReplicationTableSourceImpl(MetricsReplicationSourceImpl rms, String tableName) {
+    this.rms = rms;
+    this.tableName = tableName;
+    this.keyPrefix = "source." + this.tableName + ".";
+
+    ageOfLastShippedOpKey = this.keyPrefix + "ageOfLastShippedOp";
+    ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(ageOfLastShippedOpKey);
+
+    shippedBytesKey = this.keyPrefix + "shippedBytes";
+    shippedBytesCounter = rms.getMetricsRegistry().getCounter(shippedBytesKey, 0L);
+  }
+
+  @Override
+  public void setLastShippedAge(long age) {
+    ageOfLastShippedOpHist.add(age);
+  }
+
+  @Override
+  public void incrShippedBytes(long size) {
+    shippedBytesCounter.incr(size);
+  }
+
+  @Override
+  public void clear() {
+    rms.removeMetric(ageOfLastShippedOpKey);
+    rms.removeMetric(shippedBytesKey);
+  }
+
+  @Override
+  public long getLastShippedAge() {
+    return ageOfLastShippedOpHist.getMax();
+  }
+
+  @Override
+  public long getShippedBytes() {
+    return shippedBytesCounter.value();
+  }
+
+  @Override
+  public void init() {
+    rms.init();
+  }
+
+  @Override
+  public void setGauge(String gaugeName, long value) {
+    rms.setGauge(this.keyPrefix + gaugeName, value);
+  }
+
+  @Override
+  public void incGauge(String gaugeName, long delta) {
+    rms.incGauge(this.keyPrefix + gaugeName, delta);
+  }
+
+  @Override
+  public void decGauge(String gaugeName, long delta) {
+    rms.decGauge(this.keyPrefix + gaugeName, delta);
+  }
+
+  @Override
+  public void removeMetric(String key) {
+    rms.removeMetric(this.keyPrefix + key);
+  }
+
+  @Override
+  public void incCounters(String counterName, long delta) {
+    rms.incCounters(this.keyPrefix + counterName, delta);
+  }
+
+  @Override
+  public void updateHistogram(String name, long value) {
+    rms.updateHistogram(this.keyPrefix + name, value);
+  }
+
+  @Override
+  public String getMetricsContext() {
+    return rms.getMetricsContext();
+  }
+
+  @Override
+  public String getMetricsDescription() {
+    return rms.getMetricsDescription();
+  }
+
+  @Override
+  public String getMetricsJmxContext() {
+    return rms.getMetricsJmxContext();
+  }
+
+  @Override
+  public String getMetricsName() {
+    return rms.getMetricsName();
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 830ebe1..49ef392 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -19,8 +19,11 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,8 +50,8 @@ public class MetricsSource implements BaseSource {
   private String id;
 
   private final MetricsReplicationSourceSource singleSourceSource;
-  private final MetricsReplicationSourceSource globalSourceSource;
-  private Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable;
+  private final MetricsReplicationGlobalSourceSource globalSourceSource;
+  private Map<String, MetricsReplicationTableSource> singleSourceSourceByTable;
 
   /**
    * Constructor used to register the metrics
@@ -71,8 +74,8 @@ public class MetricsSource implements BaseSource {
    * @param globalSourceSource Class to monitor global-scoped metrics
    */
   public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
-                       MetricsReplicationSourceSource globalSourceSource,
-                       Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable) {
+                       MetricsReplicationGlobalSourceSource globalSourceSource,
+                       Map<String, MetricsReplicationTableSource> singleSourceSourceByTable) {
     this.id = id;
     this.singleSourceSource = singleSourceSource;
     this.globalSourceSource = globalSourceSource;
@@ -93,6 +96,29 @@ public class MetricsSource implements BaseSource {
   }
 
   /**
+   * Update the table level replication metrics per table
+   *
+   * @param walEntries List of pairs of WAL entry and it's size
+   */
+  public void updateTableLevelMetrics(List<Pair<Entry, Long>> walEntries) {
+    for (Pair<Entry, Long> walEntryWithSize : walEntries) {
+      Entry entry = walEntryWithSize.getFirst();
+      long entrySize = walEntryWithSize.getSecond();
+      String tableName = entry.getKey().getTableName().getNameAsString();
+      long writeTime = entry.getKey().getWriteTime();
+      long age = EnvironmentEdgeManager.currentTime() - writeTime;
+
+      // get the replication metrics source for table at the run time
+      MetricsReplicationTableSource tableSource = this.getSingleSourceSourceByTable()
+        .computeIfAbsent(tableName,
+          t -> CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
+            .getTableSource(t));
+      tableSource.setLastShippedAge(age);
+      tableSource.incrShippedBytes(entrySize);
+    }
+  }
+
+  /**
    * Set the age of the last edit that was shipped group by table
    * @param timestamp write time of the edit
    * @param tableName String as group and tableName
@@ -101,7 +127,7 @@ public class MetricsSource implements BaseSource {
     long age = EnvironmentEdgeManager.currentTime() - timestamp;
     this.getSingleSourceSourceByTable().computeIfAbsent(
         tableName, t -> CompatibilitySingletonFactory
-            .getInstance(MetricsReplicationSourceFactory.class).getSource(t))
+            .getInstance(MetricsReplicationSourceFactory.class).getTableSource(t))
             .setLastShippedAge(age);
   }
 
@@ -119,7 +145,7 @@ public class MetricsSource implements BaseSource {
    * @param walGroup which group we are getting
    * @return age
    */
-  public long getAgeofLastShippedOp(String walGroup) {
+  public long getAgeOfLastShippedOp(String walGroup) {
     return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup);
   }
 
@@ -393,7 +419,22 @@ public class MetricsSource implements BaseSource {
   }
 
   @VisibleForTesting
-  public Map<String, MetricsReplicationSourceSource> getSingleSourceSourceByTable() {
+  public Map<String, MetricsReplicationTableSource> getSingleSourceSourceByTable() {
     return singleSourceSourceByTable;
   }
+
+  /**
+   * Sets the amount of memory in bytes used in this RegionServer by edits pending replication.
+   */
+  public void setWALReaderEditsBufferUsage(long usageInBytes) {
+    globalSourceSource.setWALReaderEditsBufferBytes(usageInBytes);
+  }
+
+  /**
+   * Returns the amount of memory in bytes used in this RegionServer by edits pending replication.
+   * @return the amount of memory in bytes used in this RegionServer by edits pending replication.
+   */
+  public long getWALReaderEditsBufferUsage() {
+    return globalSourceSource.getWALReaderEditsBufferBytes();
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 752cfb8..bec9042 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableName;
@@ -72,6 +73,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
   private int statsThreadPeriod;
   // ReplicationLoad to access replication metrics
   private ReplicationLoad replicationLoad;
+  private MetricsReplicationGlobalSourceSource globalMetricsSource;
 
   private PeerProcedureHandler peerProcedureHandler;
 
@@ -119,9 +121,12 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
     } catch (KeeperException ke) {
       throw new IOException("Could not read cluster id", ke);
     }
+    this.globalMetricsSource = CompatibilitySingletonFactory
+        .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
     this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, replicationTracker, conf,
       this.server, fs, logDir, oldLogDir, clusterId,
-        walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty());
+        walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(),
+        globalMetricsSource);
     if (walProvider != null) {
       walProvider
         .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 4726949..c5f72c9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -329,7 +329,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
       String walGroupId = walGroupShipper.getKey();
       ReplicationSourceShipper shipper = walGroupShipper.getValue();
       lastTimeStamp = metrics.getLastTimeStampOfWalGroup(walGroupId);
-      ageOfLastShippedOp = metrics.getAgeofLastShippedOp(walGroupId);
+      ageOfLastShippedOp = metrics.getAgeOfLastShippedOp(walGroupId);
       int queueSize = queues.get(walGroupId).size();
       replicationDelay =
           ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize);
@@ -706,7 +706,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
       throttler.addPushSize(batchSize);
     }
     totalReplicatedEdits.addAndGet(entries.size());
-    totalBufferUsed.addAndGet(-batchSize);
+    long newBufferUsed = totalBufferUsed.addAndGet(-batchSize);
+    // Record the new buffer usage
+    this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
   }
 
   @Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index a6fc313..2806110 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -155,6 +155,9 @@ public class ReplicationSourceManager implements ReplicationListener {
 
 
   private AtomicLong totalBufferUsed = new AtomicLong();
+  // Total buffer size on this RegionServer for holding batched edits to be shipped.
+  private final long totalBufferLimit;
+  private final MetricsReplicationGlobalSourceSource globalMetrics;
 
   /**
    * Creates a replication manager and sets the watch on all the other registered region servers
@@ -171,7 +174,8 @@ public class ReplicationSourceManager implements ReplicationListener {
   public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
       ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf,
       Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId,
-      WALFileLengthProvider walFileLengthProvider) throws IOException {
+      WALFileLengthProvider walFileLengthProvider,
+      MetricsReplicationGlobalSourceSource globalMetrics) throws IOException {
     // CopyOnWriteArrayList is thread-safe.
     // Generally, reading is more than modifying.
     this.sources = new ConcurrentHashMap<>();
@@ -205,6 +209,9 @@ public class ReplicationSourceManager implements ReplicationListener {
     this.latestPaths = new HashSet<Path>();
     replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
       HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
+    this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
+        HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
+    this.globalMetrics = globalMetrics;
   }
 
   /**
@@ -862,6 +869,14 @@ public class ReplicationSourceManager implements ReplicationListener {
   }
 
   /**
+   * Returns the maximum size in bytes of edits held in memory which are pending replication
+   * across all sources inside this RegionServer.
+   */
+  public long getTotalBufferLimit() {
+    return totalBufferLimit;
+  }
+
+  /**
    * Get the directory where wals are archived
    * @return the directory where wals are archived
    */
@@ -898,6 +913,10 @@ public class ReplicationSourceManager implements ReplicationListener {
    */
   public String getStats() {
     StringBuilder stats = new StringBuilder();
+    // Print stats that apply across all Replication Sources
+    stats.append("Global stats: ");
+    stats.append("WAL Edits Buffer Used=").append(getTotalBufferUsed().get()).append("B, Limit=")
+        .append(getTotalBufferLimit()).append("B\n");
     for (ReplicationSourceInterface source : this.sources.values()) {
       stats.append("Normal source for cluster " + source.getPeerId() + ": ");
       stats.append(source.getStats() + "\n");
@@ -923,4 +942,8 @@ public class ReplicationSourceManager implements ReplicationListener {
   int activeFailoverTaskCount() {
     return executor.getActiveCount();
   }
+
+  MetricsReplicationGlobalSourceSource getGlobalMetrics() {
+    return this.globalMetrics;
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 23f736f..78e5521 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Threads;
@@ -201,17 +200,14 @@ public class ReplicationSourceShipper extends Thread {
         // Clean up hfile references
         for (Entry entry : entries) {
           cleanUpHFileRefs(entry.getEdit());
-
-          TableName tableName = entry.getKey().getTableName();
-          source.getSourceMetrics().setAgeOfLastShippedOpByTable(entry.getKey().getWriteTime(),
-              tableName.getNameAsString());
+          LOG.trace("shipped entry {}: ", entry);
         }
         // Log and clean up WAL logs
         updateLogPosition(entryBatch);
 
         //offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size)
         //this sizeExcludeBulkLoad has to use same calculation that when calling
-        //acquireBufferQuota() in ReplicatinoSourceWALReader because they maintain
+        //acquireBufferQuota() in ReplicationSourceWALReader because they maintain
         //same variable: totalBufferUsed
         source.postShipEdits(entries, sizeExcludeBulkLoad);
         // FIXME check relationship between wal group and overall
@@ -219,6 +215,8 @@ public class ReplicationSourceShipper extends Thread {
           entryBatch.getNbHFiles());
         source.getSourceMetrics().setAgeOfLastShippedOp(
           entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
+        source.getSourceMetrics().updateTableLevelMetrics(entryBatch.getWalEntriesWithSize());
+
         if (LOG.isTraceEnabled()) {
           LOG.trace("Replicated {} entries or {} operations in {} ms",
               entries.size(), entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 12653f5..328cb8f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
@@ -104,8 +103,7 @@ class ReplicationSourceWALReader extends Thread {
     // the +1 is for the current thread reading before placing onto the queue
     int batchCount = conf.getInt("replication.source.nb.batches", 1);
     this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed();
-    this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
-      HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
+    this.totalBufferQuota = source.getSourceManager().getTotalBufferLimit();
     this.sleepForRetries =
         this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
     this.maxRetriesMultiplier =
@@ -172,7 +170,7 @@ class ReplicationSourceWALReader extends Thread {
     }
     long entrySize = getEntrySizeIncludeBulkLoad(entry);
     long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
-    batch.addEntry(entry);
+    batch.addEntry(entry, entrySize);
     updateBatchStats(batch, entry, entrySize);
     boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
 
@@ -272,6 +270,8 @@ class ReplicationSourceWALReader extends Thread {
   private boolean checkQuota() {
     // try not to go over total quota
     if (totalBufferUsed.get() > totalBufferQuota) {
+      LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
+          this.source.getPeerId(), totalBufferUsed.get(), totalBufferQuota);
       Threads.sleep(sleepForRetries);
       return false;
     }
@@ -306,9 +306,7 @@ class ReplicationSourceWALReader extends Thread {
 
   private long getEntrySizeIncludeBulkLoad(Entry entry) {
     WALEdit edit = entry.getEdit();
-    WALKey key = entry.getKey();
-    return edit.heapSize() + sizeOfStoreFilesIncludeBulkLoad(edit) +
-        key.estimatedSerializedSizeOf();
+    return  getEntrySizeExcludeBulkLoad(entry) + sizeOfStoreFilesIncludeBulkLoad(edit);
   }
 
   public static long getEntrySizeExcludeBulkLoad(Entry entry) {
@@ -401,7 +399,10 @@ class ReplicationSourceWALReader extends Thread {
    * @return true if we should clear buffer and push all
    */
   private boolean acquireBufferQuota(long size) {
-    return totalBufferUsed.addAndGet(size) >= totalBufferQuota;
+    long newBufferUsed = totalBufferUsed.addAndGet(size);
+    // Record the new buffer usage
+    this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
+    return newBufferUsed >= totalBufferQuota;
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
index 22b2de7..4f96c96 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
@@ -21,7 +21,9 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -34,7 +36,8 @@ class WALEntryBatch {
   // used by recovered replication queue to indicate that all the entries have been read.
   public static final WALEntryBatch NO_MORE_DATA = new WALEntryBatch(0, null);
 
-  private List<Entry> walEntries;
+  private List<Pair<Entry, Long>> walEntriesWithSize;
+
   // last WAL that was read
   private Path lastWalPath;
   // position in WAL of last entry in this batch
@@ -54,7 +57,7 @@ class WALEntryBatch {
    * @param lastWalPath Path of the WAL the last entry in this batch was read from
    */
   WALEntryBatch(int maxNbEntries, Path lastWalPath) {
-    this.walEntries = new ArrayList<>(maxNbEntries);
+    this.walEntriesWithSize = new ArrayList<>(maxNbEntries);
     this.lastWalPath = lastWalPath;
   }
 
@@ -66,15 +69,22 @@ class WALEntryBatch {
     return batch;
   }
 
-  public void addEntry(Entry entry) {
-    walEntries.add(entry);
+  public void addEntry(Entry entry, long entrySize) {
+    walEntriesWithSize.add(new Pair<>(entry, entrySize));
   }
 
   /**
    * @return the WAL Entries.
    */
   public List<Entry> getWalEntries() {
-    return walEntries;
+    return walEntriesWithSize.stream().map(Pair::getFirst).collect(Collectors.toList());
+  }
+
+  /**
+   * @return the WAL Entries.
+   */
+  public List<Pair<Entry, Long>> getWalEntriesWithSize() {
+    return walEntriesWithSize;
   }
 
   /**
@@ -96,7 +106,7 @@ class WALEntryBatch {
   }
 
   public int getNbEntries() {
-    return walEntries.size();
+    return walEntriesWithSize.size();
   }
 
   /**
@@ -160,7 +170,7 @@ class WALEntryBatch {
 
   @Override
   public String toString() {
-    return "WALEntryBatch [walEntries=" + walEntries + ", lastWalPath=" + lastWalPath +
+    return "WALEntryBatch [walEntries=" + walEntriesWithSize + ", lastWalPath=" + lastWalPath +
       ", lastWalPosition=" + lastWalPosition + ", nbRowKeys=" + nbRowKeys + ", nbHFiles=" +
       nbHFiles + ", heapSize=" + heapSize + ", lastSeqIds=" + lastSeqIds + ", endOfFile=" +
       endOfFile + "]";
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index 5d7366d..cc7e20c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -35,6 +36,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -44,16 +47,21 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSourceImpl;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationTableSource;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
 import org.junit.AfterClass;
@@ -304,11 +312,12 @@ public class TestReplicationEndpoint extends TestReplicationBase {
   }
 
   @Test
-  public void testMetricsSourceBaseSourcePassthrough() {
+  public void testMetricsSourceBaseSourcePassThrough() {
     /*
-     * The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl and a
-     * MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces. Both of
-     * those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which allows
+     * The replication MetricsSource wraps a MetricsReplicationTableSourceImpl,
+     * MetricsReplicationSourceSourceImpl and a MetricsReplicationGlobalSourceSource,
+     * so that metrics get written to both namespaces. Both of those classes wrap a
+     * MetricsReplicationSourceImpl that implements BaseSource, which allows
      * for custom JMX metrics. This test checks to make sure the BaseSource decorator logic on
      * MetricsSource actually calls down through the two layers of wrapping to the actual
      * BaseSource.
@@ -322,14 +331,15 @@ public class TestReplicationEndpoint extends TestReplicationBase {
 
     MetricsReplicationSourceSource singleSourceSource =
       new MetricsReplicationSourceSourceImpl(singleRms, id);
-    MetricsReplicationSourceSource globalSourceSource =
-      new MetricsReplicationGlobalSourceSource(globalRms);
-    MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource);
+    MetricsReplicationGlobalSourceSource globalSourceSource =
+      new MetricsReplicationGlobalSourceSourceImpl(globalRms);
+    MetricsReplicationGlobalSourceSource spyglobalSourceSource = spy(globalSourceSource);
     doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue();
 
-    Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable = new HashMap<>();
-    MetricsSource source =
-      new MetricsSource(id, singleSourceSource, spyglobalSourceSource, singleSourceSourceByTable);
+    Map<String, MetricsReplicationTableSource> singleSourceSourceByTable =
+      new HashMap<>();
+    MetricsSource source = new MetricsSource(id, singleSourceSource,
+      spyglobalSourceSource, singleSourceSourceByTable);
 
 
     String gaugeName = "gauge";
@@ -378,16 +388,37 @@ public class TestReplicationEndpoint extends TestReplicationBase {
     boolean containsRandomNewTable = source.getSingleSourceSourceByTable()
         .containsKey("RandomNewTable");
     Assert.assertEquals(false, containsRandomNewTable);
-    source.setAgeOfLastShippedOpByTable(123L, "RandomNewTable");
+    source.updateTableLevelMetrics(createWALEntriesWithSize("RandomNewTable"));
     containsRandomNewTable = source.getSingleSourceSourceByTable()
         .containsKey("RandomNewTable");
     Assert.assertEquals(true, containsRandomNewTable);
-    MetricsReplicationSourceSource msr = source.getSingleSourceSourceByTable()
+    MetricsReplicationTableSource msr = source.getSingleSourceSourceByTable()
         .get("RandomNewTable");
-    // cannot put more concreate value here to verify because the age is arbitrary.
-    // as long as it's greater than 0, we see it as correct answer.
+
+    // age should be greater than zero we created the entry with time in the past
     Assert.assertTrue(msr.getLastShippedAge() > 0);
+    Assert.assertTrue(msr.getShippedBytes() > 0);
+
+  }
+
+  private List<Pair<Entry, Long>> createWALEntriesWithSize(String tableName) {
+    List<Pair<Entry, Long>> walEntriesWithSize = new ArrayList<>();
+    byte[] a = new byte[] { 'a' };
+    Entry entry = createEntry(tableName, null, a);
+    walEntriesWithSize.add(new Pair<>(entry, 10L));
+    return walEntriesWithSize;
+  }
 
+  private Entry createEntry(String tableName, TreeMap<byte[], Integer> scopes, byte[]... kvs) {
+    WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf(tableName),
+        System.currentTimeMillis() - 1L,
+        scopes);
+    WALEdit edit1 = new WALEdit();
+
+    for (byte[] kv : kvs) {
+      edit1.add(new KeyValue(kv, kv, kv));
+    }
+    return new Entry(key1, edit1);
   }
 
   private void doPut(byte[] row) throws IOException {
@@ -463,6 +494,44 @@ public class TestReplicationEndpoint extends TestReplicationBase {
     }
   }
 
+  /**
+   * Not used by unit tests, helpful for manual testing with replication.
+   * <p>
+   * Snippet for `hbase shell`:
+   * <pre>
+   * create 't', 'f'
+   * add_peer '1', ENDPOINT_CLASSNAME =&gt; 'org.apache.hadoop.hbase.replication.' + \
+   *    'TestReplicationEndpoint$SleepingReplicationEndpointForTest'
+   * alter 't', {NAME=&gt;'f', REPLICATION_SCOPE=&gt;1}
+   * </pre>
+   */
+  public static class SleepingReplicationEndpointForTest extends ReplicationEndpointForTest {
+    private long duration;
+    public SleepingReplicationEndpointForTest() {
+      super();
+    }
+
+    @Override
+    public void init(Context context) throws IOException {
+      super.init(context);
+      if (this.ctx != null) {
+        duration = this.ctx.getConfiguration().getLong(
+            "hbase.test.sleep.replication.endpoint.duration.millis", 5000L);
+      }
+    }
+
+    @Override
+    public boolean replicate(ReplicateContext context) {
+      try {
+        Thread.sleep(duration);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        return false;
+      }
+      return super.replicate(context);
+    }
+  }
+
   public static class InterClusterReplicationEndpointForTest
       extends HBaseInterClusterReplicationEndpoint {
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index b4af38b..067ab63 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -336,6 +336,8 @@ public class TestWALEntryStream {
   private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) {
     ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
     when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+    when(mockSourceManager.getTotalBufferLimit()).thenReturn(
+        (long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
     Server mockServer = Mockito.mock(Server.class);
     ReplicationSource source = Mockito.mock(ReplicationSource.class);
     when(source.getSourceManager()).thenReturn(mockSourceManager);
@@ -343,6 +345,9 @@ public class TestWALEntryStream {
     when(source.getWALFileLengthProvider()).thenReturn(log);
     when(source.getServer()).thenReturn(mockServer);
     when(source.isRecovered()).thenReturn(recovered);
+    MetricsReplicationGlobalSourceSource globalMetrics = Mockito.mock(
+        MetricsReplicationGlobalSourceSource.class);
+    when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
     return source;
   }