You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2021/09/02 16:17:26 UTC
[hbase] branch master updated: HBASE-26106
AbstractFSWALProvider#getArchivedLogPath doesn't look for wal file in all
oldWALs directory. (#3636)
This is an automated email from the ASF dual-hosted git repository.
apurtell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new ec747bc HBASE-26106 AbstractFSWALProvider#getArchivedLogPath doesn't look for wal file in all oldWALs directory. (#3636)
ec747bc is described below
commit ec747bcb290d6f698643b5705705ff43efba7c16
Author: Rushabh Shah <sh...@gmail.com>
AuthorDate: Thu Sep 2 12:16:54 2021 -0400
HBASE-26106 AbstractFSWALProvider#getArchivedLogPath doesn't look for wal file in all oldWALs directory. (#3636)
Signed-off-by: Andrew Purtell <ap...@apache.org>
Signed-off-by: Duo Zhang <zh...@apache.org>
---
.../hadoop/hbase/mapreduce/WALInputFormat.java | 5 +--
.../hbase/mapreduce/TestWALRecordReader.java | 30 ++++++++++++++----
.../regionserver/ReplicationSource.java | 10 ++++--
.../regionserver/ReplicationSourceWALReader.java | 2 +-
.../replication/regionserver/WALEntryStream.java | 2 ++
.../hadoop/hbase/wal/AbstractFSWALProvider.java | 37 ++--------------------
6 files changed, 39 insertions(+), 47 deletions(-)
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
index 14bfec7..ae9dcb8 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
@@ -217,8 +217,9 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
}
return res;
} catch (IOException e) {
- Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(logFile, conf);
- if (logFile != archivedLog) {
+ Path archivedLog = AbstractFSWALProvider.findArchivedLog(logFile, conf);
+ // archivedLog can be null if unable to locate in archiveDir.
+ if (archivedLog != null) {
openReader(archivedLog);
// Try call again in recursion
return nextKeyValue();
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
index 5e1c097..c8ff904 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
@@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.mapreduce;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
@@ -31,6 +33,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
@@ -92,6 +95,11 @@ public class TestWALRecordReader {
return "TestWALRecordReader";
}
+ private static String getServerName() {
+ ServerName serverName = ServerName.valueOf("TestWALRecordReader", 1, 1);
+ return serverName.toString();
+ }
+
@Before
public void setUp() throws Exception {
fs.delete(hbaseDir, true);
@@ -282,7 +290,6 @@ public class TestWALRecordReader {
LOG.debug("log="+logDir+" file="+ split.getLogFileName());
testSplitWithMovingWAL(splits.get(0), Bytes.toBytes("1"), Bytes.toBytes("2"));
-
}
protected WALKeyImpl getWalKeyImpl(final long time, NavigableMap<byte[], Integer> scopes) {
@@ -335,13 +342,16 @@ public class TestWALRecordReader {
// Move log file to archive directory
// While WAL record reader is open
WALInputFormat.WALSplit split_ = (WALInputFormat.WALSplit) split;
-
Path logFile = new Path(split_.getLogFileName());
- Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(logFile, conf);
- boolean result = fs.rename(logFile, archivedLog);
- assertTrue(result);
- result = fs.exists(archivedLog);
- assertTrue(result);
+ Path archivedLogDir = getWALArchiveDir(conf);
+ Path archivedLogLocation = new Path(archivedLogDir, logFile.getName());
+ assertNotEquals(split_.getLogFileName(), archivedLogLocation.toString());
+
+ assertTrue(fs.rename(logFile, archivedLogLocation));
+ assertTrue(fs.exists(archivedLogDir));
+ assertFalse(fs.exists(logFile));
+ // TODO: This is not behaving as expected. WALInputFormat#WALKeyRecordReader doesn't open
+ // TODO: the archivedLogLocation to read next key value.
assertTrue(reader.nextKeyValue());
cell = reader.getCurrentValue().getCells().get(0);
if (!Bytes.equals(col2, 0, col2.length, cell.getQualifierArray(), cell.getQualifierOffset(),
@@ -353,4 +363,10 @@ public class TestWALRecordReader {
}
reader.close();
}
+
+ private Path getWALArchiveDir(Configuration conf) throws IOException {
+ Path rootDir = CommonFSUtils.getWALRootDir(conf);
+ String archiveDir = AbstractFSWALProvider.getWALArchiveDirectoryName(conf, getServerName());
+ return new Path(rootDir, archiveDir);
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index d1268fa..11222a5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
-import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath;
+import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.findArchivedLog;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
@@ -396,8 +396,12 @@ public class ReplicationSource implements ReplicationSourceInterface {
try {
fileSize = fs.getContentSummary(currentPath).getLength();
} catch (FileNotFoundException e) {
- currentPath = getArchivedLogPath(currentPath, conf);
- fileSize = fs.getContentSummary(currentPath).getLength();
+ Path archivedLogPath = findArchivedLog(currentPath, conf);
+ // archivedLogPath can be null if unable to locate in archiveDir.
+ if (archivedLogPath == null) {
+ throw new FileNotFoundException("Couldn't find path: " + currentPath);
+ }
+ fileSize = fs.getContentSummary(archivedLogPath).getLength();
}
return fileSize;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 9af91e5..1109044 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -280,7 +280,7 @@ class ReplicationSourceWALReader extends Thread {
if (!fs.exists(path)) {
// There is a chance that wal has moved to oldWALs directory, so look there also.
path = AbstractFSWALProvider.findArchivedLog(path, conf);
- // path is null if it couldn't find archive path.
+ // path can be null if unable to locate in archiveDir.
}
if (path != null && fs.getFileStatus(path).getLen() == 0) {
LOG.warn("Forcing removal of 0 length log in queue: {}", path);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index f04819d..488355c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -319,6 +319,7 @@ class WALEntryStream implements Closeable {
private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException {
// If the log was archived, continue reading from there
Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf);
+ // archivedLog can be null if unable to locate in archiveDir.
if (archivedLog != null) {
openReader(archivedLog);
} else {
@@ -384,6 +385,7 @@ class WALEntryStream implements Closeable {
} catch (FileNotFoundException fnfe) {
// If the log was archived, continue reading from there
Path archivedLog = AbstractFSWALProvider.findArchivedLog(currentPath, conf);
+ // archivedLog can be null if unable to locate in archiveDir.
if (archivedLog != null) {
openReader(archivedLog);
} else {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index ffe5c42..75605e6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
-import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -470,36 +469,6 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
}
/**
- * Get the archived WAL file path
- * @param path - active WAL file path
- * @param conf - configuration
- * @return archived path if exists, path - otherwise
- * @throws IOException exception
- */
- public static Path getArchivedLogPath(Path path, Configuration conf) throws IOException {
- Path rootDir = CommonFSUtils.getWALRootDir(conf);
- Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
- if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) {
- ServerName serverName = getServerNameFromWALDirectoryName(path);
- if (serverName == null) {
- LOG.error("Couldn't locate log: " + path);
- return path;
- }
- oldLogDir = new Path(oldLogDir, serverName.getServerName());
- }
- Path archivedLogLocation = new Path(oldLogDir, path.getName());
- final FileSystem fs = CommonFSUtils.getWALFileSystem(conf);
-
- if (fs.exists(archivedLogLocation)) {
- LOG.info("Log " + path + " was moved to " + archivedLogLocation);
- return archivedLogLocation;
- } else {
- LOG.error("Couldn't locate log: " + path);
- return path;
- }
- }
-
- /**
* Find the archived WAL file path if it is not able to locate in WALs dir.
* @param path - active WAL file path
* @param conf - configuration
@@ -531,7 +500,6 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
LOG.info("Log " + path + " was moved to " + archivedLogLocation);
return archivedLogLocation;
}
-
LOG.error("Couldn't locate log: " + path);
return null;
}
@@ -557,8 +525,9 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
return reader;
} catch (FileNotFoundException fnfe) {
// If the log was archived, continue reading from there
- Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(path, conf);
- if (!Objects.equals(path, archivedLog)) {
+ Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf);
+ // archivedLog can be null if unable to locate in archiveDir.
+ if (archivedLog != null) {
return openReader(archivedLog, conf);
} else {
throw fnfe;