You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by wc...@apache.org on 2020/08/05 11:01:07 UTC
[hbase] branch branch-1 updated: HBASE-24807 Backport HBASE-20417
to branch-1 (#2197)
This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1 by this push:
new 2fd5873 HBASE-24807 Backport HBASE-20417 to branch-1 (#2197)
2fd5873 is described below
commit 2fd587384a898a3648d4722c562fed2a34d5709c
Author: Wellington Ramos Chevreuil <wc...@apache.org>
AuthorDate: Wed Aug 5 12:00:43 2020 +0100
HBASE-24807 Backport HBASE-20417 to branch-1 (#2197)
Signed-off-by: Viraj Jasani <vj...@apache.org>
Signed-off-by: Bharath Vissapragada <bh...@apache.org>
---
.../regionserver/ReplicationSource.java | 2 +-
.../ReplicationSourceWALReaderThread.java | 11 ++-
.../regionserver/TestWALEntryStream.java | 87 +++++++++++++++++++++-
3 files changed, 93 insertions(+), 7 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 144f358..69a3a51 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -858,7 +858,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
ChainWALEntryFilter readerFilter = new ChainWALEntryFilter(filters);
entryReader = new ReplicationSourceWALReaderThread(manager, replicationQueueInfo, queue,
- startPosition, fs, conf, readerFilter, metrics);
+ startPosition, fs, conf, readerFilter, metrics, ReplicationSource.this);
Threads.setDaemonThreadRunning(entryReader, threadName
+ ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode,
handler);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
index f795db9..e3c4f87 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
@@ -80,6 +80,8 @@ public class ReplicationSourceWALReaderThread extends Thread {
private AtomicLong totalBufferUsed;
private long totalBufferQuota;
+ private ReplicationSource source;
+
/**
* Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
* entries, and puts them on a batch queue.
@@ -94,8 +96,8 @@ public class ReplicationSourceWALReaderThread extends Thread {
*/
public ReplicationSourceWALReaderThread(ReplicationSourceManager manager,
ReplicationQueueInfo replicationQueueInfo, PriorityBlockingQueue<Path> logQueue,
- long startPosition,
- FileSystem fs, Configuration conf, WALEntryFilter filter, MetricsSource metrics) {
+ long startPosition, FileSystem fs, Configuration conf, WALEntryFilter filter,
+ MetricsSource metrics, ReplicationSource source) {
this.replicationQueueInfo = replicationQueueInfo;
this.logQueue = logQueue;
this.lastReadPath = logQueue.peek();
@@ -118,6 +120,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
this.metrics = metrics;
this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
+ this.source = source;
LOG.info("peerClusterZnode=" + replicationQueueInfo.getPeerClusterZnode()
+ ", ReplicationSourceWALReaderThread : " + replicationQueueInfo.getPeerId()
+ " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
@@ -132,6 +135,10 @@ public class ReplicationSourceWALReaderThread extends Thread {
try (WALEntryStream entryStream =
new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics)) {
while (isReaderRunning()) { // loop here to keep reusing stream while we can
+ if (!source.isPeerEnabled()) {
+ Threads.sleep(sleepForRetries);
+ continue;
+ }
if (!checkQuota()) {
continue;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 7ad7260..1828ad8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -27,6 +27,8 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
@@ -38,7 +40,12 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.NoSuchElementException;
import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
@@ -81,7 +88,9 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
@RunWith(MockitoJUnitRunner.class)
@Category({ ReplicationTests.class, LargeTests.class })
@@ -359,10 +368,12 @@ public class TestWALEntryStream {
// start up a batcher
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
+ ReplicationSource source = Mockito.mock(ReplicationSource.class);
+ when(source.isPeerEnabled()).thenReturn(true);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
ReplicationSourceWALReaderThread batcher =
new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(),walQueue, 0,
- fs, conf, getDummyFilter(), new MetricsSource("1"));
+ fs, conf, getDummyFilter(), new MetricsSource("1"), source);
Path walPath = walQueue.peek();
batcher.start();
WALEntryBatch entryBatch = batcher.take();
@@ -398,10 +409,13 @@ public class TestWALEntryStream {
}
ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
+ ReplicationSource source = Mockito.mock(ReplicationSource.class);
+ when(source.isPeerEnabled()).thenReturn(true);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
ReplicationSourceWALReaderThread reader =
new ReplicationSourceWALReaderThread(mockSourceManager, getRecoveredQueueInfo(),
- walQueue, 0, fs, conf, getDummyFilter(), new MetricsSource("1"));
+ walQueue, 0, fs, conf, getDummyFilter(),
+ new MetricsSource("1"), source);
Path walPath = walQueue.toArray(new Path[2])[1];
reader.start();
WALEntryBatch entryBatch = reader.take();
@@ -456,10 +470,12 @@ public class TestWALEntryStream {
appendEntriesToLog(2);
ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
+ ReplicationSource source = Mockito.mock(ReplicationSource.class);
+ when(source.isPeerEnabled()).thenReturn(true);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
final ReplicationSourceWALReaderThread reader =
new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), walQueue,
- 0, fs, conf, filter, new MetricsSource("1"));
+ 0, fs, conf, filter, new MetricsSource("1"), source);
reader.start();
WALEntryBatch entryBatch = reader.take();
@@ -490,10 +506,12 @@ public class TestWALEntryStream {
final long eof = getPosition(firstWAL);
ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
+ ReplicationSource source = Mockito.mock(ReplicationSource.class);
+ when(source.isPeerEnabled()).thenReturn(true);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
final ReplicationSourceWALReaderThread reader =
new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), walQueue,
- 0, fs, conf, filter, new MetricsSource("1"));
+ 0, fs, conf, filter, new MetricsSource("1"), source);
reader.start();
// reader won't put any batch, even if EOF reached.
@@ -613,4 +631,65 @@ public class TestWALEntryStream {
currentPath = newPath;
}
}
+
+ @Test
+ public void testReplicationSourceWALReaderDisabled()
+ throws IOException, InterruptedException, ExecutionException {
+ for(int i=0; i<3; i++) {
+ //append and sync
+ appendToLog("key" + i);
+ }
+ // get ending position
+ long position;
+ try (WALEntryStream entryStream =
+ new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
+ entryStream.next();
+ entryStream.next();
+ entryStream.next();
+ position = entryStream.getPosition();
+ }
+
+ // start up a reader
+ Path walPath = walQueue.peek();
+ ReplicationSource source = Mockito.mock(ReplicationSource.class);
+ when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
+
+ final AtomicBoolean enabled = new AtomicBoolean(false);
+ when(source.isPeerEnabled()).thenAnswer(new Answer<Boolean>() {
+ @Override
+ public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable {
+ return enabled.get();
+ }
+ });
+
+ ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
+ when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+ final ReplicationSourceWALReaderThread reader =
+ new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), walQueue,
+ 0, fs, conf, getDummyFilter(), new MetricsSource("1"), source);
+
+ reader.start();
+ Future<WALEntryBatch> future =
+ Executors.newSingleThreadExecutor().submit(new Callable<WALEntryBatch>() {
+ @Override
+ public WALEntryBatch call() throws Exception {
+ return reader.take();
+ }
+ });
+
+ // make sure that the isPeerEnabled has been called several times
+ verify(source, timeout(30000).atLeast(5)).isPeerEnabled();
+ // confirm that we can read nothing if the peer is disabled
+ assertFalse(future.isDone());
+ // then enable the peer, we should get the batch
+ enabled.set(true);
+ WALEntryBatch entryBatch = future.get();
+
+ // should've batched up our entries
+ assertNotNull(entryBatch);
+ assertEquals(3, entryBatch.getWalEntries().size());
+ assertEquals(position, entryBatch.getLastWalPosition());
+ assertEquals(walPath, entryBatch.getLastWalPath());
+ assertEquals(3, entryBatch.getNbRowKeys());
+ }
}