You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2021/02/23 21:20:32 UTC

[hbase] branch branch-1 updated: HBASE-25596: Fix NPE and avoid permanent unreplicated data due to EOF (#2975)

This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new 19fe18e  HBASE-25596: Fix NPE and avoid permanent unreplicated data due to EOF (#2975)
19fe18e is described below

commit 19fe18e466a2c4eb72c5b3c9dfbbbbb09e11df46
Author: Sandeep Pal <50...@users.noreply.github.com>
AuthorDate: Tue Feb 23 13:20:00 2021 -0800

    HBASE-25596: Fix NPE and avoid permanent unreplicated data due to EOF (#2975)
    
    Signed-off-by: Andrew Purtell <ap...@apache.org>
---
 .../regionserver/ReplicationSource.java            |   5 +
 .../regionserver/ReplicationSourceManager.java     |   2 +-
 .../ReplicationSourceWALReaderThread.java          | 208 +++++++++++-----
 .../replication/regionserver/WALEntryStream.java   |  22 +-
 .../hbase/replication/TestReplicationSource.java   | 265 +++++++++++++++++----
 .../regionserver/TestReplicationSourceBase.java    |   1 -
 .../regionserver/TestWALEntryStream.java           |  56 ++++-
 7 files changed, 443 insertions(+), 116 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index a58289e..969e8ca 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -221,6 +221,11 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
     }
   }
 
+  @InterfaceAudience.Private
+  public Map<String, PriorityBlockingQueue<Path>> getQueues() {
+    return logQueue.getQueues();
+  }
+
   @Override
   public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
       throws ReplicationException {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index d9435a3..a8e8e76 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -183,7 +183,7 @@ public class ReplicationSourceManager implements ReplicationListener {
    * position. It will also clean old logs from the queue.
    * @param log Path to the log currently being replicated from
    *            replication status in zookeeper. It will also delete older entries.
-   * @param id id of the peer cluster
+   * @param id id of the replication queue
    * @param position current location in the log
    * @param queueRecovered indicates if this queue comes from another region server
    * @param holdLogInZK if true then the log is retained in ZK
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
index bd155d5..a1d64ca 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
@@ -50,7 +50,8 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALKey;
 
 /**
- * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches onto a queue
+ * Reads and filters WAL entries, groups the filtered entries into batches,
+ * and puts the batches onto a queue
  *
  */
 @InterfaceAudience.Private
@@ -88,7 +89,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
    * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
    * entries, and puts them on a batch queue.
    * @param manager replication manager
-   * @param replicationQueueInfo
+   * @param replicationQueueInfo replication queue info
    * @param logQueue The WAL queue to read off of
    * @param startPosition position in the first WAL to start reading from
    * @param fs the files system to use
@@ -135,71 +136,128 @@ public class ReplicationSourceWALReaderThread extends Thread {
   @Override
   public void run() {
     int sleepMultiplier = 1;
-    while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
-      try (WALEntryStream entryStream =
-          new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics, walGroupId)) {
-        while (isReaderRunning()) { // loop here to keep reusing stream while we can
-          if (!source.isPeerEnabled()) {
-            Threads.sleep(sleepForRetries);
-            continue;
-          }
-          if (!checkQuota()) {
-            continue;
-          }
-          WALEntryBatch batch = new WALEntryBatch(replicationBatchCountCapacity);
-          boolean hasNext;
-          while ((hasNext = entryStream.hasNext()) == true) {
-            Entry entry = entryStream.next();
-            entry = filterEntry(entry);
-            if (entry != null) {
-              WALEdit edit = entry.getEdit();
-              if (edit != null && !edit.isEmpty()) {
-                long entrySize = getEntrySizeIncludeBulkLoad(entry);
-                long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
-                batch.addEntry(entry, entrySize);
-                updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
-                boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
-                // Stop if too many entries or too big
-                if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
+    WALEntryBatch batch = null;
+    WALEntryStream entryStream =
+      new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics, walGroupId);
+    try {
+      while (isReaderRunning()) { // we only loop back here if something fatal happens to stream
+        try {
+          entryStream = new WALEntryStream(logQueue, fs, conf,
+            lastReadPosition, metrics, walGroupId);
+          while (isReaderRunning()) { // loop here to keep reusing stream while we can
+            if (!source.isPeerEnabled()) {
+              Threads.sleep(sleepForRetries);
+              continue;
+            }
+            if (!checkQuota()) {
+              continue;
+            }
+            batch = new WALEntryBatch(replicationBatchCountCapacity);
+            boolean hasNext = entryStream.hasNext();
+            while (hasNext) {
+              Entry entry = entryStream.next();
+              entry = filterEntry(entry);
+              if (entry != null) {
+                WALEdit edit = entry.getEdit();
+                if (edit != null && !edit.isEmpty()) {
+                  long entrySize = getEntrySizeIncludeBulkLoad(entry);
+                  long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
+                  batch.addEntry(entry, entrySize);
+                  updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
+                  boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
+                  // Stop if too many entries or too big
+                  if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
                     || batch.getNbEntries() >= replicationBatchCountCapacity) {
-                  break;
+                    break;
+                  }
                 }
               }
+              hasNext = entryStream.hasNext();
             }
-          }
 
-          updateBatch(entryStream, batch, hasNext);
-          if (isShippable(batch)) {
-            sleepMultiplier = 1;
-            entryBatchQueue.put(batch);
-            if (!batch.hasMoreEntries()) {
-              // we're done with queue recovery, shut ourselves down
-              setReaderRunning(false);
+            // If the batch has data to max capacity or stream doesn't have anything
+            // try to ship it
+            if (updateBatchAndShippingQueue(entryStream, batch, hasNext, false)) {
+              sleepMultiplier = 1;
             }
+          }
+        } catch (IOException | WALEntryStreamRuntimeException e) { // stream related
+          if (handleEofException(e, entryStream, batch)) {
+            sleepMultiplier = 1;
           } else {
-            Thread.sleep(sleepForRetries);
+            if (sleepMultiplier < maxRetriesMultiplier) {
+              LOG.debug("Failed to read stream of replication entries: " + e);
+              sleepMultiplier++;
+            } else {
+              LOG.error("Failed to read stream of replication entries", e);
+            }
+            Threads.sleep(sleepForRetries * sleepMultiplier);
           }
-          resetStream(entryStream);
-        }
-      } catch (IOException | WALEntryStreamRuntimeException e) { // stream related
-        if (sleepMultiplier < maxRetriesMultiplier) {
-          LOG.debug("Failed to read stream of replication entries: " + e);
-          sleepMultiplier++;
-        } else {
-          LOG.error("Failed to read stream of replication entries", e);
-          handleEofException(e);
+        } catch (InterruptedException e) {
+          LOG.trace("Interrupted while sleeping between WAL reads");
+          Thread.currentThread().interrupt();
+        } finally {
+          entryStream.close();
         }
-        Threads.sleep(sleepForRetries * sleepMultiplier);
-      } catch (InterruptedException e) {
-        LOG.trace("Interrupted while sleeping between WAL reads");
-        Thread.currentThread().interrupt();
       }
+    } catch (IOException e) {
+      if (sleepMultiplier < maxRetriesMultiplier) {
+        LOG.debug("Failed to read stream of replication entries: " + e);
+        sleepMultiplier++;
+      } else {
+        LOG.error("Failed to read stream of replication entries", e);
+      }
+      Threads.sleep(sleepForRetries * sleepMultiplier);
+    } catch (InterruptedException e) {
+      LOG.trace("Interrupted while sleeping between WAL reads");
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  /**
+   * Update the batch try to ship and return true if shipped
+   * @param entryStream stream of the WALs
+   * @param batch Batch of entries to ship
+   * @param hasMoreData if the stream has more yet more data to read
+   * @param isEOFException if we have hit the EOF exception before this. For EOF exception,
+   *                      we do not want to reset the stream since entry stream doesn't
+   *                      have correct information.
+   * @return if batch is shipped successfully
+   * @throws InterruptedException throws interrupted exception
+   * @throws IOException throws io exception from stream
+   */
+  private boolean updateBatchAndShippingQueue(WALEntryStream entryStream, WALEntryBatch batch,
+      boolean hasMoreData, boolean isEOFException) throws InterruptedException, IOException {
+    updateBatch(entryStream, batch, hasMoreData, isEOFException);
+    boolean isDataQueued = false;
+    if (isShippable(batch)) {
+      isDataQueued = true;
+      entryBatchQueue.put(batch);
+      if (!batch.hasMoreEntries()) {
+        // we're done with queue recovery, shut ourselves down
+        LOG.debug("Stopping the reader after recovering the queue");
+        setReaderRunning(false);
+      }
+    } else {
+      Thread.sleep(sleepForRetries);
+    }
+
+    if (!isEOFException) {
+      resetStream(entryStream);
     }
+    return isDataQueued;
   }
 
-  private void updateBatch(WALEntryStream entryStream, WALEntryBatch batch, boolean moreData) {
+  private void updateBatch(WALEntryStream entryStream, WALEntryBatch batch, boolean moreData,
+      boolean isEOFException) {
     logMessage(batch);
-    batch.updatePosition(entryStream);
+    // In case of EOF exception we can utilize the last read path and position
+    // since we do not have the current information.
+    if (isEOFException) {
+      batch.updatePosition(lastReadPath, lastReadPosition);
+    } else {
+      batch.updatePosition(entryStream.getCurrentPath(), entryStream.getPosition());
+    }
     batch.setMoreEntries(!replicationQueueInfo.isQueueRecovered() || moreData);
   }
 
@@ -229,10 +287,18 @@ public class ReplicationSourceWALReaderThread extends Thread {
     stream.reset(); // reuse stream
   }
 
-  // if we get an EOF due to a zero-length log, and there are other logs in queue
-  // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
-  // enabled, then dump the log
-  private void handleEofException(Exception e) {
+  /**
+   * This is to handle the EOFException from the WAL entry stream. EOFException should
+   * be handled carefully because there are chances of data loss because of never replicating
+   * the data.
+   * If EOFException happens on the last log in recovered queue, we can safely stop
+   * the reader.
+   * If EOException doesn't happen on the last log in recovered queue, we should
+   * not stop the reader.
+   * @return true only the IOE can be handled
+   */
+  private boolean handleEofException(Exception e, WALEntryStream entryStream,
+      WALEntryBatch batch) throws InterruptedException {
     boolean isRecoveredSource = manager.getOldSources().contains(source);
     PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
     // Dump the log even if logQueue size is 1 if the source is from recovered Source since we don't
@@ -245,11 +311,22 @@ public class ReplicationSourceWALReaderThread extends Thread {
           lastReadPath = queue.peek();
           logQueue.remove(walGroupId);
           lastReadPosition = 0;
+
+          // If it was on last log in the recovered queue,
+          // the stream doesn't have more data, we should stop the reader
+          boolean hasMoreData = !queue.isEmpty();
+          // After we removed the WAL from the queue, we should
+          // try shipping the existing batch of entries, we do not want to reset
+          // stream since entry stream doesn't have the correct data at this point
+          updateBatchAndShippingQueue(entryStream, batch, hasMoreData, true);
+          return true;
         }
       } catch (IOException ioe) {
         LOG.warn("Couldn't get file length information about log " + queue.peek());
       }
     }
+
+    return false;
   }
 
   public Path getCurrentPath() {
@@ -299,7 +376,8 @@ public class ReplicationSourceWALReaderThread extends Thread {
     return edit.heapSize() + key.estimatedSerializedSizeOf();
   }
 
-  private void updateBatchStats(WALEntryBatch batch, Entry entry, long entryPosition, long entrySize) {
+  private void updateBatchStats(WALEntryBatch batch, Entry entry,
+    long entryPosition, long entrySize) {
     WALEdit edit = entry.getEdit();
     if (edit != null && !edit.isEmpty()) {
       batch.incrementHeapSize(entrySize);
@@ -409,7 +487,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
    * Holds a batch of WAL entries to replicate, along with some statistics
    *
    */
-  static class WALEntryBatch {
+  final static class WALEntryBatch {
     private List<Pair<Entry, Long>> walEntriesWithSize;
     // last WAL that was read
     private Path lastWalPath;
@@ -515,9 +593,15 @@ public class ReplicationSourceWALReaderThread extends Thread {
       return walEntriesWithSize.isEmpty();
     }
 
-    public void updatePosition(WALEntryStream entryStream) {
-      lastWalPath = entryStream.getCurrentPath();
-      lastWalPosition = entryStream.getPosition();
+    /**
+     * Update the wal entry batch with latest wal and position which will be used by
+     * shipper to update the log position in ZK node
+     * @param currentPath the path of WAL
+     * @param currentPosition the position of the WAL
+     */
+    public void updatePosition(Path currentPath, long currentPosition) {
+      lastWalPath = currentPath;
+      lastWalPosition = currentPosition;
     }
 
     public boolean hasMoreEntries() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index a0b09dd..c667881 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -72,25 +72,23 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
    * @param conf {@link Configuration} to use to create {@link Reader} for this stream
    * @param metrics replication metrics
    * @param walGroupId wal prefix
-   * @throws IOException
    */
   public WALEntryStream(ReplicationSourceLogQueue logQueue, FileSystem fs, Configuration conf,
-      MetricsSource metrics, String walGroupId)
-      throws IOException {
+      MetricsSource metrics, String walGroupId) {
     this(logQueue, fs, conf, 0, metrics, walGroupId);
   }
 
   /**
    * Create an entry stream over the given queue at the given start position
    * @param logQueue the queue of WAL paths
+   * @param fs {@link FileSystem} to use to create {@link Reader} for this stream
    * @param conf the {@link Configuration} to use to create {@link Reader} for this stream
    * @param startPosition the position in the first WAL to start reading at
    * @param metrics the replication metrics
    * @param walGroupId wal prefix
-   * @throws IOException
    */
   public WALEntryStream(ReplicationSourceLogQueue logQueue, FileSystem fs, Configuration conf,
-      long startPosition, MetricsSource metrics, String walGroupId) throws IOException {
+      long startPosition, MetricsSource metrics, String walGroupId) {
     this.logQueue = logQueue;
     this.fs = fs;
     this.conf = conf;
@@ -122,7 +120,9 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
    */
   @Override
   public Entry next() {
-    if (!hasNext()) throw new NoSuchElementException();
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
     Entry save = currentEntry;
     currentEntry = null; // gets reloaded by hasNext()
     return save;
@@ -180,7 +180,7 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
   /**
    * Should be called if the stream is to be reused (i.e. used again after hasNext() has returned
    * false)
-   * @throws IOException
+   * @throws IOException io exception while resetting the reader
    */
   public void reset() throws IOException {
     if (reader != null && currentPath != null) {
@@ -306,7 +306,9 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
     Path nextPath = queue.peek();
     if (nextPath != null) {
       openReader(nextPath);
-      if (reader != null) return true;
+      if (reader != null) {
+        return true;
+      }
     }
     return false;
   }
@@ -349,7 +351,9 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
       handleFileNotFound(path, fnfe);
     }  catch (RemoteException re) {
       IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
-      if (!(ioe instanceof FileNotFoundException)) throw ioe;
+      if (!(ioe instanceof FileNotFoundException)) {
+        throw ioe;
+      }
       handleFileNotFound(path, (FileNotFoundException)ioe);
     } catch (LeaseNotRecoveredException lnre) {
       // HBASE-15019 the WAL was not closed due to some hiccup.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
index ce185f4..b0a2a8c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
@@ -34,6 +34,8 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.mockito.internal.verification.VerificationModeFactory.times;
 
+import com.google.common.collect.Lists;
+
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
@@ -46,10 +48,10 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
@@ -75,6 +77,7 @@ import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSource
 import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
 import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.ByteStringer;
@@ -86,6 +89,7 @@ import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.wal.WALProvider;
+
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -125,6 +129,7 @@ public class TestReplicationSource {
     if (FS.exists(logDir)) {
       FS.delete(logDir, true);
     }
+    conf.setBoolean("replication.source.eof.autorecovery", true);
   }
 
   @Before
@@ -244,7 +249,6 @@ public class TestReplicationSource {
       }
 
     });
-
   }
 
   private void appendEntries(WALProvider.Writer writer, int numEntries) throws IOException {
@@ -295,12 +299,16 @@ public class TestReplicationSource {
           Mockito.anyLong(), Mockito.anyBoolean(), Mockito.anyBoolean());
     }
 
-    ReplicationSource createReplicationSourceWithMocks(ReplicationEndpoint endpoint)
-            throws IOException {
+    ReplicationSource createReplicationSourceWithMocks(ReplicationEndpoint endpoint,
+      boolean isRecovered) throws IOException {
       final ReplicationSource source = new ReplicationSource();
       endpoint.init(context);
       source.init(conf, FS, manager, queues, peers, mock(Stoppable.class),
-              "testPeerClusterZnode", UUID.randomUUID(), endpoint, metrics);
+        "testPeerClusterZnode", UUID.randomUUID(), endpoint, metrics);
+      if (isRecovered) {
+        when(manager.getOldSources())
+          .thenReturn(Lists.<ReplicationSourceInterface>newArrayList(source));
+      }
       return source;
     }
 
@@ -321,48 +329,54 @@ public class TestReplicationSource {
   @Test
   public void testSetLogPositionForWALCurrentlyReadingWhenLogsRolled() throws Exception {
     final int numWALEntries = 5;
-    conf.setInt("replication.source.nb.capacity", numWALEntries);
+    int nbCapacity = conf.getInt("replication.source.nb.capacity", 25000);
+    try {
+      conf.setInt("replication.source.nb.capacity", numWALEntries);
 
-    Mocks mocks = new Mocks();
-    final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest() {
-      @Override
-      public WALEntryFilter getWALEntryfilter() {
-        return null;
-      }
-    };
-    WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null, "test");
-    final Path log1 = new Path(logDir, "log.1");
-    final Path log2 = new Path(logDir, "log.2");
+      Mocks mocks = new Mocks();
+      final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest() {
+        @Override public WALEntryFilter getWALEntryfilter() {
+          return null;
+        }
+      };
+      WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null, "test");
+      final Path log1 = new Path(logDir, "log.1");
+      final Path log2 = new Path(logDir, "log.2");
 
-    WALProvider.Writer writer1 = WALFactory.createWALWriter(FS, log1, TEST_UTIL.getConfiguration());
-    WALProvider.Writer writer2 = WALFactory.createWALWriter(FS, log2, TEST_UTIL.getConfiguration());
+      WALProvider.Writer writer1
+        = WALFactory.createWALWriter(FS, log1, TEST_UTIL.getConfiguration());
+      WALProvider.Writer writer2
+        = WALFactory.createWALWriter(FS, log2, TEST_UTIL.getConfiguration());
 
-    appendEntries(writer1, 3);
-    appendEntries(writer2, 2);
+      appendEntries(writer1, 3);
+      appendEntries(writer2, 2);
 
-    long pos = getPosition(wals, log2, 2);
+      long pos = getPosition(wals, log2, 2);
 
-    final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint);
-    source.run();
+      final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, false);
+      source.run();
 
-    source.enqueueLog(log1);
-    // log rolled
-    source.enqueueLog(log2);
+      source.enqueueLog(log1);
+      // log rolled
+      source.enqueueLog(log2);
 
-    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
-      @Override public boolean evaluate() throws Exception {
-        return endpoint.replicateCount.get() > 0;
-      }
-    });
+      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+        @Override public boolean evaluate() {
+          return endpoint.replicateCount.get() > 0;
+        }
+      });
 
-    ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
-    ArgumentCaptor<Long> positionCaptor = ArgumentCaptor.forClass(Long.class);
-    verify(mocks.manager, times(1))
+      ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
+      ArgumentCaptor<Long> positionCaptor = ArgumentCaptor.forClass(Long.class);
+      verify(mocks.manager, times(1))
         .logPositionAndCleanOldLogs(pathCaptor.capture(), anyString(), positionCaptor.capture(),
-              anyBoolean(), anyBoolean());
-    assertTrue(endpoint.lastEntries.size() == 5);
-    assertThat(pathCaptor.getValue(), is(log2));
-    assertThat(positionCaptor.getValue(), is(pos));
+          anyBoolean(), anyBoolean());
+      assertTrue(endpoint.lastEntries.size() == 5);
+      assertThat(pathCaptor.getValue(), is(log2));
+      assertThat(positionCaptor.getValue(), is(pos));
+    } finally {
+      conf.setInt("replication.source.nb.capacity", nbCapacity);
+    }
   }
 
   @Test
@@ -405,7 +419,7 @@ public class TestReplicationSource {
     writer.close();
 
     Mocks mocks = new Mocks();
-    final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint);
+    final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, false);
     source.run();
 
     source.enqueueLog(log);
@@ -423,7 +437,7 @@ public class TestReplicationSource {
     Mocks mocks = new Mocks();
 
     final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest();
-    final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint);
+    final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, false);
     WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null, "test");
 
     final Path log1 = new Path(logDir, "log.1");
@@ -475,7 +489,7 @@ public class TestReplicationSource {
     final long pos = getPosition(wals, log2, 2);
 
     final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest();
-    final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint);
+    final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, false);
     source.enqueueLog(log1);
     source.enqueueLog(log2);
     source.run();
@@ -529,7 +543,7 @@ public class TestReplicationSource {
       }
     };
 
-    final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint);
+    final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, false);
     source.run();
     source.enqueueLog(log1);
 
@@ -556,6 +570,173 @@ public class TestReplicationSource {
     });
   }
 
