You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by wc...@apache.org on 2020/08/05 11:01:07 UTC

[hbase] branch branch-1 updated: HBASE-24807 Backport HBASE-20417 to branch-1 (#2197)

This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new 2fd5873  HBASE-24807 Backport HBASE-20417 to branch-1 (#2197)
2fd5873 is described below

commit 2fd587384a898a3648d4722c562fed2a34d5709c
Author: Wellington Ramos Chevreuil <wc...@apache.org>
AuthorDate: Wed Aug 5 12:00:43 2020 +0100

    HBASE-24807 Backport HBASE-20417 to branch-1 (#2197)
    
    Signed-off-by: Viraj Jasani <vj...@apache.org>
    Signed-off-by: Bharath Vissapragada <bh...@apache.org>
---
 .../regionserver/ReplicationSource.java            |  2 +-
 .../ReplicationSourceWALReaderThread.java          | 11 ++-
 .../regionserver/TestWALEntryStream.java           | 87 +++++++++++++++++++++-
 3 files changed, 93 insertions(+), 7 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 144f358..69a3a51 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -858,7 +858,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
         new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
       ChainWALEntryFilter readerFilter = new ChainWALEntryFilter(filters);
       entryReader = new ReplicationSourceWALReaderThread(manager, replicationQueueInfo, queue,
-          startPosition, fs, conf, readerFilter, metrics);
+          startPosition, fs, conf, readerFilter, metrics, ReplicationSource.this);
       Threads.setDaemonThreadRunning(entryReader, threadName
           + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode,
         handler);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
index f795db9..e3c4f87 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
@@ -80,6 +80,8 @@ public class ReplicationSourceWALReaderThread extends Thread {
   private AtomicLong totalBufferUsed;
   private long totalBufferQuota;
 
+  private ReplicationSource source;
+
   /**
    * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
    * entries, and puts them on a batch queue.
@@ -94,8 +96,8 @@ public class ReplicationSourceWALReaderThread extends Thread {
    */
   public ReplicationSourceWALReaderThread(ReplicationSourceManager manager,
       ReplicationQueueInfo replicationQueueInfo, PriorityBlockingQueue<Path> logQueue,
-      long startPosition,
-      FileSystem fs, Configuration conf, WALEntryFilter filter, MetricsSource metrics) {
+      long startPosition, FileSystem fs, Configuration conf, WALEntryFilter filter,
+      MetricsSource metrics, ReplicationSource source) {
     this.replicationQueueInfo = replicationQueueInfo;
     this.logQueue = logQueue;
     this.lastReadPath = logQueue.peek();
@@ -118,6 +120,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
         this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
     this.metrics = metrics;
     this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
+    this.source = source;
     LOG.info("peerClusterZnode=" + replicationQueueInfo.getPeerClusterZnode()
         + ", ReplicationSourceWALReaderThread : " + replicationQueueInfo.getPeerId()
         + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
@@ -132,6 +135,10 @@ public class ReplicationSourceWALReaderThread extends Thread {
       try (WALEntryStream entryStream =
           new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics)) {
         while (isReaderRunning()) { // loop here to keep reusing stream while we can
+          if (!source.isPeerEnabled()) {
+            Threads.sleep(sleepForRetries);
+            continue;
+          }
           if (!checkQuota()) {
             continue;
           }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 7ad7260..1828ad8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -27,6 +27,8 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
@@ -38,7 +40,12 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.NoSuchElementException;
 import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.conf.Configuration;
@@ -81,7 +88,9 @@ import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 import org.junit.runner.RunWith;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
 import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
 
 @RunWith(MockitoJUnitRunner.class)
 @Category({ ReplicationTests.class, LargeTests.class })
@@ -359,10 +368,12 @@ public class TestWALEntryStream {
 
     // start up a batcher
     ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
+    ReplicationSource source = Mockito.mock(ReplicationSource.class);
+    when(source.isPeerEnabled()).thenReturn(true);
     when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
     ReplicationSourceWALReaderThread batcher =
             new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(),walQueue, 0,
-                    fs, conf, getDummyFilter(), new MetricsSource("1"));
+                    fs, conf, getDummyFilter(), new MetricsSource("1"), source);
     Path walPath = walQueue.peek();
     batcher.start();
     WALEntryBatch entryBatch = batcher.take();
@@ -398,10 +409,13 @@ public class TestWALEntryStream {
     }
 
     ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
+    ReplicationSource source = Mockito.mock(ReplicationSource.class);
+    when(source.isPeerEnabled()).thenReturn(true);
     when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
     ReplicationSourceWALReaderThread reader =
             new ReplicationSourceWALReaderThread(mockSourceManager, getRecoveredQueueInfo(),
-                    walQueue, 0, fs, conf, getDummyFilter(), new MetricsSource("1"));
+              walQueue, 0, fs, conf, getDummyFilter(),
+              new MetricsSource("1"), source);
     Path walPath = walQueue.toArray(new Path[2])[1];
     reader.start();
     WALEntryBatch entryBatch = reader.take();
@@ -456,10 +470,12 @@ public class TestWALEntryStream {
     appendEntriesToLog(2);
 
     ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
+    ReplicationSource source = Mockito.mock(ReplicationSource.class);
+    when(source.isPeerEnabled()).thenReturn(true);
     when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
     final ReplicationSourceWALReaderThread reader =
             new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), walQueue,
-                    0, fs, conf, filter, new MetricsSource("1"));
+                    0, fs, conf, filter, new MetricsSource("1"), source);
     reader.start();
 
     WALEntryBatch entryBatch = reader.take();
@@ -490,10 +506,12 @@ public class TestWALEntryStream {
     final long eof = getPosition(firstWAL);
 
     ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
+    ReplicationSource source = Mockito.mock(ReplicationSource.class);
+    when(source.isPeerEnabled()).thenReturn(true);
     when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
     final ReplicationSourceWALReaderThread reader =
             new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), walQueue,
-                    0, fs, conf, filter, new MetricsSource("1"));
+                    0, fs, conf, filter, new MetricsSource("1"), source);
     reader.start();
 
     // reader won't put any batch, even if EOF reached.
@@ -613,4 +631,65 @@ public class TestWALEntryStream {
       currentPath = newPath;
     }
   }
