You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by GitBox <gi...@apache.org> on 2021/02/23 00:11:03 UTC

[GitHub] [hbase] xcangCRM commented on a change in pull request #2975: HBASE-25596: Fix NPE and avoid permanent unreplicated data due to EOF

xcangCRM commented on a change in pull request #2975:
URL: https://github.com/apache/hbase/pull/2975#discussion_r580678618



##########
File path: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
##########
@@ -542,6 +556,173 @@ public WALEntryFilter getWALEntryfilter() {
     });
   }
 
+  @Test
+  public void testReplicationOnEmptyLogAtTheEndOfQueueWithMultipleLogs() throws Exception {
+    final String logPrefix = "logPrefix";
+    Mocks mocks = new Mocks();
+    // set table cfs to filter all cells out
+    final TableName replicatedTable = TableName.valueOf("replicated_table");
+    final Map<TableName, List<String>> cfs =
+      Collections.singletonMap(replicatedTable, Collections.<String>emptyList());
+    when(mocks.peer.getTableCFs()).thenReturn(cfs);
+
+    // Append 3 entries in a log
+    final Path log1 = new Path(logDir, logPrefix + ".1");
+    WALProvider.Writer writer1 = WALFactory.createWALWriter(FS, log1, TEST_UTIL.getConfiguration());
+    appendEntries(writer1, 3);
+
+    // Create a 0 length log.
+    Path emptyLog = new Path(logDir, logPrefix + ".2");
+    FSDataOutputStream fsdos = FS.create(emptyLog);
+    fsdos.close();
+    assertEquals(0, FS.getFileStatus(emptyLog).getLen());
+
+    // Replication end point with no filter
+    final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest() {
+      @Override
+      public WALEntryFilter getWALEntryfilter() {
+        return null;
+      }
+    };
+
+    final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, true);
+    source.run();
+    source.enqueueLog(log1);
+    source.enqueueLog(emptyLog);
+
+    // Wait for source to replicate
+    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+      @Override public boolean evaluate() {
+        return endpoint.replicateCount.get() == 1;
+      }
+    });
+
+    // Wait and verify if all the entries get replicated for non empty logs
+    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+      @Override public boolean evaluate() {
+        return endpoint.lastEntries.size() == 3;
+      }
+    });
+
+    // Wait and verify if log queue has been drained fully
+    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+      @Override public boolean evaluate() {
+        return source.getQueues().get(logPrefix).isEmpty();
+      }
+    });
+  }
+
+  @Test
+  public void testReplicationOnEmptyLogAtTheEndOfQueueWithSingleLog() throws Exception {
+    final String logPrefix = "logPrefix";
+    Mocks mocks = new Mocks();
+    // set table cfs to filter all cells out
+    final TableName replicatedTable = TableName.valueOf("replicated_table");
+    final Map<TableName, List<String>> cfs =
+      Collections.singletonMap(replicatedTable, Collections.<String>emptyList());
+    when(mocks.peer.getTableCFs()).thenReturn(cfs);
+
+    // Create a 0 length log.
+    Path emptyLog = new Path(logDir, logPrefix + ".1");
+    FSDataOutputStream fsdos = FS.create(emptyLog);
+    fsdos.close();
+    assertEquals(0, FS.getFileStatus(emptyLog).getLen());
+
+    // Replication end point with no filter
+    final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest() {
+      @Override
+      public WALEntryFilter getWALEntryfilter() {
+        return null;
+      }
+    };
+
+    final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, true);
+    source.run();
+    source.enqueueLog(emptyLog);
+
+    // Wait and verify if no entry got replicated
+    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+      @Override public boolean evaluate() {
+        return endpoint.lastEntries == null;
+      }
+    });
+
+    // Wait and verify get is queue is empty
+    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+      @Override public boolean evaluate() {
+        return source.getQueues().get(logPrefix).isEmpty();
+      }
+    });
+  }
+
+  @Test
+  public void testReplicationOnEmptyLogBetweenTheNonEmptyLogsInLogQueue() throws Exception {
+    final String logPrefix = "logPrefix";
+    Mocks mocks = new Mocks();
+    // set table cfs to filter all cells out
+    final TableName replicatedTable = TableName.valueOf("replicated_table");
+    final Map<TableName, List<String>> cfs =
+      Collections.singletonMap(replicatedTable, Collections.<String>emptyList());
+    when(mocks.peer.getTableCFs()).thenReturn(cfs);
+
+    // Append 3 entries in a log
+    final Path log1 = new Path(logDir, logPrefix + ".11");
+    WALProvider.Writer writer1 = WALFactory.createWALWriter(FS, log1, TEST_UTIL.getConfiguration());
+    appendEntries(writer1, 3);
+
+    // Create a 0 length log.
+    Path emptyLog = new Path(logDir, logPrefix + ".12");
+    FSDataOutputStream fsdos = FS.create(emptyLog);
+    fsdos.close();
+    assertEquals(0, FS.getFileStatus(emptyLog).getLen());
+
+    // Append 5 entries in a log
+    final Path log3 = new Path(logDir, logPrefix + ".13");
+    WALProvider.Writer writer3 = WALFactory.createWALWriter(FS, log3, TEST_UTIL.getConfiguration());
+    appendEntries(writer3, 5);
+
+    // Append 10 entries in a log
+    final Path log4 = new Path(logDir, logPrefix + ".14");
+    WALProvider.Writer writer4 = WALFactory.createWALWriter(FS, log4, TEST_UTIL.getConfiguration());
+    appendEntries(writer4, 10);
+
+    // Replication end point with no filter
+    final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest() {
+      @Override
+      public WALEntryFilter getWALEntryfilter() {
+        return null;
+      }
+    };
+
+    final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, true);
+    source.run();
+    source.enqueueLog(log1);
+    source.enqueueLog(emptyLog);
+    source.enqueueLog(log3);
+    source.enqueueLog(log4);
+
+    // Wait for source to replicate
+    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+      @Override public boolean evaluate() {
+        return endpoint.replicateCount.get() == 2;
+      }
+    });
+
+    // Wait and verify the last replicated entries

