You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2020/05/14 20:46:54 UTC
[hbase] 01/02: HBASE-24350: Extending and Fixing HBaseTable level
replication metrics (#1704)
This is an automated email from the ASF dual-hosted git repository.
apurtell pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 1ff532678dca7faf1c7404c6fe0e118ef0cd4872
Author: Sandeep Pal <50...@users.noreply.github.com>
AuthorDate: Thu May 14 10:34:51 2020 -0700
HBASE-24350: Extending and Fixing HBaseTable level replication metrics (#1704)
Signed-off-by: Wellington Chevreuil <wc...@apache.org>
Signed-off-by: Andrew Purtell <ap...@apache.org>
---
.../MetricsReplicationSourceFactory.java | 1 +
...ory.java => MetricsReplicationTableSource.java} | 12 +-
.../MetricsReplicationSourceFactoryImpl.java | 4 +
.../MetricsReplicationTableSourceImpl.java | 134 +++++++++++++++++++++
.../replication/regionserver/MetricsSource.java | 36 +++++-
.../regionserver/ReplicationSource.java | 2 +-
.../regionserver/ReplicationSourceShipper.java | 8 +-
.../regionserver/ReplicationSourceWALReader.java | 6 +-
.../replication/regionserver/WALEntryBatch.java | 24 ++--
.../hbase/replication/TestReplicationEndpoint.java | 52 ++++++--
10 files changed, 242 insertions(+), 37 deletions(-)
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
index 6534b11..2816f83 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
@@ -24,5 +24,6 @@ import org.apache.yetus.audience.InterfaceAudience;
public interface MetricsReplicationSourceFactory {
public MetricsReplicationSinkSource getSink();
public MetricsReplicationSourceSource getSource(String id);
+ public MetricsReplicationTableSource getTableSource(String tableName);
public MetricsReplicationSourceSource getGlobalSource();
}
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSource.java
similarity index 78%
copy from hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
copy to hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSource.java
index 6534b11..faa944a 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSource.java
@@ -18,11 +18,15 @@
package org.apache.hadoop.hbase.replication.regionserver;
+import org.apache.hadoop.hbase.metrics.BaseSource;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
-public interface MetricsReplicationSourceFactory {
- public MetricsReplicationSinkSource getSink();
- public MetricsReplicationSourceSource getSource(String id);
- public MetricsReplicationSourceSource getGlobalSource();
+public interface MetricsReplicationTableSource extends BaseSource {
+
+ void setLastShippedAge(long age);
+ void incrShippedBytes(long size);
+ long getShippedBytes();
+ void clear();
+ long getLastShippedAge();
}
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
index af310f0..a3b3462 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
@@ -35,6 +35,10 @@ public class MetricsReplicationSourceFactoryImpl implements MetricsReplicationSo
return new MetricsReplicationSourceSourceImpl(SourceHolder.INSTANCE.source, id);
}
+ @Override public MetricsReplicationTableSource getTableSource(String tableName) {
+ return new MetricsReplicationTableSourceImpl(SourceHolder.INSTANCE.source, tableName);
+ }
+
@Override public MetricsReplicationSourceSource getGlobalSource() {
return new MetricsReplicationGlobalSourceSource(SourceHolder.INSTANCE.source);
}
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSourceImpl.java
new file mode 100644
index 0000000..7120a73
--- /dev/null
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSourceImpl.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.metrics2.lib.MutableFastCounter;
+import org.apache.hadoop.metrics2.lib.MutableHistogram;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * This is the metric source for table level replication metrics.
+ * We can easy monitor some useful table level replication metrics such as
+ * ageOfLastShippedOp and shippedBytes
+ */
+@InterfaceAudience.Private
+public class MetricsReplicationTableSourceImpl implements MetricsReplicationTableSource {
+
+ private final MetricsReplicationSourceImpl rms;
+ private final String tableName;
+ private final String ageOfLastShippedOpKey;
+ private String keyPrefix;
+
+ private final String shippedBytesKey;
+
+ private final MutableHistogram ageOfLastShippedOpHist;
+ private final MutableFastCounter shippedBytesCounter;
+
+ public MetricsReplicationTableSourceImpl(MetricsReplicationSourceImpl rms, String tableName) {
+ this.rms = rms;
+ this.tableName = tableName;
+ this.keyPrefix = "source." + this.tableName + ".";
+
+ ageOfLastShippedOpKey = this.keyPrefix + "ageOfLastShippedOp";
+ ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(ageOfLastShippedOpKey);
+
+ shippedBytesKey = this.keyPrefix + "shippedBytes";
+ shippedBytesCounter = rms.getMetricsRegistry().getCounter(shippedBytesKey, 0L);
+ }
+
+ @Override
+ public void setLastShippedAge(long age) {
+ ageOfLastShippedOpHist.add(age);
+ }
+
+ @Override
+ public void incrShippedBytes(long size) {
+ shippedBytesCounter.incr(size);
+ }
+
+ @Override
+ public void clear() {
+ rms.removeMetric(ageOfLastShippedOpKey);
+ rms.removeMetric(shippedBytesKey);
+ }
+
+ @Override
+ public long getLastShippedAge() {
+ return ageOfLastShippedOpHist.getMax();
+ }
+
+ @Override
+ public long getShippedBytes() {
+ return shippedBytesCounter.value();
+ }
+
+ @Override
+ public void init() {
+ rms.init();
+ }
+
+ @Override
+ public void setGauge(String gaugeName, long value) {
+ rms.setGauge(this.keyPrefix + gaugeName, value);
+ }
+
+ @Override
+ public void incGauge(String gaugeName, long delta) {
+ rms.incGauge(this.keyPrefix + gaugeName, delta);
+ }
+
+ @Override
+ public void decGauge(String gaugeName, long delta) {
+ rms.decGauge(this.keyPrefix + gaugeName, delta);
+ }
+
+ @Override
+ public void removeMetric(String key) {
+ rms.removeMetric(this.keyPrefix + key);
+ }
+
+ @Override
+ public void incCounters(String counterName, long delta) {
+ rms.incCounters(this.keyPrefix + counterName, delta);
+ }
+
+ @Override
+ public void updateHistogram(String name, long value) {
+ rms.updateHistogram(this.keyPrefix + name, value);
+ }
+
+ @Override
+ public String getMetricsContext() {
+ return rms.getMetricsContext();
+ }
+
+ @Override
+ public String getMetricsDescription() {
+ return rms.getMetricsDescription();
+ }
+
+ @Override
+ public String getMetricsJmxContext() {
+ return rms.getMetricsJmxContext();
+ }
+
+ @Override
+ public String getMetricsName() {
+ return rms.getMetricsName();
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 92ab070..e7a8f36 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -19,8 +19,11 @@
package org.apache.hadoop.hbase.replication.regionserver;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,7 +52,7 @@ public class MetricsSource implements BaseSource {
private final MetricsReplicationSourceSource singleSourceSource;
private final MetricsReplicationSourceSource globalSourceSource;
- private Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable;
+ private Map<String, MetricsReplicationTableSource> singleSourceSourceByTable;
/**
* Constructor used to register the metrics
@@ -73,7 +76,7 @@ public class MetricsSource implements BaseSource {
*/
public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
MetricsReplicationSourceSource globalSourceSource,
- Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable) {
+ Map<String, MetricsReplicationTableSource> singleSourceSourceByTable) {
this.id = id;
this.singleSourceSource = singleSourceSource;
this.globalSourceSource = globalSourceSource;
@@ -94,6 +97,29 @@ public class MetricsSource implements BaseSource {
}
/**
+ * Update the table level replication metrics per table
+ *
+ * @param walEntries List of pairs of WAL entry and it's size
+ */
+ public void updateTableLevelMetrics(List<Pair<Entry, Long>> walEntries) {
+ for (Pair<Entry, Long> walEntryWithSize : walEntries) {
+ Entry entry = walEntryWithSize.getFirst();
+ long entrySize = walEntryWithSize.getSecond();
+ String tableName = entry.getKey().getTableName().getNameAsString();
+ long writeTime = entry.getKey().getWriteTime();
+ long age = EnvironmentEdgeManager.currentTime() - writeTime;
+
+ // get the replication metrics source for table at the run time
+ MetricsReplicationTableSource tableSource = this.getSingleSourceSourceByTable()
+ .computeIfAbsent(tableName,
+ t -> CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
+ .getTableSource(t));
+ tableSource.setLastShippedAge(age);
+ tableSource.incrShippedBytes(entrySize);
+ }
+ }
+
+ /**
* Set the age of the last edit that was shipped group by table
* @param timestamp write time of the edit
* @param tableName String as group and tableName
@@ -102,7 +128,7 @@ public class MetricsSource implements BaseSource {
long age = EnvironmentEdgeManager.currentTime() - timestamp;
this.getSingleSourceSourceByTable().computeIfAbsent(
tableName, t -> CompatibilitySingletonFactory
- .getInstance(MetricsReplicationSourceFactory.class).getSource(t))
+ .getInstance(MetricsReplicationSourceFactory.class).getTableSource(t))
.setLastShippedAge(age);
}
@@ -111,7 +137,7 @@ public class MetricsSource implements BaseSource {
* @param walGroup which group we are getting
* @return age
*/
- public long getAgeofLastShippedOp(String walGroup) {
+ public long getAgeOfLastShippedOp(String walGroup) {
return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup);
}
@@ -436,7 +462,7 @@ public class MetricsSource implements BaseSource {
}
@VisibleForTesting
- public Map<String, MetricsReplicationSourceSource> getSingleSourceSourceByTable() {
+ public Map<String, MetricsReplicationTableSource> getSingleSourceSourceByTable() {
return singleSourceSourceByTable;
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 155f08c..bf6ab7c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -340,7 +340,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
for (Map.Entry<String, ReplicationSourceShipper> walGroupShipper : workerThreads.entrySet()) {
String walGroupId = walGroupShipper.getKey();
ReplicationSourceShipper shipper = walGroupShipper.getValue();
- ageOfLastShippedOp = metrics.getAgeofLastShippedOp(walGroupId);
+ ageOfLastShippedOp = metrics.getAgeOfLastShippedOp(walGroupId);
int queueSize = queues.get(walGroupId).size();
replicationDelay = metrics.getReplicationDelay();
Path currentPath = shipper.getCurrentPath();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 9874c46..65430b3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -198,16 +197,13 @@ public class ReplicationSourceShipper extends Thread {
for (Entry entry : entries) {
cleanUpHFileRefs(entry.getEdit());
LOG.trace("shipped entry {}: ", entry);
- TableName tableName = entry.getKey().getTableName();
- source.getSourceMetrics().setAgeOfLastShippedOpByTable(entry.getKey().getWriteTime(),
- tableName.getNameAsString());
}
// Log and clean up WAL logs
updateLogPosition(entryBatch);
//offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size)
//this sizeExcludeBulkLoad has to use same calculation that when calling
- //acquireBufferQuota() in ReplicatinoSourceWALReader because they maintain
+ //acquireBufferQuota() in ReplicationSourceWALReader because they maintain
//same variable: totalBufferUsed
source.postShipEdits(entries, sizeExcludeBulkLoad);
// FIXME check relationship between wal group and overall
@@ -215,6 +211,8 @@ public class ReplicationSourceShipper extends Thread {
entryBatch.getNbHFiles());
source.getSourceMetrics().setAgeOfLastShippedOp(
entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
+ source.getSourceMetrics().updateTableLevelMetrics(entryBatch.getWaEntriesWithSize());
+
if (LOG.isTraceEnabled()) {
LOG.debug("Replicated {} entries or {} operations in {} ms",
entries.size(), entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 82cfb81..70f02fa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -175,7 +175,7 @@ class ReplicationSourceWALReader extends Thread {
entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId());
long entrySize = getEntrySizeIncludeBulkLoad(entry);
long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
- batch.addEntry(entry);
+ batch.addEntry(entry, entrySize);
updateBatchStats(batch, entry, entrySize);
boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
@@ -310,9 +310,7 @@ class ReplicationSourceWALReader extends Thread {
private long getEntrySizeIncludeBulkLoad(Entry entry) {
WALEdit edit = entry.getEdit();
- WALKey key = entry.getKey();
- return edit.heapSize() + sizeOfStoreFilesIncludeBulkLoad(edit) +
- key.estimatedSerializedSizeOf();
+ return getEntrySizeExcludeBulkLoad(entry) + sizeOfStoreFilesIncludeBulkLoad(edit);
}
public static long getEntrySizeExcludeBulkLoad(Entry entry) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
index 22b2de7..bc600d0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
@@ -21,7 +21,9 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
@@ -34,7 +36,8 @@ class WALEntryBatch {
// used by recovered replication queue to indicate that all the entries have been read.
public static final WALEntryBatch NO_MORE_DATA = new WALEntryBatch(0, null);
- private List<Entry> walEntries;
+ private List<Pair<Entry, Long>> walEntriesWithSize;
+
// last WAL that was read
private Path lastWalPath;
// position in WAL of last entry in this batch
@@ -54,7 +57,7 @@ class WALEntryBatch {
* @param lastWalPath Path of the WAL the last entry in this batch was read from
*/
WALEntryBatch(int maxNbEntries, Path lastWalPath) {
- this.walEntries = new ArrayList<>(maxNbEntries);
+ this.walEntriesWithSize = new ArrayList<>(maxNbEntries);
this.lastWalPath = lastWalPath;
}
@@ -66,15 +69,22 @@ class WALEntryBatch {
return batch;
}
- public void addEntry(Entry entry) {
- walEntries.add(entry);
+ public void addEntry(Entry entry, long entrySize) {
+ walEntriesWithSize.add(new Pair<>(entry, entrySize));
}
/**
* @return the WAL Entries.
*/
public List<Entry> getWalEntries() {
- return walEntries;
+ return walEntriesWithSize.stream().map(Pair::getFirst).collect(Collectors.toList());
+ }
+
+ /**
+ * @return the WAL Entries.
+ */
+ public List<Pair<Entry, Long>> getWaEntriesWithSize() {
+ return walEntriesWithSize;
}
/**
@@ -96,7 +106,7 @@ class WALEntryBatch {
}
public int getNbEntries() {
- return walEntries.size();
+ return walEntriesWithSize.size();
}
/**
@@ -160,7 +170,7 @@ class WALEntryBatch {
@Override
public String toString() {
- return "WALEntryBatch [walEntries=" + walEntries + ", lastWalPath=" + lastWalPath +
+ return "WALEntryBatch [walEntries=" + walEntriesWithSize + ", lastWalPath=" + lastWalPath +
", lastWalPosition=" + lastWalPosition + ", nbRowKeys=" + nbRowKeys + ", nbHFiles=" +
nbHFiles + ", heapSize=" + heapSize + ", lastSeqIds=" + lastSeqIds + ", endOfFile=" +
endOfFile + "]";
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index 6f6fb59..642139c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -34,6 +35,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -46,13 +49,17 @@ import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobal
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationTableSource;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
import org.junit.AfterClass;
@@ -313,11 +320,12 @@ public class TestReplicationEndpoint extends TestReplicationBase {
}
@Test
- public void testMetricsSourceBaseSourcePassthrough() {
+ public void testMetricsSourceBaseSourcePassThrough() {
/*
- * The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl and a
- * MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces. Both of
- * those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which allows
+ * The replication MetricsSource wraps a MetricsReplicationTableSourceImpl,
+ * MetricsReplicationSourceSourceImpl and a MetricsReplicationGlobalSourceSource,
+ * so that metrics get written to both namespaces. Both of those classes wrap a
+ * MetricsReplicationSourceImpl that implements BaseSource, which allows
* for custom JMX metrics. This test checks to make sure the BaseSource decorator logic on
* MetricsSource actually calls down through the two layers of wrapping to the actual
* BaseSource.
@@ -336,9 +344,10 @@ public class TestReplicationEndpoint extends TestReplicationBase {
MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource);
doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue();
- Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable = new HashMap<>();
- MetricsSource source =
- new MetricsSource(id, singleSourceSource, spyglobalSourceSource, singleSourceSourceByTable);
+ Map<String, MetricsReplicationTableSource> singleSourceSourceByTable =
+ new HashMap<>();
+ MetricsSource source = new MetricsSource(id, singleSourceSource,
+ spyglobalSourceSource, singleSourceSourceByTable);
String gaugeName = "gauge";
@@ -387,16 +396,37 @@ public class TestReplicationEndpoint extends TestReplicationBase {
boolean containsRandomNewTable = source.getSingleSourceSourceByTable()
.containsKey("RandomNewTable");
Assert.assertEquals(false, containsRandomNewTable);
- source.setAgeOfLastShippedOpByTable(123L, "RandomNewTable");
+ source.updateTableLevelMetrics(createWALEntriesWithSize("RandomNewTable"));
containsRandomNewTable = source.getSingleSourceSourceByTable()
.containsKey("RandomNewTable");
Assert.assertEquals(true, containsRandomNewTable);
- MetricsReplicationSourceSource msr = source.getSingleSourceSourceByTable()
+ MetricsReplicationTableSource msr = source.getSingleSourceSourceByTable()
.get("RandomNewTable");
- // cannot put more concreate value here to verify because the age is arbitrary.
- // as long as it's greater than 0, we see it as correct answer.
+
+ // age should be greater than zero we created the entry with time in the past
Assert.assertTrue(msr.getLastShippedAge() > 0);
+ Assert.assertTrue(msr.getShippedBytes() > 0);
+
+ }
+ private List<Pair<Entry, Long>> createWALEntriesWithSize(String tableName) {
+ List<Pair<Entry, Long>> walEntriesWithSize = new ArrayList<>();
+ byte[] a = new byte[] { 'a' };
+ Entry entry = createEntry(tableName, null, a);
+ walEntriesWithSize.add(new Pair<>(entry, 10L));
+ return walEntriesWithSize;
+ }
+
+ private Entry createEntry(String tableName, TreeMap<byte[], Integer> scopes, byte[]... kvs) {
+ WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf(tableName),
+ System.currentTimeMillis() - 1L,
+ scopes);
+ WALEdit edit1 = new WALEdit();
+
+ for (byte[] kv : kvs) {
+ edit1.add(new KeyValue(kv, kv, kv));
+ }
+ return new Entry(key1, edit1);
}
private void doPut(byte[] row) throws IOException {