You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by GitBox <gi...@apache.org> on 2021/06/10 18:37:49 UTC

[GitHub] [hbase] sandeepvinayak commented on a change in pull request #3376: HBASE-25992 Polish the ReplicationSourceWALReader code for 2.x after …

sandeepvinayak commented on a change in pull request #3376:
URL: https://github.com/apache/hbase/pull/3376#discussion_r649431795



##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
##########
@@ -122,65 +122,51 @@ public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
   @Override
   public void run() {
     int sleepMultiplier = 1;
-    WALEntryBatch batch = null;
-    WALEntryStream entryStream = null;
-    try {
-      // we only loop back here if something fatal happened to our stream
-      while (isReaderRunning()) {
-        try {
-          entryStream =
-            new WALEntryStream(logQueue, conf, currentPosition, source.getWALFileLengthProvider(),
-              source.getServerWALsBelongTo(), source.getSourceMetrics(), walGroupId);
-          while (isReaderRunning()) { // loop here to keep reusing stream while we can
-            if (!source.isPeerEnabled()) {
-              Threads.sleep(sleepForRetries);
-              continue;
-            }
-            if (!checkQuota()) {
-              continue;
-            }
-
-            batch = createBatch(entryStream);
-            batch = readWALEntries(entryStream, batch);
+    while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
+      WALEntryBatch batch = null;
+      try (WALEntryStream entryStream =
+          new WALEntryStream(logQueue, conf, currentPosition,
+              source.getWALFileLengthProvider(), source.getServerWALsBelongTo(),
+              source.getSourceMetrics(), walGroupId)) {
+        while (isReaderRunning()) { // loop here to keep reusing stream while we can
+          batch = null;
+          if (!source.isPeerEnabled()) {
+            Threads.sleep(sleepForRetries);
+            continue;
+          }
+          if (!checkQuota()) {
+            continue;
+          }
+          batch = tryAdvanceStreamAndCreateWALBatch(entryStream);
+          if (batch == null) {
+            // got no entries and didn't advance position in WAL
+            handleEmptyWALEntryBatch();
+            entryStream.reset(); // reuse stream
+            continue;
+          }
+          // if we have already switched a file, skip reading and put it directly to the ship queue
+          if (!batch.isEndOfFile()) {
+            readWALEntries(entryStream, batch);
             currentPosition = entryStream.getPosition();
-            if (batch == null) {
-              // either the queue have no WAL to read
-              // or got no new entries (didn't advance position in WAL)
-              handleEmptyWALEntryBatch();
-              entryStream.reset(); // reuse stream
-            } else {
-              addBatchToShippingQueue(batch);
-            }
           }
-        } catch (WALEntryFilterRetryableException | IOException e) { // stream related
-          if (handleEofException(e, batch)) {
-            sleepMultiplier = 1;
-          } else {
-            LOG.warn("Failed to read stream of replication entries "
-              + "or replication filter is recovering", e);
-            if (sleepMultiplier < maxRetriesMultiplier) {
-              sleepMultiplier++;
-            }
-            Threads.sleep(sleepForRetries * sleepMultiplier);
+          // need to propagate the batch even it has no entries since it may carry the last
+          // sequence id information for serial replication.
+          LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
+          entryBatchQueue.put(batch);
+          sleepMultiplier = 1;
+        }
+      } catch (IOException e) { // stream related
+        if (!handleEofException(e, batch)) {

Review comment:
       I think we need to reset the sleepMultiplier to 1 in the else part of this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org