You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2023/03/15 13:50:06 UTC

[hbase] branch branch-2 updated: HBASE-27715 Refactoring the long tryAdvanceEntry method in WALEntryStream (#5105)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new ef8a981f220 HBASE-27715 Refactoring the long tryAdvanceEntry method in WALEntryStream (#5105)
ef8a981f220 is described below

commit ef8a981f2200895851942e392b1f752e76f8dad7
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Wed Mar 15 21:47:25 2023 +0800

    HBASE-27715 Refactoring the long tryAdvanceEntry method in WALEntryStream (#5105)
    
    Signed-off-by: Liangjun He <he...@apache.org>
    (cherry picked from commit 1f2e1f5b3a18504b3192f3651397af7f90b04932)
---
 .../replication/regionserver/WALEntryStream.java   | 218 ++++++++++++---------
 1 file changed, 120 insertions(+), 98 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index d95d42f2f30..c6268674c5b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -206,73 +206,127 @@ class WALEntryStream implements Closeable {
 
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION",
       justification = "HDFS-4380")
-  private HasNext tryAdvanceEntry() {
-    if (reader == null) {
-      // try open next WAL file
-      PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
-      Path nextPath = queue.peek();
-      if (nextPath != null) {
-        setCurrentPath(nextPath);
-        // we need to test this prior to create the reader. If not, it is possible that, while
-        // opening the file, the file is still being written so its header is incomplete and we get
-        // a header EOF, but then while we test whether it is still being written, we have already
-        // flushed the data out and we consider it is not being written, and then we just skip over
-        // file, then we will lose the data written after opening...
-        boolean beingWritten =
-          walFileLengthProvider.getLogFileSizeIfBeingWritten(nextPath).isPresent();
+  private HasNext prepareReader() {
+    if (reader != null) {
+      if (state != null && state != WALTailingReader.State.NORMAL) {
+        // reset before reading
+        LOG.debug("Reset reader {} to pos {}, reset compression={}", currentPath,
+          currentPositionOfEntry, state.resetCompression());
         try {
-          reader = WALFactory.createTailingReader(fs, nextPath, conf,
-            currentPositionOfEntry > 0 ? currentPositionOfEntry : -1);
-        } catch (WALHeaderEOFException e) {
-          if (!eofAutoRecovery) {
-            // if we do not enable EOF auto recovery, just let the upper layer retry
-            // the replication will be stuck usually, and need to be fixed manually
-            return HasNext.RETRY;
-          }
-          // we hit EOF while reading the WAL header, usually this means we can just skip over this
-          // file, but we need to be careful that whether this file is still being written, if so we
-          // should retry instead of skipping.
-          LOG.warn("EOF while trying to open WAL reader for path: {}", nextPath, e);
-          if (beingWritten) {
-            // just retry as the file is still being written, maybe next time we could read
-            // something
-            return HasNext.RETRY;
+          if (currentPositionOfEntry > 0) {
+            reader.resetTo(currentPositionOfEntry, state.resetCompression());
           } else {
-            // the file is not being written so we are safe to just skip over it
-            dequeueCurrentLog();
-            return HasNext.RETRY_IMMEDIATELY;
+            // we will read from the beginning so we should always clear the compression context
+            reader.resetTo(-1, true);
           }
-        } catch (LeaseNotRecoveredException e) {
-          // HBASE-15019 the WAL was not closed due to some hiccup.
-          LOG.warn("Try to recover the WAL lease " + nextPath, e);
-          AbstractFSWALProvider.recoverLease(conf, nextPath);
-          return HasNext.RETRY;
-        } catch (IOException | NullPointerException e) {
-          // For why we need to catch NPE here, see HDFS-4380 for more details
-          LOG.warn("Failed to open WAL reader for path: {}", nextPath, e);
+        } catch (IOException e) {
+          LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
+            currentPositionOfEntry, state.resetCompression(), e);
+          // just leave the state as is, and try resetting next time
           return HasNext.RETRY;
         }
       } else {
-        // no more files in queue, this could happen for recovered queue, or for a wal group of a
-        // sync replication peer which has already been transited to DA or S.
-        setCurrentPath(null);
-        return HasNext.NO;
+        return HasNext.YES;
       }
-    } else if (state != null && state != WALTailingReader.State.NORMAL) {
-      // reset before reading
-      try {
-        if (currentPositionOfEntry > 0) {
-          reader.resetTo(currentPositionOfEntry, state.resetCompression());
-        } else {
-          // we will read from the beginning so we should always clear the compression context
-          reader.resetTo(-1, true);
-        }
-      } catch (IOException e) {
-        LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
-          currentPositionOfEntry, state.resetCompression(), e);
-        // just leave the state as is, and try resetting next time
+    }
+    // try open next WAL file
+    PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
+    Path nextPath = queue.peek();
+    if (nextPath == null) {
+      LOG.debug("No more WAL files in queue");
+      // no more files in queue, this could happen for recovered queue, or for a wal group of a
+      // sync replication peer which has already been transited to DA or S.
+      setCurrentPath(null);
+      return HasNext.NO;
+    }
+    setCurrentPath(nextPath);
+    // we need to test this prior to create the reader. If not, it is possible that, while
+    // opening the file, the file is still being written so its header is incomplete and we get
+    // a header EOF, but then while we test whether it is still being written, we have already
+    // flushed the data out and we consider it is not being written, and then we just skip over
+    // file, then we will lose the data written after opening...
+    boolean beingWritten = walFileLengthProvider.getLogFileSizeIfBeingWritten(nextPath).isPresent();
+    LOG.debug("Creating new reader {}, startPosition={}, beingWritten={}", nextPath,
+      currentPositionOfEntry, beingWritten);
+    try {
+      reader = WALFactory.createTailingReader(fs, nextPath, conf,
+        currentPositionOfEntry > 0 ? currentPositionOfEntry : -1);
+      return HasNext.YES;
+    } catch (WALHeaderEOFException e) {
+      if (!eofAutoRecovery) {
+        // if we do not enable EOF auto recovery, just let the upper layer retry
+        // the replication will be stuck usually, and need to be fixed manually
         return HasNext.RETRY;
       }
+      // we hit EOF while reading the WAL header, usually this means we can just skip over this
+      // file, but we need to be careful that whether this file is still being written, if so we
+      // should retry instead of skipping.
+      LOG.warn("EOF while trying to open WAL reader for path: {}, startPosition={}", nextPath,
+        currentPositionOfEntry, e);
+      if (beingWritten) {
+        // just retry as the file is still being written, maybe next time we could read
+        // something
+        return HasNext.RETRY;
+      } else {
+        // the file is not being written so we are safe to just skip over it
+        dequeueCurrentLog();
+        return HasNext.RETRY_IMMEDIATELY;
+      }
+    } catch (LeaseNotRecoveredException e) {
+      // HBASE-15019 the WAL was not closed due to some hiccup.
+      LOG.warn("Try to recover the WAL lease " + nextPath, e);
+      AbstractFSWALProvider.recoverLease(conf, nextPath);
+      return HasNext.RETRY;
+    } catch (IOException | NullPointerException e) {
+      // For why we need to catch NPE here, see HDFS-4380 for more details
+      LOG.warn("Failed to open WAL reader for path: {}", nextPath, e);
+      return HasNext.RETRY;
+    }
+  }
+
+  private HasNext lastAttempt() {
+    LOG.debug("Reset reader {} for the last time to pos {}, reset compression={}", currentPath,
+      currentPositionOfEntry, state.resetCompression());
+    try {
+      reader.resetTo(currentPositionOfEntry, state.resetCompression());
+    } catch (IOException e) {
+      LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
+        currentPositionOfEntry, state.resetCompression(), e);
+      // just leave the state as is, next time we will try to reset it again, but there is a
+      // nasty problem is that, we will still reach here finally and try reset again to see if
+      // the log has been fully replicated, which is redundant, can be optimized later
+      return HasNext.RETRY;
+    }
+    Pair<WALTailingReader.State, Boolean> pair = readNextEntryAndRecordReaderPosition();
+    state = pair.getFirst();
+    // should not be written
+    assert !pair.getSecond();
+    if (!state.eof()) {
+      // we still have something to read after reopen, so return YES. Or there are something wrong
+      // and we need to retry
+      return state == WALTailingReader.State.NORMAL ? HasNext.YES : HasNext.RETRY;
+    }
+    // No data available after reopen
+    if (checkAllBytesParsed()) {
+      // move to the next wal file and read
+      dequeueCurrentLog();
+      return HasNext.RETRY_IMMEDIATELY;
+    } else {
+      // see HBASE-15983, if checkAllBytesParsed returns false, we need to try read from
+      // beginning again. Here we set position to 0 and state to ERROR_AND_RESET_COMPRESSION
+      // so when calling tryAdvanceENtry next time we will reset the reader to the beginning
+      // and read.
+      currentPositionOfEntry = 0;
+      currentPositionOfReader = 0;
+      state = WALTailingReader.State.ERROR_AND_RESET_COMPRESSION;
+      return HasNext.RETRY;
+    }
+  }
+
+  private HasNext tryAdvanceEntry() {
+    HasNext prepared = prepareReader();
+    if (prepared != HasNext.YES) {
+      return prepared;
     }
 
     Pair<WALTailingReader.State, Boolean> pair = readNextEntryAndRecordReaderPosition();
@@ -292,46 +346,16 @@ class WALEntryStream implements Closeable {
         return HasNext.RETRY_IMMEDIATELY;
       case EOF_AND_RESET:
       case EOF_AND_RESET_COMPRESSION:
-        if (!beingWritten) {
-          // no more entries in this log file, and the file is already closed, i.e, rolled
-          // Before dequeuing, we should always get one more attempt at reading.
-          // This is in case more entries came in after we opened the reader, and the log is rolled
-          // while we were reading. See HBASE-6758
-          try {
-            reader.resetTo(currentPositionOfEntry, state.resetCompression());
-          } catch (IOException e) {
-            LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
-              currentPositionOfEntry, state.resetCompression(), e);
-            // just leave the state as is, next time we will try to reset it again, but there is a
-            // nasty problem is that, we will still reach here finally and try reset again to see if
-            // the log has been fully replicated, which is redundant, can be optimized later
-            return HasNext.RETRY;
-          }
-          Pair<WALTailingReader.State, Boolean> p = readNextEntryAndRecordReaderPosition();
-          state = pair.getFirst();
-          // should not be written
-          assert !p.getSecond();
-          if (state.eof()) {
-            if (checkAllBytesParsed()) {
-              // move to the next wal file and read
-              dequeueCurrentLog();
-              return HasNext.RETRY_IMMEDIATELY;
-            } else {
-              // see HBASE-15983, if checkAllBytesParsed returns false, we need to try read from
-              // beginning again. Here we set position to 0 and state to ERROR_AND_RESET_COMPRESSION
-              // so when calling tryAdvanceENtry next time we will reset the reader to the beginning
-              // and read.
-              currentPositionOfEntry = 0;
-              currentPositionOfReader = 0;
-              state = WALTailingReader.State.ERROR_AND_RESET_COMPRESSION;
-              return HasNext.RETRY;
-            }
-          }
-        } else {
+        if (beingWritten) {
           // just sleep a bit and retry to see if there are new entries coming since the file is
           // still being written
           return HasNext.RETRY;
         }
+        // no more entries in this log file, and the file is already closed, i.e, rolled
+        // Before dequeuing, we should always get one more attempt at reading.
+        // This is in case more entries came in after we opened the reader, and the log is rolled
+        // while we were reading. See HBASE-6758
+        return lastAttempt();
       case ERROR_AND_RESET:
       case ERROR_AND_RESET_COMPRESSION:
         // we have meet an error, just sleep a bit and retry again
@@ -393,10 +417,8 @@ class WALEntryStream implements Closeable {
         return false;
       }
     }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Reached the end of {} and length of the file is {}", currentPath,
-        stat == null ? "N/A" : stat.getLen());
-    }
+    LOG.debug("Reached the end of {} and length of the file is {}", currentPath,
+      stat == null ? "N/A" : stat.getLen());
     metrics.incrCompletedWAL();
     return true;
   }