You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2023/03/10 15:03:55 UTC

[hbase] branch branch-2 updated: HBASE-27632 Refactor WAL.Reader implementation so we can better support WAL splitting and replication (#5055)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new cec05562639 HBASE-27632 Refactor WAL.Reader implementation so we can better support WAL splitting and replication (#5055)
cec05562639 is described below

commit cec05562639c3e01d588a7e864f6d476b66a65f1
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Fri Mar 10 21:54:20 2023 +0800

    HBASE-27632 Refactor WAL.Reader implementation so we can better support WAL splitting and replication (#5055)
    
    Signed-off-by: GeorryHuang <hu...@apache.org>
    (cherry picked from commit e48c4485db8d9255510ee1cc9cf465e14de637d7)
    
    Conflicts:
            hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
            hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
            hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java
            hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogTestHelper.java
            hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java
            hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
            hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
            hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
---
 .../hbase/IntegrationTestIngestWithEncryption.java |   4 -
 .../hadoop/hbase/mapreduce/WALInputFormat.java     |  56 +-
 .../hadoop/hbase/master/region/MasterRegion.java   |   3 +
 .../store/region/WALProcedurePrettyPrinter.java    |   3 +-
 .../apache/hadoop/hbase/regionserver/HRegion.java  |  33 +-
 .../hbase/regionserver/wal/AbstractFSWAL.java      |   5 +-
 .../wal/AbstractProtobufLogWriter.java             |  11 +-
 .../wal/AbstractProtobufWALReader.java             | 564 +++++++++++++++++++++
 .../hadoop/hbase/regionserver/wal/Compressor.java  |  10 +-
 .../hbase/regionserver/wal/ProtobufLogReader.java  | 529 -------------------
 .../regionserver/wal/ProtobufWALStreamReader.java  | 136 +++++
 .../regionserver/wal/ProtobufWALTailingReader.java | 331 ++++++++++++
 .../hadoop/hbase/regionserver/wal/ReaderBase.java  | 185 -------
 .../regionserver/wal/SecureProtobufLogReader.java  | 149 ------
 .../regionserver/wal/WALHeaderEOFException.java    |  44 ++
 .../regionserver/ReplicationSourceWALReader.java   | 179 +++----
 .../SerialReplicationSourceWALReader.java          |  33 +-
 .../replication/regionserver/WALEntryStream.java   | 464 +++++++++--------
 .../hadoop/hbase/wal/AbstractFSWALProvider.java    |  75 +--
 .../wal/AbstractRecoveredEditsOutputSink.java      |   3 +-
 .../main/java/org/apache/hadoop/hbase/wal/WAL.java |  15 -
 .../org/apache/hadoop/hbase/wal/WALFactory.java    | 111 ++--
 .../apache/hadoop/hbase/wal/WALPrettyPrinter.java  |  14 +-
 .../org/apache/hadoop/hbase/wal/WALSplitter.java   |  26 +-
 .../apache/hadoop/hbase/wal/WALStreamReader.java   |  64 +++
 .../apache/hadoop/hbase/wal/WALTailingReader.java  | 148 ++++++
 .../TestSequenceIdMonotonicallyIncreasing.java     |   7 +-
 .../hadoop/hbase/master/AbstractTestDLS.java       |  16 -
 .../hadoop/hbase/regionserver/TestHRegion.java     |  14 +-
 .../regionserver/TestHRegionReplayEvents.java      |   8 +-
 .../hbase/regionserver/TestRecoveredEdits.java     |   3 +-
 .../TestWALMonotonicallyIncreasingSeqId.java       |   9 +-
 .../regionserver/wal/AbstractTestProtobufLog.java  |  69 +--
 .../regionserver/wal/AbstractTestWALReplay.java    |  14 +-
 ...der.java => FaultyProtobufWALStreamReader.java} |  20 +-
 .../hbase/regionserver/wal/TestDurability.java     |  10 +-
 .../hbase/regionserver/wal/TestLogRolling.java     |   8 +-
 .../regionserver/wal/TestSecureAsyncWALReplay.java |   3 -
 .../regionserver/wal/TestSecureWALReplay.java      |   3 -
 .../replication/SerialReplicationTestBase.java     |  16 +-
 .../TestReplicationEmptyWALRecovery.java           |   7 +-
 .../hbase/replication/TestSerialReplication.java   |  12 +-
 .../regionserver/TestBasicWALEntryStream.java      | 213 ++++----
 .../TestBasicWALEntryStreamAsyncFSWAL.java         |   3 +
 .../TestBasicWALEntryStreamFSHLog.java             |   3 +
 .../TestRaceWhenCreatingReplicationSource.java     |   8 +-
 .../regionserver/TestReplicationSource.java        |   4 +-
 .../TestWALEntryStreamDifferentCounts.java         |  11 +-
 .../regionserver/WALEntryStreamTestBase.java       |  42 +-
 .../hadoop/hbase/wal/CompressedWALTestBase.java    |   2 +-
 .../hadoop/hbase/wal/NoEOFWALStreamReader.java     |  95 ++++
 .../hadoop/hbase/wal/TestParsePartialWALFile.java  | 213 ++++++++
 .../org/apache/hadoop/hbase/wal/TestSecureWAL.java |  33 +-
 .../apache/hadoop/hbase/wal/TestWALFactory.java    | 302 +++++------
 .../hbase/wal/TestWALOpenAfterDNRollingStart.java  |   4 +-
 .../hadoop/hbase/wal/TestWALReaderOnSecureWAL.java | 225 --------
 .../org/apache/hadoop/hbase/wal/TestWALSplit.java  |  66 +--
 .../hadoop/hbase/wal/WALPerformanceEvaluation.java |   5 +-
 58 files changed, 2548 insertions(+), 2095 deletions(-)

diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithEncryption.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithEncryption.java
index ef2dcc56afb..de69b5fb841 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithEncryption.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithEncryption.java
@@ -26,11 +26,9 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
 import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader;
 import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter;
 import org.apache.hadoop.hbase.testclassification.IntegrationTests;
 import org.apache.hadoop.hbase.util.EncryptionTest;
-import org.apache.hadoop.hbase.wal.WAL.Reader;
 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
 import org.apache.hadoop.util.ToolRunner;
 import org.junit.Before;
@@ -54,8 +52,6 @@ public class IntegrationTestIngestWithEncryption extends IntegrationTestIngest {
       conf.setInt(HFile.FORMAT_VERSION_KEY, 3);
       conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
       conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
-      conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
-        Reader.class);
       conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
         Writer.class);
       conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
index 60c3e46249e..7362f585d31 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
@@ -32,12 +32,14 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WAL.Reader;
 import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -135,7 +137,7 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
    * HLogInputFormat.
    */
   static abstract class WALRecordReader<K extends WALKey> extends RecordReader<K, WALEdit> {
-    private Reader reader = null;
+    private WALStreamReader reader = null;
     // visible until we can remove the deprecated HLogInputFormat
     Entry currentEntry = new Entry();
     private long startTime;
@@ -144,6 +146,47 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
     private Path logFile;
     private long currentPos;
 
+    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION",
+        justification = "HDFS-4380")
+    private WALStreamReader openReader(Path path, long startPosition) throws IOException {
+      long retryInterval = 2000; // 2 sec
+      int maxAttempts = 30;
+      int attempt = 0;
+      Exception ee = null;
+      WALStreamReader reader = null;
+      while (reader == null && attempt++ < maxAttempts) {
+        try {
+          // Detect if this is a new file, if so get a new reader else
+          // reset the current reader so that we see the new data
+          reader =
+            WALFactory.createStreamReader(path.getFileSystem(conf), path, conf, startPosition);
+          return reader;
+        } catch (LeaseNotRecoveredException lnre) {
+          // HBASE-15019 the WAL was not closed due to some hiccup.
+          LOG.warn("Try to recover the WAL lease " + path, lnre);
+          AbstractFSWALProvider.recoverLease(conf, path);
+          reader = null;
+          ee = lnre;
+        } catch (NullPointerException npe) {
+          // Workaround for race condition in HDFS-4380
+          // which throws a NPE if we open a file before any data node has the most recent block
+          // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
+          LOG.warn("Got NPE opening reader, will retry.");
+          reader = null;
+          ee = npe;
+        }
+        if (reader == null) {
+          // sleep before next attempt
+          try {
+            Thread.sleep(retryInterval);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+          }
+        }
+      }
+      throw new IOException("Could not open reader", ee);
+    }
+
     @Override
     public void initialize(InputSplit split, TaskAttemptContext context)
       throws IOException, InterruptedException {
@@ -158,8 +201,7 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
 
     private void openReader(Path path) throws IOException {
       closeReader();
-      reader = AbstractFSWALProvider.openReader(path, conf);
-      seek();
+      reader = openReader(path, currentPos > 0 ? currentPos : -1);
       setCurrentPath(path);
     }
 
@@ -174,12 +216,6 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
       }
     }
 
