You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by wc...@apache.org on 2020/10/01 10:24:04 UTC

[hbase] branch branch-2.3 updated: HBASE-24813 ReplicationSource should clear buffer usage on Replicatio… (#2453)

This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
     new 6914870  HBASE-24813 ReplicationSource should clear buffer usage on Replicatio… (#2453)
6914870 is described below

commit 6914870d64881b5762ab3c0be6813b4ae9fd1781
Author: Wellington Ramos Chevreuil <wc...@apache.org>
AuthorDate: Thu Oct 1 08:03:46 2020 +0100

    HBASE-24813 ReplicationSource should clear buffer usage on Replicatio… (#2453)
    
    Signed-off-by: Sean Busbey <bu...@apache.org>
    (cherry picked from commit dedb630da04305e5b90dacf8406fc2a92be2882e)
---
 .../regionserver/ReplicationSource.java            |  9 ++--
 .../regionserver/ReplicationSourceShipper.java     | 52 ++++++++++++++++++++
 .../regionserver/ReplicationSourceWALReader.java   |  3 +-
 .../regionserver/TestReplicationSource.java        | 56 ++++++++++++++++++----
 4 files changed, 106 insertions(+), 14 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 155f08c..c7c7275 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -587,14 +587,12 @@ public class ReplicationSource implements ReplicationSourceInterface {
       Threads.shutdown(initThread, this.sleepForRetries);
     }
     Collection<ReplicationSourceShipper> workers = workerThreads.values();
+
     for (ReplicationSourceShipper worker : workers) {
       worker.stopWorker();
-      if(worker.entryReader != null) {
+      if (worker.entryReader != null) {
         worker.entryReader.setReaderRunning(false);
       }
-    }
-
-    for (ReplicationSourceShipper worker : workers) {
       if (worker.isAlive() || worker.entryReader.isAlive()) {
         try {
           // Wait worker to stop
@@ -612,6 +610,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
           worker.entryReader.interrupt();
         }
       }
+      //If worker is already stopped but there was still entries batched,
+      //we need to clear buffer used for non processed entries
+      worker.clearWALEntryBatch();
     }
 
     if (this.replicationEndpoint != null) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 733569a..a41c5da 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -21,6 +21,8 @@ import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTi
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.LongAccumulator;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
@@ -28,6 +30,7 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
@@ -333,4 +336,53 @@ public class ReplicationSourceShipper extends Thread {
     }
     return sleepMultiplier < maxRetriesMultiplier;
   }
+
+  /**
+   * Attempts to properly update <code>ReplicationSourceManager.totalBufferUser</code>,
+   * in case there were unprocessed entries batched by the reader to the shipper,
+   * but the shipper didn't manage to ship those because the replication source is being terminated.
+   * In that case, it iterates through the batched entries and decrease the pending
+   * entries size from <code>ReplicationSourceManager.totalBufferUser</code>
+   * <p/>
+   * <b>NOTES</b>
+   * 1) This method should only be called upon replication source termination.
+   * It blocks waiting for both shipper and reader threads termination,
+   * to make sure no race conditions
+   * when updating <code>ReplicationSourceManager.totalBufferUser</code>.
+   *
+   * 2) It <b>does not</b> attempt to terminate reader and shipper threads. Those <b>must</b>
+   * have been triggered interruption/termination prior to calling this method.
+   */
+  void clearWALEntryBatch() {
+    long timeout = EnvironmentEdgeManager.currentTime() + this.shipEditsTimeout;
+    while(this.isAlive() || this.entryReader.isAlive()){
+      try {
+        if (System.currentTimeMillis() >= timeout) {
+          LOG.warn("Interrupting source thread for peer {} without cleaning buffer usage "
+            + "because clearWALEntryBatch method timed out whilst waiting reader/shipper "
+            + "thread to stop.", this.source.getPeerId());
+          Thread.currentThread().interrupt();
+        } else {
+          // Wait both shipper and reader threads to stop
+          Thread.sleep(this.sleepForRetries);
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("{} Interrupted while waiting {} to stop on clearWALEntryBatch: {}",
+          this.source.getPeerId(), this.getName(), e);
+        Thread.currentThread().interrupt();
+      }
+    }
+    LongAccumulator totalToDecrement = new LongAccumulator((a,b) -> a + b, 0);
+    entryReader.entryBatchQueue.forEach(w -> {
+      entryReader.entryBatchQueue.remove(w);
+      w.getWalEntries().forEach(e -> {
+        long entrySizeExcludeBulkLoad = entryReader.getEntrySizeExcludeBulkLoad(e);
+        totalToDecrement.accumulate(entrySizeExcludeBulkLoad);
+      });
+    });
+
+    LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.",
+      totalToDecrement.longValue());
+    source.getSourceManager().getTotalBufferUsed().addAndGet(-totalToDecrement.longValue());
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 82cfb81..a49fe4a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -61,7 +61,8 @@ class ReplicationSourceWALReader extends Thread {
   private final WALEntryFilter filter;
   private final ReplicationSource source;
 
-  private final BlockingQueue<WALEntryBatch> entryBatchQueue;
+  @InterfaceAudience.Private
+  final BlockingQueue<WALEntryBatch> entryBatchQueue;
   // max (heap) size of each batch - multiply by number of batches in queue to get total
   private final long replicationBatchSizeCapacity;
   // max count of each batch - multiply by number of batches in queue to get total
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index 274ccab..c89d0d5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -21,7 +21,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.OptionalLong;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -31,6 +34,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -48,12 +52,6 @@ import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
-import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
-import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSourceShipper;
-import org.apache.hadoop.hbase.replication.regionserver.Replication;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -194,6 +192,46 @@ public class TestReplicationSource {
     });
   }
 
+  @Test
+  public void testTerminateClearsBuffer() throws Exception {
+    ReplicationSource source = new ReplicationSource();
+    ReplicationSourceManager mockManager = mock(ReplicationSourceManager.class);
+    MetricsReplicationGlobalSourceSource mockMetrics =
+      mock(MetricsReplicationGlobalSourceSource.class);
+    AtomicLong buffer = new AtomicLong();
+    Mockito.when(mockManager.getTotalBufferUsed()).thenReturn(buffer);
+    ReplicationPeer mockPeer = mock(ReplicationPeer.class);
+    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
+    Configuration testConf = HBaseConfiguration.create();
+    source.init(testConf, null, mockManager, null, mockPeer, null,
+      "testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class));
+    ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null,
+      conf, null, 0, null, source);
+    ReplicationSourceShipper shipper =
+      new ReplicationSourceShipper(conf, null, null, source);
+    shipper.entryReader = reader;
+    source.workerThreads.put("testPeer", shipper);
+    WALEntryBatch batch = new WALEntryBatch(10, logDir);
+    WAL.Entry mockEntry = mock(WAL.Entry.class);
+    WALEdit mockEdit = mock(WALEdit.class);
+    WALKeyImpl mockKey = mock(WALKeyImpl.class);
+    when(mockEntry.getEdit()).thenReturn(mockEdit);
+    when(mockEdit.isEmpty()).thenReturn(false);
+    when(mockEntry.getKey()).thenReturn(mockKey);
+    when(mockKey.estimatedSerializedSizeOf()).thenReturn(1000L);
+    when(mockEdit.heapSize()).thenReturn(10000L);
+    when(mockEdit.size()).thenReturn(0);
+    ArrayList<Cell> cells = new ArrayList<>();
+    KeyValue kv = new KeyValue(Bytes.toBytes("0001"), Bytes.toBytes("f"),
+      Bytes.toBytes("1"), Bytes.toBytes("v1"));
+    cells.add(kv);
+    when(mockEdit.getCells()).thenReturn(cells);
+    reader.addEntryToBatch(batch, mockEntry);
+    reader.entryBatchQueue.put(batch);
+    source.terminate("test");
+    assertEquals(0, source.getSourceManager().getTotalBufferUsed().get());
+  }
+
   /**
    * Tests that recovered queues are preserved on a regionserver shutdown.
    * See HBASE-18192
@@ -303,12 +341,12 @@ public class TestReplicationSource {
     ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L);
     PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
     queue.put(new Path("/www/html/test"));
-    RecoveredReplicationSource source = Mockito.mock(RecoveredReplicationSource.class);
-    Server server = Mockito.mock(Server.class);
+    RecoveredReplicationSource source = mock(RecoveredReplicationSource.class);
+    Server server = mock(Server.class);
     Mockito.when(server.getServerName()).thenReturn(serverName);
     Mockito.when(source.getServer()).thenReturn(server);
     Mockito.when(source.getServerWALsBelongTo()).thenReturn(deadServer);
-    ReplicationQueueStorage storage = Mockito.mock(ReplicationQueueStorage.class);
+    ReplicationQueueStorage storage = mock(ReplicationQueueStorage.class);
     Mockito.when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any()))
         .thenReturn(1001L);
     Mockito.when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any()))