You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2021/09/02 16:17:26 UTC

[hbase] branch master updated: HBASE-26106 AbstractFSWALProvider#getArchivedLogPath doesn't look for wal file in all oldWALs directory. (#3636)

This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new ec747bc  HBASE-26106 AbstractFSWALProvider#getArchivedLogPath doesn't look for wal file in all oldWALs directory. (#3636)
ec747bc is described below

commit ec747bcb290d6f698643b5705705ff43efba7c16
Author: Rushabh Shah <sh...@gmail.com>
AuthorDate: Thu Sep 2 12:16:54 2021 -0400

    HBASE-26106 AbstractFSWALProvider#getArchivedLogPath doesn't look for wal file in all oldWALs directory. (#3636)
    
    Signed-off-by: Andrew Purtell <ap...@apache.org>
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../hadoop/hbase/mapreduce/WALInputFormat.java     |  5 +--
 .../hbase/mapreduce/TestWALRecordReader.java       | 30 ++++++++++++++----
 .../regionserver/ReplicationSource.java            | 10 ++++--
 .../regionserver/ReplicationSourceWALReader.java   |  2 +-
 .../replication/regionserver/WALEntryStream.java   |  2 ++
 .../hadoop/hbase/wal/AbstractFSWALProvider.java    | 37 ++--------------------
 6 files changed, 39 insertions(+), 47 deletions(-)

diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
index 14bfec7..ae9dcb8 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
@@ -217,8 +217,9 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
         }
         return res;
       } catch (IOException e) {
-        Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(logFile, conf);
-        if (logFile != archivedLog) {
+        Path archivedLog = AbstractFSWALProvider.findArchivedLog(logFile, conf);
+        // archivedLog can be null if unable to locate in archiveDir.
+        if (archivedLog != null) {
           openReader(archivedLog);
           // Try call again in recursion
           return nextKeyValue();
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
index 5e1c097..c8ff904 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
@@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.mapreduce;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
+import java.io.IOException;
 import java.util.List;
 import java.util.NavigableMap;
 import java.util.TreeMap;
@@ -31,6 +33,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
@@ -92,6 +95,11 @@ public class TestWALRecordReader {
     return "TestWALRecordReader";
   }
 
+  private static String getServerName() {
+    ServerName serverName = ServerName.valueOf("TestWALRecordReader", 1, 1);
+    return serverName.toString();
+  }
+
   @Before
   public void setUp() throws Exception {
     fs.delete(hbaseDir, true);
@@ -282,7 +290,6 @@ public class TestWALRecordReader {
     LOG.debug("log="+logDir+" file="+ split.getLogFileName());
 
     testSplitWithMovingWAL(splits.get(0), Bytes.toBytes("1"), Bytes.toBytes("2"));
-
   }
 
   protected WALKeyImpl getWalKeyImpl(final long time, NavigableMap<byte[], Integer> scopes) {
@@ -335,13 +342,16 @@ public class TestWALRecordReader {
     // Move log file to archive directory
     // While WAL record reader is open
     WALInputFormat.WALSplit split_ = (WALInputFormat.WALSplit) split;
-
     Path logFile = new Path(split_.getLogFileName());
-    Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(logFile, conf);
-    boolean result = fs.rename(logFile, archivedLog);
-    assertTrue(result);
-    result = fs.exists(archivedLog);
-    assertTrue(result);
+    Path archivedLogDir = getWALArchiveDir(conf);
+    Path archivedLogLocation = new Path(archivedLogDir, logFile.getName());
+    assertNotEquals(split_.getLogFileName(), archivedLogLocation.toString());
+
+    assertTrue(fs.rename(logFile, archivedLogLocation));
+    assertTrue(fs.exists(archivedLogDir));
+    assertFalse(fs.exists(logFile));
+    // TODO: This is not behaving as expected. WALInputFormat#WALKeyRecordReader doesn't open
+    // TODO: the archivedLogLocation to read next key value.
     assertTrue(reader.nextKeyValue());
     cell = reader.getCurrentValue().getCells().get(0);
     if (!Bytes.equals(col2, 0, col2.length, cell.getQualifierArray(), cell.getQualifierOffset(),
@@ -353,4 +363,10 @@ public class TestWALRecordReader {
     }
     reader.close();
   }
+
+  private Path getWALArchiveDir(Configuration conf) throws IOException {
+    Path rootDir = CommonFSUtils.getWALRootDir(conf);
+    String archiveDir = AbstractFSWALProvider.getWALArchiveDirectoryName(conf, getServerName());
+    return new Path(rootDir, archiveDir);
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index d1268fa..11222a5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
-import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath;
+import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.findArchivedLog;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
@@ -396,8 +396,12 @@ public class ReplicationSource implements ReplicationSourceInterface {
     try {
       fileSize = fs.getContentSummary(currentPath).getLength();
     } catch (FileNotFoundException e) {
-      currentPath = getArchivedLogPath(currentPath, conf);
-      fileSize = fs.getContentSummary(currentPath).getLength();
+      Path archivedLogPath = findArchivedLog(currentPath, conf);
+      // archivedLogPath can be null if unable to locate in archiveDir.
+      if (archivedLogPath == null) {
+        throw new FileNotFoundException("Couldn't find path: " + currentPath);
+      }
+      fileSize = fs.getContentSummary(archivedLogPath).getLength();
     }
     return fileSize;
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 9af91e5..1109044 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -280,7 +280,7 @@ class ReplicationSourceWALReader extends Thread {
         if (!fs.exists(path)) {
           // There is a chance that wal has moved to oldWALs directory, so look there also.
           path = AbstractFSWALProvider.findArchivedLog(path, conf);
-          // path is null if it couldn't find archive path.
+          // path can be null if unable to locate in archiveDir.
         }
         if (path != null && fs.getFileStatus(path).getLen() == 0) {
           LOG.warn("Forcing removal of 0 length log in queue: {}", path);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index f04819d..488355c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -319,6 +319,7 @@ class WALEntryStream implements Closeable {
   private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException {
     // If the log was archived, continue reading from there
     Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf);
+    // archivedLog can be null if unable to locate in archiveDir.
     if (archivedLog != null) {
       openReader(archivedLog);
     } else {
@@ -384,6 +385,7 @@ class WALEntryStream implements Closeable {
     } catch (FileNotFoundException fnfe) {
       // If the log was archived, continue reading from there
       Path archivedLog = AbstractFSWALProvider.findArchivedLog(currentPath, conf);
+      // archivedLog can be null if unable to locate in archiveDir.
       if (archivedLog != null) {
         openReader(archivedLog);
       } else {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index ffe5c42..75605e6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -470,36 +469,6 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
   }
 
   /**
-   * Get the archived WAL file path
-   * @param path - active WAL file path
-   * @param conf - configuration
-   * @return archived path if exists, path - otherwise
-   * @throws IOException exception
-   */
-  public static Path getArchivedLogPath(Path path, Configuration conf) throws IOException {
-    Path rootDir = CommonFSUtils.getWALRootDir(conf);
-    Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
-    if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) {
-      ServerName serverName = getServerNameFromWALDirectoryName(path);
-      if (serverName == null) {
-        LOG.error("Couldn't locate log: " + path);
-        return path;
-      }
-      oldLogDir = new Path(oldLogDir, serverName.getServerName());
-    }
-    Path archivedLogLocation = new Path(oldLogDir, path.getName());
-    final FileSystem fs = CommonFSUtils.getWALFileSystem(conf);
-
-    if (fs.exists(archivedLogLocation)) {
-      LOG.info("Log " + path + " was moved to " + archivedLogLocation);
-      return archivedLogLocation;
-    } else {
-      LOG.error("Couldn't locate log: " + path);
-      return path;
-    }
-  }
-
-  /**
    * Find the archived WAL file path if it is not able to locate in WALs dir.
    * @param path - active WAL file path
    * @param conf - configuration
@@ -531,7 +500,6 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
       LOG.info("Log " + path + " was moved to " + archivedLogLocation);
       return archivedLogLocation;
     }
-
     LOG.error("Couldn't locate log: " + path);
     return null;
   }
@@ -557,8 +525,9 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
         return reader;
       } catch (FileNotFoundException fnfe) {
         // If the log was archived, continue reading from there
-        Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(path, conf);
-        if (!Objects.equals(path, archivedLog)) {
+        Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf);
+        // archivedLog can be null if unable to locate in archiveDir.
+        if (archivedLog != null) {
           return openReader(archivedLog, conf);
         } else {
           throw fnfe;