You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2021/09/02 19:15:49 UTC

[hbase] 02/02: HBASE-26106 AbstractFSWALProvider#getArchivedLogPath doesn't look for wal file in all oldWALs directory. (#3636)

This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 820ba21b2abafdaaf3eac96c75f177f2f628c125
Author: Rushabh Shah <sh...@gmail.com>
AuthorDate: Thu Sep 2 12:16:54 2021 -0400

    HBASE-26106 AbstractFSWALProvider#getArchivedLogPath doesn't look for wal file in all oldWALs directory. (#3636)
    
    Signed-off-by: Andrew Purtell <ap...@apache.org>
    Signed-off-by: Duo Zhang <zh...@apache.org>
    
    Conflicts:
    	hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
---
 .../hadoop/hbase/mapreduce/WALInputFormat.java     |  5 +-
 .../hbase/mapreduce/TestWALRecordReader.java       | 58 ++++++++++++++++++++++
 .../regionserver/ReplicationSource.java            | 10 ++--
 .../regionserver/ReplicationSourceWALReader.java   |  2 +-
 .../replication/regionserver/WALEntryStream.java   |  2 +
 .../hadoop/hbase/wal/AbstractFSWALProvider.java    | 37 ++------------
 6 files changed, 74 insertions(+), 40 deletions(-)

diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
index b410fc2..ffc202a 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
@@ -217,8 +217,9 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
         }
         return res;
       } catch (IOException e) {
-        Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(logFile, conf);
-        if (logFile != archivedLog) {
+        Path archivedLog = AbstractFSWALProvider.findArchivedLog(logFile, conf);
+        // archivedLog can be null if unable to locate in archiveDir.
+        if (archivedLog != null) {
           openReader(archivedLog);
           // Try call again in recursion
           return nextKeyValue();
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
index fb1b168..41f8f35 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
@@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.mapreduce;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
+import java.io.IOException;
 import java.util.List;
 import java.util.NavigableMap;
 import java.util.TreeMap;
@@ -31,6 +33,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
@@ -43,6 +46,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALFactory;
@@ -91,6 +95,11 @@ public class TestWALRecordReader {
     return "TestWALRecordReader";
   }
 
+  private static String getServerName() {
+    ServerName serverName = ServerName.valueOf("TestWALRecordReader", 1, 1);
+    return serverName.toString();
+  }
+
   @Before
   public void setUp() throws Exception {
     fs.delete(hbaseDir, true);
@@ -272,4 +281,53 @@ public class TestWALRecordReader {
     assertFalse(reader.nextKeyValue());
     reader.close();
   }
+
+  /**
+   * Create a new reader from the split, match the edits against the passed columns,
+   * moving WAL to archive in between readings
+   */
+  private void testSplitWithMovingWAL(InputSplit split, byte[] col1, byte[] col2) throws Exception {
+    WALRecordReader<WALKey> reader = getReader();
+    reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
+
+    assertTrue(reader.nextKeyValue());
+    Cell cell = reader.getCurrentValue().getCells().get(0);
+    if (!Bytes.equals(col1, 0, col1.length, cell.getQualifierArray(), cell.getQualifierOffset(),
+      cell.getQualifierLength())) {
+      assertTrue(
+        "expected [" + Bytes.toString(col1) + "], actual [" + Bytes.toString(
+          cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]",
+        false);
+    }
+    // Move log file to archive directory
+    // While WAL record reader is open
+    WALInputFormat.WALSplit split_ = (WALInputFormat.WALSplit) split;
+    Path logFile = new Path(split_.getLogFileName());
+    Path archivedLogDir = getWALArchiveDir(conf);
+    Path archivedLogLocation = new Path(archivedLogDir, logFile.getName());
+    assertNotEquals(split_.getLogFileName(), archivedLogLocation.toString());
+
+    assertTrue(fs.rename(logFile, archivedLogLocation));
+    assertTrue(fs.exists(archivedLogDir));
+    assertFalse(fs.exists(logFile));
+    // TODO: This is not behaving as expected. WALInputFormat#WALKeyRecordReader doesn't open
+    // TODO: the archivedLogLocation to read next key value.
+    assertTrue(reader.nextKeyValue());
+    cell = reader.getCurrentValue().getCells().get(0);
+    if (!Bytes.equals(col2, 0, col2.length, cell.getQualifierArray(), cell.getQualifierOffset(),
+      cell.getQualifierLength())) {
+      assertTrue(
+        "expected [" + Bytes.toString(col2) + "], actual [" + Bytes.toString(
+          cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "]",
+        false);
+    }
+    reader.close();
+  }
+
+  private Path getWALArchiveDir(Configuration conf) throws IOException {
+    Path rootDir = CommonFSUtils.getWALRootDir(conf);
+    String archiveDir = AbstractFSWALProvider.getWALArchiveDirectoryName(conf, getServerName());
+    return new Path(rootDir, archiveDir);
+  }
+
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 9f8b336..92f1979 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
-import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath;
+import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.findArchivedLog;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
@@ -400,8 +400,12 @@ public class ReplicationSource implements ReplicationSourceInterface {
     try {
       fileSize = fs.getContentSummary(currentPath).getLength();
     } catch (FileNotFoundException e) {
-      currentPath = getArchivedLogPath(currentPath, conf);
-      fileSize = fs.getContentSummary(currentPath).getLength();
+      Path archivedLogPath = findArchivedLog(currentPath, conf);
+      // archivedLogPath can be null if unable to locate in archiveDir.
+      if (archivedLogPath == null) {
+        throw new FileNotFoundException("Couldn't find path: " + currentPath);
+      }
+      fileSize = fs.getContentSummary(archivedLogPath).getLength();
     }
     return fileSize;
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 9af91e5..1109044 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -280,7 +280,7 @@ class ReplicationSourceWALReader extends Thread {
         if (!fs.exists(path)) {
           // There is a chance that wal has moved to oldWALs directory, so look there also.
           path = AbstractFSWALProvider.findArchivedLog(path, conf);
-          // path is null if it couldn't find archive path.
+          // path can be null if unable to locate in archiveDir.
         }
         if (path != null && fs.getFileStatus(path).getLen() == 0) {
           LOG.warn("Forcing removal of 0 length log in queue: {}", path);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index 956024b..441bc1b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -318,6 +318,7 @@ class WALEntryStream implements Closeable {
   private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException {
     // If the log was archived, continue reading from there
     Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf);
+    // archivedLog can be null if unable to locate in archiveDir.
     if (archivedLog != null) {
       openReader(archivedLog);
     } else {
@@ -383,6 +384,7 @@ class WALEntryStream implements Closeable {
     } catch (FileNotFoundException fnfe) {
       // If the log was archived, continue reading from there
       Path archivedLog = AbstractFSWALProvider.findArchivedLog(currentPath, conf);
+      // archivedLog can be null if unable to locate in archiveDir.
       if (archivedLog != null) {
         openReader(archivedLog);
       } else {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index 5fbeca3..989210b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -449,36 +448,6 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
   }
 
   /**
-   * Get the archived WAL file path
-   * @param path - active WAL file path
-   * @param conf - configuration
-   * @return archived path if exists, path - otherwise
-   * @throws IOException exception
-   */
-  public static Path getArchivedLogPath(Path path, Configuration conf) throws IOException {
-    Path rootDir = CommonFSUtils.getWALRootDir(conf);
-    Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
-    if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) {
-      ServerName serverName = getServerNameFromWALDirectoryName(path);
-      if (serverName == null) {
-        LOG.error("Couldn't locate log: " + path);
-        return path;
-      }
-      oldLogDir = new Path(oldLogDir, serverName.getServerName());
-    }
-    Path archivedLogLocation = new Path(oldLogDir, path.getName());
-    final FileSystem fs = CommonFSUtils.getWALFileSystem(conf);
-
-    if (fs.exists(archivedLogLocation)) {
-      LOG.info("Log " + path + " was moved to " + archivedLogLocation);
-      return archivedLogLocation;
-    } else {
-      LOG.error("Couldn't locate log: " + path);
-      return path;
-    }
-  }
-
-  /**
    * Find the archived WAL file path if it is not able to locate in WALs dir.
    * @param path - active WAL file path
    * @param conf - configuration
@@ -510,7 +479,6 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
       LOG.info("Log " + path + " was moved to " + archivedLogLocation);
       return archivedLogLocation;
     }
-
     LOG.error("Couldn't locate log: " + path);
     return null;
   }
@@ -536,8 +504,9 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
         return reader;
       } catch (FileNotFoundException fnfe) {
         // If the log was archived, continue reading from there
-        Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(path, conf);
-        if (!Objects.equals(path, archivedLog)) {
+        Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf);
+        // archivedLog can be null if unable to locate in archiveDir.
+        if (archivedLog != null) {
           return openReader(archivedLog, conf);
         } else {
           throw fnfe;