You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by vj...@apache.org on 2021/01/29 12:26:23 UTC

[hbase] branch branch-2 updated: HBASE-25536 Remove 0 length wal file from logQueue if it belongs to old sources (#2908)

This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 2c72225  HBASE-25536 Remove 0 length wal file from logQueue if it belongs to old sources (#2908)
2c72225 is described below

commit 2c72225d13d2e82a9e8ab467c36c051cd5dfc243
Author: shahrs87 <sh...@gmail.com>
AuthorDate: Fri Jan 29 04:17:30 2021 -0800

    HBASE-25536 Remove 0 length wal file from logQueue if it belongs to old sources (#2908)
    
    Signed-off-by: Wellington Chevreuil <wc...@apache.org>
    Signed-off-by: Geoffrey Jacoby <gj...@apache.org>
    Signed-off-by: Bharath Vissapragada <bh...@apache.org>
    Signed-off-by: Viraj Jasani <vj...@apache.org>
---
 .../regionserver/ReplicationSourceWALReader.java   |  4 ++-
 .../regionserver/TestWALEntryStream.java           | 30 ++++++++++++++++++++++
 2 files changed, 33 insertions(+), 1 deletion(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 1ff0a8b..05b34a9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -246,8 +246,10 @@ class ReplicationSourceWALReader extends Thread {
   // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
   // enabled, then dump the log
   private void handleEofException(IOException e) {
+    // Dump the log even if logQueue size is 1 if the source is from recovered Source
+    // since we don't add current log to recovered source queue so it is safe to remove.
     if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
-      logQueue.size() > 1 && this.eofAutoRecovery) {
+      (source.isRecovered() || logQueue.size() > 1) && this.eofAutoRecovery) {
       try {
         if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
           LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 63e7a8b..1db9c17 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
@@ -652,4 +653,33 @@ public class TestWALEntryStream {
       assertFalse(entryStream.hasNext());
     }
   }
+
+  /*
+    Test removal of 0 length log from logQueue if the source is a recovered source and
+    size of logQueue is only 1.
+   */
+  @Test
+  public void testEOFExceptionForRecoveredQueue() throws Exception {
+    PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
+    // Create a 0 length log.
+    Path emptyLog = new Path("emptyLog");
+    FSDataOutputStream fsdos = fs.create(emptyLog);
+    fsdos.close();
+    assertEquals(0, fs.getFileStatus(emptyLog).getLen());
+    queue.add(emptyLog);
+
+    Configuration conf = new Configuration(CONF);
+    // Override the max retries multiplier to fail fast.
+    conf.setInt("replication.source.maxretriesmultiplier", 1);
+    conf.setBoolean("replication.source.eof.autorecovery", true);
+    // Create a reader thread with source as recovered source.
+    ReplicationSource source = mockReplicationSource(true, conf);
+    when(source.isPeerEnabled()).thenReturn(true);
+    ReplicationSourceWALReader reader =
+      new ReplicationSourceWALReader(fs, conf, queue, 0, getDummyFilter(), source);
+    reader.run();
+    // ReplicationSourceWALReaderThread#handleEofException method will
+    // remove empty log from logQueue.
+    assertEquals(0, queue.size());
+  }
 }