You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by we...@apache.org on 2020/09/11 18:00:03 UTC

[hbase] 01/01: Revert "HBASE-25003 Backport HBASE-24350 and HBASE-24779 to branch-2.2 (#2372)"

This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch revert-2372-HBASE-25003_branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit acf068c28d22c1fc56df594492be04f1580de682
Author: Wei-Chiu Chuang <jo...@gmail.com>
AuthorDate: Fri Sep 11 10:59:41 2020 -0700

    Revert "HBASE-25003 Backport HBASE-24350 and HBASE-24779 to branch-2.2 (#2372)"
    
    This reverts commit 7cea31869ee836f051fa2298b053676babeb78dc.
---
 .../MetricsReplicationGlobalSourceSource.java      |  39 ------
 .../MetricsReplicationSourceFactory.java           |   3 +-
 .../MetricsReplicationTableSource.java             |  32 -----
 ...a => MetricsReplicationGlobalSourceSource.java} |  18 +--
 .../MetricsReplicationSourceFactoryImpl.java       |   8 +-
 .../MetricsReplicationSourceSourceImpl.java        |   2 +-
 .../MetricsReplicationTableSourceImpl.java         | 134 ---------------------
 .../replication/regionserver/MetricsSource.java    |  55 ++-------
 .../replication/regionserver/Replication.java      |   7 +-
 .../regionserver/ReplicationSource.java            |   6 +-
 .../regionserver/ReplicationSourceManager.java     |  25 +---
 .../regionserver/ReplicationSourceShipper.java     |  10 +-
 .../regionserver/ReplicationSourceWALReader.java   |  17 ++-
 .../replication/regionserver/WALEntryBatch.java    |  24 ++--
 .../hbase/replication/TestReplicationEndpoint.java |  97 +++------------
 .../regionserver/TestWALEntryStream.java           |   5 -
 16 files changed, 52 insertions(+), 430 deletions(-)

diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
deleted file mode 100644
index e373a6c..0000000
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import org.apache.yetus.audience.InterfaceAudience;
-
-@InterfaceAudience.Private
-public interface MetricsReplicationGlobalSourceSource extends MetricsReplicationSourceSource {
-
-  public static final String SOURCE_WAL_READER_EDITS_BUFFER = "source.walReaderEditsBufferUsage";
-
-  /**
-   * Sets the total usage of memory used by edits in memory read from WALs. The memory represented
-   * by this usage measure is across peers/sources. For example, we may batch the same WAL edits
-   * multiple times for the sake of replicating them to multiple peers..
-   * @param usage The memory used by edits in bytes
-   */
-  void setWALReaderEditsBufferBytes(long usage);
-
-  /**
-   * Returns the size, in bytes, of edits held in memory to be replicated across all peers.
-   */
-  long getWALReaderEditsBufferBytes();
-}
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
index 5e4ad27..6534b11 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
@@ -24,6 +24,5 @@ import org.apache.yetus.audience.InterfaceAudience;
 public interface MetricsReplicationSourceFactory {
   public MetricsReplicationSinkSource getSink();
   public MetricsReplicationSourceSource getSource(String id);
-  public MetricsReplicationTableSource getTableSource(String tableName);
-  public MetricsReplicationGlobalSourceSource getGlobalSource();
+  public MetricsReplicationSourceSource getGlobalSource();
 }
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSource.java
deleted file mode 100644
index faa944a..0000000
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSource.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import org.apache.hadoop.hbase.metrics.BaseSource;
-import org.apache.yetus.audience.InterfaceAudience;
-
-@InterfaceAudience.Private
-public interface MetricsReplicationTableSource extends BaseSource {
-
-  void setLastShippedAge(long age);
-  void incrShippedBytes(long size);
-  long getShippedBytes();
-  void clear();
-  long getLastShippedAge();
-}
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
similarity index 93%
rename from hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
rename to hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index 963abba..4e8c810 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -24,8 +24,7 @@ import org.apache.hadoop.metrics2.lib.MutableHistogram;
 import org.apache.yetus.audience.InterfaceAudience;
 
 @InterfaceAudience.Private
