You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2020/10/06 13:24:38 UTC
[hbase] branch branch-2 updated: Revert "HBASE-24813 ReplicationSource should clear buffer usage on Replicatio… (#2453)"
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 68fe79a Revert "HBASE-24813 ReplicationSource should clear buffer usage on Replicatio… (#2453)"
68fe79a is described below
commit 68fe79a6cfcb1c0361fff2fb30bbfa93a4162895
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Tue Oct 6 21:15:33 2020 +0800
Revert "HBASE-24813 ReplicationSource should clear buffer usage on Replicatio… (#2453)"
This reverts commit dedb630da04305e5b90dacf8406fc2a92be2882e.
---
.../regionserver/ReplicationSource.java | 11 +++--
.../regionserver/ReplicationSourceShipper.java | 52 ----------------------
.../regionserver/ReplicationSourceWALReader.java | 3 +-
.../regionserver/TestReplicationSource.java | 50 ++-------------------
4 files changed, 9 insertions(+), 107 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 0bd2b36..4542451 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -629,12 +629,14 @@ public class ReplicationSource implements ReplicationSourceInterface {
Threads.shutdown(initThread, this.sleepForRetries);
}
Collection<ReplicationSourceShipper> workers = workerThreads.values();
-
for (ReplicationSourceShipper worker : workers) {
worker.stopWorker();
- if (worker.entryReader != null) {
+ if(worker.entryReader != null) {
worker.entryReader.setReaderRunning(false);
}
+ }
+
+ for (ReplicationSourceShipper worker : workers) {
if (worker.isAlive() || worker.entryReader.isAlive()) {
try {
// Wait worker to stop
@@ -652,9 +654,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
worker.entryReader.interrupt();
}
}
- //If worker is already stopped but there was still entries batched,
- //we need to clear buffer used for non processed entries
- worker.clearWALEntryBatch();
}
if (this.replicationEndpoint != null) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 3a23c67..92646d2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -21,15 +21,12 @@ import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTi
import java.io.IOException;
import java.util.List;
import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.atomic.LongAccumulator;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
@@ -334,53 +331,4 @@ public class ReplicationSourceShipper extends Thread {
}
return sleepMultiplier < maxRetriesMultiplier;
}
-
- /**
- * Attempts to properly update <code>ReplicationSourceManager.totalBufferUser</code>,
- * in case there were unprocessed entries batched by the reader to the shipper,
- * but the shipper didn't manage to ship those because the replication source is being terminated.
- * In that case, it iterates through the batched entries and decrease the pending
- * entries size from <code>ReplicationSourceManager.totalBufferUser</code>
- * <p/>
- * <b>NOTES</b>
- * 1) This method should only be called upon replication source termination.
- * It blocks waiting for both shipper and reader threads termination,
- * to make sure no race conditions
- * when updating <code>ReplicationSourceManager.totalBufferUser</code>.
- *
- * 2) It <b>does not</b> attempt to terminate reader and shipper threads. Those <b>must</b>
- * have been triggered interruption/termination prior to calling this method.
- */
- void clearWALEntryBatch() {
- long timeout = EnvironmentEdgeManager.currentTime() + this.shipEditsTimeout;
- while(this.isAlive() || this.entryReader.isAlive()){
- try {
- if (System.currentTimeMillis() >= timeout) {
- LOG.warn("Interrupting source thread for peer {} without cleaning buffer usage "
- + "because clearWALEntryBatch method timed out whilst waiting reader/shipper "
- + "thread to stop.", this.source.getPeerId());
- Thread.currentThread().interrupt();
- } else {
- // Wait both shipper and reader threads to stop
- Thread.sleep(this.sleepForRetries);
- }
- } catch (InterruptedException e) {
- LOG.warn("{} Interrupted while waiting {} to stop on clearWALEntryBatch: {}",
- this.source.getPeerId(), this.getName(), e);
- Thread.currentThread().interrupt();
- }
- }
- LongAccumulator totalToDecrement = new LongAccumulator((a,b) -> a + b, 0);
- entryReader.entryBatchQueue.forEach(w -> {
- entryReader.entryBatchQueue.remove(w);
- w.getWalEntries().forEach(e -> {
- long entrySizeExcludeBulkLoad = entryReader.getEntrySizeExcludeBulkLoad(e);
- totalToDecrement.accumulate(entrySizeExcludeBulkLoad);
- });
- });
-
- LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.",
- totalToDecrement.longValue());
- source.getSourceManager().getTotalBufferUsed().addAndGet(-totalToDecrement.longValue());
- }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 1ff0a8b..dd5b4dc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -60,8 +60,7 @@ class ReplicationSourceWALReader extends Thread {
private final WALEntryFilter filter;
private final ReplicationSource source;
- @InterfaceAudience.Private
- final BlockingQueue<WALEntryBatch> entryBatchQueue;
+ private final BlockingQueue<WALEntryBatch> entryBatchQueue;
// max (heap) size of each batch - multiply by number of batches in queue to get total
private final long replicationBatchSizeCapacity;
// max count of each batch - multiply by number of batches in queue to get total
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index 3cedd7f..15f202f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -21,10 +21,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.OptionalLong;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
@@ -271,47 +268,6 @@ public class TestReplicationSource {
}
}
- @Test
- public void testTerminateClearsBuffer() throws Exception {
- ReplicationSource source = new ReplicationSource();
- ReplicationSourceManager mockManager = mock(ReplicationSourceManager.class);
- MetricsReplicationGlobalSourceSource mockMetrics =
- mock(MetricsReplicationGlobalSourceSource.class);
- AtomicLong buffer = new AtomicLong();
- Mockito.when(mockManager.getTotalBufferUsed()).thenReturn(buffer);
- Mockito.when(mockManager.getGlobalMetrics()).thenReturn(mockMetrics);
- ReplicationPeer mockPeer = mock(ReplicationPeer.class);
- Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
- Configuration testConf = HBaseConfiguration.create();
- source.init(testConf, null, mockManager, null, mockPeer, null,
- "testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class));
- ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null,
- conf, null, 0, null, source);
- ReplicationSourceShipper shipper =
- new ReplicationSourceShipper(conf, null, null, source);
- shipper.entryReader = reader;
- source.workerThreads.put("testPeer", shipper);
- WALEntryBatch batch = new WALEntryBatch(10, logDir);
- WAL.Entry mockEntry = mock(WAL.Entry.class);
- WALEdit mockEdit = mock(WALEdit.class);
- WALKeyImpl mockKey = mock(WALKeyImpl.class);
- when(mockEntry.getEdit()).thenReturn(mockEdit);
- when(mockEdit.isEmpty()).thenReturn(false);
- when(mockEntry.getKey()).thenReturn(mockKey);
- when(mockKey.estimatedSerializedSizeOf()).thenReturn(1000L);
- when(mockEdit.heapSize()).thenReturn(10000L);
- when(mockEdit.size()).thenReturn(0);
- ArrayList<Cell> cells = new ArrayList<>();
- KeyValue kv = new KeyValue(Bytes.toBytes("0001"), Bytes.toBytes("f"),
- Bytes.toBytes("1"), Bytes.toBytes("v1"));
- cells.add(kv);
- when(mockEdit.getCells()).thenReturn(cells);
- reader.addEntryToBatch(batch, mockEntry);
- reader.entryBatchQueue.put(batch);
- source.terminate("test");
- assertEquals(0, source.getSourceManager().getTotalBufferUsed().get());
- }
-
/**
* Tests that recovered queues are preserved on a regionserver shutdown.
* See HBASE-18192
@@ -451,12 +407,12 @@ public class TestReplicationSource {
ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L);
PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
queue.put(new Path("/www/html/test"));
- RecoveredReplicationSource source = mock(RecoveredReplicationSource.class);
- Server server = mock(Server.class);
+ RecoveredReplicationSource source = Mockito.mock(RecoveredReplicationSource.class);
+ Server server = Mockito.mock(Server.class);
Mockito.when(server.getServerName()).thenReturn(serverName);
Mockito.when(source.getServer()).thenReturn(server);
Mockito.when(source.getServerWALsBelongTo()).thenReturn(deadServer);
- ReplicationQueueStorage storage = mock(ReplicationQueueStorage.class);
+ ReplicationQueueStorage storage = Mockito.mock(ReplicationQueueStorage.class);
Mockito.when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any()))
.thenReturn(1001L);
Mockito.when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any()))