You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2021/07/23 16:44:07 UTC

[hbase] branch branch-2 updated: HBASE-26093 Replication is stuck due to zero length wal file in oldWALs directory (#3504)

This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 59e2825  HBASE-26093 Replication is stuck due to zero length wal file in oldWALs directory (#3504)
59e2825 is described below

commit 59e2825c936f7042f8185ca5dc751efa8d6732c4
Author: Rushabh Shah <sh...@gmail.com>
AuthorDate: Fri Jul 23 12:32:55 2021 -0400

    HBASE-26093 Replication is stuck due to zero length wal file in oldWALs directory (#3504)
    
    Signed-off-by: Andrew Purtell <ap...@apache.org>
    Signed-off-by: Bharath Vissapragada <bh...@apache.org>
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../regionserver/ReplicationSourceWALReader.java   | 15 ++++---
 .../replication/regionserver/WALEntryStream.java   | 35 +++-------------
 .../hadoop/hbase/wal/AbstractFSWALProvider.java    | 37 +++++++++++++++++
 .../regionserver/TestBasicWALEntryStream.java      | 46 ++++++++++++++++++++++
 4 files changed, 98 insertions(+), 35 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index ca41184..9af91e5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALKey;
@@ -274,11 +275,15 @@ class ReplicationSourceWALReader extends Thread {
     // since we don't add current log to recovered source queue so it is safe to remove.
     if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
       (source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) {
-      Path head = queue.peek();
+      Path path = queue.peek();
       try {
-        if (fs.getFileStatus(head).getLen() == 0) {
-          // head of the queue is an empty log file
-          LOG.warn("Forcing removal of 0 length log in queue: {}", head);
+        if (!fs.exists(path)) {
+          // There is a chance that wal has moved to oldWALs directory, so look there also.
+          path = AbstractFSWALProvider.findArchivedLog(path, conf);
+          // path is null if it couldn't find archive path.
+        }
+        if (path != null && fs.getFileStatus(path).getLen() == 0) {
+          LOG.warn("Forcing removal of 0 length log in queue: {}", path);
           logQueue.remove(walGroupId);
           currentPosition = 0;
           if (batch != null) {
@@ -289,7 +294,7 @@ class ReplicationSourceWALReader extends Thread {
           return true;
         }
       } catch (IOException ioe) {
-        LOG.warn("Couldn't get file length information about log " + queue.peek(), ioe);
+        LOG.warn("Couldn't get file length information about log " + path, ioe);
       } catch (InterruptedException ie) {
         LOG.trace("Interrupted while adding WAL batch to ship queue");
         Thread.currentThread().interrupt();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index 75610ce..956024b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -27,13 +27,13 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
 import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WAL.Reader;
 import org.apache.hadoop.hbase.wal.WALFactory;
@@ -315,35 +315,10 @@ class WALEntryStream implements Closeable {
     return false;
   }
 
-  private Path getArchivedLog(Path path) throws IOException {
-    Path walRootDir = CommonFSUtils.getWALRootDir(conf);
-
-    // Try found the log in old dir
-    Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
-    Path archivedLogLocation = new Path(oldLogDir, path.getName());
-    if (fs.exists(archivedLogLocation)) {
-      LOG.info("Log " + path + " was moved to " + archivedLogLocation);
-      return archivedLogLocation;
-    }
-
-    // Try found the log in the seperate old log dir
-    oldLogDir =
-        new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
-            .append(Path.SEPARATOR).append(serverName.getServerName()).toString());
-    archivedLogLocation = new Path(oldLogDir, path.getName());
-    if (fs.exists(archivedLogLocation)) {
-      LOG.info("Log " + path + " was moved to " + archivedLogLocation);
-      return archivedLogLocation;
-    }
-
-    LOG.error("Couldn't locate log: " + path);
-    return path;
-  }
-
   private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException {
     // If the log was archived, continue reading from there
-    Path archivedLog = getArchivedLog(path);
-    if (!path.equals(archivedLog)) {
+    Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf);
+    if (archivedLog != null) {
       openReader(archivedLog);
     } else {
       throw fnfe;
@@ -407,8 +382,8 @@ class WALEntryStream implements Closeable {
       seek();
     } catch (FileNotFoundException fnfe) {
       // If the log was archived, continue reading from there
-      Path archivedLog = getArchivedLog(currentPath);
-      if (!currentPath.equals(archivedLog)) {
+      Path archivedLog = AbstractFSWALProvider.findArchivedLog(currentPath, conf);
+      if (archivedLog != null) {
         openReader(archivedLog);
       } else {
         throw fnfe;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index a7e3ef5..5fbeca3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -479,6 +479,43 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
   }
 
   /**
+   * Find the archived WAL file path if it is not able to locate in WALs dir.
+   * @param path - active WAL file path
+   * @param conf - configuration
+   * @return archived path if exists, null - otherwise
+   * @throws IOException exception
+   */
+  public static Path findArchivedLog(Path path, Configuration conf) throws IOException {
+    // If the path contains oldWALs keyword then exit early.
+    if (path.toString().contains(HConstants.HREGION_OLDLOGDIR_NAME)) {
+      return null;
+    }
+    Path walRootDir = CommonFSUtils.getWALRootDir(conf);
+    FileSystem fs = path.getFileSystem(conf);
+    // Try finding the log in old dir
+    Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+    Path archivedLogLocation = new Path(oldLogDir, path.getName());
+    if (fs.exists(archivedLogLocation)) {
+      LOG.info("Log " + path + " was moved to " + archivedLogLocation);
+      return archivedLogLocation;
+    }
+
+    ServerName serverName = getServerNameFromWALDirectoryName(path);
+    // Try finding the log in separate old log dir
+    oldLogDir =
+      new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
+        .append(Path.SEPARATOR).append(serverName.getServerName()).toString());
+    archivedLogLocation = new Path(oldLogDir, path.getName());
+    if (fs.exists(archivedLogLocation)) {
+      LOG.info("Log " + path + " was moved to " + archivedLogLocation);
+      return archivedLogLocation;
+    }
+
+    LOG.error("Couldn't locate log: " + path);
+    return null;
+  }
+
+  /**
    * Opens WAL reader with retries and additional exception handling
    * @param path path to WAL file
    * @param conf configuration
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
index ad77c9d..b07b5b4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
@@ -52,12 +52,14 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
 import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
@@ -716,4 +718,48 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
       assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
     }
   }
+
+  /**
+   * Tests that we handle EOFException properly if the wal has moved to oldWALs directory.
+   * @throws Exception exception
+   */
+  @Test
+  public void testEOFExceptionInOldWALsDirectory() throws Exception {
+    assertEquals(1, logQueue.getQueueSize(fakeWalGroupId));
+    AbstractFSWAL  abstractWAL = (AbstractFSWAL)log;
+    Path emptyLogFile = abstractWAL.getCurrentFileName();
+    log.rollWriter(true);
+
+    // AsyncFSWAl and FSHLog both moves the log from WALs to oldWALs directory asynchronously.
+    // Wait for in flight wal close count to become 0. This makes sure that empty wal is moved to
+    // oldWALs directory.
+    Waiter.waitFor(CONF, 5000,
+      (Waiter.Predicate<Exception>) () -> abstractWAL.getInflightWALCloseCount() == 0);
+    // There will 2 logs in the queue.
+    assertEquals(2, logQueue.getQueueSize(fakeWalGroupId));
+
+    // Get the archived dir path for the first wal.
+    Path archivePath = AbstractFSWALProvider.findArchivedLog(emptyLogFile, CONF);
+    // Make sure that the wal path is not the same as archived Dir path.
+    assertNotNull(archivePath);
+    assertTrue(fs.exists(archivePath));
+    fs.truncate(archivePath, 0);
+    // make sure the size of the wal file is 0.
+    assertEquals(0, fs.getFileStatus(archivePath).getLen());
+
+    ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
+    ReplicationSource source = Mockito.mock(ReplicationSource.class);
+    when(source.isPeerEnabled()).thenReturn(true);
+    when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+
+    Configuration localConf = new Configuration(CONF);
+    localConf.setInt("replication.source.maxretriesmultiplier", 1);
+    localConf.setBoolean("replication.source.eof.autorecovery", true);
+    // Start the reader thread.
+    createReader(false, localConf);
+    // Wait for the replication queue size to be 1. This means that we have handled
+    // 0 length wal from oldWALs directory.
+    Waiter.waitFor(localConf, 10000,
+      (Waiter.Predicate<Exception>) () -> logQueue.getQueueSize(fakeWalGroupId) == 1);
+  }
 }