-    private void seek() throws IOException {
-      if (currentPos != 0) {
-        reader.seek(currentPos);
-      }
-    }
-
     @Override
     public boolean nextKeyValue() throws IOException, InterruptedException {
       if (reader == null) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java
index 1f21e375ce9..2c0c98d86f2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java
@@ -273,6 +273,9 @@ public final class MasterRegion {
     WAL wal = createWAL(walFactory, walRoller, serverName, walFs, walRootDir, regionInfo);
     conf.set(HRegion.SPECIAL_RECOVERED_EDITS_DIR,
       replayEditsDir.makeQualified(walFs.getUri(), walFs.getWorkingDirectory()).toString());
+    // we do not do WAL splitting here so it is possible to have uncleanly closed WAL files, so we
+    // need to ignore EOFException.
+    conf.setBoolean(HRegion.RECOVERED_EDITS_IGNORE_EOF, true);
     return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, td, wal, null, null);
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/WALProcedurePrettyPrinter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/WALProcedurePrettyPrinter.java
index 3070d2732b7..b76680d0fdb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/WALProcedurePrettyPrinter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/WALProcedurePrettyPrinter.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 
@@ -88,7 +89,7 @@ public class WALProcedurePrettyPrinter extends AbstractHBaseTool {
   protected int doWork() throws Exception {
     Path path = new Path(file);
     FileSystem fs = path.getFileSystem(conf);
-    try (WAL.Reader reader = WALFactory.createReader(fs, path, conf)) {
+    try (WALStreamReader reader = WALFactory.createStreamReader(fs, path, conf)) {
       for (;;) {
         WAL.Entry entry = reader.next();
         if (entry == null) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 7edc43f297a..61bd79b0cae 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -183,6 +183,7 @@ import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
 import org.apache.hadoop.hbase.wal.WALSplitUtil;
 import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -263,6 +264,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   public static final String SPECIAL_RECOVERED_EDITS_DIR =
     "hbase.hregion.special.recovered.edits.dir";
 
+  /**
+   * Mainly used for master local region, where we will replay the WAL file directly without
+   * splitting, so it is possible to have WAL files which are not closed cleanly, in this way,
+   * hitting EOF is expected so should not consider it as a critical problem.
+   */
+  public static final String RECOVERED_EDITS_IGNORE_EOF =
+    "hbase.hregion.recovered.edits.ignore.eof";
+
   /**
    * Whether to use {@link MetaCellComparator} even if we are not meta region. Used when creating
    * master local region.
@@ -5286,9 +5295,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     MonitoredTask status = TaskMonitor.get().createStatus(msg);
 
     status.setStatus("Opening recovered edits");
-    WAL.Reader reader = null;
-    try {
-      reader = WALFactory.createReader(fs, edits, conf);
+    try (WALStreamReader reader = WALFactory.createStreamReader(fs, edits, conf)) {
       long currentEditSeqId = -1;
       long currentReplaySeqId = -1;
       long firstSeqIdInLog = -1;
@@ -5442,12 +5449,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           coprocessorHost.postReplayWALs(this.getRegionInfo(), edits);
         }
       } catch (EOFException eof) {
-        Path p = WALSplitUtil.moveAsideBadEditsFile(walFS, edits);
-        msg = "EnLongAddered EOF. Most likely due to Master failure during "
-          + "wal splitting, so we have this data in another edit. Continuing, but renaming " + edits
-          + " as " + p + " for region " + this;
-        LOG.warn(msg, eof);
-        status.abort(msg);
+        if (!conf.getBoolean(RECOVERED_EDITS_IGNORE_EOF, false)) {
+          Path p = WALSplitUtil.moveAsideBadEditsFile(walFS, edits);
+          msg = "EnLongAddered EOF. Most likely due to Master failure during "
+            + "wal splitting, so we have this data in another edit. Continuing, but renaming "
+            + edits + " as " + p + " for region " + this;
+          LOG.warn(msg, eof);
+          status.abort(msg);
+        } else {
+          LOG.warn("EOF while replaying recover edits and config '{}' is true so "
+            + "we will ignore it and continue", RECOVERED_EDITS_IGNORE_EOF, eof);
+        }
       } catch (IOException ioe) {
         // If the IOE resulted from bad file format,
         // then this problem is idempotent and retrying won't help
@@ -5474,9 +5486,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       return currentEditSeqId;
     } finally {
       status.cleanup();
-      if (reader != null) {
-        reader.close();
-      }
     }
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 18b8f7990c8..24e8b9b26a9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -105,8 +105,9 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
  * <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older
  * (smaller) than the most-recent flush.
  * <p>
- * To read an WAL, call
- * {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.fs.Path)}. *
+ * To read an WAL, call {@link WALFactory#createStreamReader(FileSystem, Path)} for one way read,
+ * call {@link WALFactory#createTailingReader(FileSystem, Path, Configuration, long)} for
+ * replication where we may want to tail the active WAL file.
  * <h2>Failure Semantic</h2> If an exception on append or sync, roll the WAL because the current WAL
  * is now a lame duck; any more appends or syncs will fail also with the same original exception. If
  * we have made successful appends to the WAL and we then are unable to sync them, our current
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
index 8437fef3bc2..d89ef6f2110 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
@@ -17,8 +17,10 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
-import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.DEFAULT_WAL_TRAILER_WARN_SIZE;
-import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.WAL_TRAILER_WARN_SIZE;
+import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.DEFAULT_WAL_TRAILER_WARN_SIZE;
+import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.PB_WAL_COMPLETE_MAGIC;
+import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.PB_WAL_MAGIC;
+import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.WAL_TRAILER_WARN_SIZE;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -185,8 +187,7 @@ public abstract class AbstractProtobufLogWriter {
         headerBuilder.setValueCompressionAlgorithm(
           CompressionContext.getValueCompressionAlgorithm(conf).ordinal());
       }
-      length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC,
-        buildWALHeader(conf, headerBuilder)));
+      length.set(writeMagicAndWALHeader(PB_WAL_MAGIC, buildWALHeader(conf, headerBuilder)));
 
       initAfterHeader(doCompress);
 
@@ -257,7 +258,7 @@ public abstract class AbstractProtobufLogWriter {
         LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum size : " + trailerSize
           + " > " + this.trailerWarnSize);
       }
-      length.set(writeWALTrailerAndMagic(trailer, ProtobufLogReader.PB_WAL_COMPLETE_MAGIC));
+      length.set(writeWALTrailerAndMagic(trailer, PB_WAL_COMPLETE_MAGIC));
       this.trailerWritten = true;
     } catch (IOException ioe) {
       LOG.warn("Failed to write trailer, non-fatal, continuing...", ioe);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufWALReader.java
new file mode 100644
index 00000000000..3af49107d2c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufWALReader.java
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.security.Key;
+import java.security.KeyException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.crypto.Cipher;
+import org.apache.hadoop.hbase.io.crypto.Decryptor;
+import org.apache.hadoop.hbase.io.crypto.Encryption;
+import org.apache.hadoop.hbase.io.util.LRUDictionary;
+import org.apache.hadoop.hbase.security.EncryptionUtil;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.EncryptionTest;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
+
+/**
+ * Base class for reading protobuf based wal reader
+ */
+@InterfaceAudience.Private
+public abstract class AbstractProtobufWALReader
+  implements AbstractFSWALProvider.Initializer, Closeable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractProtobufWALReader.class);
+
+  // public for WALFactory until we move everything to o.a.h.h.wal
+  public static final byte[] PB_WAL_MAGIC = Bytes.toBytes("PWAL");
+
+  // public for TestWALSplit
+  public static final byte[] PB_WAL_COMPLETE_MAGIC = Bytes.toBytes("LAWP");
+
+  /**
+   * Configuration name of WAL Trailer's warning size. If a waltrailer's size is greater than the
+   * configured size, providers should log a warning. e.g. this is used with Protobuf reader/writer.
+   */
+  static final String WAL_TRAILER_WARN_SIZE = "hbase.regionserver.waltrailer.warn.size";
+  static final int DEFAULT_WAL_TRAILER_WARN_SIZE = 1024 * 1024; // 1MB
+
+  private static final List<String> WRITER_CLS_NAMES =
+    ImmutableList.of(ProtobufLogWriter.class.getSimpleName(),
+      AsyncProtobufLogWriter.class.getSimpleName(), SecureProtobufLogWriter.class.getSimpleName(),
+      SecureAsyncProtobufLogWriter.class.getSimpleName());
+
+  protected Configuration conf;
+
+  protected FileSystem fs;
+
+  protected Path path;
+
+  protected long fileLength;
+
+  protected FSDataInputStream inputStream;
+
+  protected CompressionContext compressionCtx;
+  protected boolean hasCompression = false;
+  protected boolean hasTagCompression = false;
+  protected boolean hasValueCompression = false;
+  protected Compression.Algorithm valueCompressionType;
+
+  protected Codec.Decoder cellDecoder;
+  protected WALCellCodec.ByteStringUncompressor byteStringUncompressor;
+
+  protected long walEditsStopOffset;
+  protected boolean trailerPresent;
+  protected WALTrailer trailer;
+  // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger
+  // than this size, it is written/read respectively, with a WARN message in the log.
+  protected int trailerWarnSize;
+
+  // cell codec classname
+  protected String codecClsName;
+
+  protected Decryptor decryptor;
+
+  /**
+   * Get or create the input stream used by cell decoder.
+   * <p/>
+   * For implementing replication, we may need to limit the bytes we can read, so here we provide a
+   * method so sub classes can wrap the original input stream.
+   */
+  protected abstract InputStream getCellCodecInputStream(FSDataInputStream stream);
+
+  /**
+   * Skip to the given position.
+   */
+  protected abstract void skipTo(long position) throws IOException;
+
+  @Override
+  public void init(FileSystem fs, Path path, Configuration conf, long startPosition)
+    throws IOException {
+    this.conf = conf;
+    this.path = path;
+    this.fs = fs;
+    this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
+
+    Pair<FSDataInputStream, FileStatus> pair = open();
+    FSDataInputStream stream = pair.getFirst();
+    FileStatus stat = pair.getSecond();
+    boolean initSucceeded = false;
+    try {
+      // read the header
+      WALProtos.WALHeader header = readHeader(stream);
+      // initialize metadata and fields
+      initDecryptor(header);
+      initCompression(header);
+      initWALCellCodec(header, getCellCodecInputStream(stream));
+
+      // read trailer if available
+      readTrailer(stream, stat);
+
+      // this is intentional as we do not want the above methods to use the inputStream field. For
+      // implementation tailing reader, we need to wrap the input stream when creating cell decoder,
+      // so we need to make sure in the above methods we do not accidentally use the stored
+      // inputStream directly and cause trouble. If a method needs to use an input stream, we just
+      // pass the input stream in, like readHeader and readTrailer.
+      this.inputStream = stream;
+
+      // seek to the given position if it is not -1
+      if (startPosition >= 0 && startPosition != inputStream.getPos()) {
+        if (compressionCtx != null) {
+          // skip to the position, as we need to construct the compression dictionary
+          skipTo(startPosition);
+        } else {
+          // just seek to the position
+          stream.seek(startPosition);
+        }
+      }
+      initSucceeded = true;
+    } finally {
+      if (!initSucceeded) {
+        Closeables.close(stream, initSucceeded);
+        inputStream = null;
+      }
+    }
+  }
+
+  private Pair<FSDataInputStream, FileStatus> openArchivedWAL() throws IOException {
+    Path archivedWAL = AbstractFSWALProvider.findArchivedLog(path, conf);
+    if (archivedWAL != null) {
+      // try open from oldWAL dir
+      return Pair.newPair(fs.open(archivedWAL), fs.getFileStatus(archivedWAL));
+    } else {
+      return null;
+    }
+  }
+
+  protected final Pair<FSDataInputStream, FileStatus> open() throws IOException {
+    try {
+      return Pair.newPair(fs.open(path), fs.getFileStatus(path));
+    } catch (FileNotFoundException e) {
+      Pair<FSDataInputStream, FileStatus> pair = openArchivedWAL();
+      if (pair != null) {
+        return pair;
+      } else {
+        throw e;
+      }
+    } catch (RemoteException re) {
+      IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
+      if (!(ioe instanceof FileNotFoundException)) {
+        throw ioe;
+      }
+      Pair<FSDataInputStream, FileStatus> pair = openArchivedWAL();
+      if (pair != null) {
+        return pair;
+      } else {
+        throw ioe;
+      }
+    }
+  }
+
+  protected final WALProtos.WALHeader readHeader(FSDataInputStream stream) throws IOException {
+    byte[] magic = new byte[PB_WAL_MAGIC.length];
+    try {
+      stream.readFully(magic);
+    } catch (EOFException e) {
+      throw new WALHeaderEOFException("EOF while reading PB WAL magic", e);
+    }
+    if (!Arrays.equals(PB_WAL_MAGIC, magic)) {
+      throw new IOException("Invalid PB WAL magic " + Bytes.toStringBinary(magic) + ", expected "
+        + Bytes.toStringBinary(PB_WAL_MAGIC));
+    }
+    WALProtos.WALHeader header;
+    try {
+      header = ProtobufUtil.parseDelimitedFrom(stream, WALProtos.WALHeader.parser());
+    } catch (InvalidProtocolBufferException e) {
+      if (ProtobufUtil.isEOF(e)) {
+        throw new WALHeaderEOFException("EOF while reading PB header", e);
+      } else {
+        throw e;
+      }
+    } catch (EOFException e) {
+      throw new WALHeaderEOFException("EOF while reading PB header", e);
+    }
+    if (header == null) {
+      throw new WALHeaderEOFException("EOF while reading PB header");
+    }
+    if (header.hasWriterClsName() && !getWriterClsNames().contains(header.getWriterClsName())) {
+      throw new IOException("Got unknown writer class: " + header.getWriterClsName());
+    }
+    return header;
+  }
+
+  private void initDecryptor(WALProtos.WALHeader header) throws IOException {
+    if (!header.hasEncryptionKey()) {
+      return;
+    }
+    EncryptionTest.testKeyProvider(conf);
+    EncryptionTest.testCipherProvider(conf);
+
+    // Retrieve a usable key
+    byte[] keyBytes = header.getEncryptionKey().toByteArray();
+    Key key = null;
+    String walKeyName = conf.get(HConstants.CRYPTO_WAL_KEY_NAME_CONF_KEY);
+    // First try the WAL key, if one is configured
+    if (walKeyName != null) {
+      try {
+        key = EncryptionUtil.unwrapWALKey(conf, walKeyName, keyBytes);
+      } catch (KeyException e) {
+        LOG.debug("Unable to unwrap key with WAL key '{}'", walKeyName, e);
+        key = null;
+      }
+    }
+    if (key == null) {
+      String masterKeyName =
+        conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User.getCurrent().getShortName());
+      try {
+        // Then, try the cluster master key
+        key = EncryptionUtil.unwrapWALKey(conf, masterKeyName, keyBytes);
+      } catch (KeyException e) {
+        // If the current master key fails to unwrap, try the alternate, if
+        // one is configured
+        LOG.debug("Unable to unwrap key with current master key '{}'", masterKeyName, e);
+        String alternateKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_ALTERNATE_NAME_CONF_KEY);
+        if (alternateKeyName != null) {
+          try {
+            key = EncryptionUtil.unwrapWALKey(conf, alternateKeyName, keyBytes);
+          } catch (KeyException ex) {
+            throw new IOException(ex);
+          }
+        } else {
+          throw new IOException(e);
+        }
+      }
+    }
+
+    // Use the algorithm the key wants
+    Cipher cipher = Encryption.getCipher(conf, key.getAlgorithm());
+    if (cipher == null) {
+      throw new IOException("Cipher '" + key.getAlgorithm() + "' is not available");
+    }
+
+    // Set up the decryptor for this WAL
+
+    decryptor = cipher.getDecryptor();
+    decryptor.setKey(key);
+
+    LOG.debug("Initialized secure protobuf WAL: cipher={}", cipher.getName());
+  }
+
+  private void initCompression(WALProtos.WALHeader header) throws IOException {
+    this.hasCompression = header.hasHasCompression() && header.getHasCompression();
+    if (!hasCompression) {
+      return;
+    }
+    this.hasTagCompression = header.hasHasTagCompression() && header.getHasTagCompression();
+    this.hasValueCompression = header.hasHasValueCompression() && header.getHasValueCompression();
+    if (header.hasValueCompressionAlgorithm()) {
+      try {
+        this.valueCompressionType =
+          Compression.Algorithm.values()[header.getValueCompressionAlgorithm()];
+      } catch (ArrayIndexOutOfBoundsException e) {
+        throw new IOException("Invalid compression type", e);
+      }
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+        "Initializing compression context for {}: isRecoveredEdits={}"
+          + ", hasTagCompression={}, hasValueCompression={}, valueCompressionType={}",
+        path, CommonFSUtils.isRecoveredEdits(path), hasTagCompression, hasValueCompression,
+        valueCompressionType);
+    }
+    try {
+      compressionCtx =
+        new CompressionContext(LRUDictionary.class, CommonFSUtils.isRecoveredEdits(path),
+          hasTagCompression, hasValueCompression, valueCompressionType);
+    } catch (Exception e) {
+      throw new IOException("Failed to initialize CompressionContext", e);
+    }
+  }
+
+  private WALCellCodec getCodec(Configuration conf, String cellCodecClsName,
+    CompressionContext compressionContext) throws IOException {
+    return WALCellCodec.create(conf, cellCodecClsName, compressionContext);
+  }
+
+  protected final void initWALCellCodec(WALProtos.WALHeader header, InputStream inputStream)
+    throws IOException {
+    String cellCodecClsName = header.hasCellCodecClsName() ? header.getCellCodecClsName() : null;
+    if (decryptor != null && SecureWALCellCodec.class.getName().equals(cellCodecClsName)) {
+      WALCellCodec codec = SecureWALCellCodec.getCodec(this.conf, decryptor);
+      this.cellDecoder = codec.getDecoder(inputStream);
+      // We do not support compression with WAL encryption
+      this.compressionCtx = null;
+      this.byteStringUncompressor = WALCellCodec.getNoneUncompressor();
+      this.hasCompression = false;
+      this.hasTagCompression = false;
+      this.hasValueCompression = false;
+    } else {
+      WALCellCodec codec = getCodec(conf, cellCodecClsName, compressionCtx);
+      this.cellDecoder = codec.getDecoder(inputStream);
+      if (this.hasCompression) {
+        this.byteStringUncompressor = codec.getByteStringUncompressor();
+      } else {
+        this.byteStringUncompressor = WALCellCodec.getNoneUncompressor();
+      }
+    }
+    this.codecClsName = cellCodecClsName;
+  }
+
+  protected final void readTrailer(FSDataInputStream stream, FileStatus stat) throws IOException {
+    this.fileLength = stat.getLen();
+    this.walEditsStopOffset = this.fileLength;
+    long currentPos = stream.getPos();
+    // we will reset walEditsStopOffset if trailer if available
+    trailerPresent = setTrailerIfPresent(stream);
+    if (currentPos != stream.getPos()) {
+      // seek back
+      stream.seek(currentPos);
+    }
+  }
+
+  /**
+   * To check whether a trailer is present in a WAL, it seeks to position (fileLength -
+   * PB_WAL_COMPLETE_MAGIC.size() - Bytes.SIZEOF_INT). It reads the int value to know the size of
+   * the trailer, and checks whether the trailer is present at the end or not by comparing the last
+   * PB_WAL_COMPLETE_MAGIC.size() bytes. In case trailer is not present, it returns false;
+   * otherwise, sets the trailer and sets this.walEditsStopOffset variable up to the point just
+   * before the trailer.
+   * <p/>
+   * The trailer is ignored in case:
+   * <ul>
+   * <li>fileLength is 0 or not correct (when file is under recovery, etc).
+   * <li>the trailer size is negative.
+   * </ul>
+   * In case the trailer size > this.trailerMaxSize, it is read after a WARN message.
+   * @return true if a valid trailer is present
+   */
+  private boolean setTrailerIfPresent(FSDataInputStream stream) throws IOException {
+    try {
+      long trailerSizeOffset = this.fileLength - (PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT);
+      if (trailerSizeOffset <= 0) {
+        // no trailer possible.
+        return false;
+      }
+      stream.seek(trailerSizeOffset);
+      // read the int as trailer size.
+      int trailerSize = stream.readInt();
+      ByteBuffer buf = ByteBuffer.allocate(PB_WAL_COMPLETE_MAGIC.length);
+      stream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
+      if (!Arrays.equals(buf.array(), PB_WAL_COMPLETE_MAGIC)) {
+        LOG.trace("No trailer found.");
+        return false;
+      }
+      if (trailerSize < 0) {
+        LOG.warn("Invalid trailer Size " + trailerSize + ", ignoring the trailer");
+        return false;
+      } else if (trailerSize > this.trailerWarnSize) {
+        // continue reading after warning the user.
+        LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum configured size : "
+          + trailerSize + " > " + this.trailerWarnSize);
+      }
+      // seek to the position where trailer starts.
+      long positionOfTrailer = trailerSizeOffset - trailerSize;
+      stream.seek(positionOfTrailer);
+      // read the trailer.
+      buf = ByteBuffer.allocate(trailerSize);// for trailer.
+      stream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
+      trailer = WALTrailer.parseFrom(buf.array());
+      this.walEditsStopOffset = positionOfTrailer;
+      return true;
+    } catch (IOException ioe) {
+      LOG.warn("Got IOE while reading the trailer. Continuing as if no trailer is present.", ioe);
+    }
+    return false;
+  }
+
+  protected final boolean reachWALEditsStopOffset(long pos) {
+    if (trailerPresent && pos > 0 && pos == walEditsStopOffset) {
+      LOG.trace("Reached end of expected edits area at offset {}", pos);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Returns names of the accepted writer classes
+   */
+  public List<String> getWriterClsNames() {
+    return WRITER_CLS_NAMES;
+  }
+
+  /**
+   * Returns the cell codec classname
+   */
+  public String getCodecClsName() {
+    return codecClsName;
+  }
+
+  public long getPosition() throws IOException {
+    return inputStream != null ? inputStream.getPos() : -1;
+  }
+
+  public long trailerSize() {
+    if (trailerPresent) {
+      // sizeof PB_WAL_COMPLETE_MAGIC + sizeof trailerSize + trailer
+      final long calculatedSize =
+        (long) PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT + trailer.getSerializedSize();
+      final long expectedSize = fileLength - walEditsStopOffset;
+      if (expectedSize != calculatedSize) {
+        LOG.warn("After parsing the trailer, we expect the total footer to be {} bytes, but we "
+          + "calculate it as being {}", expectedSize, calculatedSize);
+      }
+      return expectedSize;
+    } else {
+      return -1L;
+    }
+  }
+
+  protected final String getPositionQuietly() {
+    try {
+      long pos = getPosition();
+      return pos >= 0 ? Long.toString(pos) : "<unknown>";
+    } catch (Exception e) {
+      LOG.warn("failed to get position, ignoring", e);
+      return "<unknown>";
+    }
+  }
+
+  protected final IOException extractHiddenEof(Exception ex) {
+    // There are two problems we are dealing with here. Hadoop stream throws generic exception
+    // for EOF, not EOFException; and scanner further hides it inside RuntimeException.
+    IOException ioEx = null;
+    if (ex instanceof EOFException) {
+      return (EOFException) ex;
+    } else if (ex instanceof IOException) {
+      ioEx = (IOException) ex;
+    } else if (
+      ex instanceof RuntimeException && ex.getCause() != null
+        && ex.getCause() instanceof IOException
+    ) {
+      ioEx = (IOException) ex.getCause();
+    }
+    if ((ioEx != null) && (ioEx.getMessage() != null)) {
+      if (ioEx.getMessage().contains("EOF")) {
+        return ioEx;
+      }
+      return null;
+    }
+    return null;
+  }
+
+  /**
+   * This is used to determine whether we have already reached the WALTrailer. As the size and magic
+   * are at the end of the WAL file, it is possible that these two options are missing while
+   * writing, so we will consider there is no trailer. And when we actually reach the WALTrailer, we
+   * will try to decode it as WALKey and we will fail but the error could be vary as it is parsing
+   * WALTrailer actually.
+   * @return whether this is a WALTrailer and we should throw EOF to upper layer the file is done
+   */
+  protected final boolean isWALTrailer(long startPosition) throws IOException {
+    // We have nothing in the WALTrailer PB message now so its size is just a int length size and a
+    // magic at the end
+    int trailerSize = PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT;
+    if (fileLength - startPosition >= trailerSize) {
+      // We still have more than trailerSize bytes before reaching the EOF so this is not a trailer.
+      // We also test for == here because if this is a valid trailer, we can read it while opening
+      // the reader so we should not reach here
+      return false;
+    }
+    inputStream.seek(startPosition);
+    for (int i = 0; i < 4; i++) {
+      int r = inputStream.read();
+      if (r == -1) {
+        // we have reached EOF while reading the length, and all bytes read are 0, so we assume this
+        // is a partial trailer
+        return true;
+      }
+      if (r != 0) {
+        // the length is not 0, should not be a trailer
+        return false;
+      }
+    }
+    for (int i = 0; i < PB_WAL_COMPLETE_MAGIC.length; i++) {
+      int r = inputStream.read();
+      if (r == -1) {
+        // we have reached EOF while reading the magic, and all bytes read are matched, so we assume
+        // this is a partial trailer
+        return true;
+      }
+      if (r != (PB_WAL_COMPLETE_MAGIC[i] & 0xFF)) {
+        // does not match magic, should not be a trailer
+        return false;
+      }
+    }
+    // in fact we should not reach here, as this means the trailer bytes are all matched and
+    // complete, then we should not call this method...
+    return true;
+  }
+
+  @Override
+  public void close() {
+    if (inputStream != null) {
+      IOUtils.closeQuietly(inputStream);
+      inputStream = null;
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
index bed31530d87..aebc8e07725 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALProvider;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -69,21 +70,22 @@ public class Compressor {
     FileSystem inFS = input.getFileSystem(conf);
     FileSystem outFS = output.getFileSystem(conf);
 
-    WAL.Reader in = WALFactory.createReaderIgnoreCustomClass(inFS, input, conf);
+    WALStreamReader in = WALFactory.createStreamReader(inFS, input, conf);
     WALProvider.Writer out = null;
 
     try {
-      if (!(in instanceof ReaderBase)) {
+      if (!(in instanceof AbstractProtobufWALReader)) {
         System.err.println("Cannot proceed, invalid reader type: " + in.getClass().getName());
         return;
       }
-      boolean compress = ((ReaderBase) in).hasCompression();
+      boolean compress = ((AbstractProtobufWALReader) in).hasCompression;
       conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, !compress);
       out = WALFactory.createWALWriter(outFS, output, conf);
 
       WAL.Entry e = null;
-      while ((e = in.next()) != null)
+      while ((e = in.next()) != null) {
         out.append(e);
+      }
     } finally {
       in.close();
       if (out != null) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
deleted file mode 100644
index a7ca1827845..00000000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
+++ /dev/null
@@ -1,529 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver.wal;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
-
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader.Builder;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
-
-/**
- * A Protobuf based WAL has the following structure:
- * <p>
- * &lt;PB_WAL_MAGIC&gt;&lt;WALHeader&gt;&lt;WALEdits&gt;...&lt;WALEdits&gt;&lt;Trailer&gt;
- * &lt;TrailerSize&gt; &lt;PB_WAL_COMPLETE_MAGIC&gt;
- * </p>
- * The Reader reads meta information (WAL Compression state, WALTrailer, etc) in
- * ProtobufLogReader#initReader(FSDataInputStream). A WALTrailer is an extensible structure which is
- * appended at the end of the WAL. This is empty for now; it can contain some meta information such
- * as Region level stats, etc in future.
- */
-@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX,
-  HBaseInterfaceAudience.CONFIG })
-public class ProtobufLogReader extends ReaderBase {
-  private static final Logger LOG = LoggerFactory.getLogger(ProtobufLogReader.class);
-  // public for WALFactory until we move everything to o.a.h.h.wal
-  @InterfaceAudience.Private
-  public static final byte[] PB_WAL_MAGIC = Bytes.toBytes("PWAL");
-  // public for TestWALSplit
-  @InterfaceAudience.Private
-  public static final byte[] PB_WAL_COMPLETE_MAGIC = Bytes.toBytes("LAWP");
-  /**
-   * Configuration name of WAL Trailer's warning size. If a waltrailer's size is greater than the
-   * configured size, providers should log a warning. e.g. this is used with Protobuf reader/writer.
-   */
-  static final String WAL_TRAILER_WARN_SIZE = "hbase.regionserver.waltrailer.warn.size";
-  static final int DEFAULT_WAL_TRAILER_WARN_SIZE = 1024 * 1024; // 1MB
-
-  protected FSDataInputStream inputStream;
-  protected Codec.Decoder cellDecoder;
-  protected WALCellCodec.ByteStringUncompressor byteStringUncompressor;
-  protected boolean hasCompression = false;
-  protected boolean hasTagCompression = false;
-  protected boolean hasValueCompression = false;
-  protected Compression.Algorithm valueCompressionType = null;
-  // walEditsStopOffset is the position of the last byte to read. After reading the last WALEdit
-  // entry in the wal, the inputstream's position is equal to walEditsStopOffset.
-  private long walEditsStopOffset;
-  private boolean trailerPresent;
-  protected WALTrailer trailer;
-  // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger
-  // than this size, it is written/read respectively, with a WARN message in the log.
-  protected int trailerWarnSize;
-  private static List<String> writerClsNames = new ArrayList<>();
-  static {
-    writerClsNames.add(ProtobufLogWriter.class.getSimpleName());
-    writerClsNames.add(AsyncProtobufLogWriter.class.getSimpleName());
-  }
-
-  // cell codec classname
-  private String codecClsName = null;
-
-  // a flag indicate that whether we need to reset compression context when seeking back
-  private boolean resetCompression;
-
-  @InterfaceAudience.Private
-  public long trailerSize() {
-    if (trailerPresent) {
-      // sizeof PB_WAL_COMPLETE_MAGIC + sizof trailerSize + trailer
-      final long calculatedSize =
-        (long) PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT + trailer.getSerializedSize();
-      final long expectedSize = fileLength - walEditsStopOffset;
-      if (expectedSize != calculatedSize) {
-        LOG.warn("After parsing the trailer, we expect the total footer to be {} bytes, but we "
-          + "calculate it as being {}", expectedSize, calculatedSize);
-      }
-      return expectedSize;
-    } else {
-      return -1L;
-    }
-  }
-
-  enum WALHdrResult {
-    EOF, // stream is at EOF when method starts
-    SUCCESS,
-    UNKNOWN_WRITER_CLS // name of writer class isn't recognized
-  }
-
-  // context for WALHdr carrying information such as Cell Codec classname
-  static class WALHdrContext {
-    WALHdrResult result;
-    String cellCodecClsName;
-
-    WALHdrContext(WALHdrResult result, String cellCodecClsName) {
-      this.result = result;
-      this.cellCodecClsName = cellCodecClsName;
-    }
-
-    WALHdrResult getResult() {
-      return result;
-    }
-
-    String getCellCodecClsName() {
-      return cellCodecClsName;
-    }
-  }
-
-  public ProtobufLogReader() {
-    super();
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (this.inputStream != null) {
-      this.inputStream.close();
-      this.inputStream = null;
-    }
-  }
-
-  @Override
-  public long getPosition() throws IOException {
-    return inputStream.getPos();
-  }
-
-  @Override
-  public void reset() throws IOException {
-    String clsName = initInternal(null, false);
-    if (resetCompression) {
-      resetCompression();
-    }
-    initAfterCompression(clsName); // We need a new decoder (at least).
-  }
-
-  @Override
-  public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream)
-    throws IOException {
-    this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
-    super.init(fs, path, conf, stream);
-  }
-
-  @Override
-  protected String initReader(FSDataInputStream stream) throws IOException {
-    return initInternal(stream, true);
-  }
-
-  /*
-   * Returns names of the accepted writer classes
-   */
-  public List<String> getWriterClsNames() {
-    return writerClsNames;
-  }
-
-  /*
-   * Returns the cell codec classname
-   */
-  public String getCodecClsName() {
-    return codecClsName;
-  }
-
-  protected WALHdrContext readHeader(Builder builder, FSDataInputStream stream) throws IOException {
-    boolean res = builder.mergeDelimitedFrom(stream);
-    if (!res) return new WALHdrContext(WALHdrResult.EOF, null);
-    if (builder.hasWriterClsName() && !getWriterClsNames().contains(builder.getWriterClsName())) {
-      return new WALHdrContext(WALHdrResult.UNKNOWN_WRITER_CLS, null);
-    }
-    String clsName = null;
-    if (builder.hasCellCodecClsName()) {
-      clsName = builder.getCellCodecClsName();
-    }
-    return new WALHdrContext(WALHdrResult.SUCCESS, clsName);
-  }
-
-  private String initInternal(FSDataInputStream stream, boolean isFirst) throws IOException {
-    close();
-    if (!isFirst) {
-      // Re-compute the file length.
-      this.fileLength = fs.getFileStatus(path).getLen();
-    }
-    long expectedPos = PB_WAL_MAGIC.length;
-    if (stream == null) {
-      stream = fs.open(path);
-      stream.seek(expectedPos);
-    }
-    if (stream.getPos() != expectedPos) {
-      throw new IOException("The stream is at invalid position: " + stream.getPos());
-    }
-    // Initialize metadata or, when we reset, just skip the header.
-    WALProtos.WALHeader.Builder builder = WALProtos.WALHeader.newBuilder();
-    WALHdrContext hdrCtxt = readHeader(builder, stream);
-    WALHdrResult walHdrRes = hdrCtxt.getResult();
-    if (walHdrRes == WALHdrResult.EOF) {
-      throw new EOFException("Couldn't read WAL PB header");
-    }
-    if (walHdrRes == WALHdrResult.UNKNOWN_WRITER_CLS) {
-      throw new IOException("Got unknown writer class: " + builder.getWriterClsName());
-    }
-    if (isFirst) {
-      WALProtos.WALHeader header = builder.build();
-      this.hasCompression = header.hasHasCompression() && header.getHasCompression();
-      this.hasTagCompression = header.hasHasTagCompression() && header.getHasTagCompression();
-      this.hasValueCompression = header.hasHasValueCompression() && header.getHasValueCompression();
-      if (header.hasValueCompressionAlgorithm()) {
-        try {
-          this.valueCompressionType =
-            Compression.Algorithm.values()[header.getValueCompressionAlgorithm()];
-        } catch (ArrayIndexOutOfBoundsException e) {
-          throw new IOException("Invalid compression type", e);
-        }
-      }
-    }
-    this.inputStream = stream;
-    this.walEditsStopOffset = this.fileLength;
-    long currentPosition = stream.getPos();
-    trailerPresent = setTrailerIfPresent();
-    this.seekOnFs(currentPosition);
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("After reading the trailer: walEditsStopOffset: " + this.walEditsStopOffset
-        + ", fileLength: " + this.fileLength + ", " + "trailerPresent: "
-        + (trailerPresent ? "true, size: " + trailer.getSerializedSize() : "false")
-        + ", currentPosition: " + currentPosition);
-    }
-
-    codecClsName = hdrCtxt.getCellCodecClsName();
-
-    return hdrCtxt.getCellCodecClsName();
-  }
-
-  /**
-   * To check whether a trailer is present in a WAL, it seeks to position (fileLength -
-   * PB_WAL_COMPLETE_MAGIC.size() - Bytes.SIZEOF_INT). It reads the int value to know the size of
-   * the trailer, and checks whether the trailer is present at the end or not by comparing the last
-   * PB_WAL_COMPLETE_MAGIC.size() bytes. In case trailer is not present, it returns false;
-   * otherwise, sets the trailer and sets this.walEditsStopOffset variable up to the point just
-   * before the trailer.
-   * <ul>
-   * The trailer is ignored in case:
-   * <li>fileLength is 0 or not correct (when file is under recovery, etc).
-   * <li>the trailer size is negative.
-   * </ul>
-   * <p>
-   * In case the trailer size > this.trailerMaxSize, it is read after a WARN message.
-   * @return true if a valid trailer is present
-   */
-  private boolean setTrailerIfPresent() {
-    try {
-      long trailerSizeOffset = this.fileLength - (PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT);
-      if (trailerSizeOffset <= 0) return false;// no trailer possible.
-      this.seekOnFs(trailerSizeOffset);
-      // read the int as trailer size.
-      int trailerSize = this.inputStream.readInt();
-      ByteBuffer buf = ByteBuffer.allocate(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length);
-      this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
-      if (!Arrays.equals(buf.array(), PB_WAL_COMPLETE_MAGIC)) {
-        LOG.trace("No trailer found.");
-        return false;
-      }
-      if (trailerSize < 0) {
-        LOG.warn("Invalid trailer Size " + trailerSize + ", ignoring the trailer");
-        return false;
-      } else if (trailerSize > this.trailerWarnSize) {
-        // continue reading after warning the user.
-        LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum configured size : "
-          + trailerSize + " > " + this.trailerWarnSize);
-      }
-      // seek to the position where trailer starts.
-      long positionOfTrailer = trailerSizeOffset - trailerSize;
-      this.seekOnFs(positionOfTrailer);
-      // read the trailer.
-      buf = ByteBuffer.allocate(trailerSize);// for trailer.
-      this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
-      trailer = WALTrailer.parseFrom(buf.array());
-      this.walEditsStopOffset = positionOfTrailer;
-      return true;
-    } catch (IOException ioe) {
-      LOG.warn("Got IOE while reading the trailer. Continuing as if no trailer is present.", ioe);
-    }
-    return false;
-  }
-
-  protected WALCellCodec getCodec(Configuration conf, String cellCodecClsName,
-    CompressionContext compressionContext) throws IOException {
-    return WALCellCodec.create(conf, cellCodecClsName, compressionContext);
-  }
-
-  @Override
-  protected void initAfterCompression() throws IOException {
-    initAfterCompression(null);
-  }
-
-  @Override
-  protected void initAfterCompression(String cellCodecClsName) throws IOException {
-    WALCellCodec codec = getCodec(this.conf, cellCodecClsName, this.compressionContext);
-    this.cellDecoder = codec.getDecoder(this.inputStream);
-    if (this.hasCompression) {
-      this.byteStringUncompressor = codec.getByteStringUncompressor();
-    } else {
-      this.byteStringUncompressor = WALCellCodec.getNoneUncompressor();
-    }
-  }
-
-  @Override
-  protected boolean hasCompression() {
-    return this.hasCompression;
-  }
-
-  @Override
-  protected boolean hasTagCompression() {
-    return this.hasTagCompression;
-  }
-
-  @Override
-  protected boolean hasValueCompression() {
-    return this.hasValueCompression;
-  }
-
-  @Override
-  protected Compression.Algorithm getValueCompressionAlgorithm() {
-    return this.valueCompressionType;
-  }
-
-  @Override
-  protected boolean readNext(Entry entry) throws IOException {
-    resetCompression = false;
-    // OriginalPosition might be < 0 on local fs; if so, it is useless to us.
-    long originalPosition = this.inputStream.getPos();
-    if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {
-      LOG.trace("Reached end of expected edits area at offset {}", originalPosition);
-      return false;
-    }
-    boolean resetPosition = false;
-    try {
-      WALKey walKey;
-      try {
-        walKey = ProtobufUtil.parseDelimitedFrom(inputStream, WALKey.parser());
-      } catch (InvalidProtocolBufferException e) {
-        if (ProtobufUtil.isEOF(e) || isWALTrailer(originalPosition)) {
-          // only rethrow EOF if it indicates an EOF, or we have reached the partial WALTrailer
-          resetPosition = true;
-          throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition="
-            + originalPosition + ", currentPosition=" + this.inputStream.getPos()).initCause(e);
-        } else {
-          throw e;
-        }
-      } catch (EOFException e) {
-        // append more detailed information
-        throw (EOFException) new EOFException("EOF while reading WAL key; originalPosition="
-          + originalPosition + ", currentPosition=" + this.inputStream.getPos()).initCause(e);
-      }
-      entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
-      if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
-        LOG.debug("WALKey has no KVs that follow it; trying the next one. current offset={}",
-          this.inputStream.getPos());
-        return true;
-      }
-      // Starting from here, we will start to read cells, which will change the content in
-      // compression dictionary, so if we fail in the below operations, when resetting, we also need
-      // to clear the compression context, and read from the beginning to reconstruct the
-      // compression dictionary, instead of seeking to the position directly.
-      // This is very useful for saving the extra effort for reconstructing the compression
-      // dictionary, as DFSInputStream implement the available method, so in most cases we will
-      // not reach here if there are not enough data.
-      resetCompression = true;
-      int expectedCells = walKey.getFollowingKvCount();
-      long posBefore = this.inputStream.getPos();
-      try {
-        int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);
-        if (expectedCells != actualCells) {
-          resetPosition = true;
-          throw new EOFException("Only read " + actualCells); // other info added in catch
-        }
-      } catch (Exception ex) {
-        String posAfterStr = "<unknown>";
-        try {
-          posAfterStr = this.inputStream.getPos() + "";
-        } catch (Throwable t) {
-          LOG.trace("Error getting pos for error message - ignoring", t);
-        }
-        String message = " while reading " + expectedCells + " WAL KVs; started reading at "
-          + posBefore + " and read up to " + posAfterStr;
-        IOException realEofEx = extractHiddenEof(ex);
-        throw (EOFException) new EOFException("EOF " + message)
-          .initCause(realEofEx != null ? realEofEx : ex);
-      }
-      if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) {
-        LOG.error(
-          "Read WALTrailer while reading WALEdits. wal: " + this.path + ", inputStream.getPos(): "
-            + this.inputStream.getPos() + ", walEditsStopOffset: " + this.walEditsStopOffset);
-        throw new EOFException("Read WALTrailer while reading WALEdits");
-      }
-    } catch (EOFException eof) {
-      // If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs)
-      if (originalPosition < 0) {
-        LOG.debug(
-          "Encountered a malformed edit, but can't seek back to last good position "
-            + "because originalPosition is negative. last offset={}",
-          this.inputStream.getPos(), eof);
-        throw eof;
-      }
-      // If stuck at the same place and we got an exception, lets go back at the beginning.
-      if (inputStream.getPos() == originalPosition) {
-        if (resetPosition) {
-          LOG.debug("Encountered a malformed edit, seeking to the beginning of the WAL since "
-            + "current position and original position match at {}", originalPosition);
-          seekOnFs(0);
-        } else {
-          LOG.debug("EOF at position {}", originalPosition);
-        }
-      } else {
-        // Else restore our position to original location in hope that next time through we will
-        // read successfully.
-        LOG.debug("Encountered a malformed edit, seeking back to last good position in file, "
-          + "from {} to {}", inputStream.getPos(), originalPosition, eof);
-        seekOnFs(originalPosition);
-      }
-      return false;
-    }
-    return true;
-  }
-
-  private IOException extractHiddenEof(Exception ex) {
-    // There are two problems we are dealing with here. Hadoop stream throws generic exception
-    // for EOF, not EOFException; and scanner further hides it inside RuntimeException.
-    IOException ioEx = null;
-    if (ex instanceof EOFException) {
-      return (EOFException) ex;
-    } else if (ex instanceof IOException) {
-      ioEx = (IOException) ex;
-    } else if (
-      ex instanceof RuntimeException && ex.getCause() != null
-        && ex.getCause() instanceof IOException
-    ) {
-      ioEx = (IOException) ex.getCause();
-    }
-    if ((ioEx != null) && (ioEx.getMessage() != null)) {
-      if (ioEx.getMessage().contains("EOF")) return ioEx;
-      return null;
-    }
-    return null;
-  }
-
-  /**
-   * This is used to determine whether we have already reached the WALTrailer. As the size and magic
-   * are at the end of the WAL file, it is possible that these two options are missing while
-   * writing, so we will consider there is no trailer. And when we actually reach the WALTrailer, we
-   * will try to decode it as WALKey and we will fail but the error could vary as it is parsing
-   * WALTrailer actually.
-   * @return whether this is a WALTrailer and we should throw EOF to upper layer the file is done
-   */
-  private boolean isWALTrailer(long startPosition) throws IOException {
-    // We have nothing in the WALTrailer PB message now so its size is just a int length size and a
-    // magic at the end
-    int trailerSize = PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT;
-    if (fileLength - startPosition >= trailerSize) {
-      // We still have more than trailerSize bytes before reaching the EOF so this is not a trailer.
-      // We also test for == here because if this is a valid trailer, we can read it while opening
-      // the reader so we should not reach here
-      return false;
-    }
-    inputStream.seek(startPosition);
-    for (int i = 0; i < 4; i++) {
-      int r = inputStream.read();
-      if (r == -1) {
-        // we have reached EOF while reading the length, and all bytes read are 0, so we assume this
-        // is a partial trailer
-        return true;
-      }
-      if (r != 0) {
-        // the length is not 0, should not be a trailer
-        return false;
-      }
-    }
-    for (int i = 0; i < PB_WAL_COMPLETE_MAGIC.length; i++) {
-      int r = inputStream.read();
-      if (r == -1) {
-        // we have reached EOF while reading the magic, and all bytes read are matched, so we assume
-        // this is a partial trailer
-        return true;
-      }
-      if (r != (PB_WAL_COMPLETE_MAGIC[i] & 0xFF)) {
-        // does not match magic, should not be a trailer
-        return false;
-      }
-    }
-    // in fact we should not reach here, as this means the trailer bytes are all matched and
-    // complete, then we should not call this method...
-    return true;
-  }
-
-  @Override
-  protected void seekOnFs(long pos) throws IOException {
-    this.inputStream.seek(pos);
-  }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufWALStreamReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufWALStreamReader.java
new file mode 100644
index 00000000000..626921401aa
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufWALStreamReader.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+
+/**
+ * A one way stream reader for reading protobuf based WAL file.
+ */
+@InterfaceAudience.Private
+public class ProtobufWALStreamReader extends AbstractProtobufWALReader
+  implements WALStreamReader, AbstractFSWALProvider.Initializer {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ProtobufWALStreamReader.class);
+
+  @Override
+  public Entry next(Entry reuse) throws IOException {
+    long originalPosition = getPosition();
+    if (reachWALEditsStopOffset(originalPosition)) {
+      return null;
+    }
+    WALProtos.WALKey walKey;
+    try {
+      // for one way stream reader, we do not care about what is the exact position where we hit the
+      // EOF or IOE, so just use the helper method to parse WALKey, in tailing reader, we will try
+      // to read the varint size by ourselves
+      walKey = ProtobufUtil.parseDelimitedFrom(inputStream, WALProtos.WALKey.parser());
+    } catch (InvalidProtocolBufferException e) {
+      if (ProtobufUtil.isEOF(e) || isWALTrailer(originalPosition)) {
+        // InvalidProtocolBufferException.truncatedMessage, should throw EOF
+        // or we have started to read the partial WALTrailer
+        throw (EOFException) new EOFException("EOF while reading WALKey, originalPosition="
+          + originalPosition + ", currentPosition=" + inputStream.getPos()).initCause(e);
+      } else {
+        // For all other type of IPBEs, it means the WAL key is broken, throw IOException out to let
+        // the upper layer know, unless we have already reached the partial WALTrailer
+        throw (IOException) new IOException("Error while reading WALKey, originalPosition="
+          + originalPosition + ", currentPosition=" + inputStream.getPos()).initCause(e);
+      }
+    }
+    Entry entry = reuse;
+    if (entry == null) {
+      entry = new Entry();
+    }
+    entry.getKey().readFieldsFromPb(walKey, byteStringUncompressor);
+    if (!walKey.hasFollowingKvCount() || walKey.getFollowingKvCount() == 0) {
+      LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset={}",
+        inputStream.getPos());
+      return entry;
+    }
+    int expectedCells = walKey.getFollowingKvCount();
+    long posBefore = getPosition();
+    int actualCells;
+    try {
+      actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);
+    } catch (Exception e) {
+      String message = " while reading " + expectedCells + " WAL KVs; started reading at "
+        + posBefore + " and read up to " + getPositionQuietly();
+      IOException realEofEx = extractHiddenEof(e);
+      if (realEofEx != null) {
+        throw (EOFException) new EOFException("EOF " + message).initCause(realEofEx);
+      } else {
+        // do not throw EOFException as it could be other type of errors, throwing EOF will cause
+        // the upper layer to consider the file has been fully read and cause data loss.
+        throw new IOException("Error " + message, e);
+      }
+    }
+    if (expectedCells != actualCells) {
+      throw new EOFException("Only read " + actualCells + " cells, expected " + expectedCells
+        + "; started reading at " + posBefore + " and read up to " + getPositionQuietly());
+    }
+    long posAfter = this.inputStream.getPos();
+    if (trailerPresent && posAfter > this.walEditsStopOffset) {
+      LOG.error("Read WALTrailer while reading WALEdits. wal: {}, inputStream.getPos(): {},"
+        + " walEditsStopOffset: {}", path, posAfter, walEditsStopOffset);
+      throw new EOFException("Read WALTrailer while reading WALEdits; started reading at "
+        + posBefore + " and read up to " + posAfter);
+    }
+    return entry;
+  }
+
+  @Override
+  protected InputStream getCellCodecInputStream(FSDataInputStream stream) {
+    // just return the original input stream
+    return stream;
+  }
+
+  @Override
+  protected void skipTo(long position) throws IOException {
+    Entry entry = new Entry();
+    for (;;) {
+      entry = next(entry);
+      if (entry == null) {
+        throw new EOFException("Can not skip to the given position " + position
+          + " as we have already reached the end of file");
+      }
+      long pos = inputStream.getPos();
+      if (pos > position) {
+        throw new IOException("Can not skip to the given position " + position + ", stopped at "
+          + pos + " which is already beyond the give position, malformed WAL?");
+      }
+      if (pos == position) {
+        return;
+      }
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufWALTailingReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufWALTailingReader.java
new file mode 100644
index 00000000000..62091acdd1d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufWALTailingReader.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hbase.io.DelegatingInputStream;
+import org.apache.hadoop.hbase.io.util.StreamUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALTailingReader;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
+import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
+import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+
+/**
+ * A WAL reader for replication. It supports reset so can be used to tail a WAL file which is being
+ * written currently.
+ */
+@InterfaceAudience.Private
+public class ProtobufWALTailingReader extends AbstractProtobufWALReader
+  implements WALTailingReader {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ProtobufWALStreamReader.class);
+
+  private DelegatingInputStream delegatingInput;
+
+  private static final class ReadWALKeyResult {
+    final State state;
+    final Entry entry;
+    final int followingKvCount;
+
+    public ReadWALKeyResult(State state, Entry entry, int followingKvCount) {
+      this.state = state;
+      this.entry = entry;
+      this.followingKvCount = followingKvCount;
+    }
+  }
+
+  private static final ReadWALKeyResult KEY_ERROR_AND_RESET =
+    new ReadWALKeyResult(State.ERROR_AND_RESET, null, 0);
+
+  private static final ReadWALKeyResult KEY_EOF_AND_RESET =
+    new ReadWALKeyResult(State.EOF_AND_RESET, null, 0);
+
+  private IOException unwrapIPBE(IOException e) {
+    if (e instanceof InvalidProtocolBufferException) {
+      return ((InvalidProtocolBufferException) e).unwrapIOException();
+    } else {
+      return e;
+    }
+  }
+
+  private ReadWALKeyResult readWALKey(long originalPosition) {
+    int firstByte;
+    try {
+      firstByte = delegatingInput.read();
+    } catch (IOException e) {
+      LOG.warn("Failed to read wal key length first byte", e);
+      return KEY_ERROR_AND_RESET;
+    }
+    if (firstByte == -1) {
+      return KEY_EOF_AND_RESET;
+    }
+    int size;
+    try {
+      size = CodedInputStream.readRawVarint32(firstByte, delegatingInput);
+    } catch (IOException e) {
+      // if we are reading a partial WALTrailer, the size will just be 0 so we will not get an
+      // exception here, so do not need to check whether it is a partial WALTrailer.
+      if (
+        e instanceof InvalidProtocolBufferException
+          && ProtobufUtil.isEOF((InvalidProtocolBufferException) e)
+      ) {
+        LOG.info("EOF while reading WALKey, originalPosition={}, currentPosition={}, error={}",
+          originalPosition, getPositionQuietly(), e.toString());
+        return KEY_EOF_AND_RESET;
+      } else {
+        LOG.warn("Failed to read wal key length", e);
+        return KEY_ERROR_AND_RESET;
+      }
+    }
+    if (size < 0) {
+      LOG.warn("Negative pb message size read: {}, malformed WAL file?", size);
+      return KEY_ERROR_AND_RESET;
+    }
+    int available;
+    try {
+      available = delegatingInput.available();
+    } catch (IOException e) {
+      LOG.warn("Failed to get available bytes", e);
+      return KEY_ERROR_AND_RESET;
+    }
+    if (available > 0 && available < size) {
+      LOG.info(
+        "Available stream not enough for edit, available={}, " + "entry size={} at offset={}",
+        available, size, getPositionQuietly());
+      return KEY_EOF_AND_RESET;
+    }
+    WALProtos.WALKey walKey;
+    try {
+      if (available > 0) {
+        walKey = WALProtos.WALKey.parseFrom(ByteStreams.limit(delegatingInput, size));
+      } else {
+        byte[] content = new byte[size];
+        ByteStreams.readFully(delegatingInput, content);
+        walKey = WALProtos.WALKey.parseFrom(content);
+      }
+    } catch (IOException e) {
+      e = unwrapIPBE(e);
+      if (
+        e instanceof EOFException || (e instanceof InvalidProtocolBufferException
+          && ProtobufUtil.isEOF((InvalidProtocolBufferException) e))
+      ) {
+        LOG.info("EOF while reading WALKey, originalPosition={}, currentPosition={}, error={}",
+          originalPosition, getPositionQuietly(), e.toString());
+        return KEY_EOF_AND_RESET;
+      } else {
+        boolean isWALTrailer;
+        try {
+          isWALTrailer = isWALTrailer(originalPosition);
+        } catch (IOException ioe) {
+          LOG.warn("Error while testing whether this is a partial WAL trailer, originalPosition={},"
+            + " currentPosition={}", originalPosition, getPositionQuietly(), e);
+          return KEY_ERROR_AND_RESET;
+        }
+        if (isWALTrailer) {
+          LOG.info("Reached partial WAL Trailer(EOF) while reading WALKey, originalPosition={},"
+            + " currentPosition={}", originalPosition, getPositionQuietly(), e);
+          return KEY_EOF_AND_RESET;
+        } else {
+          // for all other type of IPBEs or IOEs, it means the WAL key is broken
+          LOG.warn("Error while reading WALKey, originalPosition={}, currentPosition={}",
+            originalPosition, getPositionQuietly(), e);
+          return KEY_ERROR_AND_RESET;
+        }
+      }
+    }
+    Entry entry = new Entry();
+    try {
+      entry.getKey().readFieldsFromPb(walKey, byteStringUncompressor);
+    } catch (IOException e) {
+      LOG.warn("Failed to read wal key fields from pb message", e);
+      return KEY_ERROR_AND_RESET;
+    }
+    return new ReadWALKeyResult(State.NORMAL, entry,
+      walKey.hasFollowingKvCount() ? walKey.getFollowingKvCount() : 0);
+  }
+
+  private Result editEof() {
+    return hasCompression
+      ? State.EOF_AND_RESET_COMPRESSION.getResult()
+      : State.EOF_AND_RESET.getResult();
+  }
+
+  private Result editError() {
+    return hasCompression
+      ? State.ERROR_AND_RESET_COMPRESSION.getResult()
+      : State.ERROR_AND_RESET.getResult();
+  }
+
+  private Result readWALEdit(Entry entry, int followingKvCount) {
+    long posBefore;
+    try {
+      posBefore = inputStream.getPos();
+    } catch (IOException e) {
+      LOG.warn("failed to get position", e);
+      return State.ERROR_AND_RESET.getResult();
+    }
+    if (followingKvCount == 0) {
+      LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset={}",
+        posBefore);
+      return new Result(State.NORMAL, entry, posBefore);
+    }
+    int actualCells;
+    try {
+      actualCells = entry.getEdit().readFromCells(cellDecoder, followingKvCount);
+    } catch (Exception e) {
+      String message = " while reading " + followingKvCount + " WAL KVs; started reading at "
+        + posBefore + " and read up to " + getPositionQuietly();
+      IOException realEofEx = extractHiddenEof(e);
+      if (realEofEx != null) {
+        LOG.warn("EOF " + message, realEofEx);
+        return editEof();
+      } else {
+        LOG.warn("Error " + message, e);
+        return editError();
+      }
+    }
+    if (actualCells != followingKvCount) {
+      LOG.warn("Only read {} cells, expected {}; started reading at {} and read up to {}",
+        actualCells, followingKvCount, posBefore, getPositionQuietly());
+      return editEof();
+    }
+    long posAfter;
+    try {
+      posAfter = inputStream.getPos();
+    } catch (IOException e) {
+      LOG.warn("failed to get position", e);
+      return editError();
+    }
+    if (trailerPresent && posAfter > this.walEditsStopOffset) {
+      LOG.error("Read WALTrailer while reading WALEdits. wal: {}, inputStream.getPos(): {},"
+        + " walEditsStopOffset: {}", path, posAfter, walEditsStopOffset);
+      return editEof();
+    }
+    return new Result(State.NORMAL, entry, posAfter);
+  }
+
+  @Override
+  public Result next(long limit) {
+    long originalPosition;
+    try {
+      originalPosition = inputStream.getPos();
+    } catch (IOException e) {
+      LOG.warn("failed to get position", e);
+      return State.EOF_AND_RESET.getResult();
+    }
+    if (reachWALEditsStopOffset(originalPosition)) {
+      return State.EOF_WITH_TRAILER.getResult();
+    }
+    if (limit < 0) {
+      // should be closed WAL file, set to no limit, i.e, just use the original inputStream
+      delegatingInput.setDelegate(inputStream);
+    } else if (limit <= originalPosition) {
+      // no data available, just return EOF
+      return State.EOF_AND_RESET.getResult();
+    } else {
+      // calculate the remaining bytes we can read and set
+      delegatingInput.setDelegate(ByteStreams.limit(inputStream, limit - originalPosition));
+    }
+    ReadWALKeyResult readKeyResult = readWALKey(originalPosition);
+    if (readKeyResult.state != State.NORMAL) {
+      return readKeyResult.state.getResult();
+    }
+    return readWALEdit(readKeyResult.entry, readKeyResult.followingKvCount);
+  }
+
+  private void skipHeader(FSDataInputStream stream) throws IOException {
+    stream.seek(PB_WAL_MAGIC.length);
+    int headerLength = StreamUtils.readRawVarint32(stream);
+    stream.seek(stream.getPos() + headerLength);
+  }
+
+  @Override
+  public void resetTo(long position, boolean resetCompression) throws IOException {
+    close();
+    Pair<FSDataInputStream, FileStatus> pair = open();
+    boolean resetSucceed = false;
+    try {
+      if (!trailerPresent) {
+        // try read trailer this time
+        readTrailer(pair.getFirst(), pair.getSecond());
+      }
+      inputStream = pair.getFirst();
+      delegatingInput.setDelegate(inputStream);
+      if (position < 0) {
+        // read from the beginning
+        if (compressionCtx != null) {
+          compressionCtx.clear();
+        }
+        skipHeader(inputStream);
+      } else if (resetCompression && compressionCtx != null) {
+        // clear compressCtx and skip to the expected position, to fill up the dictionary
+        compressionCtx.clear();
+        skipHeader(inputStream);
+        if (position != inputStream.getPos()) {
+          skipTo(position);
+        }
+      } else {
+        // just seek to the expected position
+        inputStream.seek(position);
+      }
+      resetSucceed = true;
+    } finally {
+      if (!resetSucceed) {
+        // close the input stream to avoid resource leak
+        close();
+      }
+    }
+  }
+
+  @Override
+  protected InputStream getCellCodecInputStream(FSDataInputStream stream) {
+    delegatingInput = new DelegatingInputStream(stream);
+    return delegatingInput;
+  }
+
+  @Override
+  protected void skipTo(long position) throws IOException {
+    for (;;) {
+      Result result = next(-1);
+      if (result.getState() != State.NORMAL) {
+        throw new IOException("Can not skip to the given position " + position + ", stopped at "
+          + result.getEntryEndPos() + " which is still before the give position");
+      }
+      if (result.getEntryEndPos() == position) {
+        return;
+      }
+      if (result.getEntryEndPos() > position) {
+        throw new IOException("Can not skip to the given position " + position + ", stopped at "
+          + result.getEntryEndPos() + " which is already beyond the give position, malformed WAL?");
+      }
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
deleted file mode 100644
index 5caceeac09b..00000000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver.wal;
-
-import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.util.LRUDictionary;
-import org.apache.hadoop.hbase.util.CommonFSUtils;
-import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX })
-public abstract class ReaderBase implements AbstractFSWALProvider.Reader {
-  private static final Logger LOG = LoggerFactory.getLogger(ReaderBase.class);
-  protected Configuration conf;
-  protected FileSystem fs;
-  protected Path path;
-  protected long edit = 0;
-  protected long fileLength;
-  /**
-   * Compression context to use reading. Can be null if no compression.
-   */
-  protected CompressionContext compressionContext = null;
-  private boolean emptyCompressionContext = true;
-
-  /**
-   * Default constructor.
-   */
-  public ReaderBase() {
-  }
-
-  @Override
-  public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream)
-    throws IOException {
-    this.conf = conf;
-    this.path = path;
-    this.fs = fs;
-    this.fileLength = this.fs.getFileStatus(path).getLen();
-    String cellCodecClsName = initReader(stream);
-
-    boolean compression = hasCompression();
-    if (compression) {
-      // If compression is enabled, new dictionaries are created here.
-      try {
-        if (compressionContext == null) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(
-              "Initializing compression context for {}: isRecoveredEdits={}"
-                + ", hasTagCompression={}, hasValueCompression={}, valueCompressionType={}",
-              path, CommonFSUtils.isRecoveredEdits(path), hasTagCompression(),
-              hasValueCompression(), getValueCompressionAlgorithm());
-          }
-          compressionContext =
-            new CompressionContext(LRUDictionary.class, CommonFSUtils.isRecoveredEdits(path),
-              hasTagCompression(), hasValueCompression(), getValueCompressionAlgorithm());
-        } else {
-          compressionContext.clear();
-        }
-      } catch (Exception e) {
-        throw new IOException("Failed to initialize CompressionContext", e);
-      }
-    }
-    initAfterCompression(cellCodecClsName);
-  }
-
-  @Override
-  public Entry next() throws IOException {
-    return next(null);
-  }
-
-  @Override
-  public Entry next(Entry reuse) throws IOException {
-    Entry e = reuse;
-    if (e == null) {
-      e = new Entry();
-    }
-
-    boolean hasEntry = false;
-    try {
-      hasEntry = readNext(e);
-    } catch (IllegalArgumentException iae) {
-      TableName tableName = e.getKey().getTableName();
-      if (tableName != null && tableName.equals(TableName.OLD_ROOT_TABLE_NAME)) {
-        // It is old ROOT table edit, ignore it
-        LOG.info("Got an old ROOT edit, ignoring ");
-        return next(e);
-      } else throw iae;
-    }
-    edit++;
-    if (compressionContext != null && emptyCompressionContext) {
-      emptyCompressionContext = false;
-    }
-    return hasEntry ? e : null;
-  }
-
-  @Override
-  public void seek(long pos) throws IOException {
-    if (compressionContext != null && emptyCompressionContext) {
-      while (next() != null) {
-        if (getPosition() == pos) {
-          emptyCompressionContext = false;
-          break;
-        }
-      }
-    }
-    seekOnFs(pos);
-  }
-
-  /**
-   * Clear the {@link ReaderBase#compressionContext}, and also set {@link #emptyCompressionContext}
-   * to true, so when seeking, we will try to skip to the position and reconstruct the dictionary.
-   */
-  protected final void resetCompression() {
-    if (compressionContext != null) {
-      compressionContext.clear();
-      emptyCompressionContext = true;
-    }
-  }
-
-  /**
-   * Initializes the log reader with a particular stream (may be null). Reader assumes ownership of
-   * the stream if not null and may use it. Called once.
-   * @return the class name of cell Codec, null if such information is not available
-   */
-  protected abstract String initReader(FSDataInputStream stream) throws IOException;
-
-  /**
-   * Initializes the compression after the shared stuff has been initialized. Called once.
-   */
-  protected abstract void initAfterCompression() throws IOException;
-
-  /**
-   * Initializes the compression after the shared stuff has been initialized. Called once.
-   * @param cellCodecClsName class name of cell Codec
-   */
-  protected abstract void initAfterCompression(String cellCodecClsName) throws IOException;
-
-  /** Returns Whether compression is enabled for this log. */
-  protected abstract boolean hasCompression();
-
-  /** Returns Whether tag compression is enabled for this log. */
-  protected abstract boolean hasTagCompression();
-
-  /** Returns Whether value compression is enabled for this log. */
-  protected abstract boolean hasValueCompression();
-
-  /** Returns Value compression algorithm for this log. */
-  protected abstract Compression.Algorithm getValueCompressionAlgorithm();
-
-  /**
-   * Read next entry.
-   * @param e The entry to read into.
-   * @return Whether there was anything to read.
-   */
-  protected abstract boolean readNext(Entry e) throws IOException;
-
-  /**
-   * Performs a filesystem-level seek to a certain position in an underlying file.
-   */
-  protected abstract void seekOnFs(long pos) throws IOException;
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java
deleted file mode 100644
index 863739c72f2..00000000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver.wal;
-
-import java.io.IOException;
-import java.security.Key;
-import java.security.KeyException;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.io.crypto.Cipher;
-import org.apache.hadoop.hbase.io.crypto.Decryptor;
-import org.apache.hadoop.hbase.io.crypto.Encryption;
-import org.apache.hadoop.hbase.security.EncryptionUtil;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.EncryptionTest;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
-
-@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class SecureProtobufLogReader extends ProtobufLogReader {
-
-  private static final Logger LOG = LoggerFactory.getLogger(SecureProtobufLogReader.class);
-
-  private Decryptor decryptor = null;
-  private static List<String> writerClsNames = new ArrayList<>();
-  static {
-    writerClsNames.add(ProtobufLogWriter.class.getSimpleName());
-    writerClsNames.add(SecureProtobufLogWriter.class.getSimpleName());
-    writerClsNames.add(AsyncProtobufLogWriter.class.getSimpleName());
-    writerClsNames.add(SecureAsyncProtobufLogWriter.class.getSimpleName());
-  }
-
-  @Override
-  public List<String> getWriterClsNames() {
-    return writerClsNames;
-  }
-
-  @Override
-  protected WALHdrContext readHeader(WALHeader.Builder builder, FSDataInputStream stream)
-    throws IOException {
-    WALHdrContext hdrCtxt = super.readHeader(builder, stream);
-    WALHdrResult result = hdrCtxt.getResult();
-    // We need to unconditionally handle the case where the WAL has a key in
-    // the header, meaning it is encrypted, even if ENABLE_WAL_ENCRYPTION is
-    // no longer set in the site configuration.
-    if (result == WALHdrResult.SUCCESS && builder.hasEncryptionKey()) {
-      // Serialized header data has been merged into the builder from the
-      // stream.
-
-      EncryptionTest.testKeyProvider(conf);
-      EncryptionTest.testCipherProvider(conf);
-
-      // Retrieve a usable key
-
-      byte[] keyBytes = builder.getEncryptionKey().toByteArray();
-      Key key = null;
-      String walKeyName = conf.get(HConstants.CRYPTO_WAL_KEY_NAME_CONF_KEY);
-      // First try the WAL key, if one is configured
-      if (walKeyName != null) {
-        try {
-          key = EncryptionUtil.unwrapWALKey(conf, walKeyName, keyBytes);
-        } catch (KeyException e) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Unable to unwrap key with WAL key '" + walKeyName + "'");
-          }
-          key = null;
-        }
-      }
-      if (key == null) {
-        String masterKeyName =
-          conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User.getCurrent().getShortName());
-        try {
-          // Then, try the cluster master key
-          key = EncryptionUtil.unwrapWALKey(conf, masterKeyName, keyBytes);
-        } catch (KeyException e) {
-          // If the current master key fails to unwrap, try the alternate, if
-          // one is configured
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Unable to unwrap key with current master key '" + masterKeyName + "'");
-          }
-          String alternateKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_ALTERNATE_NAME_CONF_KEY);
-          if (alternateKeyName != null) {
-            try {
-              key = EncryptionUtil.unwrapWALKey(conf, alternateKeyName, keyBytes);
-            } catch (KeyException ex) {
-              throw new IOException(ex);
-            }
-          } else {
-            throw new IOException(e);
-          }
-        }
-      }
-
-      // Use the algorithm the key wants
-
-      Cipher cipher = Encryption.getCipher(conf, key.getAlgorithm());
-      if (cipher == null) {
-        throw new IOException("Cipher '" + key.getAlgorithm() + "' is not available");
-      }
-
-      // Set up the decryptor for this WAL
-
-      decryptor = cipher.getDecryptor();
-      decryptor.setKey(key);
-
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Initialized secure protobuf WAL: cipher=" + cipher.getName());
-      }
-    }
-
-    return hdrCtxt;
-  }
-
-  @Override
-  protected void initAfterCompression(String cellCodecClsName) throws IOException {
-    if (decryptor != null && cellCodecClsName.equals(SecureWALCellCodec.class.getName())) {
-      WALCellCodec codec = SecureWALCellCodec.getCodec(this.conf, decryptor);
-      this.cellDecoder = codec.getDecoder(this.inputStream);
-      // We do not support compression with WAL encryption
-      this.compressionContext = null;
-      this.byteStringUncompressor = WALCellCodec.getNoneUncompressor();
-      this.hasCompression = false;
-    } else {
-      super.initAfterCompression(cellCodecClsName);
-    }
-  }
-
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALHeaderEOFException.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALHeaderEOFException.java
new file mode 100644
index 00000000000..d88e89ab7b3
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALHeaderEOFException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.EOFException;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * A special EOFException to indicate that the EOF happens when we read the header of a WAL file.
+ * <p/>
+ * This usually means the WAL file just contains nothing and we are safe to skip over it.
+ */
+@InterfaceAudience.Private
+public class WALHeaderEOFException extends EOFException {
+
+  private static final long serialVersionUID = -4544368452826740759L;
+
+  public WALHeaderEOFException() {
+  }
+
+  public WALHeaderEOFException(String s) {
+    super(s);
+  }
+
+  public WALHeaderEOFException(String s, Throwable cause) {
+    super(s);
+    initCause(cause);
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index d6351ea0eab..4e1d76a9764 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -17,13 +17,11 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
-import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.conf.Configuration;
@@ -35,7 +33,6 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALKey;
@@ -75,7 +72,6 @@ class ReplicationSourceWALReader extends Thread {
   private long currentPosition;
   private final long sleepForRetries;
   private final int maxRetriesMultiplier;
-  private final boolean eofAutoRecovery;
 
   // Indicates whether this particular worker is running
   private boolean isReaderRunning = true;
@@ -115,7 +111,6 @@ class ReplicationSourceWALReader extends Thread {
     this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
     // 5 minutes @ 1 sec per
     this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
-    this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false);
     this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
     this.walGroupId = walGroupId;
     LOG.info("peerClusterZnode=" + source.getQueueId() + ", ReplicationSourceWALReaderThread : "
@@ -124,14 +119,30 @@ class ReplicationSourceWALReader extends Thread {
       + ", replicationBatchQueueCapacity=" + batchCount);
   }
 
+  private void replicationDone() throws InterruptedException {
+    // we're done with current queue, either this is a recovered queue, or it is the special
+    // group for a sync replication peer and the peer has been transited to DA or S state.
+    LOG.debug("Stopping the replication source wal reader");
+    setReaderRunning(false);
+    // shuts down shipper thread immediately
+    entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA);
+  }
+
+  protected final int sleep(int sleepMultiplier) {
+    if (sleepMultiplier < maxRetriesMultiplier) {
+      sleepMultiplier++;
+    }
+    Threads.sleep(sleepForRetries * sleepMultiplier);
+    return sleepMultiplier;
+  }
+
   @Override
   public void run() {
     int sleepMultiplier = 1;
     while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
       WALEntryBatch batch = null;
-      try (WALEntryStream entryStream =
-        new WALEntryStream(logQueue, conf, currentPosition, source.getWALFileLengthProvider(),
-          source.getServerWALsBelongTo(), source.getSourceMetrics(), walGroupId)) {
+      try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, conf, currentPosition,
+        source.getWALFileLengthProvider(), source.getSourceMetrics(), walGroupId)) {
         while (isReaderRunning()) { // loop here to keep reusing stream while we can
           batch = null;
           if (!source.isPeerEnabled()) {
@@ -141,34 +152,47 @@ class ReplicationSourceWALReader extends Thread {
           if (!checkQuota()) {
             continue;
           }
-          batch = tryAdvanceStreamAndCreateWALBatch(entryStream);
-          if (batch == null) {
-            // got no entries and didn't advance position in WAL
-            handleEmptyWALEntryBatch();
-            entryStream.reset(); // reuse stream
+          Path currentPath = entryStream.getCurrentPath();
+          WALEntryStream.HasNext hasNext = entryStream.hasNext();
+          if (hasNext == WALEntryStream.HasNext.NO) {
+            replicationDone();
+            return;
+          }
+          // first, check if we have switched a file, if so, we need to manually add an EOF entry
+          // batch to the queue
+          if (currentPath != null && switched(entryStream, currentPath)) {
+            entryBatchQueue.put(WALEntryBatch.endOfFile(currentPath));
+            continue;
+          }
+          if (hasNext == WALEntryStream.HasNext.RETRY) {
+            // sleep and retry
+            sleepMultiplier = sleep(sleepMultiplier);
             continue;
           }
-          // if we have already switched a file, skip reading and put it directly to the ship queue
-          if (!batch.isEndOfFile()) {
-            readWALEntries(entryStream, batch);
-            currentPosition = entryStream.getPosition();
+          if (hasNext == WALEntryStream.HasNext.RETRY_IMMEDIATELY) {
+            // retry immediately, this usually means we have switched a file
+            continue;
           }
+          // below are all for hasNext == YES
+          batch = createBatch(entryStream);
+          readWALEntries(entryStream, batch);
+          currentPosition = entryStream.getPosition();
           // need to propagate the batch even it has no entries since it may carry the last
           // sequence id information for serial replication.
           LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
           entryBatchQueue.put(batch);
           sleepMultiplier = 1;
         }
-      } catch (WALEntryFilterRetryableException | IOException e) { // stream related
-        if (!handleEofException(e, batch)) {
-          LOG.warn("Failed to read stream of replication entries", e);
-          if (sleepMultiplier < maxRetriesMultiplier) {
-            sleepMultiplier++;
-          }
-          Threads.sleep(sleepForRetries * sleepMultiplier);
-        }
+      } catch (WALEntryFilterRetryableException e) {
+        // here we have to recreate the WALEntryStream, as when filtering, we have already called
+        // next to get the WAL entry and advanced the WALEntryStream, at WALEntryStream layer, it
+        // just considers everything is fine,that's why the catch block is not in the inner block
+        LOG.warn("Failed to filter WAL entries and the filter let us retry later", e);
+        sleepMultiplier = sleep(sleepMultiplier);
       } catch (InterruptedException e) {
-        LOG.trace("Interrupted while sleeping between WAL reads or adding WAL batch to ship queue");
+        // this usually means we want to quit
+        LOG.warn("Interrupted while sleeping between WAL reads or adding WAL batch to ship queue",
+          e);
         Thread.currentThread().interrupt();
       }
     }
@@ -204,7 +228,7 @@ class ReplicationSourceWALReader extends Thread {
   // This is required in case there is any exception in while reading entries
   // we do not want to loss the existing entries in the batch
   protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch)
-    throws IOException, InterruptedException {
+    throws InterruptedException {
     Path currentPath = entryStream.getCurrentPath();
     for (;;) {
       Entry entry = entryStream.next();
@@ -215,111 +239,22 @@ class ReplicationSourceWALReader extends Thread {
           break;
         }
       }
-      boolean hasNext = entryStream.hasNext();
+      WALEntryStream.HasNext hasNext = entryStream.hasNext();
       // always return if we have switched to a new file
       if (switched(entryStream, currentPath)) {
         batch.setEndOfFile(true);
         break;
       }
-      if (!hasNext) {
+      if (hasNext != WALEntryStream.HasNext.YES) {
+        // For hasNext other than YES, it is OK to just retry.
+        // As for RETRY and RETRY_IMMEDIATELY, the correct action is to retry, and for NO, it will
+        // return NO again when you call the method next time, so it is OK to just return here and
+        // let the loop in the upper layer to call hasNext again.
         break;
       }
     }
   }
 
-  private void handleEmptyWALEntryBatch() throws InterruptedException {
-    LOG.trace("Didn't read any new entries from WAL");
-    if (logQueue.getQueue(walGroupId).isEmpty()) {
-      // we're done with current queue, either this is a recovered queue, or it is the special group
-      // for a sync replication peer and the peer has been transited to DA or S state.
-      LOG.debug("Stopping the replication source wal reader");
-      setReaderRunning(false);
-      // shuts down shipper thread immediately
-      entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA);
-    } else {
-      Thread.sleep(sleepForRetries);
-    }
-  }
-
-  private WALEntryBatch tryAdvanceStreamAndCreateWALBatch(WALEntryStream entryStream)
-    throws IOException {
-    Path currentPath = entryStream.getCurrentPath();
-    if (!entryStream.hasNext()) {
-      // check whether we have switched a file
-      if (currentPath != null && switched(entryStream, currentPath)) {
-        return WALEntryBatch.endOfFile(currentPath);
-      } else {
-        return null;
-      }
-    }
-    if (currentPath != null) {
-      if (switched(entryStream, currentPath)) {
-        return WALEntryBatch.endOfFile(currentPath);
-      }
-    }
-    return createBatch(entryStream);
-  }
-
-  /**
-   * This is to handle the EOFException from the WAL entry stream. EOFException should be handled
-   * carefully because there are chances of data loss because of never replicating the data. Thus we
-   * should always try to ship existing batch of entries here. If there was only one log in the
-   * queue before EOF, we ship the empty batch here and since reader is still active, in the next
-   * iteration of reader we will stop the reader.
-   * <p/>
-   * If there was more than one log in the queue before EOF, we ship the existing batch and reset
-   * the wal patch and position to the log with EOF, so shipper can remove logs from replication
-   * queue
-   * @return true only the IOE can be handled
-   */
-  private boolean handleEofException(Exception e, WALEntryBatch batch) {
-    PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
-    // Dump the log even if logQueue size is 1 if the source is from recovered Source
-    // since we don't add current log to recovered source queue so it is safe to remove.
-    if (
-      (e instanceof EOFException || e.getCause() instanceof EOFException)
-        && (source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery
-    ) {
-      Path path = queue.peek();
-      try {
-        if (!fs.exists(path)) {
-          // There is a chance that wal has moved to oldWALs directory, so look there also.
-          path = AbstractFSWALProvider.findArchivedLog(path, conf);
-          // path can be null if unable to locate in archiveDir.
-        }
-        if (path != null && fs.getFileStatus(path).getLen() == 0) {
-          LOG.warn("Forcing removal of 0 length log in queue: {}", path);
-          logQueue.remove(walGroupId);
-          currentPosition = 0;
-          if (batch != null) {
-            // After we removed the WAL from the queue, we should try shipping the existing batch of
-            // entries
-            addBatchToShippingQueue(batch);
-          }
-          return true;
-        }
-      } catch (IOException ioe) {
-        LOG.warn("Couldn't get file length information about log " + path, ioe);
-      } catch (InterruptedException ie) {
-        LOG.trace("Interrupted while adding WAL batch to ship queue");
-        Thread.currentThread().interrupt();
-      }
-    }
-    return false;
-  }
-
-  /**
-   * Update the batch try to ship and return true if shipped
-   * @param batch Batch of entries to ship
-   * @throws InterruptedException throws interrupted exception
-   */
-  private void addBatchToShippingQueue(WALEntryBatch batch) throws InterruptedException {
-    // need to propagate the batch even it has no entries since it may carry the last
-    // sequence id information for serial replication.
-    LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
-    entryBatchQueue.put(batch);
-  }
-
   public Path getCurrentPath() {
     // if we've read some WAL entries, get the Path we read from
     WALEntryBatch batchQueueHead = entryBatchQueue.peek();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
index 1a8bbf74a2c..41d95df2821 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * WAL reader for a serial replication peer.
@@ -33,6 +35,8 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.Private
 public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader {
 
+  private static final Logger LOG = LoggerFactory.getLogger(SerialReplicationSourceWALReader.class);
+
   // used to store the first cell in an entry before filtering. This is because that if serial
   // replication is enabled, we may find out that an entry can not be pushed after filtering. And
   // when we try the next time, the cells maybe null since the entry has already been filtered,
@@ -51,7 +55,7 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader
 
   @Override
   protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch)
-    throws IOException, InterruptedException {
+    throws InterruptedException {
     Path currentPath = entryStream.getCurrentPath();
     long positionBefore = entryStream.getPosition();
     for (;;) {
@@ -75,13 +79,23 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader
         entry = filterEntry(entry);
       }
       if (entry != null) {
-        if (!checker.canPush(entry, firstCellInEntryBeforeFiltering)) {
+        int sleepMultiplier = 1;
+        try {
+          if (!checker.canPush(entry, firstCellInEntryBeforeFiltering)) {
+            if (batch.getLastWalPosition() > positionBefore) {
+              // we have something that can push, break
+              break;
+            } else {
+              checker.waitUntilCanPush(entry, firstCellInEntryBeforeFiltering);
+            }
+          }
+        } catch (IOException e) {
+          LOG.warn("failed to check whether we can push the WAL entries", e);
           if (batch.getLastWalPosition() > positionBefore) {
             // we have something that can push, break
             break;
-          } else {
-            checker.waitUntilCanPush(entry, firstCellInEntryBeforeFiltering);
           }
+          sleepMultiplier = sleep(sleepMultiplier);
         }
         // arrive here means we can push the entry, record the last sequence id
         batch.setLastSeqId(Bytes.toString(entry.getKey().getEncodedRegionName()),
@@ -95,20 +109,23 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader
         // actually remove the entry.
         removeEntryFromStream(entryStream, batch);
       }
-      boolean hasNext = entryStream.hasNext();
+      WALEntryStream.HasNext hasNext = entryStream.hasNext();
       // always return if we have switched to a new file.
       if (switched(entryStream, currentPath)) {
         batch.setEndOfFile(true);
         break;
       }
-      if (!hasNext) {
+      if (hasNext != WALEntryStream.HasNext.YES) {
+        // For hasNext other than YES, it is OK to just retry.
+        // As for RETRY and RETRY_IMMEDIATELY, the correct action is to retry, and for NO, it will
+        // return NO again when you call the method next time, so it is OK to just return here and
+        // let the loop in the upper layer to call hasNext again.
         break;
       }
     }
   }
 
-  private void removeEntryFromStream(WALEntryStream entryStream, WALEntryBatch batch)
-    throws IOException {
+  private void removeEntryFromStream(WALEntryStream entryStream, WALEntryBatch batch) {
     entryStream.next();
     firstCellInEntryBeforeFiltering = null;
     batch.setLastWalPosition(entryStream.getPosition());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index aa059aa30a2..d95d42f2f30 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -26,17 +26,15 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
-import org.apache.hadoop.hbase.util.CancelableProgressable;
-import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader;
+import org.apache.hadoop.hbase.regionserver.wal.WALHeaderEOFException;
 import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
-import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WAL.Reader;
 import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
+import org.apache.hadoop.hbase.wal.WALTailingReader;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;
@@ -52,7 +50,8 @@ import org.slf4j.LoggerFactory;
 class WALEntryStream implements Closeable {
   private static final Logger LOG = LoggerFactory.getLogger(WALEntryStream.class);
 
-  private Reader reader;
+  private WALTailingReader reader;
+  private WALTailingReader.State state;
   private Path currentPath;
   // cache of next entry for hasNext()
   private Entry currentEntry;
@@ -67,56 +66,108 @@ class WALEntryStream implements Closeable {
   private final FileSystem fs;
   private final Configuration conf;
   private final WALFileLengthProvider walFileLengthProvider;
-  // which region server the WALs belong to
-  private final ServerName serverName;
   private final MetricsSource metrics;
 
+  // we should be able to skip empty WAL files, but for safety, we still provide this config
+  // see HBASE-18137 for more details
+  private boolean eofAutoRecovery;
+
   /**
    * Create an entry stream over the given queue at the given start position
    * @param logQueue              the queue of WAL paths
-   * @param conf                  the {@link Configuration} to use to create {@link Reader} for this
-   *                              stream
+   * @param conf                  the {@link Configuration} to use to create {@link WALStreamReader}
+   *                              for this stream
    * @param startPosition         the position in the first WAL to start reading at
    * @param walFileLengthProvider provides the length of the WAL file
    * @param serverName            the server name which all WALs belong to
    * @param metrics               the replication metrics
-   * @throws IOException throw IO exception from stream
    */
-  public WALEntryStream(ReplicationSourceLogQueue logQueue, Configuration conf, long startPosition,
-    WALFileLengthProvider walFileLengthProvider, ServerName serverName, MetricsSource metrics,
-    String walGroupId) throws IOException {
+  public WALEntryStream(ReplicationSourceLogQueue logQueue, FileSystem fs, Configuration conf,
+    long startPosition, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics,
+    String walGroupId) {
     this.logQueue = logQueue;
-    this.fs = CommonFSUtils.getWALFileSystem(conf);
+    this.fs = fs;
     this.conf = conf;
     this.currentPositionOfEntry = startPosition;
     this.walFileLengthProvider = walFileLengthProvider;
-    this.serverName = serverName;
     this.metrics = metrics;
     this.walGroupId = walGroupId;
+    this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false);
+  }
+
+  public enum HasNext {
+    /** means there is a new entry and you could use peek or next to get current entry */
+    YES,
+    /**
+     * means there are something wrong or we have reached EOF of the current file but it is not
+     * closed yet and there is no new file in the replication queue yet, you should sleep a while
+     * and try to call hasNext again
+     */
+    RETRY,
+    /**
+     * Usually this means we have finished reading a WAL file, and for simplify the implementation
+     * of this class, we just let the upper layer issue a new hasNext call again to open the next
+     * WAL file.
+     */
+    RETRY_IMMEDIATELY,
+    /**
+     * means there is no new entry and stream is end, the upper layer should close this stream and
+     * release other resources as well
+     */
+    NO
   }
 
-  /** Returns true if there is another WAL {@link Entry} */
-  public boolean hasNext() throws IOException {
+  /**
+   * Try advance the stream if there is no entry yet. See the javadoc for {@link HasNext} for more
+   * details about the meanings of the return values.
+   * <p/>
+   * You can call {@link #peek()} or {@link #next()} to get the actual {@link Entry} if this method
+   * returns {@link HasNext#YES}.
+   */
+  public HasNext hasNext() {
     if (currentEntry == null) {
-      tryAdvanceEntry();
+      return tryAdvanceEntry();
+    } else {
+      return HasNext.YES;
     }
-    return currentEntry != null;
   }
 
   /**
    * Returns the next WAL entry in this stream but does not advance.
+   * <p/>
+   * Must call {@link #hasNext()} first before calling this method, and if you have already called
+   * {@link #next()} to consume the current entry, you need to call {@link #hasNext()} again to
+   * advance the stream before calling this method again, otherwise it will always return
+   * {@code null}
+   * <p/>
+   * The reason here is that, we need to use the return value of {@link #hasNext()} to tell upper
+   * layer to retry or not, so we can not wrap the {@link #hasNext()} call inside {@link #peek()} or
+   * {@link #next()} as they have their own return value.
+   * @see #hasNext()
+   * @see #next()
    */
-  public Entry peek() throws IOException {
-    return hasNext() ? currentEntry : null;
+  public Entry peek() {
+    return currentEntry;
   }
 
   /**
-   * Returns the next WAL entry in this stream and advance the stream.
+   * Returns the next WAL entry in this stream and advance the stream. Will throw
+   * {@link IllegalStateException} if you do not call {@link #hasNext()} before calling this method.
+   * Please see the javadoc of {@link #peek()} method to see why we need this.
+   * @throws IllegalStateException Every time you want to call this method, please call
+   *                               {@link #hasNext()} first, otherwise a
+   *                               {@link IllegalStateException} will be thrown.
+   * @see #hasNext()
+   * @see #peek()
    */
-  public Entry next() throws IOException {
+  public Entry next() {
+    if (currentEntry == null) {
+      throw new IllegalStateException("Call hasNext first");
+    }
     Entry save = peek();
     currentPositionOfEntry = currentPositionOfReader;
     currentEntry = null;
+    state = null;
     return save;
   }
 
@@ -124,7 +175,7 @@ class WALEntryStream implements Closeable {
    * {@inheritDoc}
    */
   @Override
-  public void close() throws IOException {
+  public void close() {
     closeReader();
   }
 
@@ -149,62 +200,171 @@ class WALEntryStream implements Closeable {
     return sb.toString();
   }
 
-  /**
-   * Should be called if the stream is to be reused (i.e. used again after hasNext() has returned
-   * false)
-   */
-  public void reset() throws IOException {
-    if (reader != null && currentPath != null) {
-      resetReader();
-    }
-  }
-
-  private void setPosition(long position) {
-    currentPositionOfEntry = position;
-  }
-
   private void setCurrentPath(Path path) {
     this.currentPath = path;
   }
 
-  private void tryAdvanceEntry() throws IOException {
-    if (checkReader()) {
-      boolean beingWritten = readNextEntryAndRecordReaderPosition();
-      LOG.trace("Reading WAL {}; currently open for write={}", this.currentPath, beingWritten);
-      if (currentEntry == null && !beingWritten) {
-        // no more entries in this log file, and the file is already closed, i.e, rolled
-        // Before dequeueing, we should always get one more attempt at reading.
-        // This is in case more entries came in after we opened the reader, and the log is rolled
-        // while we were reading. See HBASE-6758
-        resetReader();
-        readNextEntryAndRecordReaderPosition();
-        if (currentEntry == null) {
-          if (checkAllBytesParsed()) { // now we're certain we're done with this log file
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION",
+      justification = "HDFS-4380")
+  private HasNext tryAdvanceEntry() {
+    if (reader == null) {
+      // try open next WAL file
+      PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
+      Path nextPath = queue.peek();
+      if (nextPath != null) {
+        setCurrentPath(nextPath);
+        // we need to test this prior to create the reader. If not, it is possible that, while
+        // opening the file, the file is still being written so its header is incomplete and we get
+        // a header EOF, but then while we test whether it is still being written, we have already
+        // flushed the data out and we consider it is not being written, and then we just skip over
+        // file, then we will lose the data written after opening...
+        boolean beingWritten =
+          walFileLengthProvider.getLogFileSizeIfBeingWritten(nextPath).isPresent();
+        try {
+          reader = WALFactory.createTailingReader(fs, nextPath, conf,
+            currentPositionOfEntry > 0 ? currentPositionOfEntry : -1);
+        } catch (WALHeaderEOFException e) {
+          if (!eofAutoRecovery) {
+            // if we do not enable EOF auto recovery, just let the upper layer retry
+            // the replication will be stuck usually, and need to be fixed manually
+            return HasNext.RETRY;
+          }
+          // we hit EOF while reading the WAL header, usually this means we can just skip over this
+          // file, but we need to be careful that whether this file is still being written, if so we
+          // should retry instead of skipping.
+          LOG.warn("EOF while trying to open WAL reader for path: {}", nextPath, e);
+          if (beingWritten) {
+            // just retry as the file is still being written, maybe next time we could read
+            // something
+            return HasNext.RETRY;
+          } else {
+            // the file is not being written so we are safe to just skip over it
             dequeueCurrentLog();
-            if (openNextLog()) {
-              readNextEntryAndRecordReaderPosition();
+            return HasNext.RETRY_IMMEDIATELY;
+          }
+        } catch (LeaseNotRecoveredException e) {
+          // HBASE-15019 the WAL was not closed due to some hiccup.
+          LOG.warn("Try to recover the WAL lease " + nextPath, e);
+          AbstractFSWALProvider.recoverLease(conf, nextPath);
+          return HasNext.RETRY;
+        } catch (IOException | NullPointerException e) {
+          // For why we need to catch NPE here, see HDFS-4380 for more details
+          LOG.warn("Failed to open WAL reader for path: {}", nextPath, e);
+          return HasNext.RETRY;
+        }
+      } else {
+        // no more files in queue, this could happen for recovered queue, or for a wal group of a
+        // sync replication peer which has already been transited to DA or S.
+        setCurrentPath(null);
+        return HasNext.NO;
+      }
+    } else if (state != null && state != WALTailingReader.State.NORMAL) {
+      // reset before reading
+      try {
+        if (currentPositionOfEntry > 0) {
+          reader.resetTo(currentPositionOfEntry, state.resetCompression());
+        } else {
+          // we will read from the beginning so we should always clear the compression context
+          reader.resetTo(-1, true);
+        }
+      } catch (IOException e) {
+        LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
+          currentPositionOfEntry, state.resetCompression(), e);
+        // just leave the state as is, and try resetting next time
+        return HasNext.RETRY;
+      }
+    }
+
+    Pair<WALTailingReader.State, Boolean> pair = readNextEntryAndRecordReaderPosition();
+    state = pair.getFirst();
+    boolean beingWritten = pair.getSecond();
+    LOG.trace("Reading WAL {}; result={}, currently open for write={}", this.currentPath, state,
+      beingWritten);
+    switch (state) {
+      case NORMAL:
+        // everything is fine, just return
+        return HasNext.YES;
+      case EOF_WITH_TRAILER:
+        // we have reached the trailer, which means this WAL file has been closed cleanly and we
+        // have finished reading it successfully, just move to the next WAL file and let the upper
+        // layer start reading the next WAL file
+        dequeueCurrentLog();
+        return HasNext.RETRY_IMMEDIATELY;
+      case EOF_AND_RESET:
+      case EOF_AND_RESET_COMPRESSION:
+        if (!beingWritten) {
+          // no more entries in this log file, and the file is already closed, i.e, rolled
+          // Before dequeuing, we should always get one more attempt at reading.
+          // This is in case more entries came in after we opened the reader, and the log is rolled
+          // while we were reading. See HBASE-6758
+          try {
+            reader.resetTo(currentPositionOfEntry, state.resetCompression());
+          } catch (IOException e) {
+            LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
+              currentPositionOfEntry, state.resetCompression(), e);
+            // just leave the state as is, next time we will try to reset it again, but there is a
+            // nasty problem is that, we will still reach here finally and try reset again to see if
+            // the log has been fully replicated, which is redundant, can be optimized later
+            return HasNext.RETRY;
+          }
+          Pair<WALTailingReader.State, Boolean> p = readNextEntryAndRecordReaderPosition();
+          state = pair.getFirst();
+          // should not be written
+          assert !p.getSecond();
+          if (state.eof()) {
+            if (checkAllBytesParsed()) {
+              // move to the next wal file and read
+              dequeueCurrentLog();
+              return HasNext.RETRY_IMMEDIATELY;
+            } else {
+              // see HBASE-15983, if checkAllBytesParsed returns false, we need to try read from
+              // beginning again. Here we set position to 0 and state to ERROR_AND_RESET_COMPRESSION
+              // so when calling tryAdvanceENtry next time we will reset the reader to the beginning
+              // and read.
+              currentPositionOfEntry = 0;
+              currentPositionOfReader = 0;
+              state = WALTailingReader.State.ERROR_AND_RESET_COMPRESSION;
+              return HasNext.RETRY;
             }
           }
+        } else {
+          // just sleep a bit and retry to see if there are new entries coming since the file is
+          // still being written
+          return HasNext.RETRY;
         }
+      case ERROR_AND_RESET:
+      case ERROR_AND_RESET_COMPRESSION:
+        // we have meet an error, just sleep a bit and retry again
+        return HasNext.RETRY;
+      default:
+        throw new IllegalArgumentException("Unknown read next result: " + state);
+    }
+  }
+
+  private FileStatus getCurrentPathFileStatus() throws IOException {
+    try {
+      return fs.getFileStatus(currentPath);
+    } catch (FileNotFoundException e) {
+      // try archived path
+      Path archivedWAL = AbstractFSWALProvider.findArchivedLog(currentPath, conf);
+      if (archivedWAL != null) {
+        return fs.getFileStatus(archivedWAL);
+      } else {
+        throw e;
       }
-      // if currentEntry != null then just return
-      // if currentEntry == null but the file is still being written, then we should not switch to
-      // the next log either, just return here and try next time to see if there are more entries in
-      // the current file
     }
-    // do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue)
   }
 
   // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file
-  private boolean checkAllBytesParsed() throws IOException {
+  private boolean checkAllBytesParsed() {
     // -1 means the wal wasn't closed cleanly.
     final long trailerSize = currentTrailerSize();
     FileStatus stat = null;
     try {
-      stat = fs.getFileStatus(this.currentPath);
-    } catch (IOException exception) {
+      stat = getCurrentPathFileStatus();
+    } catch (IOException e) {
       LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}",
-        currentPath, trailerSize < 0 ? "was not" : "was", getCurrentPathStat());
+        currentPath, trailerSize < 0 ? "was not" : "was", getCurrentPathStat(), e);
       metrics.incrUnknownFileLengthForClosedWAL();
     }
     // Here we use currentPositionOfReader instead of currentPositionOfEntry.
@@ -228,182 +388,72 @@ class WALEntryStream implements Closeable {
           "Processing end of WAL {} at position {}, which is too far away from"
             + " reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}",
           currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat());
-        setPosition(0);
-        resetReader();
         metrics.incrRestartedWALReading();
         metrics.incrRepeatedFileBytes(currentPositionOfReader);
         return false;
       }
     }
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Reached the end of " + this.currentPath + " and length of the file is "
-        + (stat == null ? "N/A" : stat.getLen()));
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Reached the end of {} and length of the file is {}", currentPath,
+        stat == null ? "N/A" : stat.getLen());
     }
     metrics.incrCompletedWAL();
     return true;
   }
 
-  private void dequeueCurrentLog() throws IOException {
+  private void dequeueCurrentLog() {
     LOG.debug("EOF, closing {}", currentPath);
     closeReader();
     logQueue.remove(walGroupId);
     setCurrentPath(null);
-    setPosition(0);
+    currentPositionOfEntry = 0;
+    state = null;
   }
 
   /**
    * Returns whether the file is opened for writing.
    */
-  private boolean readNextEntryAndRecordReaderPosition() throws IOException {
-    Entry readEntry = reader.next();
-    long readerPos = reader.getPosition();
+  private Pair<WALTailingReader.State, Boolean> readNextEntryAndRecordReaderPosition() {
     OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath);
-    if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) {
-      // See HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted
-      // data, so we need to make sure that we do not read beyond the committed file length.
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("The provider tells us the valid length for " + currentPath + " is "
-          + fileLength.getAsLong() + ", but we have advanced to " + readerPos);
-      }
-      resetReader();
-      return true;
-    }
-    if (readEntry != null) {
+    WALTailingReader.Result readResult = reader.next(fileLength.orElse(-1));
+    long readerPos = readResult.getEntryEndPos();
+    Entry readEntry = readResult.getEntry();
+    if (readResult.getState() == WALTailingReader.State.NORMAL) {
       LOG.trace("reading entry: {} ", readEntry);
       metrics.incrLogEditsRead();
       metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry);
-    }
-    currentEntry = readEntry; // could be null
-    this.currentPositionOfReader = readerPos;
-    return fileLength.isPresent();
-  }
-
-  private void closeReader() throws IOException {
-    if (reader != null) {
-      reader.close();
-      reader = null;
-    }
-  }
-
-  // if we don't have a reader, open a reader on the next log
-  private boolean checkReader() throws IOException {
-    if (reader == null) {
-      return openNextLog();
-    }
-    return true;
-  }
-
-  // open a reader on the next log in queue
-  private boolean openNextLog() throws IOException {
-    PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
-    Path nextPath = queue.peek();
-    if (nextPath != null) {
-      openReader(nextPath);
-      if (reader != null) {
-        return true;
-      }
+      // record current entry and reader position
+      currentEntry = readResult.getEntry();
+      this.currentPositionOfReader = readerPos;
     } else {
-      // no more files in queue, this could only happen for recovered queue.
-      setCurrentPath(null);
-    }
-    return false;
-  }
-
-  private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException {
-    // If the log was archived, continue reading from there
-    Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf);
-    // archivedLog can be null if unable to locate in archiveDir.
-    if (archivedLog != null) {
-      openReader(archivedLog);
-    } else {
-      throw fnfe;
-    }
-  }
-
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION",
-      justification = "HDFS-4380")
-  private void openReader(Path path) throws IOException {
-    try {
-      // Detect if this is a new file, if so get a new reader else
-      // reset the current reader so that we see the new data
-      if (reader == null || !getCurrentPath().equals(path)) {
-        closeReader();
-        reader = WALFactory.createReader(fs, path, conf);
-        seek();
-        setCurrentPath(path);
-      } else {
-        resetReader();
-      }
-    } catch (FileNotFoundException fnfe) {
-      handleFileNotFound(path, fnfe);
-    } catch (RemoteException re) {
-      IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
-      if (!(ioe instanceof FileNotFoundException)) {
-        throw ioe;
-      }
-      handleFileNotFound(path, (FileNotFoundException) ioe);
-    } catch (LeaseNotRecoveredException lnre) {
-      // HBASE-15019 the WAL was not closed due to some hiccup.
-      LOG.warn("Try to recover the WAL lease " + path, lnre);
-      recoverLease(conf, path);
-      reader = null;
-    } catch (NullPointerException npe) {
-      // Workaround for race condition in HDFS-4380
-      // which throws a NPE if we open a file before any data node has the most recent block
-      // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
-      LOG.warn("Got NPE opening reader, will retry.");
-      reader = null;
-    }
-  }
-
-  // For HBASE-15019
-  private void recoverLease(final Configuration conf, final Path path) {
-    try {
-      final FileSystem dfs = CommonFSUtils.getWALFileSystem(conf);
-      RecoverLeaseFSUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
-        @Override
-        public boolean progress() {
-          LOG.debug("recover WAL lease: " + path);
-          return true;
-        }
-      });
-    } catch (IOException e) {
-      LOG.warn("unable to recover lease for WAL: " + path, e);
-    }
-  }
-
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION",
-      justification = "HDFS-4380")
-  private void resetReader() throws IOException {
-    try {
+      LOG.trace("reading entry failed with: {}", readResult.getState());
+      // set current entry to null
       currentEntry = null;
-      reader.reset();
-      seek();
-    } catch (FileNotFoundException fnfe) {
-      // If the log was archived, continue reading from there
-      Path archivedLog = AbstractFSWALProvider.findArchivedLog(currentPath, conf);
-      // archivedLog can be null if unable to locate in archiveDir.
-      if (archivedLog != null) {
-        openReader(archivedLog);
-      } else {
-        throw fnfe;
+      try {
+        this.currentPositionOfReader = reader.getPosition();
+      } catch (IOException e) {
+        LOG.warn("failed to get current position of reader", e);
+        if (readResult.getState().resetCompression()) {
+          return Pair.newPair(WALTailingReader.State.ERROR_AND_RESET_COMPRESSION,
+            fileLength.isPresent());
+        }
       }
-    } catch (NullPointerException npe) {
-      throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
     }
+    return Pair.newPair(readResult.getState(), fileLength.isPresent());
   }
 
-  private void seek() throws IOException {
-    if (currentPositionOfEntry != 0) {
-      reader.seek(currentPositionOfEntry);
+  private void closeReader() {
+    if (reader != null) {
+      reader.close();
+      reader = null;
     }
   }
 
   private long currentTrailerSize() {
     long size = -1L;
-    if (reader instanceof ProtobufLogReader) {
-      final ProtobufLogReader pblr = (ProtobufLogReader) reader;
-      size = pblr.trailerSize();
+    if (reader instanceof AbstractProtobufWALReader) {
+      final AbstractProtobufWALReader pbwr = (AbstractProtobufWALReader) reader;
+      size = pbwr.trailerSize();
     }
     return size;
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index 7e5e33098c2..7d5974bb2cb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hbase.wal;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -28,7 +27,6 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Pattern;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Abortable;
@@ -40,7 +38,6 @@ import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
-import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
 import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
@@ -69,15 +66,15 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
   public static final String SEPARATE_OLDLOGDIR = "hbase.separate.oldlogdir.by.regionserver";
   public static final boolean DEFAULT_SEPARATE_OLDLOGDIR = false;
 
-  // Only public so classes back in regionserver.wal can access
-  public interface Reader extends WAL.Reader {
+  public interface Initializer {
     /**
-     * @param fs   File system.
-     * @param path Path.
-     * @param c    Configuration.
-     * @param s    Input stream that may have been pre-opened by the caller; may be null.
+     * A method to initialize a WAL reader.
+     * @param startPosition the start position you want to read from, -1 means start reading from
+     *                      the first WAL entry. Notice that, the first entry is not started at
+     *                      position as we have several headers, so typically you should not pass 0
+     *                      here.
      */
-    void init(FileSystem fs, Path path, Configuration c, FSDataInputStream s) throws IOException;
+    void init(FileSystem fs, Path path, Configuration c, long startPosition) throws IOException;
   }
 
   protected volatile T wal;
@@ -399,7 +396,7 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
       serverName = null;
       LOG.warn("Cannot parse a server name from path=" + logFile + "; " + ex.getMessage());
     }
-    if (serverName != null && serverName.getStartcode() < 0) {
+    if (serverName != null && serverName.getStartCode() < 0) {
       LOG.warn("Invalid log file path=" + logFile);
       serverName = null;
     }
@@ -477,62 +474,8 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
     return null;
   }
 
-  /**
-   * Opens WAL reader with retries and additional exception handling
-   * @param path path to WAL file
-   * @param conf configuration
-   * @return WAL Reader instance
-   */
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION",
-      justification = "HDFS-4380")
-  public static WAL.Reader openReader(Path path, Configuration conf) throws IOException {
-    long retryInterval = 2000; // 2 sec
-    int maxAttempts = 30;
-    int attempt = 0;
-    Exception ee = null;
-    org.apache.hadoop.hbase.wal.WAL.Reader reader = null;
-    while (reader == null && attempt++ < maxAttempts) {
-      try {
-        // Detect if this is a new file, if so get a new reader else
-        // reset the current reader so that we see the new data
-        reader = WALFactory.createReader(path.getFileSystem(conf), path, conf);
-        return reader;
-      } catch (FileNotFoundException fnfe) {
-        // If the log was archived, continue reading from there
-        Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf);
-        // archivedLog can be null if unable to locate in archiveDir.
-        if (archivedLog != null) {
-          return openReader(archivedLog, conf);
-        } else {
-          throw fnfe;
-        }
-      } catch (LeaseNotRecoveredException lnre) {
-        // HBASE-15019 the WAL was not closed due to some hiccup.
-        LOG.warn("Try to recover the WAL lease " + path, lnre);
-        recoverLease(conf, path);
-        reader = null;
-        ee = lnre;
-      } catch (NullPointerException npe) {
-        // Workaround for race condition in HDFS-4380
-        // which throws a NPE if we open a file before any data node has the most recent block
-        // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
-        LOG.warn("Got NPE opening reader, will retry.");
-        reader = null;
-        ee = npe;
-      }
-      if (reader == null) {
-        // sleep before next attempt
-        try {
-          Thread.sleep(retryInterval);
-        } catch (InterruptedException e) {
-        }
-      }
-    }
-    throw new IOException("Could not open reader", ee);
-  }
-
   // For HBASE-15019
-  private static void recoverLease(final Configuration conf, final Path path) {
+  public static void recoverLease(Configuration conf, Path path) {
     try {
       final FileSystem dfs = CommonFSUtils.getCurrentFileSystem(conf);
       RecoverLeaseFSUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
index 56e137e725f..e463591f518 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
@@ -161,7 +161,8 @@ abstract class AbstractRecoveredEditsOutputSink extends OutputSink {
   private void deleteOneWithFewerEntries(RecoveredEditsWriter editsWriter, Path dst)
     throws IOException {
     long dstMinLogSeqNum = -1L;
-    try (WAL.Reader reader = walSplitter.getWalFactory().createReader(walSplitter.walFS, dst)) {
+    try (WALStreamReader reader =
+      walSplitter.getWalFactory().createStreamReader(walSplitter.walFS, dst)) {
       WAL.Entry entry = reader.next();
       if (entry != null) {
         dstMinLogSeqNum = entry.getKey().getSequenceId();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index 5212242b9ff..79db2a678a2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -230,21 +230,6 @@ public interface WAL extends Closeable, WALFileLengthProvider {
   @Override
   String toString();
 
-  /**
-   * When outside clients need to consume persisted WALs, they rely on a provided Reader.
-   */
-  interface Reader extends Closeable {
-    Entry next() throws IOException;
-
-    Entry next(Entry reuse) throws IOException;
-
-    void seek(long pos) throws IOException;
-
-    long getPosition() throws IOException;
-
-    void reset() throws IOException;
-  }
-
   /**
    * Utility class that lets us keep track of the edit with it's key.
    */
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index e3968ae3cff..e8a5fa52bff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -28,16 +28,18 @@ import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
-import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
+import org.apache.hadoop.hbase.regionserver.wal.ProtobufWALStreamReader;
+import org.apache.hadoop.hbase.regionserver.wal.ProtobufWALTailingReader;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
-import org.apache.hadoop.hbase.wal.WAL.Reader;
 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+
 /**
  * Entry point for users of the Write Ahead Log. Acts as the shim between internal use and the
  * particular WALProvider we use to handle wal requests. Configure which provider gets used with the
@@ -57,6 +59,21 @@ import org.slf4j.LoggerFactory;
 @InterfaceAudience.Private
 public class WALFactory {
 
+  /**
+   * Used in tests for injecting customized stream reader implementation, for example, inject fault
+   * when reading, etc.
+   * <p/>
+   * After removing the sequence file based WAL, we always use protobuf based WAL reader, and we
+   * will also determine whether the WAL file is encrypted and we should use
+   * {@link org.apache.hadoop.hbase.regionserver.wal.SecureWALCellCodec} to decode by check the
+   * header of the WAL file, so we do not need to specify a specical reader to read the WAL file
+   * either.
+   * <p/>
+   * So typically you should not use this config in production.
+   */
+  public static final String WAL_STREAM_READER_CLASS_IMPL =
+    "hbase.regionserver.wal.stream.reader.impl";
+
   private static final Logger LOG = LoggerFactory.getLogger(WALFactory.class);
 
   /**
@@ -93,7 +110,7 @@ public class WALFactory {
   /**
    * Configuration-specified WAL Reader used when a custom reader is requested
    */
-  private final Class<? extends AbstractFSWALProvider.Reader> logReaderClass;
+  private final Class<? extends WALStreamReader> walStreamReaderClass;
 
   /**
    * How long to attempt opening in-recovery wals
@@ -111,8 +128,12 @@ public class WALFactory {
     // happen prior to provider initialization, in case they need to instantiate a reader/writer.
     timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
     /* TODO Both of these are probably specific to the fs wal provider */
-    logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
-      AbstractFSWALProvider.Reader.class);
+    walStreamReaderClass = conf.getClass(WAL_STREAM_READER_CLASS_IMPL,
+      ProtobufWALStreamReader.class, WALStreamReader.class);
+    Preconditions.checkArgument(
+      AbstractFSWALProvider.Initializer.class.isAssignableFrom(walStreamReaderClass),
+      "The wal stream reader class %s is not a sub class of %s", walStreamReaderClass.getName(),
+      AbstractFSWALProvider.Initializer.class.getName());
     this.conf = conf;
     // end required early initialization
 
@@ -196,8 +217,12 @@ public class WALFactory {
     // happen prior to provider initialization, in case they need to instantiate a reader/writer.
     timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
     /* TODO Both of these are probably specific to the fs wal provider */
-    logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
-      AbstractFSWALProvider.Reader.class);
+    walStreamReaderClass = conf.getClass(WAL_STREAM_READER_CLASS_IMPL,
+      ProtobufWALStreamReader.class, WALStreamReader.class);
+    Preconditions.checkArgument(
+      AbstractFSWALProvider.Initializer.class.isAssignableFrom(walStreamReaderClass),
+      "The wal stream reader class %s is not a sub class of %s", walStreamReaderClass.getName(),
+      AbstractFSWALProvider.Initializer.class.getName());
     this.conf = conf;
     this.factoryId = factoryId;
     this.excludeDatanodeManager = new ExcludeDatanodeManager(conf);
@@ -301,25 +326,26 @@ public class WALFactory {
     }
   }
 
-  public Reader createReader(final FileSystem fs, final Path path) throws IOException {
-    return createReader(fs, path, (CancelableProgressable) null);
+  public WALStreamReader createStreamReader(FileSystem fs, Path path) throws IOException {
+    return createStreamReader(fs, path, (CancelableProgressable) null);
   }
 
   /**
-   * Create a reader for the WAL. If you are reading from a file that's being written to and need to
-   * reopen it multiple times, use {@link WAL.Reader#reset()} instead of this method then just seek
-   * back to the last known good position.
+   * Create a one-way stream reader for the WAL.
    * @return A WAL reader. Close when done with it.
    */
-  public Reader createReader(final FileSystem fs, final Path path, CancelableProgressable reporter)
-    throws IOException {
-    return createReader(fs, path, reporter, true);
+  public WALStreamReader createStreamReader(FileSystem fs, Path path,
+    CancelableProgressable reporter) throws IOException {
+    return createStreamReader(fs, path, reporter, -1);
   }
 
-  public Reader createReader(final FileSystem fs, final Path path, CancelableProgressable reporter,
-    boolean allowCustom) throws IOException {
-    Class<? extends AbstractFSWALProvider.Reader> lrClass =
-      allowCustom ? logReaderClass : ProtobufLogReader.class;
+  /**
+   * Create a one-way stream reader for the WAL, and start reading from the given
+   * {@code startPosition}.
+   * @return A WAL reader. Close when done with it.
+   */
+  public WALStreamReader createStreamReader(FileSystem fs, Path path,
+    CancelableProgressable reporter, long startPosition) throws IOException {
     try {
       // A wal file could be under recovery, so it may take several
       // tries to get it open. Instead of claiming it is corrupted, retry
@@ -327,22 +353,17 @@ public class WALFactory {
       long startWaiting = EnvironmentEdgeManager.currentTime();
       long openTimeout = timeoutMillis + startWaiting;
       int nbAttempt = 0;
-      AbstractFSWALProvider.Reader reader = null;
+      WALStreamReader reader = null;
       while (true) {
         try {
-          reader = lrClass.getDeclaredConstructor().newInstance();
-          reader.init(fs, path, conf, null);
+          reader = walStreamReaderClass.getDeclaredConstructor().newInstance();
+          ((AbstractFSWALProvider.Initializer) reader).init(fs, path, conf, startPosition);
           return reader;
         } catch (Exception e) {
           // catch Exception so that we close reader for all exceptions. If we don't
           // close the reader, we leak a socket.
           if (reader != null) {
-            try {
-              reader.close();
-            } catch (IOException exception) {
-              LOG.warn("Could not close FSDataInputStream" + exception.getMessage());
-              LOG.debug("exception details", exception);
-            }
+            reader.close();
           }
 
           // Only inspect the Exception to consider retry when it's an IOException
@@ -435,34 +456,30 @@ public class WALFactory {
   }
 
   /**
-   * Create a reader for the given path, accept custom reader classes from conf. If you already have
-   * a WALFactory, you should favor the instance method.
-   * @return a WAL Reader, caller must close.
+   * Create a tailing reader for the given path. Mainly used in replication.
    */
-  public static Reader createReader(final FileSystem fs, final Path path,
-    final Configuration configuration) throws IOException {
-    return getInstance(configuration).createReader(fs, path);
+  public static WALTailingReader createTailingReader(FileSystem fs, Path path, Configuration conf,
+    long startPosition) throws IOException {
+    ProtobufWALTailingReader reader = new ProtobufWALTailingReader();
+    reader.init(fs, path, conf, startPosition);
+    return reader;
   }
 
   /**
-   * Create a reader for the given path, accept custom reader classes from conf. If you already have
-   * a WALFactory, you should favor the instance method.
-   * @return a WAL Reader, caller must close.
+   * Create a one-way stream reader for a given path.
    */
-  static Reader createReader(final FileSystem fs, final Path path,
-    final Configuration configuration, final CancelableProgressable reporter) throws IOException {
-    return getInstance(configuration).createReader(fs, path, reporter);
+  public static WALStreamReader createStreamReader(FileSystem fs, Path path, Configuration conf)
+    throws IOException {
+    return createStreamReader(fs, path, conf, -1);
   }
 
   /**
-   * Create a reader for the given path, ignore custom reader classes from conf. If you already have
-   * a WALFactory, you should favor the instance method. only public pending move of
-   * {@link org.apache.hadoop.hbase.regionserver.wal.Compressor}
-   * @return a WAL Reader, caller must close.
+   * Create a one-way stream reader for a given path.
    */
-  public static Reader createReaderIgnoreCustomClass(final FileSystem fs, final Path path,
-    final Configuration configuration) throws IOException {
-    return getInstance(configuration).createReader(fs, path, null, false);
+  public static WALStreamReader createStreamReader(FileSystem fs, Path path, Configuration conf,
+    long startPosition) throws IOException {
+    return getInstance(conf).createStreamReader(fs, path, (CancelableProgressable) null,
+      startPosition);
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java
index f810e034502..b03357b9332 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.GsonUtil;
@@ -266,10 +266,10 @@ public class WALPrettyPrinter {
       throw new IOException(p + " is not a file");
     }
 
-    WAL.Reader log = WALFactory.createReader(fs, p, conf);
+    WALStreamReader log = WALFactory.createStreamReader(fs, p, conf, position > 0 ? position : -1);
 
-    if (log instanceof ProtobufLogReader) {
-      List<String> writerClsNames = ((ProtobufLogReader) log).getWriterClsNames();
+    if (log instanceof AbstractProtobufWALReader) {
+      List<String> writerClsNames = ((AbstractProtobufWALReader) log).getWriterClsNames();
       if (writerClsNames != null && writerClsNames.size() > 0) {
         out.print("Writer Classes: ");
         for (int i = 0; i < writerClsNames.size(); i++) {
@@ -281,7 +281,7 @@ public class WALPrettyPrinter {
         out.println();
       }
 
-      String cellCodecClsName = ((ProtobufLogReader) log).getCodecClsName();
+      String cellCodecClsName = ((AbstractProtobufWALReader) log).getCodecClsName();
       if (cellCodecClsName != null) {
         out.println("Cell Codec Class: " + cellCodecClsName);
       }
@@ -292,10 +292,6 @@ public class WALPrettyPrinter {
       firstTxn = true;
     }
 
-    if (position > 0) {
-      log.seek(position);
-    }
-
     try {
       WAL.Entry entry;
       while ((entry = log.next()) != null) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 2ed9ffc96c8..9576b3b3790 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WAL.Reader;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -289,7 +288,7 @@ public class WALSplitter {
     int editsSkipped = 0;
     MonitoredTask status = TaskMonitor.get()
       .createStatus("Splitting " + wal + " to temporary staging area.", false, true);
-    Reader walReader = null;
+    WALStreamReader walReader = null;
     this.fileBeingSplit = walStatus;
     long startTS = EnvironmentEdgeManager.currentTime();
     long length = walStatus.getLen();
@@ -406,12 +405,8 @@ public class WALSplitter {
       final String log = "Finishing writing output for " + wal + " so closing down";
       LOG.debug(log);
       status.setStatus(log);
-      try {
-        if (null != walReader) {
-          walReader.close();
-        }
-      } catch (IOException exception) {
-        LOG.warn("Could not close {} reader", wal, exception);
+      if (null != walReader) {
+        walReader.close();
       }
       try {
         if (outputSinkStarted) {
@@ -442,14 +437,14 @@ public class WALSplitter {
   }
 
   /**
-   * Create a new {@link Reader} for reading logs to split.
+   * Create a new {@link WALStreamReader} for reading logs to split.
    * @return Returns null if file has length zero or file can't be found.
    */
-  protected Reader getReader(FileStatus walStatus, boolean skipErrors,
+  protected WALStreamReader getReader(FileStatus walStatus, boolean skipErrors,
     CancelableProgressable cancel) throws IOException, CorruptedLogFileException {
     Path path = walStatus.getPath();
     long length = walStatus.getLen();
-    Reader in;
+    WALStreamReader in;
 
     // Check for possibly empty file. With appends, currently Hadoop reports a
     // zero length even if the file has been sync'd. Revisit if HDFS-376 or
@@ -489,7 +484,7 @@ public class WALSplitter {
     return in;
   }
 
-  private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
+  private Entry getNextLogLine(WALStreamReader in, Path path, boolean skipErrors)
     throws CorruptedLogFileException, IOException {
     try {
       return in.next();
@@ -524,11 +519,12 @@ public class WALSplitter {
   }
 
   /**
-   * Create a new {@link Reader} for reading logs to split.
+   * Create a new {@link WALStreamReader} for reading logs to split.
    * @return new Reader instance, caller should close
    */
-  private Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException {
-    return walFactory.createReader(walFS, curLogFile, reporter);
+  private WALStreamReader getReader(Path curLogFile, CancelableProgressable reporter)
+    throws IOException {
+    return walFactory.createStreamReader(walFS, curLogFile, reporter);
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALStreamReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALStreamReader.java
new file mode 100644
index 00000000000..ef927139906
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALStreamReader.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.Closeable;
+import java.io.IOException;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * A one way WAL reader, without reset and seek support.
+ * <p/>
+ * In most cases you should use this interface to read WAL file, as the implementation is simple and
+ * robust. For replication, where we want to tail the WAL file which is currently being written, you
+ * should use {@link WALTailingReader} instead.
+ * @see WALTailingReader
+ */
+@InterfaceAudience.Private
+public interface WALStreamReader extends Closeable {
+
+  /**
+   * Read the next entry in WAL.
+   * <p/>
+   * In most cases you should just use this method, especially when reading a closed wal file for
+   * splitting or printing.
+   */
+  default WAL.Entry next() throws IOException {
+    return next(null);
+  }
+
+  /**
+   * Read the next entry in WAL, use the given {@link WAL.Entry} if not {@code null} to hold the
+   * data.
+   * <p/>
+   * Mainly used in MR.
+   * @param reuse the entry to be used for reading, can be {@code null}
+   */
+  WAL.Entry next(WAL.Entry reuse) throws IOException;
+
+  /**
+   * Get the current reading position.
+   */
+  long getPosition() throws IOException;
+
+  /**
+   * Override to remove the 'throws IOException' as we are just a reader.
+   */
+  @Override
+  void close();
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALTailingReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALTailingReader.java
new file mode 100644
index 00000000000..319fb7fb956
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALTailingReader.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.Closeable;
+import java.io.IOException;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * A WAL reader which is designed for be able to tailing the WAL file which is currently being
+ * written. It adds support
+ */
+@InterfaceAudience.Private
+public interface WALTailingReader extends Closeable {
+
+  enum State {
+    /** This means we read an Entry without any error */
+    NORMAL,
+    /**
+     * This means the WAL file has a trailer and we have reached it, which means we have finished
+     * reading this file normally
+     */
+    EOF_WITH_TRAILER,
+    /**
+     * This means we meet an error so the upper layer need to reset to read again
+     */
+    ERROR_AND_RESET,
+    /**
+     * Mostly the same with the above {@link #ERROR_AND_RESET}, the difference is that here we also
+     * mess up the compression dictionary when reading data, so the upper layer should also clear
+     * the compression context when reseting, which means when calling resetTo method, we need to
+     * skip to the position instead of just seek to, which will impact performance.
+     */
+    ERROR_AND_RESET_COMPRESSION,
+    /**
+     * This means we reach the EOF and the upper layer need to reset to see if there is more data.
+     * Notice that this does not mean that there is necessarily more data, the upper layer should
+     * determine whether they need to reset and read again.
+     */
+    EOF_AND_RESET,
+    /**
+     * Mostly the same with the above {@link #EOF_AND_RESET}, the difference is that here we also
+     * mess up the compression dictionary when reading data, so the upper layer should also clear
+     * the compression context when reseting, which means when calling resetTo method, we need to
+     * skip to the position instead of just seek to, which will impact performance. The
+     * implementation should try its best to not fall into this situation.
+     */
+    EOF_AND_RESET_COMPRESSION;
+
+    /**
+     * A dummy result for returning, as except {@link NORMAL}, for other state we do not need to
+     * provide fields other than state in the returned {@link Result}.
+     */
+    private Result result = new Result(this, null, -1);
+
+    public Result getResult() {
+      return result;
+    }
+
+    public boolean resetCompression() {
+      return this == ERROR_AND_RESET_COMPRESSION || this == EOF_AND_RESET_COMPRESSION;
+    }
+
+    public boolean eof() {
+      return this == EOF_AND_RESET || this == EOF_AND_RESET_COMPRESSION || this == EOF_WITH_TRAILER;
+    }
+  }
+
+  final class Result {
+
+    private final State state;
+    private final Entry entry;
+    private final long entryEndPos;
+
+    public Result(State state, Entry entry, long entryEndPos) {
+      this.state = state;
+      this.entry = entry;
+      this.entryEndPos = entryEndPos;
+    }
+
+    public State getState() {
+      return state;
+    }
+
+    public Entry getEntry() {
+      return entry;
+    }
+
+    public long getEntryEndPos() {
+      return entryEndPos;
+    }
+  }
+
+  /**
+   * Read the next entry and make sure the position after reading does not go beyond the given
+   * {@code limit}.
+   * <p/>
+   * Notice that we will not throw any checked exception out, all the states are represented by the
+   * return value. Of course we will log the exceptions out. The reason why we do this is that, for
+   * tailing a WAL file which is currently being written, we will hit EOFException many times, so it
+   * should not be considered as an 'exception' and also, creating an Exception is a bit expensive.
+   * @param limit the position limit. See HBASE-14004 for more details about why we need this
+   *              limitation. -1 means no limit.
+   */
+  Result next(long limit);
+
+  /**
+   * Get the current reading position.
+   */
+  long getPosition() throws IOException;
+
+  /**
+   * Reopen the reader to see if there is new data arrives, and also seek(or skip) to the given
+   * position.
+   * <p/>
+   * If you want to read from the beginning instead of a given position, please pass -1 as
+   * {@code position}, then the reader will locate to the first entry. Notice that, since we have a
+   * magic header and a pb header, the first WAL entry is not located at position 0, so passing 0
+   * will cause trouble.
+   * @param position         the position we want to start reading from after resetting, or -1 if
+   *                         you want to start reading from the beginning.
+   * @param resetCompression whether we also need to clear the compression context. If {@code true},
+   *                         we will use skip instead of seek after resetting.
+   */
+  void resetTo(long position, boolean resetCompression) throws IOException;
+
+  /**
+   * Override to remove the 'throws IOException' as we are just a reader.
+   */
+  @Override
+  void close();
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSequenceIdMonotonicallyIncreasing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSequenceIdMonotonicallyIncreasing.java
index a01b8542292..b619ad265f5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSequenceIdMonotonicallyIncreasing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSequenceIdMonotonicallyIncreasing.java
@@ -36,8 +36,9 @@ import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.NoEOFWALStreamReader;
 import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -93,8 +94,8 @@ public class TestSequenceIdMonotonicallyIncreasing {
   private long getMaxSeqId(HRegionServer rs, RegionInfo region) throws IOException {
     Path walFile = ((AbstractFSWAL<?>) rs.getWAL(null)).getCurrentFileName();
     long maxSeqId = -1L;
-    try (WAL.Reader reader =
-      WALFactory.createReader(UTIL.getTestFileSystem(), walFile, UTIL.getConfiguration())) {
+    try (WALStreamReader reader =
+      NoEOFWALStreamReader.create(UTIL.getTestFileSystem(), walFile, UTIL.getConfiguration())) {
       for (;;) {
         WAL.Entry entry = reader.next();
         if (entry == null) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
index cf1a1cd9a5f..d228c5f6072 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
@@ -30,8 +30,6 @@ import java.util.List;
 import java.util.NavigableSet;
 import java.util.concurrent.atomic.LongAdder;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
@@ -59,7 +57,6 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.junit.After;
@@ -402,19 +399,6 @@ public abstract class AbstractTestDLS {
     return;
   }
 
-  private int countWAL(Path log, FileSystem fs, Configuration conf) throws IOException {
-    int count = 0;
-    try (WAL.Reader in = WALFactory.createReader(fs, log, conf)) {
-      WAL.Entry e;
-      while ((e = in.next()) != null) {
-        if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) {
-          count++;
-        }
-      }
-    }
-    return count;
-  }
-
   private void blockUntilNoRIT() throws Exception {
     TEST_UTIL.waitUntilNoRegionsInTransition(60000);
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index b4583958629..8c42acd2633 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -176,6 +176,7 @@ import org.apache.hadoop.hbase.wal.WALKeyImpl;
 import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
 import org.apache.hadoop.hbase.wal.WALSplitUtil;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -1080,8 +1081,8 @@ public class TestHRegion {
 
       // now verify that the flush markers are written
       wal.shutdown();
-      WAL.Reader reader = WALFactory.createReader(fs, AbstractFSWALProvider.getCurrentFileName(wal),
-        TEST_UTIL.getConfiguration());
+      WALStreamReader reader = WALFactory.createStreamReader(fs,
+        AbstractFSWALProvider.getCurrentFileName(wal), TEST_UTIL.getConfiguration());
       try {
         List<WAL.Entry> flushDescriptors = new ArrayList<>();
         long lastFlushSeqId = -1;
@@ -1132,14 +1133,7 @@ public class TestHRegion {
         }
         writer.close();
       } finally {
-        if (null != reader) {
-          try {
-            reader.close();
-          } catch (IOException exception) {
-            LOG.warn("Problem closing wal: " + exception.getMessage());
-            LOG.debug("exception details", exception);
-          }
-        }
+        reader.close();
       }
 
       // close the region now, and reopen again
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
index bdc81974f90..4c7384a34ed 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
@@ -73,11 +73,13 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.NoEOFWALStreamReader;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
 import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -143,7 +145,7 @@ public class TestHRegionReplayEvents {
   private RegionInfo primaryHri, secondaryHri;
   private HRegion primaryRegion, secondaryRegion;
   private WAL walPrimary, walSecondary;
-  private WAL.Reader reader;
+  private WALStreamReader reader;
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
@@ -317,8 +319,8 @@ public class TestHRegionReplayEvents {
     return Integer.parseInt(Bytes.toString(put.getRow()));
   }
 
-  WAL.Reader createWALReaderForPrimary() throws FileNotFoundException, IOException {
-    return WALFactory.createReader(TEST_UTIL.getTestFileSystem(),
+  private WALStreamReader createWALReaderForPrimary() throws FileNotFoundException, IOException {
+    return NoEOFWALStreamReader.create(TEST_UTIL.getTestFileSystem(),
       AbstractFSWALProvider.getCurrentFileName(walPrimary), TEST_UTIL.getConfiguration());
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java
index 8c4a1eee0f7..15209baff9d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.wal.WALSplitUtil;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
@@ -179,7 +180,7 @@ public class TestRecoveredEdits {
     int count = 0;
     // Read all cells from recover edits
     List<Cell> walCells = new ArrayList<>();
-    try (WAL.Reader reader = WALFactory.createReader(fs, edits, conf)) {
+    try (WALStreamReader reader = WALFactory.createStreamReader(fs, edits, conf)) {
       WAL.Entry entry;
       while ((entry = reader.next()) != null) {
         WALKey key = entry.getKey();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java
index 3291b39b6a0..6ea7d6dccc1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -203,11 +204,11 @@ public class TestWALMonotonicallyIncreasingSeqId {
     TEST_UTIL.cleanupTestDir();
   }
 
-  private WAL.Reader createReader(Path logPath, Path oldWalsDir) throws IOException {
+  private WALStreamReader createReader(Path logPath, Path oldWalsDir) throws IOException {
     try {
-      return wals.createReader(fileSystem, logPath);
+      return wals.createStreamReader(fileSystem, logPath);
     } catch (IOException e) {
-      return wals.createReader(fileSystem, new Path(oldWalsDir, logPath.getName()));
+      return wals.createStreamReader(fileSystem, new Path(oldWalsDir, logPath.getName()));
     }
   }
 
@@ -229,7 +230,7 @@ public class TestWALMonotonicallyIncreasingSeqId {
     Thread.sleep(10);
     Path hbaseDir = new Path(walConf.get(HConstants.HBASE_DIR));
     Path oldWalsDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
-    try (WAL.Reader reader = createReader(logPath, oldWalsDir)) {
+    try (WALStreamReader reader = createReader(logPath, oldWalsDir)) {
       long currentMaxSeqid = 0;
       for (WAL.Entry e; (e = reader.next()) != null;) {
         if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java
index 6a4800fb1da..e765fb8a4a7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java
@@ -53,6 +53,8 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
 
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
 
 /**
@@ -139,13 +141,12 @@ public abstract class AbstractTestProtobufLog<W extends Closeable> {
     Path path = new Path(dir, "tempwal");
     // delete the log if already exists, for test only
     fs.delete(path, true);
+    HRegionInfo hri =
+      new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    fs.mkdirs(dir);
     W writer = null;
-    ProtobufLogReader reader = null;
     try {
-      HRegionInfo hri =
-        new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
-      HTableDescriptor htd = new HTableDescriptor(tableName);
-      fs.mkdirs(dir);
       // Write log in pb format.
       writer = createWriter(path);
       for (int i = 0; i < recordCount; ++i) {
@@ -162,39 +163,41 @@ public abstract class AbstractTestProtobufLog<W extends Closeable> {
         append(writer, new WAL.Entry(key, edit));
       }
       sync(writer);
-      if (withTrailer) writer.close();
-
-      // Now read the log using standard means.
-      reader = (ProtobufLogReader) wals.createReader(fs, path);
       if (withTrailer) {
-        assertNotNull(reader.trailer);
-      } else {
-        assertNull(reader.trailer);
+        writer.close();
+        writer = null;
       }
-      for (int i = 0; i < recordCount; ++i) {
-        WAL.Entry entry = reader.next();
-        assertNotNull(entry);
-        assertEquals(columnCount, entry.getEdit().size());
-        assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName());
-        assertEquals(tableName, entry.getKey().getTableName());
-        int idx = 0;
-        for (Cell val : entry.getEdit().getCells()) {
-          assertTrue(Bytes.equals(row, 0, row.length, val.getRowArray(), val.getRowOffset(),
-            val.getRowLength()));
-          String value = i + "" + idx;
-          assertArrayEquals(Bytes.toBytes(value), CellUtil.cloneValue(val));
-          idx++;
+      // Now read the log using standard means.
+      try (ProtobufWALStreamReader reader =
+        (ProtobufWALStreamReader) wals.createStreamReader(fs, path)) {
+        if (withTrailer) {
+          assertNotNull(reader.trailer);
+        } else {
+          assertNull(reader.trailer);
+        }
+        for (int i = 0; i < recordCount; ++i) {
+          WAL.Entry entry = reader.next();
+          assertNotNull(entry);
+          assertEquals(columnCount, entry.getEdit().size());
+          assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName());
+          assertEquals(tableName, entry.getKey().getTableName());
+          int idx = 0;
+          for (Cell val : entry.getEdit().getCells()) {
+            assertTrue(Bytes.equals(row, 0, row.length, val.getRowArray(), val.getRowOffset(),
+              val.getRowLength()));
+            String value = i + "" + idx;
+            assertArrayEquals(Bytes.toBytes(value), CellUtil.cloneValue(val));
+            idx++;
+          }
+        }
+        if (withTrailer) {
+          assertNotNull(reader.trailer);
+        } else {
+          assertNull(reader.trailer);
         }
       }
-      WAL.Entry entry = reader.next();
-      assertNull(entry);
     } finally {
-      if (writer != null) {
-        writer.close();
-      }
-      if (reader != null) {
-        reader.close();
-      }
+      Closeables.close(writer, true);
     }
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
index 8dff69ce431..2ef7ece92db 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
@@ -99,6 +99,7 @@ import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
 import org.apache.hadoop.hbase.wal.WALSplitUtil;
 import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
 import org.apache.hadoop.hdfs.DFSInputStream;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -893,15 +894,10 @@ public abstract class AbstractTestWALReplay {
 
     // here we let the DFSInputStream throw an IOException just after the WALHeader.
     Path editFile = WALSplitUtil.getSplitEditFilesSorted(this.fs, regionDir).first();
-    FSDataInputStream stream = fs.open(editFile);
-    stream.seek(ProtobufLogReader.PB_WAL_MAGIC.length);
-    Class<? extends AbstractFSWALProvider.Reader> logReaderClass =
-      conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
-        AbstractFSWALProvider.Reader.class);
-    AbstractFSWALProvider.Reader reader = logReaderClass.getDeclaredConstructor().newInstance();
-    reader.init(this.fs, editFile, conf, stream);
-    final long headerLength = stream.getPos();
-    reader.close();
+    final long headerLength;
+    try (WALStreamReader reader = WALFactory.createStreamReader(fs, editFile, conf)) {
+      headerLength = reader.getPosition();
+    }
     FileSystem spyFs = spy(this.fs);
     doAnswer(new Answer<FSDataInputStream>() {
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyProtobufLogReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyProtobufWALStreamReader.java
similarity index 88%
rename from hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyProtobufLogReader.java
rename to hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyProtobufWALStreamReader.java
index a2fd2c3a125..cbfa90cf38c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyProtobufLogReader.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyProtobufWALStreamReader.java
@@ -18,11 +18,11 @@
 package org.apache.hadoop.hbase.regionserver.wal;
 
 import java.io.IOException;
-import java.util.LinkedList;
+import java.util.ArrayDeque;
 import java.util.Queue;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 
-public class FaultyProtobufLogReader extends ProtobufLogReader {
+public class FaultyProtobufWALStreamReader extends ProtobufWALStreamReader {
 
   // public until class relocates to o.a.h.h.wal
   public enum FailureType {
@@ -32,7 +32,7 @@ public class FaultyProtobufLogReader extends ProtobufLogReader {
     NONE
   }
 
-  Queue<Entry> nextQueue = new LinkedList<>();
+  Queue<Entry> nextQueue = new ArrayDeque<>();
   int numberOfFileEntries = 0;
 
   FailureType getFailureType() {
@@ -42,13 +42,15 @@ public class FaultyProtobufLogReader extends ProtobufLogReader {
   @Override
   public Entry next(Entry reuse) throws IOException {
     if (nextQueue.isEmpty()) { // Read the whole thing at once and fake reading
-      boolean b;
-      do {
+      for (;;) {
         Entry e = new Entry();
-        b = readNext(e);
+        e = super.next(e);
+        if (e == null) {
+          break;
+        }
         nextQueue.offer(e);
         numberOfFileEntries++;
-      } while (b);
+      }
     }
 
     if (nextQueue.size() == this.numberOfFileEntries && getFailureType() == FailureType.BEGINNING) {
@@ -61,10 +63,6 @@ public class FaultyProtobufLogReader extends ProtobufLogReader {
       throw new IOException("fake Exception");
     }
 
-    if (nextQueue.peek() != null) {
-      edit++;
-    }
-
     Entry e = nextQueue.poll();
 
     if (e.getEdit().isEmpty()) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java
index 07adbde128e..0cdb35b22ae 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.NoEOFWALStreamReader;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -276,14 +277,7 @@ public class TestDurability {
 
   private void verifyWALCount(WALFactory wals, WAL log, int expected) throws Exception {
     Path walPath = AbstractFSWALProvider.getCurrentFileName(log);
-    WAL.Reader reader = wals.createReader(FS, walPath);
-    int count = 0;
-    WAL.Entry entry = new WAL.Entry();
-    while (reader.next(entry) != null) {
-      count++;
-    }
-    reader.close();
-    assertEquals(expected, count);
+    assertEquals(expected, NoEOFWALStreamReader.count(wals, FS, walPath));
   }
 
   // lifted from TestAtomicOperation
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
index 130a5e73ba0..c098140fbe9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.junit.BeforeClass;
@@ -515,9 +516,8 @@ public class TestLogRolling extends AbstractTestLogRolling {
           TEST_UTIL.getConfiguration(), null);
 
         LOG.debug("Reading WAL " + CommonFSUtils.getPath(p));
-        WAL.Reader reader = null;
-        try {
-          reader = WALFactory.createReader(fs, p, TEST_UTIL.getConfiguration());
+        try (WALStreamReader reader =
+          WALFactory.createStreamReader(fs, p, TEST_UTIL.getConfiguration())) {
           WAL.Entry entry;
           while ((entry = reader.next()) != null) {
             LOG.debug("#" + entry.getKey().getSequenceId() + ": " + entry.getEdit().getCells());
@@ -528,8 +528,6 @@ public class TestLogRolling extends AbstractTestLogRolling {
           }
         } catch (EOFException e) {
           LOG.debug("EOF reading file " + CommonFSUtils.getPath(p));
-        } finally {
-          if (reader != null) reader.close();
         }
       }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureAsyncWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureAsyncWALReplay.java
index 01286a867d4..e4184df553a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureAsyncWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureAsyncWALReplay.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.wal.AsyncFSWALProvider.AsyncWriter;
-import org.apache.hadoop.hbase.wal.WAL.Reader;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.experimental.categories.Category;
@@ -41,8 +40,6 @@ public class TestSecureAsyncWALReplay extends TestAsyncWALReplay {
     Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration();
     conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
     conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
-    conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
-      Reader.class);
     conf.setClass("hbase.regionserver.hlog.async.writer.impl", SecureAsyncProtobufLogWriter.class,
       AsyncWriter.class);
     conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureWALReplay.java
index eea0e63f7e1..d531707e257 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureWALReplay.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
-import org.apache.hadoop.hbase.wal.WAL.Reader;
 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -41,8 +40,6 @@ public class TestSecureWALReplay extends TestWALReplay {
     Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration();
     conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
     conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
-    conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
-      Reader.class);
     conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
       Writer.class);
     conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
index 69ff1c2722d..d4f76584f00 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
@@ -36,10 +36,11 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
-import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.NoEOFWALStreamReader;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALProvider;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -196,12 +197,9 @@ public class SerialReplicationTestBase {
 
       @Override
       public boolean evaluate() throws Exception {
-        try (WAL.Reader reader = WALFactory.createReader(FS, logPath, UTIL.getConfiguration())) {
-          int count = 0;
-          while (reader.next() != null) {
-            count++;
-          }
-          return count >= expectedEntries;
+        try {
+          return NoEOFWALStreamReader.count(FS, logPath, UTIL.getConfiguration())
+              >= expectedEntries;
         } catch (IOException e) {
           return false;
         }
@@ -228,8 +226,8 @@ public class SerialReplicationTestBase {
   }
 
   protected final void checkOrder(int expectedEntries) throws IOException {
-    try (WAL.Reader reader =
-      WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
+    try (WALStreamReader reader =
+      NoEOFWALStreamReader.create(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
       long seqId = -1L;
       int count = 0;
       for (Entry entry;;) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
index 78e6ec0be96..63cbfe3119c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
@@ -161,11 +161,14 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
    */
   @Test
   public void testReplicationOfEmptyWALFollowingNonEmptyWAL() throws Exception {
+    final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
+    // make sure we only the current active wal file in queue
+    verifyNumberOfLogsInQueue(1, numRs);
+
     // Disable the replication peer to accumulate the non empty WAL followed by empty WAL
     hbaseAdmin.disableReplicationPeer(PEER_ID2);
-    int numOfEntriesToReplicate = 20;
 
-    final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
+    int numOfEntriesToReplicate = 20;
     // for each RS, create an empty wal with same walGroupId
     final List<Path> emptyWalPaths = new ArrayList<>();
     long ts = EnvironmentEdgeManager.currentTime();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
index 4e9fa8fe617..7ab82b60cec 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
@@ -41,9 +41,9 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
-import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.NoEOFWALStreamReader;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -107,8 +107,8 @@ public class TestSerialReplication extends SerialReplicationTestBase {
     Map<String, Long> regionsToSeqId = new HashMap<>();
     regionsToSeqId.put(region.getEncodedName(), -1L);
     regions.stream().map(RegionInfo::getEncodedName).forEach(n -> regionsToSeqId.put(n, -1L));
-    try (WAL.Reader reader =
-      WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
+    try (WALStreamReader reader =
+      NoEOFWALStreamReader.create(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
       int count = 0;
       for (Entry entry;;) {
         entry = reader.next();
@@ -168,8 +168,8 @@ public class TestSerialReplication extends SerialReplicationTestBase {
     RegionInfo region = regionsAfterMerge.get(0);
     regionsToSeqId.put(region.getEncodedName(), -1L);
     regions.stream().map(RegionInfo::getEncodedName).forEach(n -> regionsToSeqId.put(n, -1L));
-    try (WAL.Reader reader =
-      WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
+    try (WALStreamReader reader =
+      NoEOFWALStreamReader.create(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
       int count = 0;
       for (Entry entry;;) {
         entry = reader.next();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
index eda89b232c3..efd76854250 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
@@ -57,6 +58,7 @@ import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
 import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
+import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.HasNext;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
@@ -69,17 +71,33 @@ import org.apache.hadoop.hbase.wal.WALProvider;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
 import org.mockito.Mockito;
 
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
 
 public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
 
+  @Parameter
+  public boolean isCompressionEnabled;
+
+  @Parameters(name = "{index}: isCompressionEnabled={0}")
+  public static Iterable<Object[]> data() {
+    return Arrays.asList(new Object[] { false }, new Object[] { true });
+  }
+
   @Before
   public void setUp() throws Exception {
+    CONF.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, isCompressionEnabled);
     initWAL();
   }
 
+  private Entry next(WALEntryStream entryStream) {
+    assertEquals(HasNext.YES, entryStream.hasNext());
+    return entryStream.next();
+  }
+
   /**
    * Tests basic reading of log appends
    */
@@ -88,24 +106,24 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
     appendToLogAndSync();
     long oldPos;
     try (WALEntryStream entryStream =
-      new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
+      new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
       // There's one edit in the log, read it. Reading past it needs to throw exception
-      assertTrue(entryStream.hasNext());
+      assertEquals(HasNext.YES, entryStream.hasNext());
       WAL.Entry entry = entryStream.peek();
       assertSame(entry, entryStream.next());
       assertNotNull(entry);
-      assertFalse(entryStream.hasNext());
+      assertEquals(HasNext.RETRY, entryStream.hasNext());
       assertNull(entryStream.peek());
-      assertNull(entryStream.next());
+      assertThrows(IllegalStateException.class, () -> entryStream.next());
       oldPos = entryStream.getPosition();
     }
 
     appendToLogAndSync();
 
-    try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos, log,
-      null, new MetricsSource("1"), fakeWalGroupId)) {
+    try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, fs, CONF, oldPos, log,
+      new MetricsSource("1"), fakeWalGroupId)) {
       // Read the newly added entry, make sure we made progress
-      WAL.Entry entry = entryStream.next();
+      WAL.Entry entry = next(entryStream);
       assertNotEquals(oldPos, entryStream.getPosition());
       assertNotNull(entry);
       oldPos = entryStream.getPosition();
@@ -116,19 +134,20 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
     log.rollWriter();
     appendToLogAndSync();
 
-    try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos, log,
-      null, new MetricsSource("1"), fakeWalGroupId)) {
-      WAL.Entry entry = entryStream.next();
+    try (WALEntryStreamWithRetries entryStream = new WALEntryStreamWithRetries(logQueue, fs, CONF,
+      oldPos, log, new MetricsSource("1"), fakeWalGroupId)) {
+      WAL.Entry entry = next(entryStream);
       assertNotEquals(oldPos, entryStream.getPosition());
       assertNotNull(entry);
 
       // next item should come from the new log
-      entry = entryStream.next();
+      entry = next(entryStream);
       assertNotEquals(oldPos, entryStream.getPosition());
       assertNotNull(entry);
 
-      // no more entries to read
-      assertFalse(entryStream.hasNext());
+      // no more entries to read, disable retry otherwise we will get a wait too much time error
+      entryStream.disableRetry();
+      assertEquals(HasNext.RETRY, entryStream.hasNext());
       oldPos = entryStream.getPosition();
     }
   }
@@ -138,25 +157,35 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
    * don't mistakenly dequeue the current log thinking we're done with it
    */
   @Test
-  public void testLogrollWhileStreaming() throws Exception {
+  public void testLogRollWhileStreaming() throws Exception {
     appendToLog("1");
-    appendToLog("2");// 2
-    try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, 0, log, null,
-      new MetricsSource("1"), fakeWalGroupId)) {
-      assertEquals("1", getRow(entryStream.next()));
-
-      appendToLog("3"); // 3 - comes in after reader opened
-      log.rollWriter(); // log roll happening while we're reading
-      appendToLog("4"); // 4 - this append is in the rolled log
-
-      assertEquals("2", getRow(entryStream.next()));
-      assertEquals(2, getQueue().size()); // we should not have dequeued yet since there's still an
-      // entry in first log
-      assertEquals("3", getRow(entryStream.next())); // if implemented improperly, this would be 4
-                                                     // and 3 would be skipped
-      assertEquals("4", getRow(entryStream.next())); // 4
-      assertEquals(1, getQueue().size()); // now we've dequeued and moved on to next log properly
-      assertFalse(entryStream.hasNext());
+    // 2
+    appendToLog("2");
+    try (WALEntryStreamWithRetries entryStream = new WALEntryStreamWithRetries(logQueue, fs, CONF,
+      0, log, new MetricsSource("1"), fakeWalGroupId)) {
+      assertEquals("1", getRow(next(entryStream)));
+
+      // 3 - comes in after reader opened
+      appendToLog("3");
+      // log roll happening while we're reading
+      log.rollWriter();
+      // 4 - this append is in the rolled log
+      appendToLog("4");
+
+      assertEquals("2", getRow(next(entryStream)));
+      // we should not have dequeued yet since there's still an entry in first log
+      assertEquals(2, getQueue().size());
+      // if implemented improperly, this would be 4 and 3 would be skipped
+      assertEquals("3", getRow(next(entryStream)));
+      // 4
+      assertEquals("4", getRow(next(entryStream)));
+      // now we've dequeued and moved on to next log properly
+      assertEquals(1, getQueue().size());
+
+      // disable so we can get the return value immediately, otherwise we will fail with wait too
+      // much time...
+      entryStream.disableRetry();
+      assertEquals(HasNext.RETRY, entryStream.hasNext());
     }
   }
 
@@ -168,21 +197,21 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
   public void testNewEntriesWhileStreaming() throws Exception {
     appendToLog("1");
     try (WALEntryStream entryStream =
-      new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
-      entryStream.next(); // we've hit the end of the stream at this point
+      new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
+      assertNotNull(next(entryStream)); // we've hit the end of the stream at this point
 
       // some new entries come in while we're streaming
       appendToLog("2");
       appendToLog("3");
 
       // don't see them
-      assertFalse(entryStream.hasNext());
+      assertEquals(HasNext.RETRY, entryStream.hasNext());
 
-      // But we do if we reset
-      entryStream.reset();
-      assertEquals("2", getRow(entryStream.next()));
-      assertEquals("3", getRow(entryStream.next()));
-      assertFalse(entryStream.hasNext());
+      // But we do if we retry next time, as the entryStream will reset the reader
+      assertEquals("2", getRow(next(entryStream)));
+      assertEquals("3", getRow(next(entryStream)));
+      // reached the end again
+      assertEquals(HasNext.RETRY, entryStream.hasNext());
     }
   }
 
@@ -191,18 +220,18 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
     long lastPosition = 0;
     appendToLog("1");
     try (WALEntryStream entryStream =
-      new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
-      entryStream.next(); // we've hit the end of the stream at this point
+      new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
+      assertNotNull(next(entryStream)); // we've hit the end of the stream at this point
       appendToLog("2");
       appendToLog("3");
       lastPosition = entryStream.getPosition();
     }
     // next stream should picks up where we left off
-    try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, lastPosition, log, null,
+    try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, CONF, lastPosition, log,
       new MetricsSource("1"), fakeWalGroupId)) {
-      assertEquals("2", getRow(entryStream.next()));
-      assertEquals("3", getRow(entryStream.next()));
-      assertFalse(entryStream.hasNext()); // done
+      assertEquals("2", getRow(next(entryStream)));
+      assertEquals("3", getRow(next(entryStream)));
+      assertEquals(HasNext.RETRY, entryStream.hasNext()); // done
       assertEquals(1, getQueue().size());
     }
   }
@@ -211,31 +240,30 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
    * Tests that if we stop before hitting the end of a stream, we can continue where we left off
    * using the last position
    */
-
   @Test
   public void testPosition() throws Exception {
     long lastPosition = 0;
     appendEntriesToLogAndSync(3);
     // read only one element
-    try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, lastPosition, log, null,
+    try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, CONF, lastPosition, log,
       new MetricsSource("1"), fakeWalGroupId)) {
-      entryStream.next();
+      assertNotNull(next(entryStream));
       lastPosition = entryStream.getPosition();
     }
     // there should still be two more entries from where we left off
-    try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, lastPosition, log, null,
+    try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, CONF, lastPosition, log,
       new MetricsSource("1"), fakeWalGroupId)) {
-      assertNotNull(entryStream.next());
-      assertNotNull(entryStream.next());
-      assertFalse(entryStream.hasNext());
+      assertNotNull(next(entryStream));
+      assertNotNull(next(entryStream));
+      assertEquals(HasNext.RETRY, entryStream.hasNext());
     }
   }
 
   @Test
   public void testEmptyStream() throws Exception {
     try (WALEntryStream entryStream =
-      new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
-      assertFalse(entryStream.hasNext());
+      new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
+      assertEquals(HasNext.RETRY, entryStream.hasNext());
     }
   }
 
@@ -309,10 +337,10 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
     // get ending position
     long position;
     try (WALEntryStream entryStream =
-      new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
-      entryStream.next();
-      entryStream.next();
-      entryStream.next();
+      new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
+      for (int i = 0; i < 3; i++) {
+        assertNotNull(next(entryStream));
+      }
       position = entryStream.getPosition();
     }
 
@@ -340,10 +368,10 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
     // get ending position
     long position;
     try (WALEntryStream entryStream =
-      new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
-      entryStream.next();
-      entryStream.next();
-      entryStream.next();
+      new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
+      for (int i = 0; i < 3; i++) {
+        assertNotNull(next(entryStream));
+      }
       position = entryStream.getPosition();
     }
 
@@ -455,10 +483,10 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
     // get ending position
     long position;
     try (WALEntryStream entryStream =
-      new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
-      entryStream.next();
-      entryStream.next();
-      entryStream.next();
+      new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
+      for (int i = 0; i < 3; i++) {
+        assertNotNull(next(entryStream));
+      }
       position = entryStream.getPosition();
     }
 
@@ -562,28 +590,24 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
     appendToLog("2");
     long size = log.getLogFileSizeIfBeingWritten(getQueue().peek()).getAsLong();
     AtomicLong fileLength = new AtomicLong(size - 1);
-    try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, 0,
-      p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"), fakeWalGroupId)) {
-      assertTrue(entryStream.hasNext());
-      assertNotNull(entryStream.next());
+    try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, CONF, 0,
+      p -> OptionalLong.of(fileLength.get()), new MetricsSource("1"), fakeWalGroupId)) {
+      assertNotNull(next(entryStream));
       // can not get log 2
-      assertFalse(entryStream.hasNext());
+      assertEquals(HasNext.RETRY, entryStream.hasNext());
       Thread.sleep(1000);
-      entryStream.reset();
       // still can not get log 2
-      assertFalse(entryStream.hasNext());
+      assertEquals(HasNext.RETRY, entryStream.hasNext());
 
       // can get log 2 now
       fileLength.set(size);
-      entryStream.reset();
-      assertTrue(entryStream.hasNext());
-      assertNotNull(entryStream.next());
+      assertNotNull(next(entryStream));
 
-      assertFalse(entryStream.hasNext());
+      assertEquals(HasNext.RETRY, entryStream.hasNext());
     }
   }
 
-  /*
+  /**
    * Test removal of 0 length log from logQueue if the source is a recovered source and size of
    * logQueue is only 1.
    */
@@ -625,13 +649,12 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
     ReplicationSource source = mockReplicationSource(true, conf);
     ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source);
     // Create a 0 length log.
-    Path emptyLog = new Path(fs.getHomeDirectory(), "log.2");
-    FSDataOutputStream fsdos = fs.create(emptyLog);
-    fsdos.close();
+    Path emptyLog = new Path(fs.getHomeDirectory(), "log.2." + isCompressionEnabled);
+    fs.create(emptyLog).close();
     assertEquals(0, fs.getFileStatus(emptyLog).getLen());
     localLogQueue.enqueueLog(emptyLog, fakeWalGroupId);
 
-    final Path log1 = new Path(fs.getHomeDirectory(), "log.1");
+    final Path log1 = new Path(fs.getHomeDirectory(), "log.1." + isCompressionEnabled);
     WALProvider.Writer writer1 = WALFactory.createWALWriter(fs, log1, TEST_UTIL.getConfiguration());
     appendEntries(writer1, 3);
     localLogQueue.enqueueLog(log1, fakeWalGroupId);
@@ -678,7 +701,7 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
     writer.close();
   }
 
-  /**
+  /***
    * Tests size of log queue is incremented and decremented properly.
    */
   @Test
@@ -688,16 +711,21 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
     appendToLogAndSync();
 
     log.rollWriter();
+    // wait until the previous WAL file is cleanly closed, so later we can aleays see
+    // RETRY_IMMEDIATELY instead of RETRY. The wait here is necessary because the closing of a WAL
+    // writer is asynchronouns
+    TEST_UTIL.waitFor(30000, () -> fs.getClient().isFileClosed(logQueue.getQueue(fakeWalGroupId)
+      .peek().makeQualified(fs.getUri(), fs.getWorkingDirectory()).toUri().getPath()));
     // After rolling there will be 2 wals in the queue
     assertEquals(2, logQueue.getMetrics().getSizeOfLogQueue());
 
     try (WALEntryStream entryStream =
-      new WALEntryStream(logQueue, CONF, 0, log, null, logQueue.getMetrics(), fakeWalGroupId)) {
+      new WALEntryStream(logQueue, fs, CONF, 0, log, logQueue.getMetrics(), fakeWalGroupId)) {
       // There's one edit in the log, read it.
-      assertTrue(entryStream.hasNext());
-      WAL.Entry entry = entryStream.next();
-      assertNotNull(entry);
-      assertFalse(entryStream.hasNext());
+      assertNotNull(next(entryStream));
+      // we've switched to the next WAL, and the previous WAL file is closed cleanly, so it is
+      // RETRY_IMMEDIATELY
+      assertEquals(HasNext.RETRY_IMMEDIATELY, entryStream.hasNext());
     }
     // After removing one wal, size of log queue will be 1 again.
     assertEquals(1, logQueue.getMetrics().getSizeOfLogQueue());
@@ -709,26 +737,25 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
    */
   @Test
   public void testCleanClosedWALs() throws Exception {
-    try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, 0, log, null,
+    try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, fs, CONF, 0, log,
       logQueue.getMetrics(), fakeWalGroupId)) {
       assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
       appendToLogAndSync();
-      assertNotNull(entryStream.next());
+      assertNotNull(next(entryStream));
       log.rollWriter();
       appendToLogAndSync();
-      assertNotNull(entryStream.next());
+      assertNotNull(next(entryStream));
       assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
     }
   }
 
   /**
    * Tests that we handle EOFException properly if the wal has moved to oldWALs directory.
-   * @throws Exception exception
    */
   @Test
   public void testEOFExceptionInOldWALsDirectory() throws Exception {
     assertEquals(1, logQueue.getQueueSize(fakeWalGroupId));
-    AbstractFSWAL abstractWAL = (AbstractFSWAL) log;
+    AbstractFSWAL<?> abstractWAL = (AbstractFSWAL<?>) log;
     Path emptyLogFile = abstractWAL.getCurrentFileName();
     log.rollWriter(true);
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStreamAsyncFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStreamAsyncFSWAL.java
index 8e2a01c177e..2c37e34ae4e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStreamAsyncFSWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStreamAsyncFSWAL.java
@@ -26,10 +26,13 @@ import org.apache.hadoop.hbase.wal.WALFactory;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 /**
  * TestBasicWALEntryStream with {@link AsyncFSWALProvider} as the WAL provider.
  */
+@RunWith(Parameterized.class)
 @Category({ ReplicationTests.class, MediumTests.class })
 public class TestBasicWALEntryStreamAsyncFSWAL extends TestBasicWALEntryStream {
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStreamFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStreamFSHLog.java
index db3e7fe8cf5..7ab9d3fc5a8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStreamFSHLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStreamFSHLog.java
@@ -26,10 +26,13 @@ import org.apache.hadoop.hbase.wal.WALFactory;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 /**
  * TestBasicWALEntryStream with {@link FSHLogProvider} as the WAL provider.
  */
+@RunWith(Parameterized.class)
 @Category({ ReplicationTests.class, MediumTests.class })
 public class TestBasicWALEntryStreamFSHLog extends TestBasicWALEntryStream {
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java
index 53ee0d720d8..082b42c23b6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java
@@ -42,10 +42,10 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
-import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALProvider;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -189,7 +189,8 @@ public class TestRaceWhenCreatingReplicationSource {
 
       @Override
       public boolean evaluate() throws Exception {
-        try (WAL.Reader reader = WALFactory.createReader(FS, LOG_PATH, UTIL.getConfiguration())) {
+        try (WALStreamReader reader =
+          WALFactory.createStreamReader(FS, LOG_PATH, UTIL.getConfiguration())) {
           return reader.next() != null;
         } catch (IOException e) {
           return false;
@@ -201,7 +202,8 @@ public class TestRaceWhenCreatingReplicationSource {
         return "Replication has not catched up";
       }
     });
-    try (WAL.Reader reader = WALFactory.createReader(FS, LOG_PATH, UTIL.getConfiguration())) {
+    try (WALStreamReader reader =
+      WALFactory.createStreamReader(FS, LOG_PATH, UTIL.getConfiguration())) {
       Cell cell = reader.next().getEdit().getCells().get(0);
       assertEquals(1, Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
       assertArrayEquals(CF, CellUtil.cloneFamily(cell));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index 454b8387fa7..35eb343dbff 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -70,6 +70,7 @@ import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
 import org.apache.hadoop.hbase.wal.WALProvider;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -226,7 +227,8 @@ public class TestReplicationSource {
     }
     writer.close();
 
-    WAL.Reader reader = WALFactory.createReader(FS, logPath, TEST_UTIL.getConfiguration());
+    WALStreamReader reader =
+      WALFactory.createStreamReader(FS, logPath, TEST_UTIL.getConfiguration());
     WAL.Entry entry = reader.next();
     assertNotNull(entry);
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamDifferentCounts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamDifferentCounts.java
index 3f151acceca..88f4f4539b2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamDifferentCounts.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamDifferentCounts.java
@@ -18,13 +18,13 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.HasNext;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runners.Parameterized.Parameter;
@@ -74,16 +74,17 @@ public abstract class TestWALEntryStreamDifferentCounts extends WALEntryStreamTe
     log.rollWriter();
 
     try (WALEntryStream entryStream =
-      new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
+      new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
       int i = 0;
-      while (entryStream.hasNext()) {
+      while (entryStream.hasNext() == HasNext.YES) {
         assertNotNull(entryStream.next());
         i++;
       }
       assertEquals(nbRows, i);
 
-      // should've read all entries
-      assertFalse(entryStream.hasNext());
+      // should've read all entries, and since the last file is still opened for writing so we will
+      // get a RETRY instead of NO here
+      assertEquals(HasNext.RETRY, entryStream.hasNext());
     }
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStreamTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStreamTestBase.java
index 399923b2892..5f29cb3553c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStreamTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStreamTestBase.java
@@ -28,10 +28,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNameTestRule;
-import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
@@ -39,10 +36,10 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -59,7 +56,7 @@ public abstract class WALEntryStreamTestBase {
   protected static final long TEST_TIMEOUT_MS = 5000;
   protected static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();;
   protected static Configuration CONF;
-  protected static FileSystem fs;
+  protected static DistributedFileSystem fs;
   protected static MiniDFSCluster cluster;
   protected static final TableName tableName = TableName.valueOf("tablename");
   protected static final byte[] family = Bytes.toBytes("column");
@@ -80,22 +77,30 @@ public abstract class WALEntryStreamTestBase {
    * the test code simpler.
    */
   protected static class WALEntryStreamWithRetries extends WALEntryStream {
-    // Class member to be able to set a non-final from within a lambda.
-    private Entry result;
 
-    public WALEntryStreamWithRetries(ReplicationSourceLogQueue logQueue, Configuration conf,
-      long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
-      MetricsSource metrics, String walGroupId) throws IOException {
-      super(logQueue, conf, startPosition, walFileLengthProvider, serverName, metrics, walGroupId);
+    private boolean retry = true;
+
+    public WALEntryStreamWithRetries(ReplicationSourceLogQueue logQueue, FileSystem fs,
+      Configuration conf, long startPosition, WALFileLengthProvider walFileLengthProvider,
+      MetricsSource metrics, String walGroupId) {
+      super(logQueue, fs, conf, startPosition, walFileLengthProvider, metrics, walGroupId);
+    }
+
+    public void enableRetry() {
+      retry = true;
+    }
+
+    public void disableRetry() {
+      retry = false;
     }
 
     @Override
-    public Entry next() {
-      Waiter.waitFor(CONF, TEST_TIMEOUT_MS, () -> {
-        result = WALEntryStreamWithRetries.super.next();
-        return result != null;
-      });
-      return result;
+    public HasNext hasNext() {
+      // hasNext is idempotent, so we can call it again and do not need to store its return value
+      if (retry) {
+        TEST_UTIL.waitFor(TEST_TIMEOUT_MS, () -> super.hasNext() == HasNext.YES);
+      }
+      return super.hasNext();
     }
   }
 
@@ -146,8 +151,7 @@ public abstract class WALEntryStreamTestBase {
     metricsSource.clear();
     logQueue = new ReplicationSourceLogQueue(CONF, metricsSource, source);
     pathWatcher = new PathWatcher();
-    final WALFactory wals =
-      new WALFactory(CONF, TableNameTestRule.cleanUpTestName(tn.getMethodName()));
+    final WALFactory wals = new WALFactory(CONF, tn.getMethodName().replaceAll("[\\[:]", "_"));
     wals.getWALProvider().addWALActionsListener(pathWatcher);
     log = wals.getWAL(info);
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/CompressedWALTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/CompressedWALTestBase.java
index b5f7dc634c7..cb2e5daa987 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/CompressedWALTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/CompressedWALTestBase.java
@@ -97,7 +97,7 @@ public class CompressedWALTestBase {
     wals.shutdown();
 
     // Confirm the WAL can be read back
-    try (WAL.Reader reader = wals.createReader(TEST_UTIL.getTestFileSystem(), walPath)) {
+    try (WALStreamReader reader = wals.createStreamReader(TEST_UTIL.getTestFileSystem(), walPath)) {
       int count = 0;
       WAL.Entry entry = new WAL.Entry();
       while (reader.next(entry) != null) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/NoEOFWALStreamReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/NoEOFWALStreamReader.java
new file mode 100644
index 00000000000..2eb63b99bf8
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/NoEOFWALStreamReader.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.EOFException;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A helper class for writing UTs, where we will eat the EOF and return null when reaching EOF, so
+ * in UTs we do not need to deal with partial WAL files if this does not affect the correctness. In
+ * production code you usually you should not do this, as it may cause data loss if you always
+ * ignore the EOFException.
+ */
+public final class NoEOFWALStreamReader implements WALStreamReader {
+
+  private static final Logger LOG = LoggerFactory.getLogger(NoEOFWALStreamReader.class);
+
+  private WALStreamReader reader;
+
+  private NoEOFWALStreamReader(WALStreamReader reader) {
+    this.reader = reader;
+  }
+
+  @Override
+  public Entry next(Entry reuse) throws IOException {
+    try {
+      return reader.next(reuse);
+    } catch (EOFException e) {
+      LOG.warn("Got EOF while reading", e);
+      return null;
+    }
+  }
+
+  @Override
+  public long getPosition() throws IOException {
+    return reader.getPosition();
+  }
+
+  @Override
+  public void close() {
+    reader.close();
+  }
+
+  private int count() throws IOException {
+    int count = 0;
+    Entry entry = new Entry();
+    while (next(entry) != null) {
+      count++;
+    }
+    return count;
+  }
+
+  public static NoEOFWALStreamReader create(FileSystem fs, Path path, Configuration conf)
+    throws IOException {
+    return new NoEOFWALStreamReader(WALFactory.createStreamReader(fs, path, conf));
+  }
+
+  public static NoEOFWALStreamReader create(WALFactory walFactory, FileSystem fs, Path path)
+    throws IOException {
+    return new NoEOFWALStreamReader(walFactory.createStreamReader(fs, path));
+  }
+
+  public static int count(FileSystem fs, Path path, Configuration conf) throws IOException {
+    try (NoEOFWALStreamReader reader = create(fs, path, conf)) {
+      return reader.count();
+    }
+  }
+
+  public static int count(WALFactory walFactory, FileSystem fs, Path path) throws IOException {
+    try (NoEOFWALStreamReader reader = create(walFactory, fs, path)) {
+      return reader.count();
+    }
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestParsePartialWALFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestParsePartialWALFile.java
new file mode 100644
index 00000000000..ae6db543537
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestParsePartialWALFile.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.Cell.Type;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.regionserver.wal.WALHeaderEOFException;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * In this test, we write a small WAL file first, and then generate partial WAL file which length is
+ * in range [0, fileLength)(we test all the possible length in the range), to see if we can
+ * successfully get the completed entries, and also get an EOF at the end.
+ * <p/>
+ * It is very important to make sure 3 things:
+ * <ul>
+ * <li>We do not get incorrect entries. Otherwise there will be data corruption.</li>
+ * <li>We can get all the completed entries, i.e, we do not miss some data. Otherwise there will be
+ * data loss.</li>
+ * <li>We will get an EOF finally, instead of a general IOException. Otherwise the split or
+ * replication will be stuck.</li>
+ * </ul>
+ */
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestParsePartialWALFile {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestParsePartialWALFile.class);
+
+  private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
+
+  private static FileSystem FS;
+
+  private static TableName TN = TableName.valueOf("test");
+  private static RegionInfo RI = RegionInfoBuilder.newBuilder(TN).build();
+  private static byte[] ROW = Bytes.toBytes("row");
+  private static byte[] FAMILY = Bytes.toBytes("family");
+  private static byte[] QUAL = Bytes.toBytes("qualifier");
+  private static byte[] VALUE = Bytes.toBytes("value");
+
+  @BeforeClass
+  public static void setUp() throws IOException {
+    UTIL.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
+    FS = FileSystem.getLocal(UTIL.getConfiguration());
+    if (!FS.mkdirs(UTIL.getDataTestDir())) {
+      throw new IOException("can not create " + UTIL.getDataTestDir());
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    UTIL.cleanupTestDir();
+  }
+
+  private Path generateBrokenWALFile(byte[] content, int length) throws IOException {
+    Path walFile = UTIL.getDataTestDir("wal-" + length);
+    try (FSDataOutputStream out = FS.create(walFile)) {
+      out.write(content, 0, length);
+    }
+    return walFile;
+  }
+
+  private void assertEntryEquals(WAL.Entry entry, int index) {
+    WALKeyImpl key = entry.getKey();
+    assertEquals(TN, key.getTableName());
+    assertArrayEquals(RI.getEncodedNameAsBytes(), key.getEncodedRegionName());
+    WALEdit edit = entry.getEdit();
+    assertEquals(1, edit.getCells().size());
+    Cell cell = edit.getCells().get(0);
+    assertArrayEquals(ROW, CellUtil.cloneRow(cell));
+    assertArrayEquals(FAMILY, CellUtil.cloneFamily(cell));
+    if (index % 2 == 0) {
+      assertEquals(Type.Put, cell.getType());
+      assertArrayEquals(QUAL, CellUtil.cloneQualifier(cell));
+      assertArrayEquals(VALUE, CellUtil.cloneValue(cell));
+    } else {
+      assertEquals(Type.DeleteFamily, cell.getType());
+    }
+  }
+
+  private void testReadEntry(Path file, int entryCount) throws IOException {
+    try (
+      WALStreamReader reader = WALFactory.createStreamReader(FS, file, UTIL.getConfiguration())) {
+      for (int i = 0; i < entryCount; i++) {
+        assertEntryEquals(reader.next(), i);
+      }
+      assertThrows(EOFException.class, () -> reader.next());
+    }
+    try (WALTailingReader reader =
+      WALFactory.createTailingReader(FS, file, UTIL.getConfiguration(), -1)) {
+      for (int i = 0; i < entryCount; i++) {
+        WALTailingReader.Result result = reader.next(-1);
+        assertEquals(WALTailingReader.State.NORMAL, result.getState());
+        assertEntryEquals(result.getEntry(), i);
+      }
+      WALTailingReader.Result result = reader.next(-1);
+      assertEquals(WALTailingReader.State.EOF_AND_RESET, result.getState());
+    }
+  }
+
+  @Test
+  public void testPartialParse() throws Exception {
+    Path walFile = UTIL.getDataTestDir("wal");
+    long headerLength;
+    List<Long> endOffsets = new ArrayList<>();
+    try (WALProvider.Writer writer =
+      WALFactory.createWALWriter(FS, walFile, UTIL.getConfiguration())) {
+      headerLength = writer.getLength();
+      for (int i = 0; i < 3; i++) {
+        WALKeyImpl key = new WALKeyImpl(RI.getEncodedNameAsBytes(), TN, i,
+          EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID);
+        WALEdit edit = new WALEdit();
+        if (i % 2 == 0) {
+          edit.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Type.Put)
+            .setRow(ROW).setFamily(FAMILY).setQualifier(QUAL).setValue(VALUE).build());
+        } else {
+          edit.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
+            .setType(Type.DeleteFamily).setRow(ROW).setFamily(FAMILY).build());
+        }
+        writer.append(new WAL.Entry(key, edit));
+        writer.sync(true);
+        endOffsets.add(writer.getLength());
+      }
+    }
+    long fileLength = FS.getFileStatus(walFile).getLen();
+    byte[] content = new byte[(int) fileLength];
+    try (FSDataInputStream in = FS.open(walFile)) {
+      in.readFully(content);
+    }
+    // partial header, should throw WALHeaderEOFException
+    for (int i = 0; i < headerLength; i++) {
+      Path brokenFile = generateBrokenWALFile(content, i);
+      assertThrows(WALHeaderEOFException.class,
+        () -> WALFactory.createStreamReader(FS, brokenFile, UTIL.getConfiguration()));
+      assertThrows(WALHeaderEOFException.class,
+        () -> WALFactory.createTailingReader(FS, brokenFile, UTIL.getConfiguration(), -1));
+      FS.delete(brokenFile, false);
+    }
+    // partial WAL entries, should be able to read some entries and the last one we will get an EOF
+    for (int i = 0; i <= endOffsets.size(); i++) {
+      int startOffset;
+      int endOffset;
+      if (i == 0) {
+        startOffset = (int) headerLength;
+        endOffset = endOffsets.get(i).intValue();
+      } else if (i == endOffsets.size()) {
+        startOffset = endOffsets.get(i - 1).intValue();
+        endOffset = (int) fileLength;
+      } else {
+        startOffset = endOffsets.get(i - 1).intValue();
+        endOffset = endOffsets.get(i).intValue();
+      }
+      for (int j = startOffset; j < endOffset; j++) {
+        Path brokenFile = generateBrokenWALFile(content, j);
+        testReadEntry(brokenFile, i);
+        FS.delete(brokenFile, false);
+      }
+    }
+    // partial trailer, should be able to read all the entries but get an EOF when trying read
+    // again, as we do not know it is a trailer
+    for (int i = endOffsets.get(endOffsets.size() - 1).intValue(); i < fileLength; i++) {
+      Path brokenFile = generateBrokenWALFile(content, i);
+      testReadEntry(brokenFile, endOffsets.size());
+      FS.delete(brokenFile, false);
+    }
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java
index 3b784427b54..67b03fb103c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.regionserver.wal.SecureAsyncProtobufLogWriter;
-import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader;
 import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@@ -87,8 +86,6 @@ public class TestSecureWAL {
     Configuration conf = TEST_UTIL.getConfiguration();
     conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
     conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
-    conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
-      WAL.Reader.class);
     conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
       WALProvider.Writer.class);
     conf.setClass("hbase.regionserver.hlog.async.writer.impl", SecureAsyncProtobufLogWriter.class,
@@ -146,23 +143,23 @@ public class TestSecureWAL {
     assertFalse("Cells appear to be plaintext", Bytes.contains(fileData, value));
 
     // Confirm the WAL can be read back
-    WAL.Reader reader = wals.createReader(TEST_UTIL.getTestFileSystem(), walPath);
     int count = 0;
-    WAL.Entry entry = new WAL.Entry();
-    while (reader.next(entry) != null) {
-      count++;
-      List<Cell> cells = entry.getEdit().getCells();
-      assertTrue("Should be one KV per WALEdit", cells.size() == 1);
-      for (Cell cell : cells) {
-        assertTrue("Incorrect row", Bytes.equals(cell.getRowArray(), cell.getRowOffset(),
-          cell.getRowLength(), row, 0, row.length));
-        assertTrue("Incorrect family", Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(),
-          cell.getFamilyLength(), family, 0, family.length));
-        assertTrue("Incorrect value", Bytes.equals(cell.getValueArray(), cell.getValueOffset(),
-          cell.getValueLength(), value, 0, value.length));
+    try (WALStreamReader reader = wals.createStreamReader(TEST_UTIL.getTestFileSystem(), walPath)) {
+      WAL.Entry entry = new WAL.Entry();
+      while (reader.next(entry) != null) {
+        count++;
+        List<Cell> cells = entry.getEdit().getCells();
+        assertTrue("Should be one KV per WALEdit", cells.size() == 1);
+        for (Cell cell : cells) {
+          assertTrue("Incorrect row", Bytes.equals(cell.getRowArray(), cell.getRowOffset(),
+            cell.getRowLength(), row, 0, row.length));
+          assertTrue("Incorrect family", Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(),
+            cell.getFamilyLength(), family, 0, family.length));
+          assertTrue("Incorrect value", Bytes.equals(cell.getValueArray(), cell.getValueOffset(),
+            cell.getValueLength(), value, 0, value.length));
+        }
       }
+      assertEquals("Should have read back as many KVs as written", total, count);
     }
-    assertEquals("Should have read back as many KVs as written", total, count);
-    reader.close();
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
index d519d3720e5..1afdc6e0fa7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
@@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -261,91 +262,63 @@ public class TestWALFactory {
     in.close();
 
     final int total = 20;
-    WAL.Reader reader = null;
+    RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
+    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    scopes.put(tableName.getName(), 0);
+    final WAL wal = wals.getWAL(info);
 
-    try {
-      RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
-      NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-      scopes.put(tableName.getName(), 0);
-      final WAL wal = wals.getWAL(info);
-
-      for (int i = 0; i < total; i++) {
-        WALEdit kvs = new WALEdit();
-        kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
-        wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
-          EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
-      }
-      // Now call sync and try reading. Opening a Reader before you sync just
-      // gives you EOFE.
-      wal.sync();
-      // Open a Reader.
-      Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
-      reader = wals.createReader(fs, walPath);
-      int count = 0;
-      WAL.Entry entry = new WAL.Entry();
-      while ((entry = reader.next(entry)) != null)
-        count++;
-      assertEquals(total, count);
-      reader.close();
-      // Add test that checks to see that an open of a Reader works on a file
-      // that has had a sync done on it.
-      for (int i = 0; i < total; i++) {
-        WALEdit kvs = new WALEdit();
-        kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
-        wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
-          EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
-      }
-      wal.sync();
-      reader = wals.createReader(fs, walPath);
-      count = 0;
-      while ((entry = reader.next(entry)) != null)
-        count++;
-      assertTrue(count >= total);
-      reader.close();
-      // If I sync, should see double the edits.
-      wal.sync();
-      reader = wals.createReader(fs, walPath);
-      count = 0;
-      while ((entry = reader.next(entry)) != null)
-        count++;
-      assertEquals(total * 2, count);
-      reader.close();
-      // Now do a test that ensures stuff works when we go over block boundary,
-      // especially that we return good length on file.
-      final byte[] value = new byte[1025 * 1024]; // Make a 1M value.
-      for (int i = 0; i < total; i++) {
-        WALEdit kvs = new WALEdit();
-        kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value));
-        wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
-          EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
-      }
-      // Now I should have written out lots of blocks. Sync then read.
-      wal.sync();
-      reader = wals.createReader(fs, walPath);
-      count = 0;
-      while ((entry = reader.next(entry)) != null)
-        count++;
-      assertEquals(total * 3, count);
-      reader.close();
-      // shutdown and ensure that Reader gets right length also.
-      wal.shutdown();
-      reader = wals.createReader(fs, walPath);
-      count = 0;
-      while ((entry = reader.next(entry)) != null)
-        count++;
-      assertEquals(total * 3, count);
-      reader.close();
-    } finally {
-      if (reader != null) reader.close();
+    for (int i = 0; i < total; i++) {
+      WALEdit kvs = new WALEdit();
+      kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
+      wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
+        EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
     }
+    // Now call sync and try reading. Opening a Reader before you sync just
+    // gives you EOFE.
+    wal.sync();
+    // Open a Reader.
+    Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
+    int count = NoEOFWALStreamReader.count(wals, fs, walPath);
+    assertEquals(total, count);
+    // Add test that checks to see that an open of a Reader works on a file
+    // that has had a sync done on it.
+    for (int i = 0; i < total; i++) {
+      WALEdit kvs = new WALEdit();
+      kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
+      wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
+        EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
+    }
+    wal.sync();
+    count = NoEOFWALStreamReader.count(wals, fs, walPath);
+    assertTrue(count >= total);
+    // If I sync, should see double the edits.
+    wal.sync();
+    count = NoEOFWALStreamReader.count(wals, fs, walPath);
+    assertEquals(total * 2, count);
+    // Now do a test that ensures stuff works when we go over block boundary,
+    // especially that we return good length on file.
+    final byte[] value = new byte[1025 * 1024]; // Make a 1M value.
+    for (int i = 0; i < total; i++) {
+      WALEdit kvs = new WALEdit();
+      kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value));
+      wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
+        EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
+    }
+    // Now I should have written out lots of blocks. Sync then read.
+    wal.sync();
+    count = NoEOFWALStreamReader.count(wals, fs, walPath);
+    assertEquals(total * 3, count);
+    // shutdown and ensure that Reader gets right length also.
+    wal.shutdown();
+    count = NoEOFWALStreamReader.count(wals, fs, walPath);
+    assertEquals(total * 3, count);
   }
 
   private void verifySplits(final List<Path> splits, final int howmany) throws IOException {
     assertEquals(howmany * howmany, splits.size());
     for (int i = 0; i < splits.size(); i++) {
       LOG.info("Verifying=" + splits.get(i));
-      WAL.Reader reader = wals.createReader(fs, splits.get(i));
-      try {
+      try (WALStreamReader reader = wals.createStreamReader(fs, splits.get(i))) {
         int count = 0;
         String previousRegion = null;
         long seqno = -1;
@@ -364,8 +337,6 @@ public class TestWALFactory {
           count++;
         }
         assertEquals(howmany, count);
-      } finally {
-        reader.close();
       }
     }
   }
@@ -475,15 +446,15 @@ public class TestWALFactory {
     if (t.exception != null) throw t.exception;
 
     // Make sure you can read all the content
-    WAL.Reader reader = wals.createReader(fs, walPath);
     int count = 0;
-    WAL.Entry entry = new WAL.Entry();
-    while (reader.next(entry) != null) {
-      count++;
-      assertTrue("Should be one KeyValue per WALEdit", entry.getEdit().getCells().size() == 1);
+    try (NoEOFWALStreamReader reader = NoEOFWALStreamReader.create(wals, fs, walPath)) {
+      WAL.Entry entry = new WAL.Entry();
+      while (reader.next(entry) != null) {
+        count++;
+        assertTrue("Should be one KeyValue per WALEdit", entry.getEdit().getCells().size() == 1);
+      }
     }
     assertEquals(total, count);
-    reader.close();
 
     // Reset the lease period
     setLeasePeriod.invoke(cluster, new Object[] { 60000L, 3600000L });
@@ -503,31 +474,29 @@ public class TestWALFactory {
       scopes.put(fam, 0);
     }
     byte[] row = Bytes.toBytes("row");
-    WAL.Reader reader = null;
-    try {
-      final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
+    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
 
-      // Write columns named 1, 2, 3, etc. and then values of single byte
-      // 1, 2, 3...
-      long timestamp = EnvironmentEdgeManager.currentTime();
-      WALEdit cols = new WALEdit();
-      for (int i = 0; i < colCount; i++) {
-        cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)),
-          timestamp, new byte[] { (byte) (i + '0') }));
-      }
-      RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(row)
-        .setEndKey(Bytes.toBytes(Bytes.toString(row) + "1")).build();
-      final WAL log = wals.getWAL(info);
-
-      final long txid = log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(),
-        htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
-      log.sync(txid);
-      log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
-      log.completeCacheFlush(info.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
-      log.shutdown();
-      Path filename = AbstractFSWALProvider.getCurrentFileName(log);
-      // Now open a reader on the log and assert append worked.
-      reader = wals.createReader(fs, filename);
+    // Write columns named 1, 2, 3, etc. and then values of single byte
+    // 1, 2, 3...
+    long timestamp = EnvironmentEdgeManager.currentTime();
+    WALEdit cols = new WALEdit();
+    for (int i = 0; i < colCount; i++) {
+      cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)),
+        timestamp, new byte[] { (byte) (i + '0') }));
+    }
+    RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(row)
+      .setEndKey(Bytes.toBytes(Bytes.toString(row) + "1")).build();
+    final WAL log = wals.getWAL(info);
+
+    final long txid = log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(),
+      htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
+    log.sync(txid);
+    log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
+    log.completeCacheFlush(info.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
+    log.shutdown();
+    Path filename = AbstractFSWALProvider.getCurrentFileName(log);
+    // Now open a reader on the log and assert append worked.
+    try (NoEOFWALStreamReader reader = NoEOFWALStreamReader.create(wals, fs, filename)) {
       // Above we added all columns on a single row so we only read one
       // entry in the below... thats why we have '1'.
       for (int i = 0; i < 1; i++) {
@@ -541,11 +510,7 @@ public class TestWALFactory {
         assertTrue(Bytes.equals(row, 0, row.length, cell.getRowArray(), cell.getRowOffset(),
           cell.getRowLength()));
         assertEquals((byte) (i + '0'), CellUtil.cloneValue(cell)[0]);
-        System.out.println(key + " " + val);
-      }
-    } finally {
-      if (reader != null) {
-        reader.close();
+        LOG.info(key + " " + val);
       }
     }
   }
@@ -561,28 +526,26 @@ public class TestWALFactory {
       scopes.put(fam, 0);
     }
     byte[] row = Bytes.toBytes("row");
-    WAL.Reader reader = null;
     final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
-    try {
-      // Write columns named 1, 2, 3, etc. and then values of single byte
-      // 1, 2, 3...
-      long timestamp = EnvironmentEdgeManager.currentTime();
-      WALEdit cols = new WALEdit();
-      for (int i = 0; i < colCount; i++) {
-        cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)),
-          timestamp, new byte[] { (byte) (i + '0') }));
-      }
-      RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
-      final WAL log = wals.getWAL(hri);
-      final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(),
-        htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
-      log.sync(txid);
-      log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
-      log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
-      log.shutdown();
-      Path filename = AbstractFSWALProvider.getCurrentFileName(log);
-      // Now open a reader on the log and assert append worked.
-      reader = wals.createReader(fs, filename);
+    // Write columns named 1, 2, 3, etc. and then values of single byte
+    // 1, 2, 3...
+    long timestamp = EnvironmentEdgeManager.currentTime();
+    WALEdit cols = new WALEdit();
+    for (int i = 0; i < colCount; i++) {
+      cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)),
+        timestamp, new byte[] { (byte) (i + '0') }));
+    }
+    RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
+    final WAL log = wals.getWAL(hri);
+    final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(),
+      htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
+    log.sync(txid);
+    log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
+    log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
+    log.shutdown();
+    Path filename = AbstractFSWALProvider.getCurrentFileName(log);
+    // Now open a reader on the log and assert append worked.
+    try (WALStreamReader reader = wals.createStreamReader(fs, filename)) {
       WAL.Entry entry = reader.next();
       assertEquals(colCount, entry.getEdit().size());
       int idx = 0;
@@ -596,10 +559,6 @@ public class TestWALFactory {
         System.out.println(entry.getKey() + " " + val);
         idx++;
       }
-    } finally {
-      if (reader != null) {
-        reader.close();
-      }
     }
   }
 
@@ -771,45 +730,34 @@ public class TestWALFactory {
       scopes.put(fam, 0);
     }
     byte[] row = Bytes.toBytes("row");
-    WAL.Reader reader = null;
     final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
-    try {
-      // Write one column in one edit.
-      WALEdit cols = new WALEdit();
-      cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes("0"),
-        EnvironmentEdgeManager.currentTime(), new byte[] { 0 }));
-      final WAL log = customFactory.getWAL(hri);
-      final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(),
-        htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
-      // Sync the edit to the WAL
-      log.sync(txid);
-      log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
-      log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
-      log.shutdown();
-
-      // Inject our failure, object is constructed via reflection.
-      BrokenWALCellCodec.THROW_FAILURE_ON_INIT.set(true);
-
-      // Now open a reader on the log which will throw an exception when
-      // we try to instantiate the custom Codec.
-      Path filename = AbstractFSWALProvider.getCurrentFileName(log);
-      try {
-        reader = customFactory.createReader(proxyFs, filename);
-        fail("Expected to see an exception when creating WAL reader");
-      } catch (Exception e) {
-        // Expected that we get an exception
-      }
-      // We should have exactly one reader
-      assertEquals(1, openedReaders.size());
-      // And that reader should be closed.
-      long unclosedReaders =
-        openedReaders.stream().filter((r) -> !r.isClosed.get()).collect(Collectors.counting());
-      assertEquals("Should not find any open readers", 0, (int) unclosedReaders);
-    } finally {
-      if (reader != null) {
-        reader.close();
-      }
-    }
+    // Write one column in one edit.
+    WALEdit cols = new WALEdit();
+    cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes("0"),
+      EnvironmentEdgeManager.currentTime(), new byte[] { 0 }));
+    final WAL log = customFactory.getWAL(hri);
+    final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(),
+      htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
+    // Sync the edit to the WAL
+    log.sync(txid);
+    log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
+    log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
+    log.shutdown();
+
+    // Inject our failure, object is constructed via reflection.
+    BrokenWALCellCodec.THROW_FAILURE_ON_INIT.set(true);
+
+    // Now open a reader on the log which will throw an exception when
+    // we try to instantiate the custom Codec.
+    Path filename = AbstractFSWALProvider.getCurrentFileName(log);
+    assertThrows("Expected to see an exception when creating WAL reader", IOException.class,
+      () -> customFactory.createStreamReader(proxyFs, filename));
+    // We should have exactly one reader
+    assertEquals(1, openedReaders.size());
+    // And that reader should be closed.
+    long unclosedReaders =
+      openedReaders.stream().filter((r) -> !r.isClosed.get()).collect(Collectors.counting());
+    assertEquals("Should not find any open readers", 0, unclosedReaders);
   }
 
   /**
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALOpenAfterDNRollingStart.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALOpenAfterDNRollingStart.java
index 3067164221f..096ebe020a9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALOpenAfterDNRollingStart.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALOpenAfterDNRollingStart.java
@@ -121,8 +121,8 @@ public class TestWALOpenAfterDNRollingStart {
       currentFile = new Path(oldLogDir, currentFile.getName());
     }
     // if the log is not rolled, then we can never open this wal forever.
-    try (WAL.Reader reader = WALFactory.createReader(TEST_UTIL.getTestFileSystem(), currentFile,
-      TEST_UTIL.getConfiguration())) {
+    try (WALStreamReader reader = WALFactory.createStreamReader(TEST_UTIL.getTestFileSystem(),
+      currentFile, TEST_UTIL.getConfiguration())) {
       reader.next();
     }
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
deleted file mode 100644
index 1ac0620ed54..00000000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.wal;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ByteBufferKeyValue;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionInfoBuilder;
-import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
-import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
-import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
-import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
-import org.apache.hadoop.hbase.regionserver.wal.SecureAsyncProtobufLogWriter;
-import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader;
-import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter;
-import org.apache.hadoop.hbase.regionserver.wal.SecureWALCellCodec;
-import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
-import org.apache.hadoop.hbase.testclassification.RegionServerTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.CommonFSUtils;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-
-/**
- * Test that verifies WAL written by SecureProtobufLogWriter is not readable by ProtobufLogReader
- */
-@Category({ RegionServerTests.class, SmallTests.class })
-public class TestWALReaderOnSecureWAL {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestWALReaderOnSecureWAL.class);
-
-  static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-  final byte[] value = Bytes.toBytes("Test value");
-
-  private static final String WAL_ENCRYPTION = "hbase.regionserver.wal.encryption";
-
-  @Rule
-  public TestName currentTest = new TestName();
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    Configuration conf = TEST_UTIL.getConfiguration();
-    conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
-    conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
-    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
-    conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
-    CommonFSUtils.setRootDir(conf, TEST_UTIL.getDataTestDir());
-  }
-
-  private Path writeWAL(final WALFactory wals, final String tblName, boolean offheap)
-    throws IOException {
-    Configuration conf = TEST_UTIL.getConfiguration();
-    String clsName = conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
-    conf.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, SecureWALCellCodec.class,
-      WALCellCodec.class);
-    try {
-      TableName tableName = TableName.valueOf(tblName);
-      NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-      scopes.put(tableName.getName(), 0);
-      RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build();
-      final int total = 10;
-      final byte[] row = Bytes.toBytes("row");
-      final byte[] family = Bytes.toBytes("family");
-      final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
-
-      // Write the WAL
-      WAL wal = wals.getWAL(regionInfo);
-      for (int i = 0; i < total; i++) {
-        WALEdit kvs = new WALEdit();
-        KeyValue kv = new KeyValue(row, family, Bytes.toBytes(i), value);
-        if (offheap) {
-          ByteBuffer bb = ByteBuffer.allocateDirect(kv.getBuffer().length);
-          bb.put(kv.getBuffer());
-          ByteBufferKeyValue offheapKV = new ByteBufferKeyValue(bb, 0, kv.getLength());
-          kvs.add(offheapKV);
-        } else {
-          kvs.add(kv);
-        }
-        wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName,
-          EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
-      }
-      wal.sync();
-      final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
-      wal.shutdown();
-
-      return walPath;
-    } finally {
-      // restore the cell codec class
-      conf.set(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, clsName);
-    }
-  }
-
-  @Test()
-  public void testWALReaderOnSecureWALWithKeyValues() throws Exception {
-    testSecureWALInternal(false);
-  }
-
-  @Test()
-  public void testWALReaderOnSecureWALWithOffheapKeyValues() throws Exception {
-    testSecureWALInternal(true);
-  }
-
-  private void testSecureWALInternal(boolean offheap) throws IOException, FileNotFoundException {
-    Configuration conf = TEST_UTIL.getConfiguration();
-    conf.setClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, WAL.Reader.class);
-    conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
-      WALProvider.Writer.class);
-    conf.setClass("hbase.regionserver.hlog.async.writer.impl", SecureAsyncProtobufLogWriter.class,
-      WALProvider.AsyncWriter.class);
-    conf.setBoolean(WAL_ENCRYPTION, true);
-    FileSystem fs = TEST_UTIL.getTestFileSystem();
-    final WALFactory wals = new WALFactory(conf, currentTest.getMethodName());
-    Path walPath = writeWAL(wals, currentTest.getMethodName(), offheap);
-
-    // Insure edits are not plaintext
-    long length = fs.getFileStatus(walPath).getLen();
-    FSDataInputStream in = fs.open(walPath);
-    byte[] fileData = new byte[(int) length];
-    IOUtils.readFully(in, fileData);
-    in.close();
-    assertFalse("Cells appear to be plaintext", Bytes.contains(fileData, value));
-
-    // Confirm the WAL cannot be read back by ProtobufLogReader
-    try {
-      wals.createReader(TEST_UTIL.getTestFileSystem(), walPath);
-      assertFalse(true);
-    } catch (IOException ioe) {
-      System.out.println("Expected ioe " + ioe.getMessage());
-    }
-
-    FileStatus[] listStatus = fs.listStatus(walPath.getParent());
-    Path rootdir = CommonFSUtils.getRootDir(conf);
-    WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, rootdir, fs, null, null, null);
-    WALSplitter.SplitWALResult swr = s.splitWAL(listStatus[0], null);
-    assertTrue(swr.isCorrupt());
-    wals.close();
-  }
-
-  @Test()
-  public void testSecureWALReaderOnWAL() throws Exception {
-    Configuration conf = TEST_UTIL.getConfiguration();
-    conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
-      WAL.Reader.class);
-    conf.setClass("hbase.regionserver.hlog.writer.impl", ProtobufLogWriter.class,
-      WALProvider.Writer.class);
-    conf.setBoolean(WAL_ENCRYPTION, false);
-    FileSystem fs = TEST_UTIL.getTestFileSystem();
-    final WALFactory wals = new WALFactory(conf,
-      ServerName.valueOf(currentTest.getMethodName(), 16010, EnvironmentEdgeManager.currentTime())
-        .toString());
-    Path walPath = writeWAL(wals, currentTest.getMethodName(), false);
-
-    // Ensure edits are plaintext
-    long length = fs.getFileStatus(walPath).getLen();
-    FSDataInputStream in = fs.open(walPath);
-    byte[] fileData = new byte[(int) length];
-    IOUtils.readFully(in, fileData);
-    in.close();
-    assertTrue("Cells should be plaintext", Bytes.contains(fileData, value));
-
-    // Confirm the WAL can be read back by SecureProtobufLogReader
-    try {
-      WAL.Reader reader = wals.createReader(TEST_UTIL.getTestFileSystem(), walPath);
-      reader.close();
-    } catch (IOException ioe) {
-      assertFalse(true);
-    }
-
-    FileStatus[] listStatus = fs.listStatus(walPath.getParent());
-    Path rootdir = CommonFSUtils.getRootDir(conf);
-    try {
-      WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, rootdir, fs, null, null, null);
-      s.splitWAL(listStatus[0], null);
-      Path file =
-        new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()), "corrupt");
-      assertTrue(!fs.exists(file));
-    } catch (IOException ioe) {
-      assertTrue("WAL should have been processed", false);
-    }
-    wals.close();
-  }
-}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
index 5ee699e7ce4..0c4387c87b2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
@@ -66,9 +66,9 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
 import org.apache.hadoop.hbase.master.SplitLogManager;
 import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufLogReader;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader;
+import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufWALStreamReader;
 import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter;
-import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -79,7 +79,6 @@ import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WAL.Reader;
 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
 import org.apache.hadoop.hbase.wal.WALSplitter.CorruptedLogFileException;
 import org.apache.hadoop.hdfs.DFSTestUtil;
@@ -606,10 +605,11 @@ public class TestWALSplit {
   @Test
   public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException {
     conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
-    List<FaultyProtobufLogReader.FailureType> failureTypes =
-      Arrays.asList(FaultyProtobufLogReader.FailureType.values()).stream()
-        .filter(x -> x != FaultyProtobufLogReader.FailureType.NONE).collect(Collectors.toList());
-    for (FaultyProtobufLogReader.FailureType failureType : failureTypes) {
+    List<FaultyProtobufWALStreamReader.FailureType> failureTypes =
+      Arrays.asList(FaultyProtobufWALStreamReader.FailureType.values()).stream()
+        .filter(x -> x != FaultyProtobufWALStreamReader.FailureType.NONE)
+        .collect(Collectors.toList());
+    for (FaultyProtobufWALStreamReader.FailureType failureType : failureTypes) {
       final Set<String> walDirContents = splitCorruptWALs(failureType);
       final Set<String> archivedLogs = new HashSet<>();
       final StringBuilder archived = new StringBuilder("Archived logs in CORRUPTDIR:");
@@ -627,14 +627,14 @@ public class TestWALSplit {
    * @return set of wal names present prior to split attempt.
    * @throws IOException if the split process fails
    */
-  private Set<String> splitCorruptWALs(final FaultyProtobufLogReader.FailureType failureType)
+  private Set<String> splitCorruptWALs(final FaultyProtobufWALStreamReader.FailureType failureType)
     throws IOException {
-    Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl", Reader.class);
+    String backupClass = conf.get(WALFactory.WAL_STREAM_READER_CLASS_IMPL);
     InstrumentedLogWriter.activateFailure = false;
 
     try {
-      conf.setClass("hbase.regionserver.hlog.reader.impl", FaultyProtobufLogReader.class,
-        Reader.class);
+      conf.setClass(WALFactory.WAL_STREAM_READER_CLASS_IMPL, FaultyProtobufWALStreamReader.class,
+        WALStreamReader.class);
       conf.set("faultyprotobuflogreader.failuretype", failureType.name());
       // Clean up from previous tests or previous loop
       try {
@@ -663,21 +663,25 @@ public class TestWALSplit {
       WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
       return walDirContents;
     } finally {
-      conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass, Reader.class);
+      if (backupClass != null) {
+        conf.set(WALFactory.WAL_STREAM_READER_CLASS_IMPL, backupClass);
+      } else {
+        conf.unset(WALFactory.WAL_STREAM_READER_CLASS_IMPL);
+      }
     }
   }
 
   @Test(expected = IOException.class)
   public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows() throws IOException {
     conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
-    splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING);
+    splitCorruptWALs(FaultyProtobufWALStreamReader.FailureType.BEGINNING);
   }
 
   @Test
   public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs() throws IOException {
     conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
     try {
-      splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING);
+      splitCorruptWALs(FaultyProtobufWALStreamReader.FailureType.BEGINNING);
     } catch (IOException e) {
       LOG.debug("split with 'skip errors' set to 'false' correctly threw");
     }
@@ -704,13 +708,12 @@ public class TestWALSplit {
     assertEquals(1, splitLog.length);
 
     int actualCount = 0;
-    Reader in = wals.createReader(fs, splitLog[0]);
-    @SuppressWarnings("unused")
-    Entry entry;
-    while ((entry = in.next()) != null)
-      ++actualCount;
+    try (WALStreamReader in = wals.createStreamReader(fs, splitLog[0])) {
+      while (in.next() != null) {
+        ++actualCount;
+      }
+    }
     assertEquals(expectedCount, actualCount);
-    in.close();
 
     // should not have stored the EOF files as corrupt
     FileStatus[] archivedLogs =
@@ -1047,15 +1050,17 @@ public class TestWALSplit {
 
       /* Produce a mock reader that generates fake entries */
       @Override
-      protected Reader getReader(FileStatus file, boolean skipErrors,
+      protected WALStreamReader getReader(FileStatus file, boolean skipErrors,
         CancelableProgressable reporter) throws IOException, CorruptedLogFileException {
-        Reader mockReader = Mockito.mock(Reader.class);
+        WALStreamReader mockReader = Mockito.mock(WALStreamReader.class);
         Mockito.doAnswer(new Answer<Entry>() {
           int index = 0;
 
           @Override
           public Entry answer(InvocationOnMock invocation) throws Throwable {
-            if (index >= numFakeEdits) return null;
+            if (index >= numFakeEdits) {
+              return null;
+            }
 
             // Generate r0 through r4 in round robin fashion
             int regionIdx = index % regions.size();
@@ -1354,8 +1359,8 @@ public class TestWALSplit {
       case TRUNCATE:
         fs.delete(path, false);
         out = fs.create(path);
-        out.write(corrupted_bytes, 0,
-          fileSize - (32 + ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT));
+        out.write(corrupted_bytes, 0, fileSize
+          - (32 + AbstractProtobufWALReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT));
         closeOrFlush(close, out);
         break;
 
@@ -1394,11 +1399,11 @@ public class TestWALSplit {
 
   private int countWAL(Path log) throws IOException {
     int count = 0;
-    Reader in = wals.createReader(fs, log);
-    while (in.next() != null) {
-      count++;
+    try (WALStreamReader in = wals.createStreamReader(fs, log)) {
+      while (in.next() != null) {
+        count++;
+      }
     }
-    in.close();
     return count;
   }
 
@@ -1475,7 +1480,8 @@ public class TestWALSplit {
   }
 
   private boolean logsAreEqual(Path p1, Path p2) throws IOException {
-    try (Reader in1 = wals.createReader(fs, p1); Reader in2 = wals.createReader(fs, p2)) {
+    try (WALStreamReader in1 = wals.createStreamReader(fs, p1);
+      WALStreamReader in2 = wals.createStreamReader(fs, p2)) {
       Entry entry1;
       Entry entry2;
       while ((entry1 = in1.next()) != null) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
index ad0201ea891..061633f7e21 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
@@ -57,7 +57,6 @@ import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.LogRoller;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
-import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader;
 import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.trace.TraceUtil;
@@ -258,8 +257,6 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
       Configuration conf = getConf();
       conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
       conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
-      conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
-        WAL.Reader.class);
       conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
         Writer.class);
       conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
@@ -377,7 +374,7 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
    */
   private long verify(final WALFactory wals, final Path wal, final boolean verbose)
     throws IOException {
-    WAL.Reader reader = wals.createReader(wal.getFileSystem(getConf()), wal);
+    WALStreamReader reader = wals.createStreamReader(wal.getFileSystem(getConf()), wal);
     long count = 0;
     Map<String, Long> sequenceIds = new HashMap<>();
     try {