-public class MetricsReplicationGlobalSourceSourceImpl
-    implements MetricsReplicationGlobalSourceSource {
+public class MetricsReplicationGlobalSourceSource implements MetricsReplicationSourceSource{
   private static final String KEY_PREFIX = "source.";
 
   private final MetricsReplicationSourceImpl rms;
@@ -54,9 +53,8 @@ public class MetricsReplicationGlobalSourceSourceImpl
   private final MutableFastCounter completedWAL;
   private final MutableFastCounter completedRecoveryQueue;
   private final MutableFastCounter failedRecoveryQueue;
-  private final MutableGaugeLong walReaderBufferUsageBytes;
 
-  public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl rms) {
+  public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {
     this.rms = rms;
 
     ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(SOURCE_AGE_OF_LAST_SHIPPED_OP);
@@ -94,8 +92,6 @@ public class MetricsReplicationGlobalSourceSourceImpl
             .getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L);
     failedRecoveryQueue = rms.getMetricsRegistry()
             .getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L);
-    walReaderBufferUsageBytes = rms.getMetricsRegistry()
-        .getGauge(SOURCE_WAL_READER_EDITS_BUFFER, 0L);
   }
 
   @Override public void setLastShippedAge(long age) {
@@ -264,14 +260,4 @@ public class MetricsReplicationGlobalSourceSourceImpl
   public String getMetricsName() {
     return rms.getMetricsName();
   }
-
-  @Override
-  public void setWALReaderEditsBufferBytes(long usage) {
-    this.walReaderBufferUsageBytes.set(usage);
-  }
-
-  @Override
-  public long getWALReaderEditsBufferBytes() {
-    return this.walReaderBufferUsageBytes.value();
-  }
 }
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
index c0cd1c7..af310f0 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
@@ -35,11 +35,7 @@ public class MetricsReplicationSourceFactoryImpl implements MetricsReplicationSo
     return new MetricsReplicationSourceSourceImpl(SourceHolder.INSTANCE.source, id);
   }
 
