You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2020/05/14 20:46:53 UTC

[hbase] branch branch-2 updated (2e4be3e -> a93d94c)

This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a change to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git.


    from 2e4be3e  HBASE-24368 Let HBCKSCP clear 'Unknown Servers', even if RegionStateNode has RegionLocation == null
     new 1ff5326  HBASE-24350: Extending and Fixing HBaseTable level replication metrics (#1704)
     new a93d94c  Amend HBASE-24350: Extending and Fixing HBaseTable level replication metrics (#1704)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../MetricsReplicationSourceFactory.java           |   1 +
 ...ory.java => MetricsReplicationTableSource.java} |  12 +-
 .../MetricsReplicationSourceFactoryImpl.java       |   4 +
 .../MetricsReplicationTableSourceImpl.java         | 134 +++++++++++++++++++++
 .../replication/regionserver/MetricsSource.java    |  36 +++++-
 .../regionserver/ReplicationSource.java            |   2 +-
 .../regionserver/ReplicationSourceShipper.java     |   8 +-
 .../regionserver/ReplicationSourceWALReader.java   |   6 +-
 .../replication/regionserver/WALEntryBatch.java    |  24 ++--
 .../hbase/replication/TestReplicationEndpoint.java |  52 ++++++--
 10 files changed, 242 insertions(+), 37 deletions(-)
 copy hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/{MetricsReplicationSourceFactory.java => MetricsReplicationTableSource.java} (78%)
 create mode 100644 hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSourceImpl.java


[hbase] 02/02: Amend HBASE-24350: Extending and Fixing HBaseTable level replication metrics (#1704)

Posted by ap...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit a93d94cad312d7c92b3a2eb43848058605d27dcb
Author: Andrew Purtell <ap...@apache.org>
AuthorDate: Thu May 14 13:41:56 2020 -0700

    Amend HBASE-24350: Extending and Fixing HBaseTable level replication metrics (#1704)
    
    - Rename WALEntryBatch#getWaEntriesWithSize -> getWalEntriesWithSize
---
 .../hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java | 2 +-
 .../org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 65430b3..d7e7e01 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -211,7 +211,7 @@ public class ReplicationSourceShipper extends Thread {
           entryBatch.getNbHFiles());
         source.getSourceMetrics().setAgeOfLastShippedOp(
           entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
-        source.getSourceMetrics().updateTableLevelMetrics(entryBatch.getWaEntriesWithSize());
+        source.getSourceMetrics().updateTableLevelMetrics(entryBatch.getWalEntriesWithSize());
 
         if (LOG.isTraceEnabled()) {
           LOG.debug("Replicated {} entries or {} operations in {} ms",
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
index bc600d0..4f96c96 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
@@ -83,7 +83,7 @@ class WALEntryBatch {
   /**
    * @return the WAL Entries.
    */
-  public List<Pair<Entry, Long>> getWaEntriesWithSize() {
+  public List<Pair<Entry, Long>> getWalEntriesWithSize() {
     return walEntriesWithSize;
   }
 


[hbase] 01/02: HBASE-24350: Extending and Fixing HBaseTable level replication metrics (#1704)

Posted by ap...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 1ff532678dca7faf1c7404c6fe0e118ef0cd4872
Author: Sandeep Pal <50...@users.noreply.github.com>
AuthorDate: Thu May 14 10:34:51 2020 -0700

    HBASE-24350: Extending and Fixing HBaseTable level replication metrics (#1704)
    
    Signed-off-by: Wellington Chevreuil <wc...@apache.org>
    Signed-off-by: Andrew Purtell <ap...@apache.org>
---
 .../MetricsReplicationSourceFactory.java           |   1 +
 ...ory.java => MetricsReplicationTableSource.java} |  12 +-
 .../MetricsReplicationSourceFactoryImpl.java       |   4 +
 .../MetricsReplicationTableSourceImpl.java         | 134 +++++++++++++++++++++
 .../replication/regionserver/MetricsSource.java    |  36 +++++-
 .../regionserver/ReplicationSource.java            |   2 +-
 .../regionserver/ReplicationSourceShipper.java     |   8 +-
 .../regionserver/ReplicationSourceWALReader.java   |   6 +-
 .../replication/regionserver/WALEntryBatch.java    |  24 ++--
 .../hbase/replication/TestReplicationEndpoint.java |  52 ++++++--
 10 files changed, 242 insertions(+), 37 deletions(-)

diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
index 6534b11..2816f83 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
@@ -24,5 +24,6 @@ import org.apache.yetus.audience.InterfaceAudience;
 public interface MetricsReplicationSourceFactory {
   public MetricsReplicationSinkSource getSink();
   public MetricsReplicationSourceSource getSource(String id);
+  public MetricsReplicationTableSource getTableSource(String tableName);
   public MetricsReplicationSourceSource getGlobalSource();
 }
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSource.java
similarity index 78%
copy from hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
copy to hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSource.java
index 6534b11..faa944a 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSource.java
@@ -18,11 +18,15 @@
 
 package org.apache.hadoop.hbase.replication.regionserver;
 
+import org.apache.hadoop.hbase.metrics.BaseSource;
 import org.apache.yetus.audience.InterfaceAudience;
 
 @InterfaceAudience.Private
-public interface MetricsReplicationSourceFactory {
-  public MetricsReplicationSinkSource getSink();
-  public MetricsReplicationSourceSource getSource(String id);
-  public MetricsReplicationSourceSource getGlobalSource();
+public interface MetricsReplicationTableSource extends BaseSource {
+
+  void setLastShippedAge(long age);
+  void incrShippedBytes(long size);
+  long getShippedBytes();
+  void clear();
+  long getLastShippedAge();
 }
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
index af310f0..a3b3462 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
@@ -35,6 +35,10 @@ public class MetricsReplicationSourceFactoryImpl implements MetricsReplicationSo
     return new MetricsReplicationSourceSourceImpl(SourceHolder.INSTANCE.source, id);
   }
 
+  @Override public MetricsReplicationTableSource getTableSource(String tableName) {
+    return new MetricsReplicationTableSourceImpl(SourceHolder.INSTANCE.source, tableName);
+  }
+
   @Override public MetricsReplicationSourceSource getGlobalSource() {
     return new MetricsReplicationGlobalSourceSource(SourceHolder.INSTANCE.source);
   }
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSourceImpl.java
new file mode 100644
index 0000000..7120a73
--- /dev/null
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSourceImpl.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.metrics2.lib.MutableFastCounter;
+import org.apache.hadoop.metrics2.lib.MutableHistogram;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * This is the metric source for table level replication metrics.
+ * We can easy monitor some useful table level replication metrics such as
+ * ageOfLastShippedOp and shippedBytes
+ */
+@InterfaceAudience.Private
+public class MetricsReplicationTableSourceImpl implements MetricsReplicationTableSource {
+
+  private final MetricsReplicationSourceImpl rms;
+  private final String tableName;
+  private final String ageOfLastShippedOpKey;
+  private String keyPrefix;
+
+  private final String shippedBytesKey;
+
+  private final MutableHistogram ageOfLastShippedOpHist;
+  private final MutableFastCounter shippedBytesCounter;
+
+  public MetricsReplicationTableSourceImpl(MetricsReplicationSourceImpl rms, String tableName) {
+    this.rms = rms;
+    this.tableName = tableName;
+    this.keyPrefix = "source." + this.tableName + ".";
+
+    ageOfLastShippedOpKey = this.keyPrefix + "ageOfLastShippedOp";
+    ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(ageOfLastShippedOpKey);
+
+    shippedBytesKey = this.keyPrefix + "shippedBytes";
+    shippedBytesCounter = rms.getMetricsRegistry().getCounter(shippedBytesKey, 0L);
+  }
+
+  @Override
+  public void setLastShippedAge(long age) {
+    ageOfLastShippedOpHist.add(age);
+  }
+
+  @Override
+  public void incrShippedBytes(long size) {
+    shippedBytesCounter.incr(size);
+  }
+
+  @Override
+  public void clear() {
+    rms.removeMetric(ageOfLastShippedOpKey);
+    rms.removeMetric(shippedBytesKey);
+  }
+
+  @Override
+  public long getLastShippedAge() {
+    return ageOfLastShippedOpHist.getMax();
+  }
+
+  @Override
+  public long getShippedBytes() {
+    return shippedBytesCounter.value();
+  }
+
+  @Override
+  public void init() {
+    rms.init();
+  }
+
+  @Override
+  public void setGauge(String gaugeName, long value) {
+    rms.setGauge(this.keyPrefix + gaugeName, value);
+  }
+
+  @Override
+  public void incGauge(String gaugeName, long delta) {
+    rms.incGauge(this.keyPrefix + gaugeName, delta);
+  }
+
+  @Override
+  public void decGauge(String gaugeName, long delta) {
+    rms.decGauge(this.keyPrefix + gaugeName, delta);
+  }
+
+  @Override
+  public void removeMetric(String key) {
+    rms.removeMetric(this.keyPrefix + key);
+  }
+
+  @Override
+  public void incCounters(String counterName, long delta) {
+    rms.incCounters(this.keyPrefix + counterName, delta);
+  }
+
+  @Override
+  public void updateHistogram(String name, long value) {
+    rms.updateHistogram(this.keyPrefix + name, value);
+  }
+
+  @Override
+  public String getMetricsContext() {
+    return rms.getMetricsContext();
+  }
+
+  @Override
+  public String getMetricsDescription() {
+    return rms.getMetricsDescription();
+  }
+
+  @Override
+  public String getMetricsJmxContext() {
+    return rms.getMetricsJmxContext();
+  }
+
+  @Override
+  public String getMetricsName() {
+    return rms.getMetricsName();
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 92ab070..e7a8f36 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -19,8 +19,11 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,7 +52,7 @@ public class MetricsSource implements BaseSource {
 
   private final MetricsReplicationSourceSource singleSourceSource;
   private final MetricsReplicationSourceSource globalSourceSource;
-  private Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable;
+  private Map<String, MetricsReplicationTableSource> singleSourceSourceByTable;
 
   /**
    * Constructor used to register the metrics
@@ -73,7 +76,7 @@ public class MetricsSource implements BaseSource {
    */
   public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
                        MetricsReplicationSourceSource globalSourceSource,
-                       Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable) {
+                       Map<String, MetricsReplicationTableSource> singleSourceSourceByTable) {
     this.id = id;
     this.singleSourceSource = singleSourceSource;
     this.globalSourceSource = globalSourceSource;
@@ -94,6 +97,29 @@ public class MetricsSource implements BaseSource {
   }
 
   /**
+   * Update the table level replication metrics per table
+   *
+   * @param walEntries List of pairs of WAL entry and it's size
+   */
+  public void updateTableLevelMetrics(List<Pair<Entry, Long>> walEntries) {
+    for (Pair<Entry, Long> walEntryWithSize : walEntries) {
+      Entry entry = walEntryWithSize.getFirst();
+      long entrySize = walEntryWithSize.getSecond();
+      String tableName = entry.getKey().getTableName().getNameAsString();
+      long writeTime = entry.getKey().getWriteTime();
+      long age = EnvironmentEdgeManager.currentTime() - writeTime;
+
+      // get the replication metrics source for table at the run time
+      MetricsReplicationTableSource tableSource = this.getSingleSourceSourceByTable()
+        .computeIfAbsent(tableName,
+          t -> CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
+            .getTableSource(t));
+      tableSource.setLastShippedAge(age);
+      tableSource.incrShippedBytes(entrySize);
+    }
+  }
+
+  /**
    * Set the age of the last edit that was shipped group by table
    * @param timestamp write time of the edit
    * @param tableName String as group and tableName
@@ -102,7 +128,7 @@ public class MetricsSource implements BaseSource {
     long age = EnvironmentEdgeManager.currentTime() - timestamp;
     this.getSingleSourceSourceByTable().computeIfAbsent(
         tableName, t -> CompatibilitySingletonFactory
-            .getInstance(MetricsReplicationSourceFactory.class).getSource(t))
+            .getInstance(MetricsReplicationSourceFactory.class).getTableSource(t))
             .setLastShippedAge(age);
   }
 
@@ -111,7 +137,7 @@ public class MetricsSource implements BaseSource {
    * @param walGroup which group we are getting
    * @return age
    */
-  public long getAgeofLastShippedOp(String walGroup) {
+  public long getAgeOfLastShippedOp(String walGroup) {
     return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup);
   }
 
@@ -436,7 +462,7 @@ public class MetricsSource implements BaseSource {
   }
 
   @VisibleForTesting
-  public Map<String, MetricsReplicationSourceSource> getSingleSourceSourceByTable() {
+  public Map<String, MetricsReplicationTableSource> getSingleSourceSourceByTable() {
     return singleSourceSourceByTable;
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 155f08c..bf6ab7c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -340,7 +340,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
     for (Map.Entry<String, ReplicationSourceShipper> walGroupShipper : workerThreads.entrySet()) {
       String walGroupId = walGroupShipper.getKey();
       ReplicationSourceShipper shipper = walGroupShipper.getValue();
-      ageOfLastShippedOp = metrics.getAgeofLastShippedOp(walGroupId);
+      ageOfLastShippedOp = metrics.getAgeOfLastShippedOp(walGroupId);
       int queueSize = queues.get(walGroupId).size();
       replicationDelay = metrics.getReplicationDelay();
       Path currentPath = shipper.getCurrentPath();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 9874c46..65430b3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -198,16 +197,13 @@ public class ReplicationSourceShipper extends Thread {
         for (Entry entry : entries) {
           cleanUpHFileRefs(entry.getEdit());
           LOG.trace("shipped entry {}: ", entry);
-          TableName tableName = entry.getKey().getTableName();
-          source.getSourceMetrics().setAgeOfLastShippedOpByTable(entry.getKey().getWriteTime(),
-              tableName.getNameAsString());
         }
         // Log and clean up WAL logs
         updateLogPosition(entryBatch);
 
         //offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size)
         //this sizeExcludeBulkLoad has to use same calculation that when calling
-        //acquireBufferQuota() in ReplicatinoSourceWALReader because they maintain
+        //acquireBufferQuota() in ReplicationSourceWALReader because they maintain
         //same variable: totalBufferUsed
         source.postShipEdits(entries, sizeExcludeBulkLoad);
         // FIXME check relationship between wal group and overall
@@ -215,6 +211,8 @@ public class ReplicationSourceShipper extends Thread {
           entryBatch.getNbHFiles());
         source.getSourceMetrics().setAgeOfLastShippedOp(
           entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
+        source.getSourceMetrics().updateTableLevelMetrics(entryBatch.getWaEntriesWithSize());
+
         if (LOG.isTraceEnabled()) {
           LOG.debug("Replicated {} entries or {} operations in {} ms",
               entries.size(), entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 82cfb81..70f02fa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -175,7 +175,7 @@ class ReplicationSourceWALReader extends Thread {
         entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId());
     long entrySize = getEntrySizeIncludeBulkLoad(entry);
     long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
-    batch.addEntry(entry);
+    batch.addEntry(entry, entrySize);
     updateBatchStats(batch, entry, entrySize);
     boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
 
@@ -310,9 +310,7 @@ class ReplicationSourceWALReader extends Thread {
 
   private long getEntrySizeIncludeBulkLoad(Entry entry) {
     WALEdit edit = entry.getEdit();
-    WALKey key = entry.getKey();
-    return edit.heapSize() + sizeOfStoreFilesIncludeBulkLoad(edit) +
-        key.estimatedSerializedSizeOf();
+    return  getEntrySizeExcludeBulkLoad(entry) + sizeOfStoreFilesIncludeBulkLoad(edit);
   }
 
   public static long getEntrySizeExcludeBulkLoad(Entry entry) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
index 22b2de7..bc600d0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
@@ -21,7 +21,9 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -34,7 +36,8 @@ class WALEntryBatch {
   // used by recovered replication queue to indicate that all the entries have been read.
   public static final WALEntryBatch NO_MORE_DATA = new WALEntryBatch(0, null);
 
-  private List<Entry> walEntries;
+  private List<Pair<Entry, Long>> walEntriesWithSize;
+
   // last WAL that was read
   private Path lastWalPath;
   // position in WAL of last entry in this batch
@@ -54,7 +57,7 @@ class WALEntryBatch {
    * @param lastWalPath Path of the WAL the last entry in this batch was read from
    */
   WALEntryBatch(int maxNbEntries, Path lastWalPath) {
-    this.walEntries = new ArrayList<>(maxNbEntries);
+    this.walEntriesWithSize = new ArrayList<>(maxNbEntries);
     this.lastWalPath = lastWalPath;
   }
 
@@ -66,15 +69,22 @@ class WALEntryBatch {
     return batch;
   }
 
-  public void addEntry(Entry entry) {
-    walEntries.add(entry);
+  public void addEntry(Entry entry, long entrySize) {
+    walEntriesWithSize.add(new Pair<>(entry, entrySize));
   }
 
   /**
    * @return the WAL Entries.
    */
   public List<Entry> getWalEntries() {
-    return walEntries;
+    return walEntriesWithSize.stream().map(Pair::getFirst).collect(Collectors.toList());
+  }
+
+  /**
+   * @return the WAL Entries.
+   */
+  public List<Pair<Entry, Long>> getWaEntriesWithSize() {
+    return walEntriesWithSize;
   }
 
   /**
@@ -96,7 +106,7 @@ class WALEntryBatch {
   }
 
   public int getNbEntries() {
-    return walEntries.size();
+    return walEntriesWithSize.size();
   }
 
   /**
@@ -160,7 +170,7 @@ class WALEntryBatch {
 
   @Override
   public String toString() {
-    return "WALEntryBatch [walEntries=" + walEntries + ", lastWalPath=" + lastWalPath +
+    return "WALEntryBatch [walEntries=" + walEntriesWithSize + ", lastWalPath=" + lastWalPath +
       ", lastWalPosition=" + lastWalPosition + ", nbRowKeys=" + nbRowKeys + ", nbHFiles=" +
       nbHFiles + ", heapSize=" + heapSize + ", lastSeqIds=" + lastSeqIds + ", endOfFile=" +
       endOfFile + "]";
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index 6f6fb59..642139c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -34,6 +35,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -46,13 +49,17 @@ import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobal
 import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationTableSource;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
 import org.junit.AfterClass;
@@ -313,11 +320,12 @@ public class TestReplicationEndpoint extends TestReplicationBase {
   }
 
   @Test
-  public void testMetricsSourceBaseSourcePassthrough() {
+  public void testMetricsSourceBaseSourcePassThrough() {
     /*
-     * The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl and a
-     * MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces. Both of
-     * those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which allows
+     * The replication MetricsSource wraps a MetricsReplicationTableSourceImpl,
+     * MetricsReplicationSourceSourceImpl and a MetricsReplicationGlobalSourceSource,
+     * so that metrics get written to both namespaces. Both of those classes wrap a
+     * MetricsReplicationSourceImpl that implements BaseSource, which allows
      * for custom JMX metrics. This test checks to make sure the BaseSource decorator logic on
      * MetricsSource actually calls down through the two layers of wrapping to the actual
      * BaseSource.
@@ -336,9 +344,10 @@ public class TestReplicationEndpoint extends TestReplicationBase {
     MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource);
     doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue();
 
-    Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable = new HashMap<>();
-    MetricsSource source =
-      new MetricsSource(id, singleSourceSource, spyglobalSourceSource, singleSourceSourceByTable);
+    Map<String, MetricsReplicationTableSource> singleSourceSourceByTable =
+      new HashMap<>();
+    MetricsSource source = new MetricsSource(id, singleSourceSource,
+      spyglobalSourceSource, singleSourceSourceByTable);
 
 
     String gaugeName = "gauge";
@@ -387,16 +396,37 @@ public class TestReplicationEndpoint extends TestReplicationBase {
     boolean containsRandomNewTable = source.getSingleSourceSourceByTable()
         .containsKey("RandomNewTable");
     Assert.assertEquals(false, containsRandomNewTable);
-    source.setAgeOfLastShippedOpByTable(123L, "RandomNewTable");
+    source.updateTableLevelMetrics(createWALEntriesWithSize("RandomNewTable"));
     containsRandomNewTable = source.getSingleSourceSourceByTable()
         .containsKey("RandomNewTable");
     Assert.assertEquals(true, containsRandomNewTable);
-    MetricsReplicationSourceSource msr = source.getSingleSourceSourceByTable()
+    MetricsReplicationTableSource msr = source.getSingleSourceSourceByTable()
         .get("RandomNewTable");
-    // cannot put more concreate value here to verify because the age is arbitrary.
-    // as long as it's greater than 0, we see it as correct answer.
+
+    // age should be greater than zero we created the entry with time in the past
     Assert.assertTrue(msr.getLastShippedAge() > 0);
+    Assert.assertTrue(msr.getShippedBytes() > 0);
+
+  }
 
+  private List<Pair<Entry, Long>> createWALEntriesWithSize(String tableName) {
+    List<Pair<Entry, Long>> walEntriesWithSize = new ArrayList<>();
+    byte[] a = new byte[] { 'a' };
+    Entry entry = createEntry(tableName, null, a);
+    walEntriesWithSize.add(new Pair<>(entry, 10L));
+    return walEntriesWithSize;
+  }
+
+  private Entry createEntry(String tableName, TreeMap<byte[], Integer> scopes, byte[]... kvs) {
+    WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf(tableName),
+        System.currentTimeMillis() - 1L,
+        scopes);
+    WALEdit edit1 = new WALEdit();
+
+    for (byte[] kv : kvs) {
+      edit1.add(new KeyValue(kv, kv, kv));
+    }
+    return new Entry(key1, edit1);
   }
 
   private void doPut(byte[] row) throws IOException {