You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by we...@apache.org on 2020/09/11 17:58:32 UTC
[hbase] branch branch-2.2 updated: HBASE-25003 Backport HBASE-24350
and HBASE-24779 to branch-2.2 (#2372)
This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.2 by this push:
new 7cea318 HBASE-25003 Backport HBASE-24350 and HBASE-24779 to branch-2.2 (#2372)
7cea318 is described below
commit 7cea31869ee836f051fa2298b053676babeb78dc
Author: tamasadami <42...@users.noreply.github.com>
AuthorDate: Fri Sep 11 19:58:12 2020 +0200
HBASE-25003 Backport HBASE-24350 and HBASE-24779 to branch-2.2 (#2372)
Includes the following
* HBASE-24350: Extending and Fixing HBaseTable level replication metrics (#1704)
* Amend HBASE-24350: Extending and Fixing HBaseTable level replication metrics (#1704)
* Rename WALEntryBatch#getWaEntriesWithSize -> getWalEntriesWithSize
* HBASE-24779 Report on the WAL edit buffer usage/limit for replication
* HBASE-24779 Report on the WAL edit buffer usage/limit for replication - addendum
Co-authored-by: Sandeep Pal <50...@users.noreply.github.com>
Co-authored-by: Andrew Purtell <ap...@apache.org>
Co-authored-by: Josh Elser <el...@apache.org>
---
.../MetricsReplicationGlobalSourceSource.java | 28 ++---
.../MetricsReplicationSourceFactory.java | 3 +-
...ory.java => MetricsReplicationTableSource.java} | 12 +-
... MetricsReplicationGlobalSourceSourceImpl.java} | 18 ++-
.../MetricsReplicationSourceFactoryImpl.java | 8 +-
.../MetricsReplicationSourceSourceImpl.java | 2 +-
.../MetricsReplicationTableSourceImpl.java | 134 +++++++++++++++++++++
.../replication/regionserver/MetricsSource.java | 55 +++++++--
.../replication/regionserver/Replication.java | 7 +-
.../regionserver/ReplicationSource.java | 6 +-
.../regionserver/ReplicationSourceManager.java | 25 +++-
.../regionserver/ReplicationSourceShipper.java | 10 +-
.../regionserver/ReplicationSourceWALReader.java | 17 +--
.../replication/regionserver/WALEntryBatch.java | 24 ++--
.../hbase/replication/TestReplicationEndpoint.java | 97 ++++++++++++---
.../regionserver/TestWALEntryStream.java | 5 +
16 files changed, 380 insertions(+), 71 deletions(-)
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
similarity index 56%
copy from hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
copy to hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index af310f0..e373a6c 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -20,22 +20,20 @@ package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
-public class MetricsReplicationSourceFactoryImpl implements MetricsReplicationSourceFactory {
+public interface MetricsReplicationGlobalSourceSource extends MetricsReplicationSourceSource {
- private static enum SourceHolder {
- INSTANCE;
- final MetricsReplicationSourceImpl source = new MetricsReplicationSourceImpl();
- }
+ public static final String SOURCE_WAL_READER_EDITS_BUFFER = "source.walReaderEditsBufferUsage";
- @Override public MetricsReplicationSinkSource getSink() {
- return new MetricsReplicationSinkSourceImpl(SourceHolder.INSTANCE.source);
- }
+ /**
+ * Sets the total usage of memory used by edits in memory read from WALs. The memory represented
+ * by this usage measure is across peers/sources. For example, we may batch the same WAL edits
+ * multiple times for the sake of replicating them to multiple peers..
+ * @param usage The memory used by edits in bytes
+ */
+ void setWALReaderEditsBufferBytes(long usage);
- @Override public MetricsReplicationSourceSource getSource(String id) {
- return new MetricsReplicationSourceSourceImpl(SourceHolder.INSTANCE.source, id);
- }
-
- @Override public MetricsReplicationSourceSource getGlobalSource() {
- return new MetricsReplicationGlobalSourceSource(SourceHolder.INSTANCE.source);
- }
+ /**
+ * Returns the size, in bytes, of edits held in memory to be replicated across all peers.
+ */
+ long getWALReaderEditsBufferBytes();
}
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
index 6534b11..5e4ad27 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
@@ -24,5 +24,6 @@ import org.apache.yetus.audience.InterfaceAudience;
public interface MetricsReplicationSourceFactory {
public MetricsReplicationSinkSource getSink();
public MetricsReplicationSourceSource getSource(String id);
- public MetricsReplicationSourceSource getGlobalSource();
+ public MetricsReplicationTableSource getTableSource(String tableName);
+ public MetricsReplicationGlobalSourceSource getGlobalSource();
}
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSource.java
similarity index 78%
copy from hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
copy to hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSource.java
index 6534b11..faa944a 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSource.java
@@ -18,11 +18,15 @@
package org.apache.hadoop.hbase.replication.regionserver;
+import org.apache.hadoop.hbase.metrics.BaseSource;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
-public interface MetricsReplicationSourceFactory {
- public MetricsReplicationSinkSource getSink();
- public MetricsReplicationSourceSource getSource(String id);
- public MetricsReplicationSourceSource getGlobalSource();
+public interface MetricsReplicationTableSource extends BaseSource {
+
+ void setLastShippedAge(long age);
+ void incrShippedBytes(long size);
+ long getShippedBytes();
+ void clear();
+ long getLastShippedAge();
}
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
similarity index 93%
rename from hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
rename to hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
index 4e8c810..963abba 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
@@ -24,7 +24,8 @@ import org.apache.hadoop.metrics2.lib.MutableHistogram;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
-public class MetricsReplicationGlobalSourceSource implements MetricsReplicationSourceSource{
+public class MetricsReplicationGlobalSourceSourceImpl
+ implements MetricsReplicationGlobalSourceSource {
private static final String KEY_PREFIX = "source.";
private final MetricsReplicationSourceImpl rms;
@@ -53,8 +54,9 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
private final MutableFastCounter completedWAL;
private final MutableFastCounter completedRecoveryQueue;
private final MutableFastCounter failedRecoveryQueue;
+ private final MutableGaugeLong walReaderBufferUsageBytes;
- public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {
+ public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl rms) {
this.rms = rms;
ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(SOURCE_AGE_OF_LAST_SHIPPED_OP);
@@ -92,6 +94,8 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
.getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L);
failedRecoveryQueue = rms.getMetricsRegistry()
.getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L);
+ walReaderBufferUsageBytes = rms.getMetricsRegistry()
+ .getGauge(SOURCE_WAL_READER_EDITS_BUFFER, 0L);
}
@Override public void setLastShippedAge(long age) {
@@ -260,4 +264,14 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
public String getMetricsName() {
return rms.getMetricsName();
}
+
+ @Override
+ public void setWALReaderEditsBufferBytes(long usage) {
+ this.walReaderBufferUsageBytes.set(usage);
+ }
+
+ @Override
+ public long getWALReaderEditsBufferBytes() {
+ return this.walReaderBufferUsageBytes.value();
+ }
}
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
index af310f0..c0cd1c7 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
@@ -35,7 +35,11 @@ public class MetricsReplicationSourceFactoryImpl implements MetricsReplicationSo
return new MetricsReplicationSourceSourceImpl(SourceHolder.INSTANCE.source, id);
}
- @Override public MetricsReplicationSourceSource getGlobalSource() {
- return new MetricsReplicationGlobalSourceSource(SourceHolder.INSTANCE.source);
+ @Override public MetricsReplicationTableSource getTableSource(String tableName) {
+ return new MetricsReplicationTableSourceImpl(SourceHolder.INSTANCE.source, tableName);
+ }
+
+ @Override public MetricsReplicationGlobalSourceSource getGlobalSource() {
+ return new MetricsReplicationGlobalSourceSourceImpl(SourceHolder.INSTANCE.source);
}
}
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index 0ad5052..79e9bc1 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -162,7 +162,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
@Override public void incrShippedBytes(long size) {
shippedBytesCounter.incr(size);
- MetricsReplicationGlobalSourceSource
+ MetricsReplicationGlobalSourceSourceImpl
.incrementKBsCounter(shippedBytesCounter, shippedKBsCounter);
}
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSourceImpl.java
new file mode 100644
index 0000000..7120a73
--- /dev/null
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSourceImpl.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.metrics2.lib.MutableFastCounter;
+import org.apache.hadoop.metrics2.lib.MutableHistogram;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * This is the metric source for table level replication metrics.
+ * We can easy monitor some useful table level replication metrics such as
+ * ageOfLastShippedOp and shippedBytes
+ */
+@InterfaceAudience.Private
+public class MetricsReplicationTableSourceImpl implements MetricsReplicationTableSource {
+
+ private final MetricsReplicationSourceImpl rms;
+ private final String tableName;
+ private final String ageOfLastShippedOpKey;
+ private String keyPrefix;
+
+ private final String shippedBytesKey;
+
+ private final MutableHistogram ageOfLastShippedOpHist;
+ private final MutableFastCounter shippedBytesCounter;
+
+ public MetricsReplicationTableSourceImpl(MetricsReplicationSourceImpl rms, String tableName) {
+ this.rms = rms;
+ this.tableName = tableName;
+ this.keyPrefix = "source." + this.tableName + ".";
+
+ ageOfLastShippedOpKey = this.keyPrefix + "ageOfLastShippedOp";
+ ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(ageOfLastShippedOpKey);
+
+ shippedBytesKey = this.keyPrefix + "shippedBytes";
+ shippedBytesCounter = rms.getMetricsRegistry().getCounter(shippedBytesKey, 0L);
+ }
+
+ @Override
+ public void setLastShippedAge(long age) {
+ ageOfLastShippedOpHist.add(age);
+ }
+
+ @Override
+ public void incrShippedBytes(long size) {
+ shippedBytesCounter.incr(size);
+ }
+
+ @Override
+ public void clear() {
+ rms.removeMetric(ageOfLastShippedOpKey);
+ rms.removeMetric(shippedBytesKey);
+ }
+
+ @Override
+ public long getLastShippedAge() {
+ return ageOfLastShippedOpHist.getMax();
+ }
+
+ @Override
+ public long getShippedBytes() {
+ return shippedBytesCounter.value();
+ }
+
+ @Override
+ public void init() {
+ rms.init();
+ }
+
+ @Override
+ public void setGauge(String gaugeName, long value) {
+ rms.setGauge(this.keyPrefix + gaugeName, value);
+ }
+
+ @Override
+ public void incGauge(String gaugeName, long delta) {
+ rms.incGauge(this.keyPrefix + gaugeName, delta);
+ }
+
+ @Override
+ public void decGauge(String gaugeName, long delta) {
+ rms.decGauge(this.keyPrefix + gaugeName, delta);
+ }
+
+ @Override
+ public void removeMetric(String key) {
+ rms.removeMetric(this.keyPrefix + key);
+ }
+
+ @Override
+ public void incCounters(String counterName, long delta) {
+ rms.incCounters(this.keyPrefix + counterName, delta);
+ }
+
+ @Override
+ public void updateHistogram(String name, long value) {
+ rms.updateHistogram(this.keyPrefix + name, value);
+ }
+
+ @Override
+ public String getMetricsContext() {
+ return rms.getMetricsContext();
+ }
+
+ @Override
+ public String getMetricsDescription() {
+ return rms.getMetricsDescription();
+ }
+
+ @Override
+ public String getMetricsJmxContext() {
+ return rms.getMetricsJmxContext();
+ }
+
+ @Override
+ public String getMetricsName() {
+ return rms.getMetricsName();
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 830ebe1..49ef392 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -19,8 +19,11 @@
package org.apache.hadoop.hbase.replication.regionserver;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,8 +50,8 @@ public class MetricsSource implements BaseSource {
private String id;
private final MetricsReplicationSourceSource singleSourceSource;
- private final MetricsReplicationSourceSource globalSourceSource;
- private Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable;
+ private final MetricsReplicationGlobalSourceSource globalSourceSource;
+ private Map<String, MetricsReplicationTableSource> singleSourceSourceByTable;
/**
* Constructor used to register the metrics
@@ -71,8 +74,8 @@ public class MetricsSource implements BaseSource {
* @param globalSourceSource Class to monitor global-scoped metrics
*/
public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
- MetricsReplicationSourceSource globalSourceSource,
- Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable) {
+ MetricsReplicationGlobalSourceSource globalSourceSource,
+ Map<String, MetricsReplicationTableSource> singleSourceSourceByTable) {
this.id = id;
this.singleSourceSource = singleSourceSource;
this.globalSourceSource = globalSourceSource;
@@ -93,6 +96,29 @@ public class MetricsSource implements BaseSource {
}
/**
+ * Update the table level replication metrics per table
+ *
+ * @param walEntries List of pairs of WAL entry and it's size
+ */
+ public void updateTableLevelMetrics(List<Pair<Entry, Long>> walEntries) {
+ for (Pair<Entry, Long> walEntryWithSize : walEntries) {
+ Entry entry = walEntryWithSize.getFirst();
+ long entrySize = walEntryWithSize.getSecond();
+ String tableName = entry.getKey().getTableName().getNameAsString();
+ long writeTime = entry.getKey().getWriteTime();
+ long age = EnvironmentEdgeManager.currentTime() - writeTime;
+
+ // get the replication metrics source for table at the run time
+ MetricsReplicationTableSource tableSource = this.getSingleSourceSourceByTable()
+ .computeIfAbsent(tableName,
+ t -> CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
+ .getTableSource(t));
+ tableSource.setLastShippedAge(age);
+ tableSource.incrShippedBytes(entrySize);
+ }
+ }
+
+ /**
* Set the age of the last edit that was shipped group by table
* @param timestamp write time of the edit
* @param tableName String as group and tableName
@@ -101,7 +127,7 @@ public class MetricsSource implements BaseSource {
long age = EnvironmentEdgeManager.currentTime() - timestamp;
this.getSingleSourceSourceByTable().computeIfAbsent(
tableName, t -> CompatibilitySingletonFactory
- .getInstance(MetricsReplicationSourceFactory.class).getSource(t))
+ .getInstance(MetricsReplicationSourceFactory.class).getTableSource(t))
.setLastShippedAge(age);
}
@@ -119,7 +145,7 @@ public class MetricsSource implements BaseSource {
* @param walGroup which group we are getting
* @return age
*/
- public long getAgeofLastShippedOp(String walGroup) {
+ public long getAgeOfLastShippedOp(String walGroup) {
return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup);
}
@@ -393,7 +419,22 @@ public class MetricsSource implements BaseSource {
}
@VisibleForTesting
- public Map<String, MetricsReplicationSourceSource> getSingleSourceSourceByTable() {
+ public Map<String, MetricsReplicationTableSource> getSingleSourceSourceByTable() {
return singleSourceSourceByTable;
}
+
+ /**
+ * Sets the amount of memory in bytes used in this RegionServer by edits pending replication.
+ */
+ public void setWALReaderEditsBufferUsage(long usageInBytes) {
+ globalSourceSource.setWALReaderEditsBufferBytes(usageInBytes);
+ }
+
+ /**
+ * Returns the amount of memory in bytes used in this RegionServer by edits pending replication.
+ * @return the amount of memory in bytes used in this RegionServer by edits pending replication.
+ */
+ public long getWALReaderEditsBufferUsage() {
+ return globalSourceSource.getWALReaderEditsBufferBytes();
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 752cfb8..bec9042 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
@@ -72,6 +73,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
private int statsThreadPeriod;
// ReplicationLoad to access replication metrics
private ReplicationLoad replicationLoad;
+ private MetricsReplicationGlobalSourceSource globalMetricsSource;
private PeerProcedureHandler peerProcedureHandler;
@@ -119,9 +121,12 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
} catch (KeeperException ke) {
throw new IOException("Could not read cluster id", ke);
}
+ this.globalMetricsSource = CompatibilitySingletonFactory
+ .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, replicationTracker, conf,
this.server, fs, logDir, oldLogDir, clusterId,
- walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty());
+ walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(),
+ globalMetricsSource);
if (walProvider != null) {
walProvider
.addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 4726949..c5f72c9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -329,7 +329,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
String walGroupId = walGroupShipper.getKey();
ReplicationSourceShipper shipper = walGroupShipper.getValue();
lastTimeStamp = metrics.getLastTimeStampOfWalGroup(walGroupId);
- ageOfLastShippedOp = metrics.getAgeofLastShippedOp(walGroupId);
+ ageOfLastShippedOp = metrics.getAgeOfLastShippedOp(walGroupId);
int queueSize = queues.get(walGroupId).size();
replicationDelay =
ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize);
@@ -706,7 +706,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
throttler.addPushSize(batchSize);
}
totalReplicatedEdits.addAndGet(entries.size());
- totalBufferUsed.addAndGet(-batchSize);
+ long newBufferUsed = totalBufferUsed.addAndGet(-batchSize);
+ // Record the new buffer usage
+ this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index a6fc313..2806110 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -155,6 +155,9 @@ public class ReplicationSourceManager implements ReplicationListener {
private AtomicLong totalBufferUsed = new AtomicLong();
+ // Total buffer size on this RegionServer for holding batched edits to be shipped.
+ private final long totalBufferLimit;
+ private final MetricsReplicationGlobalSourceSource globalMetrics;
/**
* Creates a replication manager and sets the watch on all the other registered region servers
@@ -171,7 +174,8 @@ public class ReplicationSourceManager implements ReplicationListener {
public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf,
Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId,
- WALFileLengthProvider walFileLengthProvider) throws IOException {
+ WALFileLengthProvider walFileLengthProvider,
+ MetricsReplicationGlobalSourceSource globalMetrics) throws IOException {
// CopyOnWriteArrayList is thread-safe.
// Generally, reading is more than modifying.
this.sources = new ConcurrentHashMap<>();
@@ -205,6 +209,9 @@ public class ReplicationSourceManager implements ReplicationListener {
this.latestPaths = new HashSet<Path>();
replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
+ this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
+ HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
+ this.globalMetrics = globalMetrics;
}
/**
@@ -862,6 +869,14 @@ public class ReplicationSourceManager implements ReplicationListener {
}
/**
+ * Returns the maximum size in bytes of edits held in memory which are pending replication
+ * across all sources inside this RegionServer.
+ */
+ public long getTotalBufferLimit() {
+ return totalBufferLimit;
+ }
+
+ /**
* Get the directory where wals are archived
* @return the directory where wals are archived
*/
@@ -898,6 +913,10 @@ public class ReplicationSourceManager implements ReplicationListener {
*/
public String getStats() {
StringBuilder stats = new StringBuilder();
+ // Print stats that apply across all Replication Sources
+ stats.append("Global stats: ");
+ stats.append("WAL Edits Buffer Used=").append(getTotalBufferUsed().get()).append("B, Limit=")
+ .append(getTotalBufferLimit()).append("B\n");
for (ReplicationSourceInterface source : this.sources.values()) {
stats.append("Normal source for cluster " + source.getPeerId() + ": ");
stats.append(source.getStats() + "\n");
@@ -923,4 +942,8 @@ public class ReplicationSourceManager implements ReplicationListener {
int activeFailoverTaskCount() {
return executor.getActiveCount();
}
+
+ MetricsReplicationGlobalSourceSource getGlobalMetrics() {
+ return this.globalMetrics;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 23f736f..78e5521 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
@@ -201,17 +200,14 @@ public class ReplicationSourceShipper extends Thread {
// Clean up hfile references
for (Entry entry : entries) {
cleanUpHFileRefs(entry.getEdit());
-
- TableName tableName = entry.getKey().getTableName();
- source.getSourceMetrics().setAgeOfLastShippedOpByTable(entry.getKey().getWriteTime(),
- tableName.getNameAsString());
+ LOG.trace("shipped entry {}: ", entry);
}
// Log and clean up WAL logs
updateLogPosition(entryBatch);
//offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size)
//this sizeExcludeBulkLoad has to use same calculation that when calling
- //acquireBufferQuota() in ReplicatinoSourceWALReader because they maintain
+ //acquireBufferQuota() in ReplicationSourceWALReader because they maintain
//same variable: totalBufferUsed
source.postShipEdits(entries, sizeExcludeBulkLoad);
// FIXME check relationship between wal group and overall
@@ -219,6 +215,8 @@ public class ReplicationSourceShipper extends Thread {
entryBatch.getNbHFiles());
source.getSourceMetrics().setAgeOfLastShippedOp(
entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
+ source.getSourceMetrics().updateTableLevelMetrics(entryBatch.getWalEntriesWithSize());
+
if (LOG.isTraceEnabled()) {
LOG.trace("Replicated {} entries or {} operations in {} ms",
entries.size(), entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 12653f5..328cb8f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
@@ -104,8 +103,7 @@ class ReplicationSourceWALReader extends Thread {
// the +1 is for the current thread reading before placing onto the queue
int batchCount = conf.getInt("replication.source.nb.batches", 1);
this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed();
- this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
- HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
+ this.totalBufferQuota = source.getSourceManager().getTotalBufferLimit();
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
this.maxRetriesMultiplier =
@@ -172,7 +170,7 @@ class ReplicationSourceWALReader extends Thread {
}
long entrySize = getEntrySizeIncludeBulkLoad(entry);
long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
- batch.addEntry(entry);
+ batch.addEntry(entry, entrySize);
updateBatchStats(batch, entry, entrySize);
boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
@@ -272,6 +270,8 @@ class ReplicationSourceWALReader extends Thread {
private boolean checkQuota() {
// try not to go over total quota
if (totalBufferUsed.get() > totalBufferQuota) {
+ LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
+ this.source.getPeerId(), totalBufferUsed.get(), totalBufferQuota);
Threads.sleep(sleepForRetries);
return false;
}
@@ -306,9 +306,7 @@ class ReplicationSourceWALReader extends Thread {
private long getEntrySizeIncludeBulkLoad(Entry entry) {
WALEdit edit = entry.getEdit();
- WALKey key = entry.getKey();
- return edit.heapSize() + sizeOfStoreFilesIncludeBulkLoad(edit) +
- key.estimatedSerializedSizeOf();
+ return getEntrySizeExcludeBulkLoad(entry) + sizeOfStoreFilesIncludeBulkLoad(edit);
}
public static long getEntrySizeExcludeBulkLoad(Entry entry) {
@@ -401,7 +399,10 @@ class ReplicationSourceWALReader extends Thread {
* @return true if we should clear buffer and push all
*/
private boolean acquireBufferQuota(long size) {
- return totalBufferUsed.addAndGet(size) >= totalBufferQuota;
+ long newBufferUsed = totalBufferUsed.addAndGet(size);
+ // Record the new buffer usage
+ this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
+ return newBufferUsed >= totalBufferQuota;
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
index 22b2de7..4f96c96 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
@@ -21,7 +21,9 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
@@ -34,7 +36,8 @@ class WALEntryBatch {
// used by recovered replication queue to indicate that all the entries have been read.
public static final WALEntryBatch NO_MORE_DATA = new WALEntryBatch(0, null);
- private List<Entry> walEntries;
+ private List<Pair<Entry, Long>> walEntriesWithSize;
+
// last WAL that was read
private Path lastWalPath;
// position in WAL of last entry in this batch
@@ -54,7 +57,7 @@ class WALEntryBatch {
* @param lastWalPath Path of the WAL the last entry in this batch was read from
*/
WALEntryBatch(int maxNbEntries, Path lastWalPath) {
- this.walEntries = new ArrayList<>(maxNbEntries);
+ this.walEntriesWithSize = new ArrayList<>(maxNbEntries);
this.lastWalPath = lastWalPath;
}
@@ -66,15 +69,22 @@ class WALEntryBatch {
return batch;
}
- public void addEntry(Entry entry) {
- walEntries.add(entry);
+ public void addEntry(Entry entry, long entrySize) {
+ walEntriesWithSize.add(new Pair<>(entry, entrySize));
}
/**
* @return the WAL Entries.
*/
public List<Entry> getWalEntries() {
- return walEntries;
+ return walEntriesWithSize.stream().map(Pair::getFirst).collect(Collectors.toList());
+ }
+
+ /**
+ * @return the WAL Entries.
+ */
+ public List<Pair<Entry, Long>> getWalEntriesWithSize() {
+ return walEntriesWithSize;
}
/**
@@ -96,7 +106,7 @@ class WALEntryBatch {
}
public int getNbEntries() {
- return walEntries.size();
+ return walEntriesWithSize.size();
}
/**
@@ -160,7 +170,7 @@ class WALEntryBatch {
@Override
public String toString() {
- return "WALEntryBatch [walEntries=" + walEntries + ", lastWalPath=" + lastWalPath +
+ return "WALEntryBatch [walEntries=" + walEntriesWithSize + ", lastWalPath=" + lastWalPath +
", lastWalPosition=" + lastWalPosition + ", nbRowKeys=" + nbRowKeys + ", nbHFiles=" +
nbHFiles + ", heapSize=" + heapSize + ", lastSeqIds=" + lastSeqIds + ", endOfFile=" +
endOfFile + "]";
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index 5d7366d..cc7e20c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -35,6 +36,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -44,16 +47,21 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSourceImpl;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationTableSource;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
import org.junit.AfterClass;
@@ -304,11 +312,12 @@ public class TestReplicationEndpoint extends TestReplicationBase {
}
@Test
- public void testMetricsSourceBaseSourcePassthrough() {
+ public void testMetricsSourceBaseSourcePassThrough() {
/*
- * The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl and a
- * MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces. Both of
- * those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which allows
+ * The replication MetricsSource wraps a MetricsReplicationTableSourceImpl,
+ * MetricsReplicationSourceSourceImpl and a MetricsReplicationGlobalSourceSource,
+ * so that metrics get written to both namespaces. Both of those classes wrap a
+ * MetricsReplicationSourceImpl that implements BaseSource, which allows
* for custom JMX metrics. This test checks to make sure the BaseSource decorator logic on
* MetricsSource actually calls down through the two layers of wrapping to the actual
* BaseSource.
@@ -322,14 +331,15 @@ public class TestReplicationEndpoint extends TestReplicationBase {
MetricsReplicationSourceSource singleSourceSource =
new MetricsReplicationSourceSourceImpl(singleRms, id);
- MetricsReplicationSourceSource globalSourceSource =
- new MetricsReplicationGlobalSourceSource(globalRms);
- MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource);
+ MetricsReplicationGlobalSourceSource globalSourceSource =
+ new MetricsReplicationGlobalSourceSourceImpl(globalRms);
+ MetricsReplicationGlobalSourceSource spyglobalSourceSource = spy(globalSourceSource);
doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue();
- Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable = new HashMap<>();
- MetricsSource source =
- new MetricsSource(id, singleSourceSource, spyglobalSourceSource, singleSourceSourceByTable);
+ Map<String, MetricsReplicationTableSource> singleSourceSourceByTable =
+ new HashMap<>();
+ MetricsSource source = new MetricsSource(id, singleSourceSource,
+ spyglobalSourceSource, singleSourceSourceByTable);
String gaugeName = "gauge";
@@ -378,16 +388,37 @@ public class TestReplicationEndpoint extends TestReplicationBase {
boolean containsRandomNewTable = source.getSingleSourceSourceByTable()
.containsKey("RandomNewTable");
Assert.assertEquals(false, containsRandomNewTable);
- source.setAgeOfLastShippedOpByTable(123L, "RandomNewTable");
+ source.updateTableLevelMetrics(createWALEntriesWithSize("RandomNewTable"));
containsRandomNewTable = source.getSingleSourceSourceByTable()
.containsKey("RandomNewTable");
Assert.assertEquals(true, containsRandomNewTable);
- MetricsReplicationSourceSource msr = source.getSingleSourceSourceByTable()
+ MetricsReplicationTableSource msr = source.getSingleSourceSourceByTable()
.get("RandomNewTable");
- // cannot put more concreate value here to verify because the age is arbitrary.
- // as long as it's greater than 0, we see it as correct answer.
+
+ // age should be greater than zero we created the entry with time in the past
Assert.assertTrue(msr.getLastShippedAge() > 0);
+ Assert.assertTrue(msr.getShippedBytes() > 0);
+
+ }
+
+ private List<Pair<Entry, Long>> createWALEntriesWithSize(String tableName) {
+ List<Pair<Entry, Long>> walEntriesWithSize = new ArrayList<>();
+ byte[] a = new byte[] { 'a' };
+ Entry entry = createEntry(tableName, null, a);
+ walEntriesWithSize.add(new Pair<>(entry, 10L));
+ return walEntriesWithSize;
+ }
+ private Entry createEntry(String tableName, TreeMap<byte[], Integer> scopes, byte[]... kvs) {
+ WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf(tableName),
+ System.currentTimeMillis() - 1L,
+ scopes);
+ WALEdit edit1 = new WALEdit();
+
+ for (byte[] kv : kvs) {
+ edit1.add(new KeyValue(kv, kv, kv));
+ }
+ return new Entry(key1, edit1);
}
private void doPut(byte[] row) throws IOException {
@@ -463,6 +494,44 @@ public class TestReplicationEndpoint extends TestReplicationBase {
}
}
+ /**
+ * Not used by unit tests, helpful for manual testing with replication.
+ * <p>
+ * Snippet for `hbase shell`:
+ * <pre>
+ * create 't', 'f'
+ * add_peer '1', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.replication.' + \
+ * 'TestReplicationEndpoint$SleepingReplicationEndpointForTest'
+ * alter 't', {NAME=>'f', REPLICATION_SCOPE=>1}
+ * </pre>
+ */
+ public static class SleepingReplicationEndpointForTest extends ReplicationEndpointForTest {
+ private long duration;
+ public SleepingReplicationEndpointForTest() {
+ super();
+ }
+
+ @Override
+ public void init(Context context) throws IOException {
+ super.init(context);
+ if (this.ctx != null) {
+ duration = this.ctx.getConfiguration().getLong(
+ "hbase.test.sleep.replication.endpoint.duration.millis", 5000L);
+ }
+ }
+
+ @Override
+ public boolean replicate(ReplicateContext context) {
+ try {
+ Thread.sleep(duration);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ return super.replicate(context);
+ }
+ }
+
public static class InterClusterReplicationEndpointForTest
extends HBaseInterClusterReplicationEndpoint {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index b4af38b..067ab63 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -336,6 +336,8 @@ public class TestWALEntryStream {
private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) {
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+ when(mockSourceManager.getTotalBufferLimit()).thenReturn(
+ (long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
Server mockServer = Mockito.mock(Server.class);
ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.getSourceManager()).thenReturn(mockSourceManager);
@@ -343,6 +345,9 @@ public class TestWALEntryStream {
when(source.getWALFileLengthProvider()).thenReturn(log);
when(source.getServer()).thenReturn(mockServer);
when(source.isRecovered()).thenReturn(recovered);
+ MetricsReplicationGlobalSourceSource globalMetrics = Mockito.mock(
+ MetricsReplicationGlobalSourceSource.class);
+ when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
return source;
}