You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by vj...@apache.org on 2021/01/29 12:26:23 UTC
[hbase] branch branch-2 updated: HBASE-25536 Remove 0 length wal
file from logQueue if it belongs to old sources (#2908)
This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 2c72225 HBASE-25536 Remove 0 length wal file from logQueue if it belongs to old sources (#2908)
2c72225 is described below
commit 2c72225d13d2e82a9e8ab467c36c051cd5dfc243
Author: shahrs87 <sh...@gmail.com>
AuthorDate: Fri Jan 29 04:17:30 2021 -0800
HBASE-25536 Remove 0 length wal file from logQueue if it belongs to old sources (#2908)
Signed-off-by: Wellington Chevreuil <wc...@apache.org>
Signed-off-by: Geoffrey Jacoby <gj...@apache.org>
Signed-off-by: Bharath Vissapragada <bh...@apache.org>
Signed-off-by: Viraj Jasani <vj...@apache.org>
---
.../regionserver/ReplicationSourceWALReader.java | 4 ++-
.../regionserver/TestWALEntryStream.java | 30 ++++++++++++++++++++++
2 files changed, 33 insertions(+), 1 deletion(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 1ff0a8b..05b34a9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -246,8 +246,10 @@ class ReplicationSourceWALReader extends Thread {
// (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
// enabled, then dump the log
private void handleEofException(IOException e) {
+ // Dump the log even if logQueue size is 1 if the source is from recovered Source
+ // since we don't add current log to recovered source queue so it is safe to remove.
if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
- logQueue.size() > 1 && this.eofAutoRecovery) {
+ (source.isRecovered() || logQueue.size() > 1) && this.eofAutoRecovery) {
try {
if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 63e7a8b..1db9c17 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@@ -652,4 +653,33 @@ public class TestWALEntryStream {
assertFalse(entryStream.hasNext());
}
}
+
+ /*
+ Test removal of 0 length log from logQueue if the source is a recovered source and
+ size of logQueue is only 1.
+ */
+ @Test
+ public void testEOFExceptionForRecoveredQueue() throws Exception {
+ PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
+ // Create a 0 length log.
+ Path emptyLog = new Path("emptyLog");
+ FSDataOutputStream fsdos = fs.create(emptyLog);
+ fsdos.close();
+ assertEquals(0, fs.getFileStatus(emptyLog).getLen());
+ queue.add(emptyLog);
+
+ Configuration conf = new Configuration(CONF);
+ // Override the max retries multiplier to fail fast.
+ conf.setInt("replication.source.maxretriesmultiplier", 1);
+ conf.setBoolean("replication.source.eof.autorecovery", true);
+ // Create a reader thread with source as recovered source.
+ ReplicationSource source = mockReplicationSource(true, conf);
+ when(source.isPeerEnabled()).thenReturn(true);
+ ReplicationSourceWALReader reader =
+ new ReplicationSourceWALReader(fs, conf, queue, 0, getDummyFilter(), source);
+ reader.run();
+ // ReplicationSourceWALReaderThread#handleEofException method will
+ // remove empty log from logQueue.
+ assertEquals(0, queue.size());
+ }
}