Review comment:
       great test
   Nit comment:
   I see line 711 to line 722 can be extracted as a method and reused since you do this test for above 3 tests.
   The only parameter you need to provide is length of replicated entries.
   Just a suggestion if you think it makes sense. 
   

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
##########
@@ -133,71 +134,126 @@ public ReplicationSourceWALReaderThread(ReplicationSourceManager manager,
   @Override
   public void run() {
     int sleepMultiplier = 1;
-    while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
-      try (WALEntryStream entryStream =
-          new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics)) {
-        while (isReaderRunning()) { // loop here to keep reusing stream while we can
-          if (!source.isPeerEnabled()) {
-            Threads.sleep(sleepForRetries);
-            continue;
-          }
-          if (!checkQuota()) {
-            continue;
-          }
-          WALEntryBatch batch = new WALEntryBatch(replicationBatchCountCapacity);
-          boolean hasNext;
-          while ((hasNext = entryStream.hasNext()) == true) {
-            Entry entry = entryStream.next();
-            entry = filterEntry(entry);
-            if (entry != null) {
-              WALEdit edit = entry.getEdit();
-              if (edit != null && !edit.isEmpty()) {
-                long entrySize = getEntrySizeIncludeBulkLoad(entry);
-                long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
-                batch.addEntry(entry, entrySize);
-                updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
-                boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
-                // Stop if too many entries or too big
-                if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
+    WALEntryBatch batch = null;
+    WALEntryStream entryStream =
+      new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics);
+    try {
+      while (isReaderRunning()) { // we only loop back here if something fatal happens to stream
+        try {
+          entryStream = new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics);
+          while (isReaderRunning()) { // loop here to keep reusing stream while we can
+            if (!source.isPeerEnabled()) {
+              Threads.sleep(sleepForRetries);
+              continue;
+            }
+            if (!checkQuota()) {
+              continue;
+            }
+            batch = new WALEntryBatch(replicationBatchCountCapacity);
+            boolean hasNext = entryStream.hasNext();
+            while (hasNext) {
+              Entry entry = entryStream.next();
+              entry = filterEntry(entry);
+              if (entry != null) {
+                WALEdit edit = entry.getEdit();
+                if (edit != null && !edit.isEmpty()) {
+                  long entrySize = getEntrySizeIncludeBulkLoad(entry);
+                  long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
+                  batch.addEntry(entry, entrySize);
+                  updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
+                  boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
+                  // Stop if too many entries or too big
+                  if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
                     || batch.getNbEntries() >= replicationBatchCountCapacity) {
-                  break;
+                    break;
+                  }
                 }
               }
+              hasNext = entryStream.hasNext();
             }
-          }
 
-          updateBatch(entryStream, batch, hasNext);
-          if (isShippable(batch)) {
-            sleepMultiplier = 1;
-            entryBatchQueue.put(batch);
-            if (!batch.hasMoreEntries()) {
-              // we're done with queue recovery, shut ourselves down
-              setReaderRunning(false);
+            // If the batch has data to max capacity or stream doesn't have anything
+            // try to ship it
+            if (updateBatchAndAddInShippingQueue(entryStream, batch, hasNext, false)) {
+              sleepMultiplier = 1;
             }
+          }
+        } catch (IOException | WALEntryStreamRuntimeException e) { // stream related
+          if (handleEofException(e, entryStream, batch)) {
+            sleepMultiplier = 1;
           } else {
-            Thread.sleep(sleepForRetries);
+            if (sleepMultiplier < maxRetriesMultiplier) {
+              LOG.debug("Failed to read stream of replication entries: " + e);
+              sleepMultiplier++;
+            } else {
+              LOG.error("Failed to read stream of replication entries", e);
+            }
+            Threads.sleep(sleepForRetries * sleepMultiplier);
           }
-          resetStream(entryStream);
+        } catch (InterruptedException e) {
+          LOG.trace("Interrupted while sleeping between WAL reads");
+          Thread.currentThread().interrupt();
+        } finally {
+          entryStream.close();

Review comment:
       will we be able to put entryStream into try-with-resource paradigm  above? 

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
##########
@@ -133,71 +134,126 @@ public ReplicationSourceWALReaderThread(ReplicationSourceManager manager,
   @Override
   public void run() {
     int sleepMultiplier = 1;
-    while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
-      try (WALEntryStream entryStream =
-          new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics)) {
-        while (isReaderRunning()) { // loop here to keep reusing stream while we can
-          if (!source.isPeerEnabled()) {
-            Threads.sleep(sleepForRetries);
-            continue;
-          }
-          if (!checkQuota()) {
-            continue;
-          }
-          WALEntryBatch batch = new WALEntryBatch(replicationBatchCountCapacity);
-          boolean hasNext;
-          while ((hasNext = entryStream.hasNext()) == true) {
-            Entry entry = entryStream.next();
-            entry = filterEntry(entry);
-            if (entry != null) {
-              WALEdit edit = entry.getEdit();
-              if (edit != null && !edit.isEmpty()) {
-                long entrySize = getEntrySizeIncludeBulkLoad(entry);
-                long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
-                batch.addEntry(entry, entrySize);
-                updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
-                boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
-                // Stop if too many entries or too big
-                if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
+    WALEntryBatch batch = null;
+    WALEntryStream entryStream =
+      new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics);
+    try {
+      while (isReaderRunning()) { // we only loop back here if something fatal happens to stream
+        try {
+          entryStream = new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics);
+          while (isReaderRunning()) { // loop here to keep reusing stream while we can
+            if (!source.isPeerEnabled()) {
+              Threads.sleep(sleepForRetries);
+              continue;
+            }
+            if (!checkQuota()) {
+              continue;
+            }
+            batch = new WALEntryBatch(replicationBatchCountCapacity);
+            boolean hasNext = entryStream.hasNext();
+            while (hasNext) {
+              Entry entry = entryStream.next();
+              entry = filterEntry(entry);
+              if (entry != null) {
+                WALEdit edit = entry.getEdit();
+                if (edit != null && !edit.isEmpty()) {
+                  long entrySize = getEntrySizeIncludeBulkLoad(entry);
+                  long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
+                  batch.addEntry(entry, entrySize);
+                  updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
+                  boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
+                  // Stop if too many entries or too big
+                  if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
                     || batch.getNbEntries() >= replicationBatchCountCapacity) {
-                  break;
+                    break;
+                  }
                 }
               }
+              hasNext = entryStream.hasNext();
             }
-          }
 
-          updateBatch(entryStream, batch, hasNext);
-          if (isShippable(batch)) {
-            sleepMultiplier = 1;
-            entryBatchQueue.put(batch);
-            if (!batch.hasMoreEntries()) {
-              // we're done with queue recovery, shut ourselves down
-              setReaderRunning(false);
+            // If the batch has data to max capacity or stream doesn't have anything
+            // try to ship it
+            if (updateBatchAndAddInShippingQueue(entryStream, batch, hasNext, false)) {
+              sleepMultiplier = 1;
             }
+          }
+        } catch (IOException | WALEntryStreamRuntimeException e) { // stream related
+          if (handleEofException(e, entryStream, batch)) {
+            sleepMultiplier = 1;
           } else {
-            Thread.sleep(sleepForRetries);
+            if (sleepMultiplier < maxRetriesMultiplier) {
+              LOG.debug("Failed to read stream of replication entries: " + e);
+              sleepMultiplier++;
+            } else {
+              LOG.error("Failed to read stream of replication entries", e);
+            }
+            Threads.sleep(sleepForRetries * sleepMultiplier);
           }
-          resetStream(entryStream);
+        } catch (InterruptedException e) {
+          LOG.trace("Interrupted while sleeping between WAL reads");
+          Thread.currentThread().interrupt();
+        } finally {
+          entryStream.close();
         }
-      } catch (IOException | WALEntryStreamRuntimeException e) { // stream related
-        if (sleepMultiplier < maxRetriesMultiplier) {
-          LOG.debug("Failed to read stream of replication entries: " + e);
-          sleepMultiplier++;
-        } else {
-          LOG.error("Failed to read stream of replication entries", e);
-          handleEofException(e);
-        }
-        Threads.sleep(sleepForRetries * sleepMultiplier);
-      } catch (InterruptedException e) {
-        LOG.trace("Interrupted while sleeping between WAL reads");
-        Thread.currentThread().interrupt();
       }
+    } catch (IOException e) {
+      if (sleepMultiplier < maxRetriesMultiplier) {
+        LOG.debug("Failed to read stream of replication entries: " + e);
+        sleepMultiplier++;
+      } else {
+        LOG.error("Failed to read stream of replication entries", e);
+      }
+      Threads.sleep(sleepForRetries * sleepMultiplier);
+    } catch (InterruptedException e) {
+      LOG.trace("Interrupted while sleeping between WAL reads");
+      Thread.currentThread().interrupt();
     }
   }
 
-  private void updateBatch(WALEntryStream entryStream, WALEntryBatch batch, boolean moreData) {
+  /**
+   * Update the batch try to ship and return true if shipped
+   * @param entryStream stream of the WALs
+   * @param batch Batch of entries to ship
+   * @param hasMoreData if the stream has more yet more data to read
+   * @param isEOFException if we have hit the EOF exception before this. For EOF exception,
+   *                      we do not want to reset the stream since entry stream doesn't
+   *                      have correct information.
+   * @return if batch is shipped successfully
+   * @throws InterruptedException throws interrupted exception
+   * @throws IOException throws io exception from stream
+   */
+  private boolean updateBatchAndAddInShippingQueue(WALEntryStream entryStream, WALEntryBatch batch,
+      boolean hasMoreData, boolean isEOFException) throws InterruptedException, IOException {
+    updateBatch(entryStream, batch, hasMoreData, isEOFException);
+    boolean isDataQueued = false;
+    if (isShippable(batch)) {
+      isDataQueued = true;
+      entryBatchQueue.put(batch);
+      if (!batch.hasMoreEntries()) {
+        // we're done with queue recovery, shut ourselves down

Review comment:
       worth adding a debugging log here?

##########
File path: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
##########
@@ -726,8 +727,62 @@ public void testEOFExceptionForRecoveredQueue() throws Exception {
         queue, 0, fs, conf, getDummyFilter(),
         new MetricsSource("1"), (ReplicationSource) source);
     reader.run();
+    assertEquals(0, queue.size());
+  }
+
+  @Test
+  public void testEOFExceptionForRecoveredQueueWithMultipleLogs() throws Exception {
+    PriorityBlockingQueue<Path> queue =
+      new PriorityBlockingQueue<>(5, new ReplicationSource.LogsComparator());
+
+    // Create a 0 length log.
+    Path emptyLog = new Path("log.2");
+    FSDataOutputStream fsdos = fs.create(emptyLog);
+    fsdos.close();
+    assertEquals(0, fs.getFileStatus(emptyLog).getLen());
+    queue.add(emptyLog);
+
+    final Path log1 = new Path("log.1");
+    WALProvider.Writer writer1 = WALFactory.createWALWriter(fs, log1, TEST_UTIL.getConfiguration());
+    appendEntries(writer1, 3);
+    queue.add(log1);
+
+    ReplicationSource source = Mockito.mock(ReplicationSource.class);
+    ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
+    // Make it look like the source is from recovered source.
+    when(mockSourceManager.getOldSources())
+      .thenReturn(new ArrayList<>(Arrays.asList((ReplicationSourceInterface)source)));
+    when(source.isPeerEnabled()).thenReturn(true);
+    when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+    // Override the max retries multiplier to fail fast.
+    conf.setInt("replication.source.maxretriesmultiplier", 1);
+    conf.setBoolean("replication.source.eof.autorecovery", true);
+    // Create a reader thread.
+    ReplicationSourceWALReaderThread reader =
+      new ReplicationSourceWALReaderThread(mockSourceManager, getRecoveredQueueInfo(),
+        queue, 0, fs, conf, getDummyFilter(),
+        new MetricsSource("1"), (ReplicationSource) source);
+    reader.run();
+

Review comment:
       do you need to do some wait here? How did you drain the queue? I might be missing something here. thanks 

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
##########
@@ -133,71 +134,126 @@ public ReplicationSourceWALReaderThread(ReplicationSourceManager manager,
   @Override
   public void run() {
     int sleepMultiplier = 1;
-    while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
-      try (WALEntryStream entryStream =
-          new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics)) {
-        while (isReaderRunning()) { // loop here to keep reusing stream while we can
-          if (!source.isPeerEnabled()) {
-            Threads.sleep(sleepForRetries);
-            continue;
-          }
-          if (!checkQuota()) {
-            continue;
-          }
-          WALEntryBatch batch = new WALEntryBatch(replicationBatchCountCapacity);
-          boolean hasNext;
-          while ((hasNext = entryStream.hasNext()) == true) {
-            Entry entry = entryStream.next();
-            entry = filterEntry(entry);
-            if (entry != null) {
-              WALEdit edit = entry.getEdit();
-              if (edit != null && !edit.isEmpty()) {
-                long entrySize = getEntrySizeIncludeBulkLoad(entry);
-                long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
-                batch.addEntry(entry, entrySize);
-                updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
-                boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
-                // Stop if too many entries or too big
-                if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
+    WALEntryBatch batch = null;
+    WALEntryStream entryStream =
+      new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics);
+    try {
+      while (isReaderRunning()) { // we only loop back here if something fatal happens to stream
+        try {
+          entryStream = new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics);
+          while (isReaderRunning()) { // loop here to keep reusing stream while we can
+            if (!source.isPeerEnabled()) {
+              Threads.sleep(sleepForRetries);
+              continue;
+            }
+            if (!checkQuota()) {
+              continue;
+            }
+            batch = new WALEntryBatch(replicationBatchCountCapacity);
+            boolean hasNext = entryStream.hasNext();
+            while (hasNext) {
+              Entry entry = entryStream.next();
+              entry = filterEntry(entry);
+              if (entry != null) {
+                WALEdit edit = entry.getEdit();
+                if (edit != null && !edit.isEmpty()) {
+                  long entrySize = getEntrySizeIncludeBulkLoad(entry);
+                  long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
+                  batch.addEntry(entry, entrySize);
+                  updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
+                  boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
+                  // Stop if too many entries or too big
+                  if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
                     || batch.getNbEntries() >= replicationBatchCountCapacity) {
-                  break;
+                    break;
+                  }
                 }
               }
+              hasNext = entryStream.hasNext();
             }
-          }
 
-          updateBatch(entryStream, batch, hasNext);
-          if (isShippable(batch)) {
-            sleepMultiplier = 1;
-            entryBatchQueue.put(batch);
-            if (!batch.hasMoreEntries()) {
-              // we're done with queue recovery, shut ourselves down
-              setReaderRunning(false);
+            // If the batch has data to max capacity or stream doesn't have anything
+            // try to ship it
+            if (updateBatchAndAddInShippingQueue(entryStream, batch, hasNext, false)) {
+              sleepMultiplier = 1;
             }
+          }
+        } catch (IOException | WALEntryStreamRuntimeException e) { // stream related
+          if (handleEofException(e, entryStream, batch)) {
+            sleepMultiplier = 1;
           } else {
-            Thread.sleep(sleepForRetries);
+            if (sleepMultiplier < maxRetriesMultiplier) {
+              LOG.debug("Failed to read stream of replication entries: " + e);
+              sleepMultiplier++;
+            } else {
+              LOG.error("Failed to read stream of replication entries", e);
+            }
+            Threads.sleep(sleepForRetries * sleepMultiplier);
           }
-          resetStream(entryStream);
+        } catch (InterruptedException e) {
+          LOG.trace("Interrupted while sleeping between WAL reads");
+          Thread.currentThread().interrupt();
+        } finally {
+          entryStream.close();
         }
-      } catch (IOException | WALEntryStreamRuntimeException e) { // stream related
-        if (sleepMultiplier < maxRetriesMultiplier) {
-          LOG.debug("Failed to read stream of replication entries: " + e);
-          sleepMultiplier++;
-        } else {
-          LOG.error("Failed to read stream of replication entries", e);
-          handleEofException(e);
-        }
-        Threads.sleep(sleepForRetries * sleepMultiplier);
-      } catch (InterruptedException e) {
-        LOG.trace("Interrupted while sleeping between WAL reads");
-        Thread.currentThread().interrupt();
       }
+    } catch (IOException e) {
+      if (sleepMultiplier < maxRetriesMultiplier) {
+        LOG.debug("Failed to read stream of replication entries: " + e);
+        sleepMultiplier++;
+      } else {
+        LOG.error("Failed to read stream of replication entries", e);
+      }
+      Threads.sleep(sleepForRetries * sleepMultiplier);
+    } catch (InterruptedException e) {
+      LOG.trace("Interrupted while sleeping between WAL reads");
+      Thread.currentThread().interrupt();
     }
   }
 
-  private void updateBatch(WALEntryStream entryStream, WALEntryBatch batch, boolean moreData) {
+  /**
+   * Update the batch try to ship and return true if shipped
+   * @param entryStream stream of the WALs
+   * @param batch Batch of entries to ship
+   * @param hasMoreData if the stream has more yet more data to read
+   * @param isEOFException if we have hit the EOF exception before this. For EOF exception,
+   *                      we do not want to reset the stream since entry stream doesn't
+   *                      have correct information.
+   * @return if batch is shipped successfully
+   * @throws InterruptedException throws interrupted exception
+   * @throws IOException throws io exception from stream
+   */
+  private boolean updateBatchAndAddInShippingQueue(WALEntryStream entryStream, WALEntryBatch batch,

Review comment:
       nit:
   name is bit long,
   how about updateBatchAndShippingQueue

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
##########
@@ -133,71 +134,126 @@ public ReplicationSourceWALReaderThread(ReplicationSourceManager manager,
   @Override
   public void run() {
     int sleepMultiplier = 1;
-    while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
-      try (WALEntryStream entryStream =
-          new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics)) {
-        while (isReaderRunning()) { // loop here to keep reusing stream while we can
-          if (!source.isPeerEnabled()) {
-            Threads.sleep(sleepForRetries);
-            continue;
-          }
-          if (!checkQuota()) {
-            continue;
-          }
-          WALEntryBatch batch = new WALEntryBatch(replicationBatchCountCapacity);
-          boolean hasNext;
-          while ((hasNext = entryStream.hasNext()) == true) {
-            Entry entry = entryStream.next();
-            entry = filterEntry(entry);
-            if (entry != null) {
-              WALEdit edit = entry.getEdit();
-              if (edit != null && !edit.isEmpty()) {
-                long entrySize = getEntrySizeIncludeBulkLoad(entry);
-                long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
-                batch.addEntry(entry, entrySize);
-                updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
-                boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
-                // Stop if too many entries or too big
-                if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
+    WALEntryBatch batch = null;
+    WALEntryStream entryStream =
+      new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics);
+    try {
+      while (isReaderRunning()) { // we only loop back here if something fatal happens to stream
+        try {
+          entryStream = new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics);
+          while (isReaderRunning()) { // loop here to keep reusing stream while we can
+            if (!source.isPeerEnabled()) {
+              Threads.sleep(sleepForRetries);
+              continue;
+            }
+            if (!checkQuota()) {
+              continue;
+            }
+            batch = new WALEntryBatch(replicationBatchCountCapacity);
+            boolean hasNext = entryStream.hasNext();
+            while (hasNext) {
+              Entry entry = entryStream.next();
+              entry = filterEntry(entry);
+              if (entry != null) {
+                WALEdit edit = entry.getEdit();
+                if (edit != null && !edit.isEmpty()) {
+                  long entrySize = getEntrySizeIncludeBulkLoad(entry);
+                  long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
+                  batch.addEntry(entry, entrySize);
+                  updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
+                  boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
+                  // Stop if too many entries or too big
+                  if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
                     || batch.getNbEntries() >= replicationBatchCountCapacity) {
-                  break;
+                    break;
+                  }
                 }
               }
+              hasNext = entryStream.hasNext();
             }
-          }
 
-          updateBatch(entryStream, batch, hasNext);
-          if (isShippable(batch)) {
-            sleepMultiplier = 1;
-            entryBatchQueue.put(batch);
-            if (!batch.hasMoreEntries()) {
-              // we're done with queue recovery, shut ourselves down
-              setReaderRunning(false);
+            // If the batch has data to max capacity or stream doesn't have anything
+            // try to ship it
+            if (updateBatchAndAddInShippingQueue(entryStream, batch, hasNext, false)) {
+              sleepMultiplier = 1;
             }
+          }
+        } catch (IOException | WALEntryStreamRuntimeException e) { // stream related
+          if (handleEofException(e, entryStream, batch)) {
+            sleepMultiplier = 1;
           } else {
-            Thread.sleep(sleepForRetries);
+            if (sleepMultiplier < maxRetriesMultiplier) {
+              LOG.debug("Failed to read stream of replication entries: " + e);

Review comment:
       Do you think it may be valuable to log as info for the failed reason even we are going to retry it? 
   In case the final time exception is different from early tries'. 

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
##########
@@ -511,9 +587,9 @@ public boolean isEmpty() {
       return walEntriesWithSize.isEmpty();
     }
 
-    public void updatePosition(WALEntryStream entryStream) {
-      lastWalPath = entryStream.getCurrentPath();
-      lastWalPosition = entryStream.getPosition();
+    public void updatePosition(Path currentPath, long currentPosition) {

Review comment:
       can you add javadoc on this method to make it clear what's expected for these parameters since it's a public method?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org