You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2023/03/15 13:47:38 UTC
[hbase] branch master updated: HBASE-27715 Refactoring the long tryAdvanceEntry method in WALEntryStream (#5105)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 1f2e1f5b3a1 HBASE-27715 Refactoring the long tryAdvanceEntry method in WALEntryStream (#5105)
1f2e1f5b3a1 is described below
commit 1f2e1f5b3a18504b3192f3651397af7f90b04932
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Wed Mar 15 21:47:25 2023 +0800
HBASE-27715 Refactoring the long tryAdvanceEntry method in WALEntryStream (#5105)
Signed-off-by: Liangjun He <he...@apache.org>
---
.../replication/regionserver/WALEntryStream.java | 218 ++++++++++++---------
1 file changed, 120 insertions(+), 98 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index d95d42f2f30..c6268674c5b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -206,73 +206,127 @@ class WALEntryStream implements Closeable {
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION",
justification = "HDFS-4380")
- private HasNext tryAdvanceEntry() {
- if (reader == null) {
- // try open next WAL file
- PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
- Path nextPath = queue.peek();
- if (nextPath != null) {
- setCurrentPath(nextPath);
- // we need to test this prior to create the reader. If not, it is possible that, while
- // opening the file, the file is still being written so its header is incomplete and we get
- // a header EOF, but then while we test whether it is still being written, we have already
- // flushed the data out and we consider it is not being written, and then we just skip over
- // file, then we will lose the data written after opening...
- boolean beingWritten =
- walFileLengthProvider.getLogFileSizeIfBeingWritten(nextPath).isPresent();
+ private HasNext prepareReader() {
+ if (reader != null) {
+ if (state != null && state != WALTailingReader.State.NORMAL) {
+ // reset before reading
+ LOG.debug("Reset reader {} to pos {}, reset compression={}", currentPath,
+ currentPositionOfEntry, state.resetCompression());
try {
- reader = WALFactory.createTailingReader(fs, nextPath, conf,
- currentPositionOfEntry > 0 ? currentPositionOfEntry : -1);
- } catch (WALHeaderEOFException e) {
- if (!eofAutoRecovery) {
- // if we do not enable EOF auto recovery, just let the upper layer retry
- // the replication will be stuck usually, and need to be fixed manually
- return HasNext.RETRY;
- }
- // we hit EOF while reading the WAL header, usually this means we can just skip over this
- // file, but we need to be careful that whether this file is still being written, if so we
- // should retry instead of skipping.
- LOG.warn("EOF while trying to open WAL reader for path: {}", nextPath, e);
- if (beingWritten) {
- // just retry as the file is still being written, maybe next time we could read
- // something
- return HasNext.RETRY;
+ if (currentPositionOfEntry > 0) {
+ reader.resetTo(currentPositionOfEntry, state.resetCompression());
} else {
- // the file is not being written so we are safe to just skip over it
- dequeueCurrentLog();
- return HasNext.RETRY_IMMEDIATELY;
+ // we will read from the beginning so we should always clear the compression context
+ reader.resetTo(-1, true);
}
- } catch (LeaseNotRecoveredException e) {
- // HBASE-15019 the WAL was not closed due to some hiccup.
- LOG.warn("Try to recover the WAL lease " + nextPath, e);
- AbstractFSWALProvider.recoverLease(conf, nextPath);
- return HasNext.RETRY;
- } catch (IOException | NullPointerException e) {
- // For why we need to catch NPE here, see HDFS-4380 for more details
- LOG.warn("Failed to open WAL reader for path: {}", nextPath, e);
+ } catch (IOException e) {
+ LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
+ currentPositionOfEntry, state.resetCompression(), e);
+ // just leave the state as is, and try resetting next time
return HasNext.RETRY;
}
} else {
- // no more files in queue, this could happen for recovered queue, or for a wal group of a
- // sync replication peer which has already been transited to DA or S.
- setCurrentPath(null);
- return HasNext.NO;
+ return HasNext.YES;
}
- } else if (state != null && state != WALTailingReader.State.NORMAL) {
- // reset before reading
- try {
- if (currentPositionOfEntry > 0) {
- reader.resetTo(currentPositionOfEntry, state.resetCompression());
- } else {
- // we will read from the beginning so we should always clear the compression context
- reader.resetTo(-1, true);
- }
- } catch (IOException e) {
- LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
- currentPositionOfEntry, state.resetCompression(), e);
- // just leave the state as is, and try resetting next time
+ }
+ // try open next WAL file
+ PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
+ Path nextPath = queue.peek();
+ if (nextPath == null) {
+ LOG.debug("No more WAL files in queue");
+ // no more files in queue, this could happen for recovered queue, or for a wal group of a
+ // sync replication peer which has already been transited to DA or S.
+ setCurrentPath(null);
+ return HasNext.NO;
+ }
+ setCurrentPath(nextPath);
+ // we need to test this prior to create the reader. If not, it is possible that, while
+ // opening the file, the file is still being written so its header is incomplete and we get
+ // a header EOF, but then while we test whether it is still being written, we have already
+ // flushed the data out and we consider it is not being written, and then we just skip over
+ // file, then we will lose the data written after opening...
+ boolean beingWritten = walFileLengthProvider.getLogFileSizeIfBeingWritten(nextPath).isPresent();
+ LOG.debug("Creating new reader {}, startPosition={}, beingWritten={}", nextPath,
+ currentPositionOfEntry, beingWritten);
+ try {
+ reader = WALFactory.createTailingReader(fs, nextPath, conf,
+ currentPositionOfEntry > 0 ? currentPositionOfEntry : -1);
+ return HasNext.YES;
+ } catch (WALHeaderEOFException e) {
+ if (!eofAutoRecovery) {
+ // if we do not enable EOF auto recovery, just let the upper layer retry
+ // the replication will be stuck usually, and need to be fixed manually
return HasNext.RETRY;
}
+ // we hit EOF while reading the WAL header, usually this means we can just skip over this
+ // file, but we need to be careful that whether this file is still being written, if so we
+ // should retry instead of skipping.
+ LOG.warn("EOF while trying to open WAL reader for path: {}, startPosition={}", nextPath,
+ currentPositionOfEntry, e);
+ if (beingWritten) {
+ // just retry as the file is still being written, maybe next time we could read
+ // something
+ return HasNext.RETRY;
+ } else {
+ // the file is not being written so we are safe to just skip over it
+ dequeueCurrentLog();
+ return HasNext.RETRY_IMMEDIATELY;
+ }
+ } catch (LeaseNotRecoveredException e) {
+ // HBASE-15019 the WAL was not closed due to some hiccup.
+ LOG.warn("Try to recover the WAL lease " + nextPath, e);
+ AbstractFSWALProvider.recoverLease(conf, nextPath);
+ return HasNext.RETRY;
+ } catch (IOException | NullPointerException e) {
+ // For why we need to catch NPE here, see HDFS-4380 for more details
+ LOG.warn("Failed to open WAL reader for path: {}", nextPath, e);
+ return HasNext.RETRY;
+ }
+ }
+
+ private HasNext lastAttempt() {
+ LOG.debug("Reset reader {} for the last time to pos {}, reset compression={}", currentPath,
+ currentPositionOfEntry, state.resetCompression());
+ try {
+ reader.resetTo(currentPositionOfEntry, state.resetCompression());
+ } catch (IOException e) {
+ LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
+ currentPositionOfEntry, state.resetCompression(), e);
+ // just leave the state as is, next time we will try to reset it again, but there is a
+ // nasty problem is that, we will still reach here finally and try reset again to see if
+ // the log has been fully replicated, which is redundant, can be optimized later
+ return HasNext.RETRY;
+ }
+ Pair<WALTailingReader.State, Boolean> pair = readNextEntryAndRecordReaderPosition();
+ state = pair.getFirst();
+ // should not be written
+ assert !pair.getSecond();
+ if (!state.eof()) {
+ // we still have something to read after reopen, so return YES. Or there are something wrong
+ // and we need to retry
+ return state == WALTailingReader.State.NORMAL ? HasNext.YES : HasNext.RETRY;
+ }
+ // No data available after reopen
+ if (checkAllBytesParsed()) {
+ // move to the next wal file and read
+ dequeueCurrentLog();
+ return HasNext.RETRY_IMMEDIATELY;
+ } else {
+ // see HBASE-15983, if checkAllBytesParsed returns false, we need to try read from
+ // beginning again. Here we set position to 0 and state to ERROR_AND_RESET_COMPRESSION
+ // so when calling tryAdvanceENtry next time we will reset the reader to the beginning
+ // and read.
+ currentPositionOfEntry = 0;
+ currentPositionOfReader = 0;
+ state = WALTailingReader.State.ERROR_AND_RESET_COMPRESSION;
+ return HasNext.RETRY;
+ }
+ }
+
+ private HasNext tryAdvanceEntry() {
+ HasNext prepared = prepareReader();
+ if (prepared != HasNext.YES) {
+ return prepared;
}
Pair<WALTailingReader.State, Boolean> pair = readNextEntryAndRecordReaderPosition();
@@ -292,46 +346,16 @@ class WALEntryStream implements Closeable {
return HasNext.RETRY_IMMEDIATELY;
case EOF_AND_RESET:
case EOF_AND_RESET_COMPRESSION:
- if (!beingWritten) {
- // no more entries in this log file, and the file is already closed, i.e, rolled
- // Before dequeuing, we should always get one more attempt at reading.
- // This is in case more entries came in after we opened the reader, and the log is rolled
- // while we were reading. See HBASE-6758
- try {
- reader.resetTo(currentPositionOfEntry, state.resetCompression());
- } catch (IOException e) {
- LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
- currentPositionOfEntry, state.resetCompression(), e);
- // just leave the state as is, next time we will try to reset it again, but there is a
- // nasty problem is that, we will still reach here finally and try reset again to see if
- // the log has been fully replicated, which is redundant, can be optimized later
- return HasNext.RETRY;
- }
- Pair<WALTailingReader.State, Boolean> p = readNextEntryAndRecordReaderPosition();
- state = pair.getFirst();
- // should not be written
- assert !p.getSecond();
- if (state.eof()) {
- if (checkAllBytesParsed()) {
- // move to the next wal file and read
- dequeueCurrentLog();
- return HasNext.RETRY_IMMEDIATELY;
- } else {
- // see HBASE-15983, if checkAllBytesParsed returns false, we need to try read from
- // beginning again. Here we set position to 0 and state to ERROR_AND_RESET_COMPRESSION
- // so when calling tryAdvanceENtry next time we will reset the reader to the beginning
- // and read.
- currentPositionOfEntry = 0;
- currentPositionOfReader = 0;
- state = WALTailingReader.State.ERROR_AND_RESET_COMPRESSION;
- return HasNext.RETRY;
- }
- }
- } else {
+ if (beingWritten) {
// just sleep a bit and retry to see if there are new entries coming since the file is
// still being written
return HasNext.RETRY;
}
+ // no more entries in this log file, and the file is already closed, i.e, rolled
+ // Before dequeuing, we should always get one more attempt at reading.
+ // This is in case more entries came in after we opened the reader, and the log is rolled
+ // while we were reading. See HBASE-6758
+ return lastAttempt();
case ERROR_AND_RESET:
case ERROR_AND_RESET_COMPRESSION:
// we have meet an error, just sleep a bit and retry again
@@ -393,10 +417,8 @@ class WALEntryStream implements Closeable {
return false;
}
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Reached the end of {} and length of the file is {}", currentPath,
- stat == null ? "N/A" : stat.getLen());
- }
+ LOG.debug("Reached the end of {} and length of the file is {}", currentPath,
+ stat == null ? "N/A" : stat.getLen());
metrics.incrCompletedWAL();
return true;
}