+  @Test
+  public void testReplicationOnEmptyLogAtTheEndOfQueueWithMultipleLogs() throws Exception {
+    final String logPrefix = "logPrefix";
+    Mocks mocks = new Mocks();
+    // set table cfs to filter all cells out
+    final TableName replicatedTable = TableName.valueOf("replicated_table");
+    final Map<TableName, List<String>> cfs =
+      Collections.singletonMap(replicatedTable, Collections.<String>emptyList());
+    when(mocks.peer.getTableCFs()).thenReturn(cfs);
+
+    // Append 3 entries in a log
+    final Path log1 = new Path(logDir, logPrefix + ".1");
+    WALProvider.Writer writer1 = WALFactory.createWALWriter(FS, log1, TEST_UTIL.getConfiguration());
+    appendEntries(writer1, 3);
+
+    // Create a 0 length log.
+    Path emptyLog = new Path(logDir, logPrefix + ".2");
+    FSDataOutputStream fsdos = FS.create(emptyLog);
+    fsdos.close();
+    assertEquals(0, FS.getFileStatus(emptyLog).getLen());
+
+    // Replication end point with no filter
+    final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest() {
+      @Override
+      public WALEntryFilter getWALEntryfilter() {
+        return null;
+      }
+    };
+
+    final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, true);
+    source.run();
+    source.enqueueLog(log1);
+    source.enqueueLog(emptyLog);
+
+    // Wait for source to replicate
+    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+      @Override public boolean evaluate() {
+        return endpoint.replicateCount.get() == 1;
+      }
+    });
+
+    // Wait and verify if all the entries get replicated for non empty logs
+    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+      @Override public boolean evaluate() {
+        return endpoint.lastEntries.size() == 3;
+      }
+    });
+
+    // Wait and verify if log queue has been drained fully
+    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+      @Override public boolean evaluate() {
+        return source.getQueues().get(logPrefix).isEmpty();
+      }
+    });
+  }
+
+  @Test
+  public void testReplicationOnEmptyLogAtTheEndOfQueueWithSingleLog() throws Exception {
+    final String logPrefix = "logPrefix";
+    Mocks mocks = new Mocks();
+    // set table cfs to filter all cells out
+    final TableName replicatedTable = TableName.valueOf("replicated_table");
+    final Map<TableName, List<String>> cfs =
+      Collections.singletonMap(replicatedTable, Collections.<String>emptyList());
+    when(mocks.peer.getTableCFs()).thenReturn(cfs);
+
+    // Create a 0 length log.
+    Path emptyLog = new Path(logDir, logPrefix + ".1");
+    FSDataOutputStream fsdos = FS.create(emptyLog);
+    fsdos.close();
+    assertEquals(0, FS.getFileStatus(emptyLog).getLen());
+
+    // Replication end point with no filter
+    final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest() {
+      @Override
+      public WALEntryFilter getWALEntryfilter() {
+        return null;
+      }
+    };
+
+    final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, true);
+    source.run();
+    source.enqueueLog(emptyLog);
+
+    // Wait and verify if no entry got replicated
+    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+      @Override public boolean evaluate() {
+        return endpoint.lastEntries == null;
+      }
+    });
+
+    // Wait and verify get is queue is empty
+    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+      @Override public boolean evaluate() {
+        return source.getQueues().get(logPrefix).isEmpty();
+      }
+    });
+  }
+
+  @Test
+  public void testReplicationOnEmptyLogBetweenTheNonEmptyLogsInLogQueue() throws Exception {
+    final String logPrefix = "logPrefix";
+    Mocks mocks = new Mocks();
+    // set table cfs to filter all cells out
+    final TableName replicatedTable = TableName.valueOf("replicated_table");
+    final Map<TableName, List<String>> cfs =
+      Collections.singletonMap(replicatedTable, Collections.<String>emptyList());
+    when(mocks.peer.getTableCFs()).thenReturn(cfs);
+
+    // Append 3 entries in a log
+    final Path log1 = new Path(logDir, logPrefix + ".11");
+    WALProvider.Writer writer1 = WALFactory.createWALWriter(FS, log1, TEST_UTIL.getConfiguration());
+    appendEntries(writer1, 3);
+
+    // Create a 0 length log.
+    Path emptyLog = new Path(logDir, logPrefix + ".12");
+    FSDataOutputStream fsdos = FS.create(emptyLog);
+    fsdos.close();
+    assertEquals(0, FS.getFileStatus(emptyLog).getLen());
+
+    // Append 5 entries in a log
+    final Path log3 = new Path(logDir, logPrefix + ".13");
+    WALProvider.Writer writer3 = WALFactory.createWALWriter(FS, log3, TEST_UTIL.getConfiguration());
+    appendEntries(writer3, 5);
+
+    // Append 10 entries in a log
+    final Path log4 = new Path(logDir, logPrefix + ".14");
+    WALProvider.Writer writer4 = WALFactory.createWALWriter(FS, log4, TEST_UTIL.getConfiguration());
+    appendEntries(writer4, 10);
+
+    // Replication end point with no filter
+    final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest() {
+      @Override
+      public WALEntryFilter getWALEntryfilter() {
+        return null;
+      }
+    };
+
+    final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, true);
+    source.run();
+    source.enqueueLog(log1);
+    source.enqueueLog(emptyLog);
+    source.enqueueLog(log3);
+    source.enqueueLog(log4);
+
+    // Wait for source to replicate
+    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+      @Override public boolean evaluate() {
+        return endpoint.replicateCount.get() == 2;
+      }
+    });
+
+    // Wait and verify the last replicated entries
+    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+      @Override public boolean evaluate() {
+        return endpoint.lastEntries.size() == 15;
+      }
+    });
+
+    // Wait and verify only one log is there in queue
+    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+      @Override public boolean evaluate() {
+        return source.getQueues().get(logPrefix).size() == 1;
+      }
+    });
+  }
+
   /**
    * Tests that recovered queues are preserved on a regionserver shutdown.
    * See HBASE-18192
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceBase.java
index e94985e..ab4d19d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceBase.java
@@ -76,7 +76,6 @@ public abstract class TestReplicationSourceBase {
   protected static DummyServer server;
 
   @BeforeClass public static void setUpBeforeClass() throws Exception {
-
     conf = HBaseConfiguration.create();
     conf.set("replication.replicationsource.implementation",
       ReplicationSourceDummyWithNoTermination.class.getCanonicalName());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index adf427b..0b01c5f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -81,6 +81,7 @@ import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -741,9 +742,45 @@ public class TestWALEntryStream {
         new ReplicationSourceWALReaderThread(mockSourceManager, getRecoveredQueueInfo(),
         localLogQueue, 0, fs, conf, getDummyFilter(), getMockMetrics(), source, fakeWalGroupId);
     reader.run();
+    assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId));
+  }
+
+  @Test
+  public void testEOFExceptionForRecoveredQueueWithMultipleLogs() throws Exception {
+    ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, getMockMetrics());
+    // Create a 0 length log.
+    Path emptyLog = new Path("log.2");
+    FSDataOutputStream fsdos = fs.create(emptyLog);
+    fsdos.close();
+    assertEquals(0, fs.getFileStatus(emptyLog).getLen());
+    localLogQueue.enqueueLog(emptyLog, fakeWalGroupId);
+
+    final Path log1 = new Path("log.1");
+    WALProvider.Writer writer1 = WALFactory.createWALWriter(fs, log1, TEST_UTIL.getConfiguration());
+    appendEntries(writer1, 3);
+    localLogQueue.enqueueLog(log1, fakeWalGroupId);
+
+    ReplicationSource source = Mockito.mock(ReplicationSource.class);
+    ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
+    // Make it look like the source is from recovered source.
+    when(mockSourceManager.getOldSources())
+      .thenReturn(new ArrayList<>(Arrays.asList((ReplicationSourceInterface)source)));
+    when(source.isPeerEnabled()).thenReturn(true);
+    when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+    // Override the max retries multiplier to fail fast.
+    conf.setInt("replication.source.maxretriesmultiplier", 1);
+    conf.setBoolean("replication.source.eof.autorecovery", true);
+    // Create a reader thread.
+    ReplicationSourceWALReaderThread reader =
+      new ReplicationSourceWALReaderThread(mockSourceManager, getRecoveredQueueInfo(),
+        localLogQueue, 0, fs, conf, getDummyFilter(), getMockMetrics(), source, fakeWalGroupId);
+    assertEquals("Initial log queue size is not correct",
+      2, localLogQueue.getQueueSize(fakeWalGroupId));
+    reader.run();
+
     // ReplicationSourceWALReaderThread#handleEofException method will
     // remove empty log from logQueue.
-    assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId));
+    assertEquals("Log queue should be empty", 0, localLogQueue.getQueueSize(fakeWalGroupId));
   }
 
   private PriorityBlockingQueue<Path> getQueue() {
@@ -757,4 +794,21 @@ public class TestWALEntryStream {
     doNothing().when(source).setOldestWalAge(Mockito.anyInt());
     return source;
   }
+
+  private void appendEntries(WALProvider.Writer writer, int numEntries) throws IOException {
+    for (int i = 0; i < numEntries; i++) {
+      byte[] b = Bytes.toBytes(Integer.toString(i));
+      KeyValue kv = new KeyValue(b,b,b);
+      WALEdit edit = new WALEdit();
+      edit.add(kv);
+      WALKey key = new WALKey(b, TableName.valueOf(b), 0, 0,
+        HConstants.DEFAULT_CLUSTER_ID);
+      NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+      scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
+      key.setScopes(scopes);
+      writer.append(new WAL.Entry(key, edit));
+      writer.sync(false);
+    }
+    writer.close();
+  }
 }