You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/04/16 14:13:15 UTC
hbase git commit: HBASE-20417 Do not read wal entries when peer is
disabled
Repository: hbase
Updated Branches:
refs/heads/master 1339ff966 -> 773aff90f
HBASE-20417 Do not read wal entries when peer is disabled
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/773aff90
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/773aff90
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/773aff90
Branch: refs/heads/master
Commit: 773aff90fdf9edf4687b373d1c7ba859d166a437
Parents: 1339ff9
Author: zhangduo <zh...@apache.org>
Authored: Sat Apr 14 22:00:15 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Mon Apr 16 22:07:33 2018 +0800
----------------------------------------------------------------------
.../regionserver/ReplicationSourceShipper.java | 15 ++---
.../ReplicationSourceWALReader.java | 4 ++
.../regionserver/TestWALEntryStream.java | 59 +++++++++++++++++++-
3 files changed, 68 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/773aff90/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 2097d00..11fd660 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -87,12 +87,11 @@ public class ReplicationSourceShipper extends Thread {
setWorkerState(WorkerState.RUNNING);
// Loop until we close down
while (isActive()) {
- int sleepMultiplier = 1;
// Sleep until replication is enabled again
if (!source.isPeerEnabled()) {
- if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
- sleepMultiplier++;
- }
+ // The peer enabled check is in memory, not expensive, so do not need to increase the
+ // sleep interval as it may cause a long lag when we enable the peer.
+ sleepForRetries("Replication is disabled", 1);
continue;
}
try {
@@ -188,8 +187,8 @@ public class ReplicationSourceShipper extends Thread {
}
break;
} catch (Exception ex) {
- LOG.warn(source.getReplicationEndpoint().getClass().getName() + " threw unknown exception:"
- + org.apache.hadoop.util.StringUtils.stringifyException(ex));
+ LOG.warn("{} threw unknown exception:",
+ source.getReplicationEndpoint().getClass().getName(), ex);
if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
sleepMultiplier++;
}
@@ -292,9 +291,7 @@ public class ReplicationSourceShipper extends Thread {
*/
public boolean sleepForRetries(String msg, int sleepMultiplier) {
try {
- if (LOG.isTraceEnabled()) {
- LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
- }
+ LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, sleepMultiplier);
Thread.sleep(this.sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
LOG.debug("Interrupted while sleeping between retries");
http://git-wip-us.apache.org/repos/asf/hbase/blob/773aff90/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 7ba347f..64fd48d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -126,6 +126,10 @@ class ReplicationSourceWALReader extends Thread {
source.getWALFileLengthProvider(), source.getServerWALsBelongTo(),
source.getSourceMetrics())) {
while (isReaderRunning()) { // loop here to keep reusing stream while we can
+ if (!source.isPeerEnabled()) {
+ Threads.sleep(sleepForRetries);
+ continue;
+ }
if (!checkQuota()) {
continue;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/773aff90/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 2670756..35e4f82 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -30,7 +30,12 @@ import java.io.IOException;
import java.util.NavigableMap;
import java.util.OptionalLong;
import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Future;
import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -328,7 +333,7 @@ public class TestWALEntryStream {
}
}
- private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) {
+ private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) {
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
Server mockServer = Mockito.mock(Server.class);
@@ -338,6 +343,12 @@ public class TestWALEntryStream {
when(source.getWALFileLengthProvider()).thenReturn(log);
when(source.getServer()).thenReturn(mockServer);
when(source.isRecovered()).thenReturn(recovered);
+ return source;
+ }
+
+ private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) {
+ ReplicationSource source = mockReplicationSource(recovered, conf);
+ when(source.isPeerEnabled()).thenReturn(true);
ReplicationSourceWALReader reader =
new ReplicationSourceWALReader(fs, conf, walQueue, 0, getDummyFilter(), source);
reader.start();
@@ -460,6 +471,52 @@ public class TestWALEntryStream {
assertFalse(entryBatch.isEndOfFile());
}
+ @Test
+ public void testReplicationSourceWALReaderDisabled()
+ throws IOException, InterruptedException, ExecutionException {
+ appendEntriesToLogAndSync(3);
+ // get ending position
+ long position;
+ try (WALEntryStream entryStream =
+ new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
+ entryStream.next();
+ entryStream.next();
+ entryStream.next();
+ position = entryStream.getPosition();
+ }
+
+ // start up a reader
+ Path walPath = walQueue.peek();
+ ReplicationSource source = mockReplicationSource(false, CONF);
+ AtomicInteger invokeCount = new AtomicInteger(0);
+ AtomicBoolean enabled = new AtomicBoolean(false);
+ when(source.isPeerEnabled()).then(i -> {
+ invokeCount.incrementAndGet();
+ return enabled.get();
+ });
+
+ ReplicationSourceWALReader reader =
+ new ReplicationSourceWALReader(fs, CONF, walQueue, 0, getDummyFilter(), source);
+ reader.start();
+ Future<WALEntryBatch> future = ForkJoinPool.commonPool().submit(() -> {
+ return reader.take();
+ });
+ // make sure that the isPeerEnabled has been called several times
+ TEST_UTIL.waitFor(30000, () -> invokeCount.get() >= 5);
+ // confirm that we can read nothing if the peer is disabled
+ assertFalse(future.isDone());
+ // then enable the peer, we should get the batch
+ enabled.set(true);
+ WALEntryBatch entryBatch = future.get();
+
+ // should've batched up our entries
+ assertNotNull(entryBatch);
+ assertEquals(3, entryBatch.getWalEntries().size());
+ assertEquals(position, entryBatch.getLastWalPosition());
+ assertEquals(walPath, entryBatch.getLastWalPath());
+ assertEquals(3, entryBatch.getNbRowKeys());
+ }
+
private String getRow(WAL.Entry entry) {
Cell cell = entry.getEdit().getCells().get(0);
return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());