You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by wc...@apache.org on 2021/01/12 10:49:37 UTC
[hbase] branch branch-2.2 updated: HBASE-24813 ReplicationSource should clear buffer usage on Replicatio… (#2546) (#2849)
This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.2 by this push:
new 3242c8a HBASE-24813 ReplicationSource should clear buffer usage on Replicatio… (#2546) (#2849)
3242c8a is described below
commit 3242c8a3d1e10e80e9c3603b1199b56940bd17a5
Author: Wellington Ramos Chevreuil <wc...@apache.org>
AuthorDate: Tue Jan 12 09:40:25 2021 +0000
HBASE-24813 ReplicationSource should clear buffer usage on Replicatio… (#2546) (#2849)
Signed-off-by: Ankit Singhal <an...@apache.org>
Signed-off-by: Josh Elser <el...@apache.org>
(cherry picked from commit fdae12d754b523d6e20a27de4aab59d3e11042f4)
---
.../regionserver/ReplicationSource.java | 5 ++
.../regionserver/ReplicationSourceShipper.java | 59 +++++++++++++++-
.../regionserver/ReplicationSourceWALReader.java | 3 +-
.../regionserver/TestReplicationSource.java | 79 ++++++++++++++++------
4 files changed, 123 insertions(+), 23 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index e3f8c69..180acb3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -577,6 +577,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
Threads.shutdown(initThread, this.sleepForRetries);
}
Collection<ReplicationSourceShipper> workers = workerThreads.values();
+
for (ReplicationSourceShipper worker : workers) {
worker.stopWorker();
worker.entryReader.setReaderRunning(false);
@@ -585,6 +586,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
if (this.replicationEndpoint != null) {
this.replicationEndpoint.stop();
}
+
for (ReplicationSourceShipper worker : workers) {
if (worker.isAlive() || worker.entryReader.isAlive()) {
try {
@@ -603,6 +605,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
worker.entryReader.interrupt();
}
}
+ //If worker is already stopped but there was still entries batched,
+ //we need to clear buffer used for non processed entries
+ worker.clearWALEntryBatch();
}
if (join) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 78e5521..eb6777d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -21,6 +21,8 @@ import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTi
import java.io.IOException;
import java.util.List;
import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.LongAccumulator;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@@ -56,7 +58,7 @@ public class ReplicationSourceShipper extends Thread {
private final Configuration conf;
protected final String walGroupId;
protected final PriorityBlockingQueue<Path> queue;
- private final ReplicationSourceInterface source;
+ private final ReplicationSource source;
// Last position in the log that we sent to ZooKeeper
// It will be accessed by the stats thread so make it volatile
@@ -76,7 +78,7 @@ public class ReplicationSourceShipper extends Thread {
private final int shipEditsTimeout;
public ReplicationSourceShipper(Configuration conf, String walGroupId,
- PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) {
+ PriorityBlockingQueue<Path> queue, ReplicationSource source) {
this.conf = conf;
this.walGroupId = walGroupId;
this.queue = queue;
@@ -126,6 +128,7 @@ public class ReplicationSourceShipper extends Thread {
if (!isFinished()) {
setWorkerState(WorkerState.STOPPED);
} else {
+ source.workerThreads.remove(this.walGroupId);
postFinish();
}
}
@@ -335,4 +338,56 @@ public class ReplicationSourceShipper extends Thread {
}
return sleepMultiplier < maxRetriesMultiplier;
}
+
+ /**
+ * Attempts to properly update <code>ReplicationSourceManager.totalBufferUser</code>,
+ * in case there were unprocessed entries batched by the reader to the shipper,
+ * but the shipper didn't manage to ship those because the replication source is being terminated.
+ * In that case, it iterates through the batched entries and decrease the pending
+ * entries size from <code>ReplicationSourceManager.totalBufferUser</code>
+ * <p/>
+ * <b>NOTES</b>
+ * 1) This method should only be called upon replication source termination.
+ * It blocks waiting for both shipper and reader threads termination,
+ * to make sure no race conditions
+ * when updating <code>ReplicationSourceManager.totalBufferUser</code>.
+ *
+ * 2) It <b>does not</b> attempt to terminate reader and shipper threads. Those <b>must</b>
+ * have been triggered interruption/termination prior to calling this method.
+ */
+ void clearWALEntryBatch() {
+ long timeout = System.currentTimeMillis() + this.shipEditsTimeout;
+ while(this.isAlive() || this.entryReader.isAlive()){
+ try {
+ if (System.currentTimeMillis() >= timeout) {
+ LOG.warn("Shipper clearWALEntryBatch method timed out whilst waiting reader/shipper "
+ + "thread to stop. Not cleaning buffer usage. Shipper alive: {}; Reader alive: {}",
+ this.source.getPeerId(), this.isAlive(), this.entryReader.isAlive());
+ return;
+ } else {
+ // Wait both shipper and reader threads to stop
+ Thread.sleep(this.sleepForRetries);
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("{} Interrupted while waiting {} to stop on clearWALEntryBatch. "
+ + "Not cleaning buffer usage: {}", this.source.getPeerId(), this.getName(), e);
+ return;
+ }
+ }
+ LongAccumulator totalToDecrement = new LongAccumulator((a,b) -> a + b, 0);
+ entryReader.entryBatchQueue.forEach(w -> {
+ entryReader.entryBatchQueue.remove(w);
+ w.getWalEntries().forEach(e -> {
+ long entrySizeExcludeBulkLoad = ReplicationSourceWALReader.getEntrySizeExcludeBulkLoad(e);
+ totalToDecrement.accumulate(entrySizeExcludeBulkLoad);
+ });
+ });
+ if( LOG.isTraceEnabled()) {
+ LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.",
+ totalToDecrement.longValue());
+ }
+ long newBufferUsed = source.getSourceManager().getTotalBufferUsed()
+ .addAndGet(-totalToDecrement.longValue());
+ source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 328cb8f..cbc4922 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -60,7 +60,8 @@ class ReplicationSourceWALReader extends Thread {
private final WALEntryFilter filter;
private final ReplicationSource source;
- private final BlockingQueue<WALEntryBatch> entryBatchQueue;
+ @InterfaceAudience.Private
+ final BlockingQueue<WALEntryBatch> entryBatchQueue;
// max (heap) size of each batch - multiply by number of batches in queue to get total
private final long replicationBatchSizeCapacity;
// max count of each batch - multiply by number of batches in queue to get total
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index 274ccab..3fcfdfe 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,8 +20,11 @@ package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.OptionalLong;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -31,6 +34,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -48,12 +52,6 @@ import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
-import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
-import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSourceShipper;
-import org.apache.hadoop.hbase.replication.regionserver.Replication;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -142,7 +140,7 @@ public class TestReplicationSource {
entry = reader.next();
assertNotNull(entry);
- entry = reader.next();
+ reader.next();
entry = reader.next();
assertNull(entry);
@@ -168,12 +166,12 @@ public class TestReplicationSource {
}
};
replicationEndpoint.start();
- ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
- Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
+ ReplicationPeer mockPeer = mock(ReplicationPeer.class);
+ when(mockPeer.getPeerBandwidth()).thenReturn(0L);
Configuration testConf = HBaseConfiguration.create();
testConf.setInt("replication.source.maxretriesmultiplier", 1);
- ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
- Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
+ ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
+ when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null,
p -> OptionalLong.empty(), null);
ExecutorService executor = Executors.newSingleThreadExecutor();
@@ -194,6 +192,47 @@ public class TestReplicationSource {
});
}
+ @Test
+ public void testTerminateClearsBuffer() throws Exception {
+ ReplicationSource source = new ReplicationSource();
+ ReplicationSourceManager mockManager = mock(ReplicationSourceManager.class);
+ MetricsReplicationGlobalSourceSource mockMetrics =
+ mock(MetricsReplicationGlobalSourceSource.class);
+ AtomicLong buffer = new AtomicLong();
+ when(mockManager.getTotalBufferUsed()).thenReturn(buffer);
+ when(mockManager.getGlobalMetrics()).thenReturn(mockMetrics);
+ ReplicationPeer mockPeer = mock(ReplicationPeer.class);
+ when(mockPeer.getPeerBandwidth()).thenReturn(0L);
+ Configuration testConf = HBaseConfiguration.create();
+ source.init(testConf, null, mockManager, null, mockPeer, null,
+ "testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class));
+ ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null,
+ conf, null, 0, null, source);
+ ReplicationSourceShipper shipper =
+ new ReplicationSourceShipper(conf, null, null, source);
+ shipper.entryReader = reader;
+ source.workerThreads.put("testPeer", shipper);
+ WALEntryBatch batch = new WALEntryBatch(10, logDir);
+ WAL.Entry mockEntry = mock(WAL.Entry.class);
+ WALEdit mockEdit = mock(WALEdit.class);
+ WALKeyImpl mockKey = mock(WALKeyImpl.class);
+ when(mockEntry.getEdit()).thenReturn(mockEdit);
+ when(mockEdit.isEmpty()).thenReturn(false);
+ when(mockEntry.getKey()).thenReturn(mockKey);
+ when(mockKey.estimatedSerializedSizeOf()).thenReturn(1000L);
+ when(mockEdit.heapSize()).thenReturn(10000L);
+ when(mockEdit.size()).thenReturn(0);
+ ArrayList<Cell> cells = new ArrayList<>();
+ KeyValue kv = new KeyValue(Bytes.toBytes("0001"), Bytes.toBytes("f"),
+ Bytes.toBytes("1"), Bytes.toBytes("v1"));
+ cells.add(kv);
+ when(mockEdit.getCells()).thenReturn(cells);
+ reader.addEntryToBatch(batch, mockEntry);
+ reader.entryBatchQueue.put(batch);
+ source.terminate("test");
+ assertEquals(0, source.getSourceManager().getTotalBufferUsed().get());
+ }
+
/**
* Tests that recovered queues are preserved on a regionserver shutdown.
* See HBASE-18192
@@ -303,15 +342,15 @@ public class TestReplicationSource {
ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L);
PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
queue.put(new Path("/www/html/test"));
- RecoveredReplicationSource source = Mockito.mock(RecoveredReplicationSource.class);
- Server server = Mockito.mock(Server.class);
- Mockito.when(server.getServerName()).thenReturn(serverName);
- Mockito.when(source.getServer()).thenReturn(server);
- Mockito.when(source.getServerWALsBelongTo()).thenReturn(deadServer);
- ReplicationQueueStorage storage = Mockito.mock(ReplicationQueueStorage.class);
- Mockito.when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any()))
+ RecoveredReplicationSource source = mock(RecoveredReplicationSource.class);
+ Server server = mock(Server.class);
+ when(server.getServerName()).thenReturn(serverName);
+ when(source.getServer()).thenReturn(server);
+ when(source.getServerWALsBelongTo()).thenReturn(deadServer);
+ ReplicationQueueStorage storage = mock(ReplicationQueueStorage.class);
+ when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any()))
.thenReturn(1001L);
- Mockito.when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any()))
+ when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any()))
.thenReturn(-1L);
conf.setInt("replication.source.maxretriesmultiplier", -1);
RecoveredReplicationSourceShipper shipper =