You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by GitBox <gi...@apache.org> on 2021/02/25 19:02:00 UTC

[GitHub] [hbase] apurtell commented on a change in pull request #2987: HBASE-25596: Fix NPE and avoid permanent unreplicated data due to EOF

apurtell commented on a change in pull request #2987:
URL: https://github.com/apache/hbase/pull/2987#discussion_r583087218



##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
##########
@@ -245,22 +270,37 @@ private void handleEmptyWALEntryBatch() throws InterruptedException {
   }
 
   /**
-   * if we get an EOF due to a zero-length log, and there are other logs in queue
-   * (highly likely we've closed the current log), and autorecovery is
-   * enabled, then dump the log
+   * This is to handle the EOFException from the WAL entry stream. EOFException should
+   * be handled carefully because there are chances of data loss because of never replicating
+   * the data. Thus we should always try to ship existing batch of entries here.
+   * If there was only one log in the queue before EOF, we ship the empty batch here
+   * and since reader is still active, in the next iteration of reader we will
+   * stop the reader.
+   * If there was more than one log in the queue before EOF, we ship the existing batch
+   * and reset the wal patch and position to the log with EOF, so shipper can remove
+   * logs from replication queue
    * @return true only the IOE can be handled
    */
-  private boolean handleEofException(IOException e) {
+  private boolean handleEofException(IOException e, WALEntryBatch batch)
+      throws InterruptedException {
     PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
     // Dump the log even if logQueue size is 1 if the source is from recovered Source
     // since we don't add current log to recovered source queue so it is safe to remove.
-    if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
-      (source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) {
+    if ((e instanceof EOFException || e.getCause() instanceof EOFException)
+      && (source.isRecovered() || queue.size() > 1)
+      && this.eofAutoRecovery) {
       try {
         if (fs.getFileStatus(queue.peek()).getLen() == 0) {

Review comment:
       There is a potential race here. queue.peek() is called twice, and can return different results. (There's no guarantee that it won't, right?) A local variable should be assigned just before this line and reused, e.g.
   
       Path head = queue.peek();
       if (fs.getFileStatus(head.getLen() == 0) {
           LOG.warn("Forcing removal of 0 length log in queue: {}", head);
           ...
           batch.setLastWalPath(head);
           ...
       }

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
##########
@@ -123,44 +122,64 @@ public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
   @Override
   public void run() {
     int sleepMultiplier = 1;
-    while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
-      try (WALEntryStream entryStream =
-          new WALEntryStream(logQueue, conf, currentPosition,
-              source.getWALFileLengthProvider(), source.getServerWALsBelongTo(),
-              source.getSourceMetrics(), walGroupId)) {
-        while (isReaderRunning()) { // loop here to keep reusing stream while we can
-          if (!source.isPeerEnabled()) {
-            Threads.sleep(sleepForRetries);
-            continue;
-          }
-          if (!checkQuota()) {
-            continue;
+    WALEntryBatch batch = null;
+    WALEntryStream entryStream = null;
+    try {
+      // we only loop back here if something fatal happened to our stream
+      while (isReaderRunning()) {
+        try {
+          entryStream =
+            new WALEntryStream(logQueue, conf, currentPosition, source.getWALFileLengthProvider(),
+              source.getServerWALsBelongTo(), source.getSourceMetrics(), walGroupId);
+          while (isReaderRunning()) { // loop here to keep reusing stream while we can
+            if (!source.isPeerEnabled()) {
+              Threads.sleep(sleepForRetries);
+              continue;
+            }
+            if (!checkQuota()) {
+              continue;
+            }
+
+            batch = createBatch(entryStream);
+            batch = readWALEntries(entryStream, batch);
+            currentPosition = entryStream.getPosition();
+            if (batch == null) {
+              // either the queue have no WAL to read
+              // or got no new entries (didn't advance position in WAL)
+              handleEmptyWALEntryBatch();
+              entryStream.reset(); // reuse stream
+            } else {
+              addBatchToShippingQueue(batch);
+            }
           }
-          WALEntryBatch batch = readWALEntries(entryStream);
-          currentPosition = entryStream.getPosition();
-          if (batch != null) {
-            // need to propagate the batch even it has no entries since it may carry the last
-            // sequence id information for serial replication.
-            LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
-            entryBatchQueue.put(batch);
+        } catch (IOException e) { // stream related
+          if (handleEofException(e, batch)) {
             sleepMultiplier = 1;
-          } else { // got no entries and didn't advance position in WAL
-            handleEmptyWALEntryBatch();
-            entryStream.reset(); // reuse stream
+          } else {
+            LOG.warn("Failed to read stream of replication entries", e);
+            if (sleepMultiplier < maxRetriesMultiplier) {
+              sleepMultiplier++;
+            }
+            Threads.sleep(sleepForRetries * sleepMultiplier);
           }
+        } catch (InterruptedException e) {
+          LOG.trace("Interrupted while sleeping between WAL reads");
+          Thread.currentThread().interrupt();
+        } finally {
+          entryStream.close();

Review comment:
       See earlier branch-1 review as to why the try blocks have been restructured here. lgtm




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org