You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2020/05/14 20:46:54 UTC

[hbase] 01/02: HBASE-24350: Extending and Fixing HBaseTable level replication metrics (#1704)

This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 1ff532678dca7faf1c7404c6fe0e118ef0cd4872
Author: Sandeep Pal <50...@users.noreply.github.com>
AuthorDate: Thu May 14 10:34:51 2020 -0700

    HBASE-24350: Extending and Fixing HBaseTable level replication metrics (#1704)
    
    Signed-off-by: Wellington Chevreuil <wc...@apache.org>
    Signed-off-by: Andrew Purtell <ap...@apache.org>
---
 .../MetricsReplicationSourceFactory.java           |   1 +
 ...ory.java => MetricsReplicationTableSource.java} |  12 +-
 .../MetricsReplicationSourceFactoryImpl.java       |   4 +
 .../MetricsReplicationTableSourceImpl.java         | 134 +++++++++++++++++++++
 .../replication/regionserver/MetricsSource.java    |  36 +++++-
 .../regionserver/ReplicationSource.java            |   2 +-
 .../regionserver/ReplicationSourceShipper.java     |   8 +-
 .../regionserver/ReplicationSourceWALReader.java   |   6 +-
 .../replication/regionserver/WALEntryBatch.java    |  24 ++--
 .../hbase/replication/TestReplicationEndpoint.java |  52 ++++++--
 10 files changed, 242 insertions(+), 37 deletions(-)

diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
index 6534b11..2816f83 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
@@ -24,5 +24,6 @@ import org.apache.yetus.audience.InterfaceAudience;
 public interface MetricsReplicationSourceFactory {
   public MetricsReplicationSinkSource getSink();
   public MetricsReplicationSourceSource getSource(String id);
+  public MetricsReplicationTableSource getTableSource(String tableName);
   public MetricsReplicationSourceSource getGlobalSource();
 }
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSource.java
similarity index 78%
copy from hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
copy to hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSource.java
index 6534b11..faa944a 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSource.java
@@ -18,11 +18,15 @@
 
 package org.apache.hadoop.hbase.replication.regionserver;
 
+import org.apache.hadoop.hbase.metrics.BaseSource;
 import org.apache.yetus.audience.InterfaceAudience;
 
 @InterfaceAudience.Private
-public interface MetricsReplicationSourceFactory {
-  public MetricsReplicationSinkSource getSink();
-  public MetricsReplicationSourceSource getSource(String id);
-  public MetricsReplicationSourceSource getGlobalSource();
+public interface MetricsReplicationTableSource extends BaseSource {
+
+  void setLastShippedAge(long age);
+  void incrShippedBytes(long size);
+  long getShippedBytes();
+  void clear();
+  long getLastShippedAge();
 }
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
index af310f0..a3b3462 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
@@ -35,6 +35,10 @@ public class MetricsReplicationSourceFactoryImpl implements MetricsReplicationSo
     return new MetricsReplicationSourceSourceImpl(SourceHolder.INSTANCE.source, id);
   }
 
+  @Override public MetricsReplicationTableSource getTableSource(String tableName) {
+    return new MetricsReplicationTableSourceImpl(SourceHolder.INSTANCE.source, tableName);
+  }
+
   @Override public MetricsReplicationSourceSource getGlobalSource() {
     return new MetricsReplicationGlobalSourceSource(SourceHolder.INSTANCE.source);
   }
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSourceImpl.java
new file mode 100644
index 0000000..7120a73
--- /dev/null
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSourceImpl.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.metrics2.lib.MutableFastCounter;
+import org.apache.hadoop.metrics2.lib.MutableHistogram;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * This is the metric source for table level replication metrics.
+ * We can easy monitor some useful table level replication metrics such as
+ * ageOfLastShippedOp and shippedBytes
+ */
+@InterfaceAudience.Private
+public class MetricsReplicationTableSourceImpl implements MetricsReplicationTableSource {
+
+  private final MetricsReplicationSourceImpl rms;
+  private final String tableName;
+  private final String ageOfLastShippedOpKey;
+  private String keyPrefix;
+
+  private final String shippedBytesKey;
+
+  private final MutableHistogram ageOfLastShippedOpHist;
+  private final MutableFastCounter shippedBytesCounter;
+
+  public MetricsReplicationTableSourceImpl(MetricsReplicationSourceImpl rms, String tableName) {
+    this.rms = rms;
+    this.tableName = tableName;
+    this.keyPrefix = "source." + this.tableName + ".";
+
+    ageOfLastShippedOpKey = this.keyPrefix + "ageOfLastShippedOp";
+    ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(ageOfLastShippedOpKey);
+
+    shippedBytesKey = this.keyPrefix + "shippedBytes";
+    shippedBytesCounter = rms.getMetricsRegistry().getCounter(shippedBytesKey, 0L);
+  }
+
+  @Override
+  public void setLastShippedAge(long age) {
+    ageOfLastShippedOpHist.add(age);
+  }
+
+  @Override
+  public void incrShippedBytes(long size) {
+    shippedBytesCounter.incr(size);
+  }
+
+  @Override
+  public void clear() {
+    rms.removeMetric(ageOfLastShippedOpKey);
+    rms.removeMetric(shippedBytesKey);
+  }
+
+  @Override
+  public long getLastShippedAge() {
+    return ageOfLastShippedOpHist.getMax();
+  }
+
+  @Override
+  public long getShippedBytes() {
+    return shippedBytesCounter.value();
+  }
+
+  @Override
+  public void init() {
+    rms.init();
+  }
+
+  @Override
+  public void setGauge(String gaugeName, long value) {
+    rms.setGauge(this.keyPrefix + gaugeName, value);
+  }
+
+  @Override
+  public void incGauge(String gaugeName, long delta) {
+    rms.incGauge(this.keyPrefix + gaugeName, delta);
+  }
+
+  @Override
+  public void decGauge(String gaugeName, long delta) {
+    rms.decGauge(this.keyPrefix + gaugeName, delta);
+  }
+
+  @Override
+  public void removeMetric(String key) {
+    rms.removeMetric(this.keyPrefix + key);
+  }
+
+  @Override
+  public void incCounters(String counterName, long delta) {
+    rms.incCounters(this.keyPrefix + counterName, delta);
+  }
+
+  @Override
+  public void updateHistogram(String name, long value) {
+    rms.updateHistogram(this.keyPrefix + name, value);
+  }
+
+  @Override
+  public String getMetricsContext() {
+    return rms.getMetricsContext();
+  }
+
+  @Override
+  public String getMetricsDescription() {
+    return rms.getMetricsDescription();
+  }
+
+  @Override
+  public String getMetricsJmxContext() {
+    return rms.getMetricsJmxContext();
+  }
+
+  @Override
+  public String getMetricsName() {
+    return rms.getMetricsName();
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 92ab070..e7a8f36 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -19,8 +19,11 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,7 +52,7 @@ public class MetricsSource implements BaseSource {
 
   private final MetricsReplicationSourceSource singleSourceSource;
   private final MetricsReplicationSourceSource globalSourceSource;
-  private Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable;
+  private Map<String, MetricsReplicationTableSource> singleSourceSourceByTable;
 
   /**
    * Constructor used to register the metrics
@@ -73,7 +76,7 @@ public class MetricsSource implements BaseSource {
    */
   public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
                        MetricsReplicationSourceSource globalSourceSource,
-                       Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable) {
+                       Map<String, MetricsReplicationTableSource> singleSourceSourceByTable) {
     this.id = id;
     this.singleSourceSource = singleSourceSource;
     this.globalSourceSource = globalSourceSource;
@@ -94,6 +97,29 @@ public class MetricsSource implements BaseSource {
   }
 
   /**
+   * Update the table level replication metrics per table
+   *
+   * @param walEntries List of pairs of WAL entry and it's size
+   */
+  public void updateTableLevelMetrics(List<Pair<Entry, Long>> walEntries) {
+    for (Pair<Entry, Long> walEntryWithSize : walEntries) {
+      Entry entry = walEntryWithSize.getFirst();
+      long entrySize = walEntryWithSize.getSecond();
+      String tableName = entry.getKey().getTableName().getNameAsString();
+      long writeTime = entry.getKey().getWriteTime();
+      long age = EnvironmentEdgeManager.currentTime() - writeTime;
+
+      // get the replication metrics source for table at the run time
+      MetricsReplicationTableSource tableSource = this.getSingleSourceSourceByTable()
+        .computeIfAbsent(tableName,
+          t -> CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
+            .getTableSource(t));
+      tableSource.setLastShippedAge(age);
+      tableSource.incrShippedBytes(entrySize);
+    }
+  }
+
+  /**
    * Set the age of the last edit that was shipped group by table
    * @param timestamp write time of the edit
    * @param tableName String as group and tableName
@@ -102,7 +128,7 @@ public class MetricsSource implements BaseSource {
     long age = EnvironmentEdgeManager.currentTime() - timestamp;
     this.getSingleSourceSourceByTable().computeIfAbsent(
         tableName, t -> CompatibilitySingletonFactory
-            .getInstance(MetricsReplicationSourceFactory.class).getSource(t))
+            .getInstance(MetricsReplicationSourceFactory.class).getTableSource(t))
             .setLastShippedAge(age);
   }
 
@@ -111,7 +137,7 @@ public class MetricsSource implements BaseSource {
    * @param walGroup which group we are getting
    * @return age
    */
-  public long getAgeofLastShippedOp(String walGroup) {
+  public long getAgeOfLastShippedOp(String walGroup) {
     return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup);
   }
 
@@ -436,7 +462,7 @@ public class MetricsSource implements BaseSource {
   }
 
   @VisibleForTesting
-  public Map<String, MetricsReplicationSourceSource> getSingleSourceSourceByTable() {
+  public Map<String, MetricsReplicationTableSource> getSingleSourceSourceByTable() {
     return singleSourceSourceByTable;
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 155f08c..bf6ab7c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -340,7 +340,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
     for (Map.Entry<String, ReplicationSourceShipper> walGroupShipper : workerThreads.entrySet()) {
       String walGroupId = walGroupShipper.getKey();
       ReplicationSourceShipper shipper = walGroupShipper.getValue();
-      ageOfLastShippedOp = metrics.getAgeofLastShippedOp(walGroupId);
+      ageOfLastShippedOp = metrics.getAgeOfLastShippedOp(walGroupId);
       int queueSize = queues.get(walGroupId).size();
       replicationDelay = metrics.getReplicationDelay();
       Path currentPath = shipper.getCurrentPath();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 9874c46..65430b3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -198,16 +197,13 @@ public class ReplicationSourceShipper extends Thread {
         for (Entry entry : entries) {
           cleanUpHFileRefs(entry.getEdit());
           LOG.trace("shipped entry {}: ", entry);
-          TableName tableName = entry.getKey().getTableName();
-          source.getSourceMetrics().setAgeOfLastShippedOpByTable(entry.getKey().getWriteTime(),
-              tableName.getNameAsString());
         }
         // Log and clean up WAL logs
         updateLogPosition(entryBatch);
 
         //offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size)
         //this sizeExcludeBulkLoad has to use same calculation that when calling
-        //acquireBufferQuota() in ReplicatinoSourceWALReader because they maintain
+        //acquireBufferQuota() in ReplicationSourceWALReader because they maintain
         //same variable: totalBufferUsed
         source.postShipEdits(entries, sizeExcludeBulkLoad);
         // FIXME check relationship between wal group and overall
@@ -215,6 +211,8 @@ public class ReplicationSourceShipper extends Thread {
           entryBatch.getNbHFiles());
         source.getSourceMetrics().setAgeOfLastShippedOp(
           entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
+        source.getSourceMetrics().updateTableLevelMetrics(entryBatch.getWaEntriesWithSize());
+
         if (LOG.isTraceEnabled()) {
           LOG.debug("Replicated {} entries or {} operations in {} ms",
               entries.size(), entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 82cfb81..70f02fa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -175,7 +175,7 @@ class ReplicationSourceWALReader extends Thread {
         entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId());
     long entrySize = getEntrySizeIncludeBulkLoad(entry);
     long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
-    batch.addEntry(entry);
+    batch.addEntry(entry, entrySize);
     updateBatchStats(batch, entry, entrySize);
     boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
 
@@ -310,9 +310,7 @@ class ReplicationSourceWALReader extends Thread {
 
   private long getEntrySizeIncludeBulkLoad(Entry entry) {
     WALEdit edit = entry.getEdit();
-    WALKey key = entry.getKey();
-    return edit.heapSize() + sizeOfStoreFilesIncludeBulkLoad(edit) +
-        key.estimatedSerializedSizeOf();
+    return  getEntrySizeExcludeBulkLoad(entry) + sizeOfStoreFilesIncludeBulkLoad(edit);
   }
 
   public static long getEntrySizeExcludeBulkLoad(Entry entry) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
index 22b2de7..bc600d0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
@@ -21,7 +21,9 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -34,7 +36,8 @@ class WALEntryBatch {
   // used by recovered replication queue to indicate that all the entries have been read.
   public static final WALEntryBatch NO_MORE_DATA = new WALEntryBatch(0, null);
 
-  private List<Entry> walEntries;
+  private List<Pair<Entry, Long>> walEntriesWithSize;
+
   // last WAL that was read
   private Path lastWalPath;
   // position in WAL of last entry in this batch
@@ -54,7 +57,7 @@ class WALEntryBatch {
    * @param lastWalPath Path of the WAL the last entry in this batch was read from
    */
   WALEntryBatch(int maxNbEntries, Path lastWalPath) {
-    this.walEntries = new ArrayList<>(maxNbEntries);
+    this.walEntriesWithSize = new ArrayList<>(maxNbEntries);
     this.lastWalPath = lastWalPath;
   }
 
@@ -66,15 +69,22 @@ class WALEntryBatch {
     return batch;
   }
 
-  public void addEntry(Entry entry) {
-    walEntries.add(entry);
+  public void addEntry(Entry entry, long entrySize) {
+    walEntriesWithSize.add(new Pair<>(entry, entrySize));
   }
 
   /**
    * @return the WAL Entries.
    */
   public List<Entry> getWalEntries() {
-    return walEntries;
+    return walEntriesWithSize.stream().map(Pair::getFirst).collect(Collectors.toList());
+  }
+
+  /**
+   * @return the WAL Entries.
+   */
+  public List<Pair<Entry, Long>> getWaEntriesWithSize() {
+    return walEntriesWithSize;
   }
 
   /**
@@ -96,7 +106,7 @@ class WALEntryBatch {
   }
 
   public int getNbEntries() {
-    return walEntries.size();
+    return walEntriesWithSize.size();
   }
 
   /**
@@ -160,7 +170,7 @@ class WALEntryBatch {
 
   @Override
   public String toString() {
-    return "WALEntryBatch [walEntries=" + walEntries + ", lastWalPath=" + lastWalPath +
+    return "WALEntryBatch [walEntries=" + walEntriesWithSize + ", lastWalPath=" + lastWalPath +
       ", lastWalPosition=" + lastWalPosition + ", nbRowKeys=" + nbRowKeys + ", nbHFiles=" +
       nbHFiles + ", heapSize=" + heapSize + ", lastSeqIds=" + lastSeqIds + ", endOfFile=" +
       endOfFile + "]";
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index 6f6fb59..642139c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -34,6 +35,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -46,13 +49,17 @@ import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobal
 import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationTableSource;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
 import org.junit.AfterClass;
@@ -313,11 +320,12 @@ public class TestReplicationEndpoint extends TestReplicationBase {
   }
 
   @Test
-  public void testMetricsSourceBaseSourcePassthrough() {
+  public void testMetricsSourceBaseSourcePassThrough() {
     /*
-     * The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl and a
-     * MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces. Both of
-     * those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which allows
+     * The replication MetricsSource wraps a MetricsReplicationTableSourceImpl,
+     * MetricsReplicationSourceSourceImpl and a MetricsReplicationGlobalSourceSource,
+     * so that metrics get written to both namespaces. Both of those classes wrap a
+     * MetricsReplicationSourceImpl that implements BaseSource, which allows
      * for custom JMX metrics. This test checks to make sure the BaseSource decorator logic on
      * MetricsSource actually calls down through the two layers of wrapping to the actual
      * BaseSource.
@@ -336,9 +344,10 @@ public class TestReplicationEndpoint extends TestReplicationBase {
     MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource);
     doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue();
 
-    Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable = new HashMap<>();
-    MetricsSource source =
-      new MetricsSource(id, singleSourceSource, spyglobalSourceSource, singleSourceSourceByTable);
+    Map<String, MetricsReplicationTableSource> singleSourceSourceByTable =
+      new HashMap<>();
+    MetricsSource source = new MetricsSource(id, singleSourceSource,
+      spyglobalSourceSource, singleSourceSourceByTable);
 
 
     String gaugeName = "gauge";
@@ -387,16 +396,37 @@ public class TestReplicationEndpoint extends TestReplicationBase {
     boolean containsRandomNewTable = source.getSingleSourceSourceByTable()
         .containsKey("RandomNewTable");
     Assert.assertEquals(false, containsRandomNewTable);
-    source.setAgeOfLastShippedOpByTable(123L, "RandomNewTable");
+    source.updateTableLevelMetrics(createWALEntriesWithSize("RandomNewTable"));
     containsRandomNewTable = source.getSingleSourceSourceByTable()
         .containsKey("RandomNewTable");
     Assert.assertEquals(true, containsRandomNewTable);
-    MetricsReplicationSourceSource msr = source.getSingleSourceSourceByTable()
+    MetricsReplicationTableSource msr = source.getSingleSourceSourceByTable()
         .get("RandomNewTable");
-    // cannot put more concreate value here to verify because the age is arbitrary.
-    // as long as it's greater than 0, we see it as correct answer.
+
+    // age should be greater than zero we created the entry with time in the past
     Assert.assertTrue(msr.getLastShippedAge() > 0);
+    Assert.assertTrue(msr.getShippedBytes() > 0);
+
+  }
 
+  private List<Pair<Entry, Long>> createWALEntriesWithSize(String tableName) {
+    List<Pair<Entry, Long>> walEntriesWithSize = new ArrayList<>();
+    byte[] a = new byte[] { 'a' };
+    Entry entry = createEntry(tableName, null, a);
+    walEntriesWithSize.add(new Pair<>(entry, 10L));
+    return walEntriesWithSize;
+  }
+
+  private Entry createEntry(String tableName, TreeMap<byte[], Integer> scopes, byte[]... kvs) {
+    WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf(tableName),
+        System.currentTimeMillis() - 1L,
+        scopes);
+    WALEdit edit1 = new WALEdit();
+
+    for (byte[] kv : kvs) {
+      edit1.add(new KeyValue(kv, kv, kv));
+    }
+    return new Entry(key1, edit1);
   }
 
   private void doPut(byte[] row) throws IOException {