-  @Override public MetricsReplicationTableSource getTableSource(String tableName) {
-    return new MetricsReplicationTableSourceImpl(SourceHolder.INSTANCE.source, tableName);
-  }
-
-  @Override public MetricsReplicationGlobalSourceSource getGlobalSource() {
-    return new MetricsReplicationGlobalSourceSourceImpl(SourceHolder.INSTANCE.source);
+  @Override public MetricsReplicationSourceSource getGlobalSource() {
+    return new MetricsReplicationGlobalSourceSource(SourceHolder.INSTANCE.source);
   }
 }
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index 79e9bc1..0ad5052 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -162,7 +162,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
 
   @Override public void incrShippedBytes(long size) {
     shippedBytesCounter.incr(size);
-    MetricsReplicationGlobalSourceSourceImpl
+    MetricsReplicationGlobalSourceSource
       .incrementKBsCounter(shippedBytesCounter, shippedKBsCounter);
   }
 
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSourceImpl.java
deleted file mode 100644
index 7120a73..0000000
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSourceImpl.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import org.apache.hadoop.metrics2.lib.MutableFastCounter;
-import org.apache.hadoop.metrics2.lib.MutableHistogram;
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * This is the metric source for table level replication metrics.
- * We can easy monitor some useful table level replication metrics such as
- * ageOfLastShippedOp and shippedBytes
- */
-@InterfaceAudience.Private
-public class MetricsReplicationTableSourceImpl implements MetricsReplicationTableSource {
-
-  private final MetricsReplicationSourceImpl rms;
-  private final String tableName;
-  private final String ageOfLastShippedOpKey;
-  private String keyPrefix;
-
-  private final String shippedBytesKey;
-
-  private final MutableHistogram ageOfLastShippedOpHist;
-  private final MutableFastCounter shippedBytesCounter;
-
-  public MetricsReplicationTableSourceImpl(MetricsReplicationSourceImpl rms, String tableName) {
-    this.rms = rms;
-    this.tableName = tableName;
-    this.keyPrefix = "source." + this.tableName + ".";
-
-    ageOfLastShippedOpKey = this.keyPrefix + "ageOfLastShippedOp";
-    ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(ageOfLastShippedOpKey);
-
-    shippedBytesKey = this.keyPrefix + "shippedBytes";
-    shippedBytesCounter = rms.getMetricsRegistry().getCounter(shippedBytesKey, 0L);
-  }
-
-  @Override
-  public void setLastShippedAge(long age) {
-    ageOfLastShippedOpHist.add(age);
-  }
-
-  @Override
-  public void incrShippedBytes(long size) {
-    shippedBytesCounter.incr(size);
-  }
-
-  @Override
-  public void clear() {
-    rms.removeMetric(ageOfLastShippedOpKey);
-    rms.removeMetric(shippedBytesKey);
-  }
-
-  @Override
-  public long getLastShippedAge() {
-    return ageOfLastShippedOpHist.getMax();
-  }
-
-  @Override
-  public long getShippedBytes() {
-    return shippedBytesCounter.value();
-  }
-
-  @Override
-  public void init() {
-    rms.init();
-  }
-
-  @Override
-  public void setGauge(String gaugeName, long value) {
-    rms.setGauge(this.keyPrefix + gaugeName, value);
-  }
-
-  @Override
-  public void incGauge(String gaugeName, long delta) {
-    rms.incGauge(this.keyPrefix + gaugeName, delta);
-  }
-
-  @Override
-  public void decGauge(String gaugeName, long delta) {
-    rms.decGauge(this.keyPrefix + gaugeName, delta);
-  }
-
-  @Override
-  public void removeMetric(String key) {
-    rms.removeMetric(this.keyPrefix + key);
-  }
-
-  @Override
-  public void incCounters(String counterName, long delta) {
-    rms.incCounters(this.keyPrefix + counterName, delta);
-  }
-
-  @Override
-  public void updateHistogram(String name, long value) {
-    rms.updateHistogram(this.keyPrefix + name, value);
-  }
-
-  @Override
-  public String getMetricsContext() {
-    return rms.getMetricsContext();
-  }
-
-  @Override
-  public String getMetricsDescription() {
-    return rms.getMetricsDescription();
-  }
-
-  @Override
-  public String getMetricsJmxContext() {
-    return rms.getMetricsJmxContext();
-  }
-
-  @Override
-  public String getMetricsName() {
-    return rms.getMetricsName();
-  }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 49ef392..830ebe1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -19,11 +19,8 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,8 +47,8 @@ public class MetricsSource implements BaseSource {
   private String id;
 
   private final MetricsReplicationSourceSource singleSourceSource;
-  private final MetricsReplicationGlobalSourceSource globalSourceSource;
-  private Map<String, MetricsReplicationTableSource> singleSourceSourceByTable;
+  private final MetricsReplicationSourceSource globalSourceSource;
+  private Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable;
 
   /**
    * Constructor used to register the metrics
@@ -74,8 +71,8 @@ public class MetricsSource implements BaseSource {
    * @param globalSourceSource Class to monitor global-scoped metrics
    */
   public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
-                       MetricsReplicationGlobalSourceSource globalSourceSource,
-                       Map<String, MetricsReplicationTableSource> singleSourceSourceByTable) {
+                       MetricsReplicationSourceSource globalSourceSource,
+                       Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable) {
     this.id = id;
     this.singleSourceSource = singleSourceSource;
     this.globalSourceSource = globalSourceSource;
@@ -96,29 +93,6 @@ public class MetricsSource implements BaseSource {
   }
 
   /**
-   * Update the table level replication metrics per table
-   *
-   * @param walEntries List of pairs of WAL entry and it's size
-   */
-  public void updateTableLevelMetrics(List<Pair<Entry, Long>> walEntries) {
-    for (Pair<Entry, Long> walEntryWithSize : walEntries) {
-      Entry entry = walEntryWithSize.getFirst();
-      long entrySize = walEntryWithSize.getSecond();
-      String tableName = entry.getKey().getTableName().getNameAsString();
-      long writeTime = entry.getKey().getWriteTime();
-      long age = EnvironmentEdgeManager.currentTime() - writeTime;
-
-      // get the replication metrics source for table at the run time
-      MetricsReplicationTableSource tableSource = this.getSingleSourceSourceByTable()
-        .computeIfAbsent(tableName,
-          t -> CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
-            .getTableSource(t));
-      tableSource.setLastShippedAge(age);
-      tableSource.incrShippedBytes(entrySize);
-    }
-  }
-
-  /**
    * Set the age of the last edit that was shipped group by table
    * @param timestamp write time of the edit
    * @param tableName String as group and tableName
@@ -127,7 +101,7 @@ public class MetricsSource implements BaseSource {
     long age = EnvironmentEdgeManager.currentTime() - timestamp;
     this.getSingleSourceSourceByTable().computeIfAbsent(
         tableName, t -> CompatibilitySingletonFactory
-            .getInstance(MetricsReplicationSourceFactory.class).getTableSource(t))
+            .getInstance(MetricsReplicationSourceFactory.class).getSource(t))
             .setLastShippedAge(age);
   }
 
@@ -145,7 +119,7 @@ public class MetricsSource implements BaseSource {
    * @param walGroup which group we are getting
    * @return age
    */
-  public long getAgeOfLastShippedOp(String walGroup) {
+  public long getAgeofLastShippedOp(String walGroup) {
     return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup);
   }
 
@@ -419,22 +393,7 @@ public class MetricsSource implements BaseSource {
   }
 
   @VisibleForTesting
-  public Map<String, MetricsReplicationTableSource> getSingleSourceSourceByTable() {
+  public Map<String, MetricsReplicationSourceSource> getSingleSourceSourceByTable() {
     return singleSourceSourceByTable;
   }
-
-  /**
-   * Sets the amount of memory in bytes used in this RegionServer by edits pending replication.
-   */
-  public void setWALReaderEditsBufferUsage(long usageInBytes) {
-    globalSourceSource.setWALReaderEditsBufferBytes(usageInBytes);
-  }
-
-  /**
-   * Returns the amount of memory in bytes used in this RegionServer by edits pending replication.
-   * @return the amount of memory in bytes used in this RegionServer by edits pending replication.
-   */
-  public long getWALReaderEditsBufferUsage() {
-    return globalSourceSource.getWALReaderEditsBufferBytes();
-  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index bec9042..752cfb8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableName;
@@ -73,7 +72,6 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
   private int statsThreadPeriod;
   // ReplicationLoad to access replication metrics
   private ReplicationLoad replicationLoad;
-  private MetricsReplicationGlobalSourceSource globalMetricsSource;
 
   private PeerProcedureHandler peerProcedureHandler;
 
@@ -121,12 +119,9 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
     } catch (KeeperException ke) {
       throw new IOException("Could not read cluster id", ke);
     }
-    this.globalMetricsSource = CompatibilitySingletonFactory
-        .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
     this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, replicationTracker, conf,
       this.server, fs, logDir, oldLogDir, clusterId,
-        walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(),
-        globalMetricsSource);
+        walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty());
     if (walProvider != null) {
       walProvider
         .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index c5f72c9..4726949 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -329,7 +329,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
       String walGroupId = walGroupShipper.getKey();
       ReplicationSourceShipper shipper = walGroupShipper.getValue();
       lastTimeStamp = metrics.getLastTimeStampOfWalGroup(walGroupId);
-      ageOfLastShippedOp = metrics.getAgeOfLastShippedOp(walGroupId);
+      ageOfLastShippedOp = metrics.getAgeofLastShippedOp(walGroupId);
       int queueSize = queues.get(walGroupId).size();
       replicationDelay =
           ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize);
@@ -706,9 +706,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
       throttler.addPushSize(batchSize);
     }
     totalReplicatedEdits.addAndGet(entries.size());
