You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2020/08/07 22:44:30 UTC
[hbase] branch master updated: HBASE-24779 Report on the WAL edit
buffer usage/limit for replication
This is an automated email from the ASF dual-hosted git repository.
elserj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 124af63 HBASE-24779 Report on the WAL edit buffer usage/limit for replication
124af63 is described below
commit 124af6392cdebff2fe2693c572a9564dc318eee5
Author: Josh Elser <el...@apache.org>
AuthorDate: Fri Aug 7 12:59:17 2020 -0400
HBASE-24779 Report on the WAL edit buffer usage/limit for replication
Closes #2193
Signed-off-by: Bharath Vissapragada <bh...@apache.org>
Signed-off-by: Sean Busbey <bu...@apache.org>
Signed-off-by: Wellington Chevreuil <wc...@apache.org>
---
.../MetricsReplicationGlobalSourceSource.java | 248 ++-------------------
... MetricsReplicationGlobalSourceSourceImpl.java} | 19 +-
.../MetricsReplicationSourceFactory.java | 2 +-
.../MetricsReplicationSourceFactoryImpl.java | 4 +-
.../replication/regionserver/MetricsSource.java | 19 +-
.../replication/regionserver/Replication.java | 6 +-
.../regionserver/ReplicationSource.java | 4 +-
.../regionserver/ReplicationSourceManager.java | 25 ++-
.../regionserver/ReplicationSourceWALReader.java | 11 +-
.../hbase/replication/TestReplicationEndpoint.java | 45 +++-
.../regionserver/TestWALEntryStream.java | 5 +
11 files changed, 140 insertions(+), 248 deletions(-)
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index 630fdb8..e373a6c 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,239 +15,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.replication.regionserver;
-import org.apache.hadoop.metrics2.lib.MutableFastCounter;
-import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
-import org.apache.hadoop.metrics2.lib.MutableHistogram;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
-public class MetricsReplicationGlobalSourceSource implements MetricsReplicationSourceSource{
- private static final String KEY_PREFIX = "source.";
-
- private final MetricsReplicationSourceImpl rms;
-
- private final MutableHistogram ageOfLastShippedOpHist;
- private final MutableGaugeLong sizeOfLogQueueGauge;
- private final MutableFastCounter logReadInEditsCounter;
- private final MutableFastCounter walEditsFilteredCounter;
- private final MutableFastCounter shippedBatchesCounter;
- private final MutableFastCounter shippedOpsCounter;
- private final MutableFastCounter shippedBytesCounter;
- private final MutableFastCounter logReadInBytesCounter;
- private final MutableFastCounter shippedHFilesCounter;
- private final MutableGaugeLong sizeOfHFileRefsQueueGauge;
- private final MutableFastCounter unknownFileLengthForClosedWAL;
- private final MutableFastCounter uncleanlyClosedWAL;
- private final MutableFastCounter uncleanlyClosedSkippedBytes;
- private final MutableFastCounter restartWALReading;
- private final MutableFastCounter repeatedFileBytes;
- private final MutableFastCounter completedWAL;
- private final MutableFastCounter completedRecoveryQueue;
- private final MutableFastCounter failedRecoveryQueue;
-
- public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {
- this.rms = rms;
-
- ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(SOURCE_AGE_OF_LAST_SHIPPED_OP);
-
- sizeOfLogQueueGauge = rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_LOG_QUEUE, 0L);
-
- shippedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BATCHES, 0L);
-
- shippedOpsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_OPS, 0L);
-
- shippedBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BYTES, 0L);
-
- logReadInBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_BYTES, 0L);
-
- logReadInEditsCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_EDITS, 0L);
-
- walEditsFilteredCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_EDITS_FILTERED, 0L);
-
- shippedHFilesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_HFILES, 0L);
-
- sizeOfHFileRefsQueueGauge =
- rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_HFILE_REFS_QUEUE, 0L);
-
- unknownFileLengthForClosedWAL = rms.getMetricsRegistry()
- .getCounter(SOURCE_CLOSED_LOGS_WITH_UNKNOWN_LENGTH, 0L);
- uncleanlyClosedWAL = rms.getMetricsRegistry().getCounter(SOURCE_UNCLEANLY_CLOSED_LOGS, 0L);
- uncleanlyClosedSkippedBytes = rms.getMetricsRegistry()
- .getCounter(SOURCE_UNCLEANLY_CLOSED_IGNORED_IN_BYTES, 0L);
- restartWALReading = rms.getMetricsRegistry().getCounter(SOURCE_RESTARTED_LOG_READING, 0L);
- repeatedFileBytes = rms.getMetricsRegistry().getCounter(SOURCE_REPEATED_LOG_FILE_BYTES, 0L);
- completedWAL = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_LOGS, 0L);
- completedRecoveryQueue = rms.getMetricsRegistry()
- .getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L);
- failedRecoveryQueue = rms.getMetricsRegistry()
- .getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L);
- }
-
- @Override public void setLastShippedAge(long age) {
- ageOfLastShippedOpHist.add(age);
- }
-
- @Override public void incrSizeOfLogQueue(int size) {
- sizeOfLogQueueGauge.incr(size);
- }
-
- @Override public void decrSizeOfLogQueue(int size) {
- sizeOfLogQueueGauge.decr(size);
- }
-
- @Override public void incrLogReadInEdits(long size) {
- logReadInEditsCounter.incr(size);
- }
-
- @Override public void incrLogEditsFiltered(long size) {
- walEditsFilteredCounter.incr(size);
- }
-
- @Override public void incrBatchesShipped(int batches) {
- shippedBatchesCounter.incr(batches);
- }
-
- @Override public void incrOpsShipped(long ops) {
- shippedOpsCounter.incr(ops);
- }
-
- @Override public void incrShippedBytes(long size) {
- shippedBytesCounter.incr(size);
- }
-
- @Override public void incrLogReadInBytes(long size) {
- logReadInBytesCounter.incr(size);
- }
-
- @Override public void clear() {
- }
-
- @Override
- public long getLastShippedAge() {
- return ageOfLastShippedOpHist.getMax();
- }
-
- @Override public void incrHFilesShipped(long hfiles) {
- shippedHFilesCounter.incr(hfiles);
- }
-
- @Override
- public void incrSizeOfHFileRefsQueue(long size) {
- sizeOfHFileRefsQueueGauge.incr(size);
- }
-
- @Override
- public void decrSizeOfHFileRefsQueue(long size) {
- sizeOfHFileRefsQueueGauge.decr(size);
- }
-
- @Override
- public int getSizeOfLogQueue() {
- return (int)sizeOfLogQueueGauge.value();
- }
-
- @Override
- public void incrUnknownFileLengthForClosedWAL() {
- unknownFileLengthForClosedWAL.incr(1L);
- }
- @Override
- public void incrUncleanlyClosedWALs() {
- uncleanlyClosedWAL.incr(1L);
- }
- @Override
- public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
- uncleanlyClosedSkippedBytes.incr(bytes);
- }
- @Override
- public void incrRestartedWALReading() {
- restartWALReading.incr(1L);
- }
- @Override
- public void incrRepeatedFileBytes(final long bytes) {
- repeatedFileBytes.incr(bytes);
- }
- @Override
- public void incrCompletedWAL() {
- completedWAL.incr(1L);
- }
- @Override
- public void incrCompletedRecoveryQueue() {
- completedRecoveryQueue.incr(1L);
- }
- @Override
- public void incrFailedRecoveryQueue() {
- failedRecoveryQueue.incr(1L);
- }
- @Override
- public void init() {
- rms.init();
- }
-
- @Override
- public void setGauge(String gaugeName, long value) {
- rms.setGauge(KEY_PREFIX + gaugeName, value);
- }
-
- @Override
- public void incGauge(String gaugeName, long delta) {
- rms.incGauge(KEY_PREFIX + gaugeName, delta);
- }
-
- @Override
- public void decGauge(String gaugeName, long delta) {
- rms.decGauge(KEY_PREFIX + gaugeName, delta);
- }
-
- @Override
- public void removeMetric(String key) {
- rms.removeMetric(KEY_PREFIX + key);
- }
-
- @Override
- public void incCounters(String counterName, long delta) {
- rms.incCounters(KEY_PREFIX + counterName, delta);
- }
-
- @Override
- public void updateHistogram(String name, long value) {
- rms.updateHistogram(KEY_PREFIX + name, value);
- }
-
- @Override
- public String getMetricsContext() {
- return rms.getMetricsContext();
- }
-
- @Override
- public String getMetricsDescription() {
- return rms.getMetricsDescription();
- }
-
- @Override
- public String getMetricsJmxContext() {
- return rms.getMetricsJmxContext();
- }
-
- @Override
- public String getMetricsName() {
- return rms.getMetricsName();
- }
-
- @Override
- public long getWALEditsRead() {
- return this.logReadInEditsCounter.value();
- }
-
- @Override
- public long getShippedOps() {
- return this.shippedOpsCounter.value();
- }
-
- @Override
- public long getEditsFiltered() {
- return this.walEditsFilteredCounter.value();
- }
+public interface MetricsReplicationGlobalSourceSource extends MetricsReplicationSourceSource {
+
+ public static final String SOURCE_WAL_READER_EDITS_BUFFER = "source.walReaderEditsBufferUsage";
+
+ /**
+ * Sets the total usage of memory used by edits in memory read from WALs. The memory represented
+ * by this usage measure is across peers/sources. For example, we may batch the same WAL edits
+ * multiple times for the sake of replicating them to multiple peers..
+ * @param usage The memory used by edits in bytes
+ */
+ void setWALReaderEditsBufferBytes(long usage);
+
+ /**
+ * Returns the size, in bytes, of edits held in memory to be replicated across all peers.
+ */
+ long getWALReaderEditsBufferBytes();
}
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
similarity index 92%
copy from hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
copy to hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
index 630fdb8..1c04109 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
@@ -24,7 +24,8 @@ import org.apache.hadoop.metrics2.lib.MutableHistogram;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
-public class MetricsReplicationGlobalSourceSource implements MetricsReplicationSourceSource{
+public class MetricsReplicationGlobalSourceSourceImpl
+ implements MetricsReplicationGlobalSourceSource {
private static final String KEY_PREFIX = "source.";
private final MetricsReplicationSourceImpl rms;
@@ -47,8 +48,9 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
private final MutableFastCounter completedWAL;
private final MutableFastCounter completedRecoveryQueue;
private final MutableFastCounter failedRecoveryQueue;
+ private final MutableGaugeLong walReaderBufferUsageBytes;
- public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {
+ public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl rms) {
this.rms = rms;
ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(SOURCE_AGE_OF_LAST_SHIPPED_OP);
@@ -84,6 +86,9 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
.getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L);
failedRecoveryQueue = rms.getMetricsRegistry()
.getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L);
+
+ walReaderBufferUsageBytes = rms.getMetricsRegistry()
+ .getGauge(SOURCE_WAL_READER_EDITS_BUFFER, 0L);
}
@Override public void setLastShippedAge(long age) {
@@ -250,4 +255,14 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
public long getEditsFiltered() {
return this.walEditsFilteredCounter.value();
}
+
+ @Override
+ public void setWALReaderEditsBufferBytes(long usage) {
+ this.walReaderBufferUsageBytes.set(usage);
+ }
+
+ @Override
+ public long getWALReaderEditsBufferBytes() {
+ return this.walReaderBufferUsageBytes.value();
+ }
}
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
index 2816f83..73d2cfd 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
@@ -25,5 +25,5 @@ public interface MetricsReplicationSourceFactory {
public MetricsReplicationSinkSource getSink();
public MetricsReplicationSourceSource getSource(String id);
public MetricsReplicationTableSource getTableSource(String tableName);
- public MetricsReplicationSourceSource getGlobalSource();
+ public MetricsReplicationGlobalSourceSourceImpl getGlobalSource();
}
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
index a3b3462..061fc58 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
@@ -39,7 +39,7 @@ public class MetricsReplicationSourceFactoryImpl implements MetricsReplicationSo
return new MetricsReplicationTableSourceImpl(SourceHolder.INSTANCE.source, tableName);
}
- @Override public MetricsReplicationSourceSource getGlobalSource() {
- return new MetricsReplicationGlobalSourceSource(SourceHolder.INSTANCE.source);
+ @Override public MetricsReplicationGlobalSourceSourceImpl getGlobalSource() {
+ return new MetricsReplicationGlobalSourceSourceImpl(SourceHolder.INSTANCE.source);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 39fe7b4..0f73576 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -51,7 +51,7 @@ public class MetricsSource implements BaseSource {
private long timeStampNextToReplicate;
private final MetricsReplicationSourceSource singleSourceSource;
- private final MetricsReplicationSourceSource globalSourceSource;
+ private final MetricsReplicationGlobalSourceSource globalSourceSource;
private Map<String, MetricsReplicationTableSource> singleSourceSourceByTable;
/**
@@ -75,7 +75,7 @@ public class MetricsSource implements BaseSource {
* @param globalSourceSource Class to monitor global-scoped metrics
*/
public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
- MetricsReplicationSourceSource globalSourceSource,
+ MetricsReplicationGlobalSourceSource globalSourceSource,
Map<String, MetricsReplicationTableSource> singleSourceSourceByTable) {
this.id = id;
this.singleSourceSource = singleSourceSource;
@@ -454,4 +454,19 @@ public class MetricsSource implements BaseSource {
public Map<String, MetricsReplicationTableSource> getSingleSourceSourceByTable() {
return singleSourceSourceByTable;
}
+
+ /**
+ * Sets the amount of memory in bytes used in this RegionServer by edits pending replication.
+ */
+ public void setWALReaderEditsBufferUsage(long usageInBytes) {
+ globalSourceSource.setWALReaderEditsBufferBytes(usageInBytes);
+ }
+
+ /**
+ * Returns the amount of memory in bytes used in this RegionServer by edits pending replication.
+ * @return
+ */
+ public long getWALReaderEditsBufferUsage() {
+ return globalSourceSource.getWALReaderEditsBufferBytes();
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 4cbce8c..195877b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
@@ -76,6 +77,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
private int statsThreadPeriod;
// ReplicationLoad to access replication metrics
private ReplicationLoad replicationLoad;
+ private MetricsReplicationGlobalSourceSource globalMetricsSource;
private PeerProcedureHandler peerProcedureHandler;
@@ -124,10 +126,12 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
throw new IOException("Could not read cluster id", ke);
}
SyncReplicationPeerMappingManager mapping = new SyncReplicationPeerMappingManager();
+ this.globalMetricsSource = CompatibilitySingletonFactory
+ .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers,
replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId,
walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(),
- mapping);
+ mapping, globalMetricsSource);
this.syncReplicationPeerInfoProvider =
new SyncReplicationPeerInfoProviderImpl(replicationPeers, mapping);
PeerActionListener peerActionListener = PeerActionListener.DUMMY;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 1d9269d..f24ecfa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -771,7 +771,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
throttler.addPushSize(batchSize);
}
totalReplicatedEdits.addAndGet(entries.size());
- totalBufferUsed.addAndGet(-batchSize);
+ long newBufferUsed = totalBufferUsed.addAndGet(-batchSize);
+ // Record the new buffer usage
+ this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 1a012bd..2cf91ed 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -169,6 +169,9 @@ public class ReplicationSourceManager implements ReplicationListener {
// Maximum number of retries before taking bold actions when deleting remote wal files for sync
// replication peer.
private final int maxRetriesMultiplier;
+ // Total buffer size on this RegionServer for holding batched edits to be shipped.
+ private final long totalBufferLimit;
+ private final MetricsReplicationGlobalSourceSource globalMetrics;
/**
* Creates a replication manager and sets the watch on all the other registered region servers
@@ -186,7 +189,8 @@ public class ReplicationSourceManager implements ReplicationListener {
ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf,
Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId,
WALFileLengthProvider walFileLengthProvider,
- SyncReplicationPeerMappingManager syncReplicationPeerMappingManager) throws IOException {
+ SyncReplicationPeerMappingManager syncReplicationPeerMappingManager,
+ MetricsReplicationGlobalSourceSource globalMetrics) throws IOException {
this.sources = new ConcurrentHashMap<>();
this.queueStorage = queueStorage;
this.replicationPeers = replicationPeers;
@@ -222,6 +226,9 @@ public class ReplicationSourceManager implements ReplicationListener {
this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000);
this.maxRetriesMultiplier =
this.conf.getInt("replication.source.sync.maxretriesmultiplier", 60);
+ this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
+ HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
+ this.globalMetrics = globalMetrics;
}
/**
@@ -1070,6 +1077,14 @@ public class ReplicationSourceManager implements ReplicationListener {
}
/**
+ * Returns the maximum size in bytes of edits held in memory which are pending replication
+ * across all sources inside this RegionServer.
+ */
+ public long getTotalBufferLimit() {
+ return totalBufferLimit;
+ }
+
+ /**
* Get the directory where wals are archived
* @return the directory where wals are archived
*/
@@ -1106,6 +1121,10 @@ public class ReplicationSourceManager implements ReplicationListener {
*/
public String getStats() {
StringBuilder stats = new StringBuilder();
+ // Print stats that apply across all Replication Sources
+ stats.append("Global stats: ");
+ stats.append("WAL Edits Buffer Used=").append(getTotalBufferUsed().get()).append("B, Limit=")
+ .append(getTotalBufferLimit()).append("B\n");
for (ReplicationSourceInterface source : this.sources.values()) {
stats.append("Normal source for cluster " + source.getPeerId() + ": ");
stats.append(source.getStats() + "\n");
@@ -1131,4 +1150,8 @@ public class ReplicationSourceManager implements ReplicationListener {
int activeFailoverTaskCount() {
return executor.getActiveCount();
}
+
+ MetricsReplicationGlobalSourceSource getGlobalMetrics() {
+ return this.globalMetrics;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 7e0e550..c71db1b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
@@ -104,8 +103,7 @@ class ReplicationSourceWALReader extends Thread {
// the +1 is for the current thread reading before placing onto the queue
int batchCount = conf.getInt("replication.source.nb.batches", 1);
this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed();
- this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
- HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
+ this.totalBufferQuota = source.getSourceManager().getTotalBufferLimit();
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
this.maxRetriesMultiplier =
@@ -276,6 +274,8 @@ class ReplicationSourceWALReader extends Thread {
private boolean checkQuota() {
// try not to go over total quota
if (totalBufferUsed.get() > totalBufferQuota) {
+ LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
+ this.source.getPeerId(), totalBufferUsed.get(), totalBufferQuota);
Threads.sleep(sleepForRetries);
return false;
}
@@ -404,7 +404,10 @@ class ReplicationSourceWALReader extends Thread {
* @return true if we should clear buffer and push all
*/
private boolean acquireBufferQuota(long size) {
- return totalBufferUsed.addAndGet(size) >= totalBufferQuota;
+ long newBufferUsed = totalBufferUsed.addAndGet(size);
+ // Record the new buffer usage
+ this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
+ return newBufferUsed >= totalBufferQuota;
}
/**
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index 4dd264c..5a6ac0c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSourceImpl;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl;
@@ -329,9 +330,9 @@ public class TestReplicationEndpoint extends TestReplicationBase {
MetricsReplicationSourceSource singleSourceSource =
new MetricsReplicationSourceSourceImpl(singleRms, id);
- MetricsReplicationSourceSource globalSourceSource =
- new MetricsReplicationGlobalSourceSource(globalRms);
- MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource);
+ MetricsReplicationGlobalSourceSource globalSourceSource =
+ new MetricsReplicationGlobalSourceSourceImpl(globalRms);
+ MetricsReplicationGlobalSourceSource spyglobalSourceSource = spy(globalSourceSource);
doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue();
Map<String, MetricsReplicationTableSource> singleSourceSourceByTable =
@@ -497,6 +498,44 @@ public class TestReplicationEndpoint extends TestReplicationBase {
}
}
+ /**
+ * Not used by unit tests, helpful for manual testing with replication.
+ * <p>
+ * Snippet for `hbase shell`:
+ * <pre>
+ * create 't', 'f'
+ * add_peer '1', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.replication.' + \
+ * 'TestReplicationEndpoint$SleepingReplicationEndpointForTest'
+ * alter 't', {NAME=>'f', REPLICATION_SCOPE=>1}
+ * </pre>
+ */
+ public static class SleepingReplicationEndpointForTest extends ReplicationEndpointForTest {
+ private long duration;
+ public SleepingReplicationEndpointForTest() {
+ super();
+ }
+
+ @Override
+ public void init(Context context) throws IOException {
+ super.init(context);
+ if (this.ctx != null) {
+ duration = this.ctx.getConfiguration().getLong(
+ "hbase.test.sleep.replication.endpoint.duration.millis", 5000L);
+ }
+ }
+
+ @Override
+ public boolean replicate(ReplicateContext context) {
+ try {
+ Thread.sleep(duration);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ return super.replicate(context);
+ }
+ }
+
public static class InterClusterReplicationEndpointForTest
extends HBaseInterClusterReplicationEndpoint {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 2a21660..63e7a8b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -371,6 +371,8 @@ public class TestWALEntryStream {
private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) {
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+ when(mockSourceManager.getTotalBufferLimit()).thenReturn(
+ (long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
Server mockServer = Mockito.mock(Server.class);
ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.getSourceManager()).thenReturn(mockSourceManager);
@@ -378,6 +380,9 @@ public class TestWALEntryStream {
when(source.getWALFileLengthProvider()).thenReturn(log);
when(source.getServer()).thenReturn(mockServer);
when(source.isRecovered()).thenReturn(recovered);
+ MetricsReplicationGlobalSourceSource globalMetrics = Mockito.mock(
+ MetricsReplicationGlobalSourceSource.class);
+ when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
return source;
}