You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2021/07/23 16:44:07 UTC
[hbase] branch branch-2 updated: HBASE-26093 Replication is stuck
due to zero length wal file in oldWALs directory (#3504)
This is an automated email from the ASF dual-hosted git repository.
apurtell pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 59e2825 HBASE-26093 Replication is stuck due to zero length wal file in oldWALs directory (#3504)
59e2825 is described below
commit 59e2825c936f7042f8185ca5dc751efa8d6732c4
Author: Rushabh Shah <sh...@gmail.com>
AuthorDate: Fri Jul 23 12:32:55 2021 -0400
HBASE-26093 Replication is stuck due to zero length wal file in oldWALs directory (#3504)
Signed-off-by: Andrew Purtell <ap...@apache.org>
Signed-off-by: Bharath Vissapragada <bh...@apache.org>
Signed-off-by: Duo Zhang <zh...@apache.org>
---
.../regionserver/ReplicationSourceWALReader.java | 15 ++++---
.../replication/regionserver/WALEntryStream.java | 35 +++-------------
.../hadoop/hbase/wal/AbstractFSWALProvider.java | 37 +++++++++++++++++
.../regionserver/TestBasicWALEntryStream.java | 46 ++++++++++++++++++++++
4 files changed, 98 insertions(+), 35 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index ca41184..9af91e5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
@@ -274,11 +275,15 @@ class ReplicationSourceWALReader extends Thread {
// since we don't add current log to recovered source queue so it is safe to remove.
if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
(source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) {
- Path head = queue.peek();
+ Path path = queue.peek();
try {
- if (fs.getFileStatus(head).getLen() == 0) {
- // head of the queue is an empty log file
- LOG.warn("Forcing removal of 0 length log in queue: {}", head);
+ if (!fs.exists(path)) {
+ // There is a chance that wal has moved to oldWALs directory, so look there also.
+ path = AbstractFSWALProvider.findArchivedLog(path, conf);
+ // path is null if it couldn't find archive path.
+ }
+ if (path != null && fs.getFileStatus(path).getLen() == 0) {
+ LOG.warn("Forcing removal of 0 length log in queue: {}", path);
logQueue.remove(walGroupId);
currentPosition = 0;
if (batch != null) {
@@ -289,7 +294,7 @@ class ReplicationSourceWALReader extends Thread {
return true;
}
} catch (IOException ioe) {
- LOG.warn("Couldn't get file length information about log " + queue.peek(), ioe);
+ LOG.warn("Couldn't get file length information about log " + path, ioe);
} catch (InterruptedException ie) {
LOG.trace("Interrupted while adding WAL batch to ship queue");
Thread.currentThread().interrupt();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index 75610ce..956024b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -27,13 +27,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALFactory;
@@ -315,35 +315,10 @@ class WALEntryStream implements Closeable {
return false;
}
- private Path getArchivedLog(Path path) throws IOException {
- Path walRootDir = CommonFSUtils.getWALRootDir(conf);
-
- // Try found the log in old dir
- Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
- Path archivedLogLocation = new Path(oldLogDir, path.getName());
- if (fs.exists(archivedLogLocation)) {
- LOG.info("Log " + path + " was moved to " + archivedLogLocation);
- return archivedLogLocation;
- }
-
- // Try found the log in the seperate old log dir
- oldLogDir =
- new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
- .append(Path.SEPARATOR).append(serverName.getServerName()).toString());
- archivedLogLocation = new Path(oldLogDir, path.getName());
- if (fs.exists(archivedLogLocation)) {
- LOG.info("Log " + path + " was moved to " + archivedLogLocation);
- return archivedLogLocation;
- }
-
- LOG.error("Couldn't locate log: " + path);
- return path;
- }
-
private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException {
// If the log was archived, continue reading from there
- Path archivedLog = getArchivedLog(path);
- if (!path.equals(archivedLog)) {
+ Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf);
+ if (archivedLog != null) {
openReader(archivedLog);
} else {
throw fnfe;
@@ -407,8 +382,8 @@ class WALEntryStream implements Closeable {
seek();
} catch (FileNotFoundException fnfe) {
// If the log was archived, continue reading from there
- Path archivedLog = getArchivedLog(currentPath);
- if (!currentPath.equals(archivedLog)) {
+ Path archivedLog = AbstractFSWALProvider.findArchivedLog(currentPath, conf);
+ if (archivedLog != null) {
openReader(archivedLog);
} else {
throw fnfe;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index a7e3ef5..5fbeca3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -479,6 +479,43 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
}
/**
+ * Find the archived WAL file path if it is not able to locate in WALs dir.
+ * @param path - active WAL file path
+ * @param conf - configuration
+ * @return archived path if exists, null - otherwise
+ * @throws IOException exception
+ */
+ public static Path findArchivedLog(Path path, Configuration conf) throws IOException {
+ // If the path contains oldWALs keyword then exit early.
+ if (path.toString().contains(HConstants.HREGION_OLDLOGDIR_NAME)) {
+ return null;
+ }
+ Path walRootDir = CommonFSUtils.getWALRootDir(conf);
+ FileSystem fs = path.getFileSystem(conf);
+ // Try finding the log in old dir
+ Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+ Path archivedLogLocation = new Path(oldLogDir, path.getName());
+ if (fs.exists(archivedLogLocation)) {
+ LOG.info("Log " + path + " was moved to " + archivedLogLocation);
+ return archivedLogLocation;
+ }
+
+ ServerName serverName = getServerNameFromWALDirectoryName(path);
+ // Try finding the log in separate old log dir
+ oldLogDir =
+ new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
+ .append(Path.SEPARATOR).append(serverName.getServerName()).toString());
+ archivedLogLocation = new Path(oldLogDir, path.getName());
+ if (fs.exists(archivedLogLocation)) {
+ LOG.info("Log " + path + " was moved to " + archivedLogLocation);
+ return archivedLogLocation;
+ }
+
+ LOG.error("Couldn't locate log: " + path);
+ return null;
+ }
+
+ /**
* Opens WAL reader with retries and additional exception handling
* @param path path to WAL file
* @param conf configuration
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
index ad77c9d..b07b5b4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
@@ -52,12 +52,14 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
@@ -716,4 +718,48 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
}
}
+
+ /**
+ * Tests that we handle EOFException properly if the wal has moved to oldWALs directory.
+ * @throws Exception exception
+ */
+ @Test
+ public void testEOFExceptionInOldWALsDirectory() throws Exception {
+ assertEquals(1, logQueue.getQueueSize(fakeWalGroupId));
+ AbstractFSWAL abstractWAL = (AbstractFSWAL)log;
+ Path emptyLogFile = abstractWAL.getCurrentFileName();
+ log.rollWriter(true);
+
+ // AsyncFSWAl and FSHLog both moves the log from WALs to oldWALs directory asynchronously.
+ // Wait for in flight wal close count to become 0. This makes sure that empty wal is moved to
+ // oldWALs directory.
+ Waiter.waitFor(CONF, 5000,
+ (Waiter.Predicate<Exception>) () -> abstractWAL.getInflightWALCloseCount() == 0);
+ // There will 2 logs in the queue.
+ assertEquals(2, logQueue.getQueueSize(fakeWalGroupId));
+
+ // Get the archived dir path for the first wal.
+ Path archivePath = AbstractFSWALProvider.findArchivedLog(emptyLogFile, CONF);
+ // Make sure that the wal path is not the same as archived Dir path.
+ assertNotNull(archivePath);
+ assertTrue(fs.exists(archivePath));
+ fs.truncate(archivePath, 0);
+ // make sure the size of the wal file is 0.
+ assertEquals(0, fs.getFileStatus(archivePath).getLen());
+
+ ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
+ ReplicationSource source = Mockito.mock(ReplicationSource.class);
+ when(source.isPeerEnabled()).thenReturn(true);
+ when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+
+ Configuration localConf = new Configuration(CONF);
+ localConf.setInt("replication.source.maxretriesmultiplier", 1);
+ localConf.setBoolean("replication.source.eof.autorecovery", true);
+ // Start the reader thread.
+ createReader(false, localConf);
+ // Wait for the replication queue size to be 1. This means that we have handled
+ // 0 length wal from oldWALs directory.
+ Waiter.waitFor(localConf, 10000,
+ (Waiter.Predicate<Exception>) () -> logQueue.getQueueSize(fakeWalGroupId) == 1);
+ }
}