-    long newBufferUsed = totalBufferUsed.addAndGet(-batchSize);
-    // Record the new buffer usage
-    this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
+    totalBufferUsed.addAndGet(-batchSize);
   }
 
   @Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 2806110..a6fc313 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -155,9 +155,6 @@ public class ReplicationSourceManager implements ReplicationListener {
 
 
   private AtomicLong totalBufferUsed = new AtomicLong();
-  // Total buffer size on this RegionServer for holding batched edits to be shipped.
-  private final long totalBufferLimit;
-  private final MetricsReplicationGlobalSourceSource globalMetrics;
 
   /**
    * Creates a replication manager and sets the watch on all the other registered region servers
@@ -174,8 +171,7 @@ public class ReplicationSourceManager implements ReplicationListener {
   public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
       ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf,
       Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId,
-      WALFileLengthProvider walFileLengthProvider,
-      MetricsReplicationGlobalSourceSource globalMetrics) throws IOException {
+      WALFileLengthProvider walFileLengthProvider) throws IOException {
     // CopyOnWriteArrayList is thread-safe.
     // Generally, reading is more than modifying.
     this.sources = new ConcurrentHashMap<>();
@@ -209,9 +205,6 @@ public class ReplicationSourceManager implements ReplicationListener {
     this.latestPaths = new HashSet<Path>();
     replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
       HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
-    this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
-        HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
-    this.globalMetrics = globalMetrics;
   }
 
   /**
@@ -869,14 +862,6 @@ public class ReplicationSourceManager implements ReplicationListener {
   }
 
   /**
-   * Returns the maximum size in bytes of edits held in memory which are pending replication
-   * across all sources inside this RegionServer.
-   */
-  public long getTotalBufferLimit() {
-    return totalBufferLimit;
-  }
-
-  /**
    * Get the directory where wals are archived
    * @return the directory where wals are archived
    */
@@ -913,10 +898,6 @@ public class ReplicationSourceManager implements ReplicationListener {
    */
   public String getStats() {
     StringBuilder stats = new StringBuilder();
-    // Print stats that apply across all Replication Sources
-    stats.append("Global stats: ");
-    stats.append("WAL Edits Buffer Used=").append(getTotalBufferUsed().get()).append("B, Limit=")
-        .append(getTotalBufferLimit()).append("B\n");
     for (ReplicationSourceInterface source : this.sources.values()) {
       stats.append("Normal source for cluster " + source.getPeerId() + ": ");
       stats.append(source.getStats() + "\n");
@@ -942,8 +923,4 @@ public class ReplicationSourceManager implements ReplicationListener {
   int activeFailoverTaskCount() {
     return executor.getActiveCount();
   }
-
-  MetricsReplicationGlobalSourceSource getGlobalMetrics() {
-    return this.globalMetrics;
-  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 78e5521..23f736f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Threads;
@@ -200,14 +201,17 @@ public class ReplicationSourceShipper extends Thread {
         // Clean up hfile references
         for (Entry entry : entries) {
           cleanUpHFileRefs(entry.getEdit());
-          LOG.trace("shipped entry {}: ", entry);
+
+          TableName tableName = entry.getKey().getTableName();
+          source.getSourceMetrics().setAgeOfLastShippedOpByTable(entry.getKey().getWriteTime(),
+              tableName.getNameAsString());
         }
         // Log and clean up WAL logs
         updateLogPosition(entryBatch);
 
         //offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size)
         //this sizeExcludeBulkLoad has to use same calculation that when calling
-        //acquireBufferQuota() in ReplicationSourceWALReader because they maintain
+        //acquireBufferQuota() in ReplicatinoSourceWALReader because they maintain
         //same variable: totalBufferUsed
         source.postShipEdits(entries, sizeExcludeBulkLoad);
         // FIXME check relationship between wal group and overall
@@ -215,8 +219,6 @@ public class ReplicationSourceShipper extends Thread {
           entryBatch.getNbHFiles());
         source.getSourceMetrics().setAgeOfLastShippedOp(
           entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
-        source.getSourceMetrics().updateTableLevelMetrics(entryBatch.getWalEntriesWithSize());
-
         if (LOG.isTraceEnabled()) {
           LOG.trace("Replicated {} entries or {} operations in {} ms",
               entries.size(), entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 328cb8f..12653f5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
@@ -103,7 +104,8 @@ class ReplicationSourceWALReader extends Thread {
     // the +1 is for the current thread reading before placing onto the queue
     int batchCount = conf.getInt("replication.source.nb.batches", 1);
     this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed();
-    this.totalBufferQuota = source.getSourceManager().getTotalBufferLimit();
+    this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
+      HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
     this.sleepForRetries =
         this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
     this.maxRetriesMultiplier =
@@ -170,7 +172,7 @@ class ReplicationSourceWALReader extends Thread {
     }
     long entrySize = getEntrySizeIncludeBulkLoad(entry);
     long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
-    batch.addEntry(entry, entrySize);
+    batch.addEntry(entry);
     updateBatchStats(batch, entry, entrySize);
     boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
 
@@ -270,8 +272,6 @@ class ReplicationSourceWALReader extends Thread {
   private boolean checkQuota() {
     // try not to go over total quota
     if (totalBufferUsed.get() > totalBufferQuota) {
-      LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
-          this.source.getPeerId(), totalBufferUsed.get(), totalBufferQuota);
       Threads.sleep(sleepForRetries);
       return false;
     }
@@ -306,7 +306,9 @@ class ReplicationSourceWALReader extends Thread {
 
   private long getEntrySizeIncludeBulkLoad(Entry entry) {
     WALEdit edit = entry.getEdit();
-    return  getEntrySizeExcludeBulkLoad(entry) + sizeOfStoreFilesIncludeBulkLoad(edit);
+    WALKey key = entry.getKey();
+    return edit.heapSize() + sizeOfStoreFilesIncludeBulkLoad(edit) +
+        key.estimatedSerializedSizeOf();
   }
 
   public static long getEntrySizeExcludeBulkLoad(Entry entry) {
@@ -399,10 +401,7 @@ class ReplicationSourceWALReader extends Thread {
    * @return true if we should clear buffer and push all
    */
   private boolean acquireBufferQuota(long size) {
-    long newBufferUsed = totalBufferUsed.addAndGet(size);
-    // Record the new buffer usage
-    this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
-    return newBufferUsed >= totalBufferQuota;
+    return totalBufferUsed.addAndGet(size) >= totalBufferQuota;
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
index 4f96c96..22b2de7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
@@ -21,9 +21,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -36,8 +34,7 @@ class WALEntryBatch {
   // used by recovered replication queue to indicate that all the entries have been read.
   public static final WALEntryBatch NO_MORE_DATA = new WALEntryBatch(0, null);
 
-  private List<Pair<Entry, Long>> walEntriesWithSize;
-
+  private List<Entry> walEntries;
   // last WAL that was read
   private Path lastWalPath;
   // position in WAL of last entry in this batch
@@ -57,7 +54,7 @@ class WALEntryBatch {
    * @param lastWalPath Path of the WAL the last entry in this batch was read from
    */
   WALEntryBatch(int maxNbEntries, Path lastWalPath) {
-    this.walEntriesWithSize = new ArrayList<>(maxNbEntries);
+    this.walEntries = new ArrayList<>(maxNbEntries);
     this.lastWalPath = lastWalPath;
   }
 
@@ -69,22 +66,15 @@ class WALEntryBatch {
     return batch;
   }
 
-  public void addEntry(Entry entry, long entrySize) {
-    walEntriesWithSize.add(new Pair<>(entry, entrySize));
+  public void addEntry(Entry entry) {
+    walEntries.add(entry);
   }
 
   /**
    * @return the WAL Entries.
    */
   public List<Entry> getWalEntries() {
-    return walEntriesWithSize.stream().map(Pair::getFirst).collect(Collectors.toList());
-  }
-
-  /**
-   * @return the WAL Entries.
-   */
-  public List<Pair<Entry, Long>> getWalEntriesWithSize() {
-    return walEntriesWithSize;
+    return walEntries;
   }
 
   /**
@@ -106,7 +96,7 @@ class WALEntryBatch {
   }
 
   public int getNbEntries() {
-    return walEntriesWithSize.size();
+    return walEntries.size();
   }
 
   /**
@@ -170,7 +160,7 @@ class WALEntryBatch {
 
   @Override
   public String toString() {
-    return "WALEntryBatch [walEntries=" + walEntriesWithSize + ", lastWalPath=" + lastWalPath +
+    return "WALEntryBatch [walEntries=" + walEntries + ", lastWalPath=" + lastWalPath +
       ", lastWalPosition=" + lastWalPosition + ", nbRowKeys=" + nbRowKeys + ", nbHFiles=" +
       nbHFiles + ", heapSize=" + heapSize + ", lastSeqIds=" + lastSeqIds + ", endOfFile=" +
       endOfFile + "]";
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index cc7e20c..5d7366d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -28,7 +28,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -36,8 +35,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -47,21 +44,16 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
-import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSourceImpl;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl;
-import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationTableSource;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.hadoop.hbase.wal.WALKeyImpl;
 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
 import org.junit.AfterClass;
@@ -312,12 +304,11 @@ public class TestReplicationEndpoint extends TestReplicationBase {
   }
 
   @Test
-  public void testMetricsSourceBaseSourcePassThrough() {
+  public void testMetricsSourceBaseSourcePassthrough() {
     /*
-     * The replication MetricsSource wraps a MetricsReplicationTableSourceImpl,
-     * MetricsReplicationSourceSourceImpl and a MetricsReplicationGlobalSourceSource,
-     * so that metrics get written to both namespaces. Both of those classes wrap a
-     * MetricsReplicationSourceImpl that implements BaseSource, which allows
+     * The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl and a
+     * MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces. Both of
+     * those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which allows
      * for custom JMX metrics. This test checks to make sure the BaseSource decorator logic on
      * MetricsSource actually calls down through the two layers of wrapping to the actual
      * BaseSource.
@@ -331,15 +322,14 @@ public class TestReplicationEndpoint extends TestReplicationBase {
 
     MetricsReplicationSourceSource singleSourceSource =
       new MetricsReplicationSourceSourceImpl(singleRms, id);
-    MetricsReplicationGlobalSourceSource globalSourceSource =
-      new MetricsReplicationGlobalSourceSourceImpl(globalRms);
-    MetricsReplicationGlobalSourceSource spyglobalSourceSource = spy(globalSourceSource);
+    MetricsReplicationSourceSource globalSourceSource =
+      new MetricsReplicationGlobalSourceSource(globalRms);
+    MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource);
     doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue();
 
-    Map<String, MetricsReplicationTableSource> singleSourceSourceByTable =
-      new HashMap<>();
-    MetricsSource source = new MetricsSource(id, singleSourceSource,
-      spyglobalSourceSource, singleSourceSourceByTable);
+    Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable = new HashMap<>();
+    MetricsSource source =
+      new MetricsSource(id, singleSourceSource, spyglobalSourceSource, singleSourceSourceByTable);
 
 
     String gaugeName = "gauge";
@@ -388,37 +378,16 @@ public class TestReplicationEndpoint extends TestReplicationBase {
     boolean containsRandomNewTable = source.getSingleSourceSourceByTable()
         .containsKey("RandomNewTable");
     Assert.assertEquals(false, containsRandomNewTable);
-    source.updateTableLevelMetrics(createWALEntriesWithSize("RandomNewTable"));
+    source.setAgeOfLastShippedOpByTable(123L, "RandomNewTable");
     containsRandomNewTable = source.getSingleSourceSourceByTable()
         .containsKey("RandomNewTable");
     Assert.assertEquals(true, containsRandomNewTable);
-    MetricsReplicationTableSource msr = source.getSingleSourceSourceByTable()
+    MetricsReplicationSourceSource msr = source.getSingleSourceSourceByTable()
         .get("RandomNewTable");
-
-    // age should be greater than zero we created the entry with time in the past
+    // cannot put more concreate value here to verify because the age is arbitrary.
+    // as long as it's greater than 0, we see it as correct answer.
     Assert.assertTrue(msr.getLastShippedAge() > 0);
-    Assert.assertTrue(msr.getShippedBytes() > 0);
-
-  }
-
-  private List<Pair<Entry, Long>> createWALEntriesWithSize(String tableName) {
-    List<Pair<Entry, Long>> walEntriesWithSize = new ArrayList<>();
-    byte[] a = new byte[] { 'a' };
-    Entry entry = createEntry(tableName, null, a);
-    walEntriesWithSize.add(new Pair<>(entry, 10L));
-    return walEntriesWithSize;
-  }
 
-  private Entry createEntry(String tableName, TreeMap<byte[], Integer> scopes, byte[]... kvs) {
-    WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf(tableName),
-        System.currentTimeMillis() - 1L,
-        scopes);
-    WALEdit edit1 = new WALEdit();
-
-    for (byte[] kv : kvs) {
-      edit1.add(new KeyValue(kv, kv, kv));
-    }
-    return new Entry(key1, edit1);
   }
 
   private void doPut(byte[] row) throws IOException {
@@ -494,44 +463,6 @@ public class TestReplicationEndpoint extends TestReplicationBase {
     }
   }
 
-  /**
-   * Not used by unit tests, helpful for manual testing with replication.
-   * <p>
-   * Snippet for `hbase shell`:
-   * <pre>
-   * create 't', 'f'
-   * add_peer '1', ENDPOINT_CLASSNAME =&gt; 'org.apache.hadoop.hbase.replication.' + \
-   *    'TestReplicationEndpoint$SleepingReplicationEndpointForTest'
-   * alter 't', {NAME=&gt;'f', REPLICATION_SCOPE=&gt;1}
-   * </pre>
-   */
-  public static class SleepingReplicationEndpointForTest extends ReplicationEndpointForTest {
-    private long duration;
-    public SleepingReplicationEndpointForTest() {
-      super();
-    }
-
-    @Override
-    public void init(Context context) throws IOException {
-      super.init(context);
-      if (this.ctx != null) {
-        duration = this.ctx.getConfiguration().getLong(
-            "hbase.test.sleep.replication.endpoint.duration.millis", 5000L);
-      }
-    }
-
-    @Override
-    public boolean replicate(ReplicateContext context) {
-      try {
-        Thread.sleep(duration);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        return false;
-      }
-      return super.replicate(context);
-    }
-  }
-
   public static class InterClusterReplicationEndpointForTest
       extends HBaseInterClusterReplicationEndpoint {
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 067ab63..b4af38b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -336,8 +336,6 @@ public class TestWALEntryStream {
   private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) {
     ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
     when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
-    when(mockSourceManager.getTotalBufferLimit()).thenReturn(
-        (long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
     Server mockServer = Mockito.mock(Server.class);
     ReplicationSource source = Mockito.mock(ReplicationSource.class);
     when(source.getSourceManager()).thenReturn(mockSourceManager);
@@ -345,9 +343,6 @@ public class TestWALEntryStream {
     when(source.getWALFileLengthProvider()).thenReturn(log);
     when(source.getServer()).thenReturn(mockServer);
     when(source.isRecovered()).thenReturn(recovered);
-    MetricsReplicationGlobalSourceSource globalMetrics = Mockito.mock(
-        MetricsReplicationGlobalSourceSource.class);
-    when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
     return source;
   }