+
+  @Test
+  public void testReplicationSourceWALReaderDisabled()
+    throws IOException, InterruptedException, ExecutionException {
+    for(int i=0; i<3; i++) {
+      //append and sync
+      appendToLog("key" + i);
+    }
+    // get ending position
+    long position;
+    try (WALEntryStream entryStream =
+      new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
+      entryStream.next();
+      entryStream.next();
+      entryStream.next();
+      position = entryStream.getPosition();
+    }
+
+    // start up a reader
+    Path walPath = walQueue.peek();
+    ReplicationSource source = Mockito.mock(ReplicationSource.class);
+    when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
+
+    final AtomicBoolean enabled = new AtomicBoolean(false);
+    when(source.isPeerEnabled()).thenAnswer(new Answer<Boolean>() {
+      @Override
+      public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable {
+        return enabled.get();
+      }
+    });
+
+    ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
+    when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+    final ReplicationSourceWALReaderThread reader =
+      new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), walQueue,
+        0, fs, conf, getDummyFilter(), new MetricsSource("1"), source);
+
+    reader.start();
+    Future<WALEntryBatch> future =
+      Executors.newSingleThreadExecutor().submit(new Callable<WALEntryBatch>() {
+        @Override
+        public WALEntryBatch call() throws Exception {
+          return reader.take();
+        }
+      });
+
+    // make sure that the isPeerEnabled has been called several times
+    verify(source, timeout(30000).atLeast(5)).isPeerEnabled();
+    // confirm that we can read nothing if the peer is disabled
+    assertFalse(future.isDone());
+    // then enable the peer, we should get the batch
+    enabled.set(true);
+    WALEntryBatch entryBatch = future.get();
+
+    // should've batched up our entries
+    assertNotNull(entryBatch);
+    assertEquals(3, entryBatch.getWalEntries().size());
+    assertEquals(position, entryBatch.getLastWalPosition());
+    assertEquals(walPath, entryBatch.getLastWalPath());
+    assertEquals(3, entryBatch.getNbRowKeys());
+  }
 }