You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/09/15 12:32:46 UTC

hbase git commit: HBASE-14004 [Replication] Inconsistency between Memstore and WAL may result in data in remote cluster that is not in the origin

Repository: hbase
Updated Branches:
  refs/heads/master f7a986cb6 -> 4341c3f55


HBASE-14004 [Replication] Inconsistency between Memstore and WAL may result in data in remote cluster that is not in the origin


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4341c3f5
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4341c3f5
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4341c3f5

Branch: refs/heads/master
Commit: 4341c3f554cf85e73d3bb536bdda33a83f463f16
Parents: f7a986c
Author: zhangduo <zh...@apache.org>
Authored: Thu Sep 14 17:26:36 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Fri Sep 15 19:22:00 2017 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/util/Threads.java   |  12 +-
 .../hbase/regionserver/HRegionServer.java       |  71 +++++-----
 .../hbase/regionserver/ReplicationService.java  |  17 ++-
 .../hbase/regionserver/wal/AbstractFSWAL.java   |  35 ++++-
 .../hbase/regionserver/wal/AsyncFSWAL.java      |   6 +-
 .../hadoop/hbase/regionserver/wal/FSHLog.java   |  12 +-
 .../RecoveredReplicationSource.java             |  11 +-
 .../replication/regionserver/Replication.java   |  15 ++-
 .../regionserver/ReplicationSource.java         |  22 ++--
 .../ReplicationSourceInterface.java             |  14 +-
 .../regionserver/ReplicationSourceManager.java  |  71 +++++-----
 .../ReplicationSourceWALReader.java             |  25 ++--
 .../regionserver/WALEntryStream.java            | 130 ++++++++-----------
 .../regionserver/WALFileLengthProvider.java     |  34 +++++
 .../hadoop/hbase/wal/AbstractFSWALProvider.java |   2 +-
 .../hadoop/hbase/wal/DisabledWALProvider.java   |   8 +-
 .../hbase/wal/RegionGroupingProvider.java       |   2 +-
 .../java/org/apache/hadoop/hbase/wal/WAL.java   |   6 +-
 .../org/apache/hadoop/hbase/wal/WALFactory.java |  11 +-
 .../apache/hadoop/hbase/wal/WALProvider.java    |  12 +-
 .../replication/ReplicationSourceDummy.java     |  13 +-
 .../replication/TestReplicationSource.java      |   5 +-
 .../TestReplicationSourceManager.java           |  10 +-
 .../regionserver/TestWALEntryStream.java        |  87 ++++++++-----
 .../apache/hadoop/hbase/wal/IOTestProvider.java |   2 +-
 25 files changed, 365 insertions(+), 268 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
index 35bf2b7..b39a5e8 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
@@ -59,7 +59,7 @@ public class Threads {
    * @param t thread to run
    * @return Returns the passed Thread <code>t</code>.
    */
-  public static Thread setDaemonThreadRunning(final Thread t) {
+  public static <T extends Thread> T setDaemonThreadRunning(T t) {
     return setDaemonThreadRunning(t, t.getName());
   }
 
@@ -69,8 +69,7 @@ public class Threads {
    * @param name new name
    * @return Returns the passed Thread <code>t</code>.
    */
-  public static Thread setDaemonThreadRunning(final Thread t,
-    final String name) {
+  public static <T extends Thread> T setDaemonThreadRunning(T t, String name) {
     return setDaemonThreadRunning(t, name, null);
   }
 
@@ -78,12 +77,11 @@ public class Threads {
    * Utility method that sets name, daemon status and starts passed thread.
    * @param t thread to frob
    * @param name new name
-   * @param handler A handler to set on the thread.  Pass null if want to
-   * use default handler.
+   * @param handler A handler to set on the thread. Pass null if want to use default handler.
    * @return Returns the passed Thread <code>t</code>.
    */
-  public static Thread setDaemonThreadRunning(final Thread t,
-    final String name, final UncaughtExceptionHandler handler) {
+  public static <T extends Thread> T setDaemonThreadRunning(T t, String name,
+      UncaughtExceptionHandler handler) {
     t.setName(name);
     if (handler != null) {
       t.setUncaughtExceptionHandler(handler);

http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 62987c0..f648c2f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1586,7 +1586,7 @@ public class HRegionServer extends HasThread implements
       // Save it in a file, this will allow to see if we crash
       ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
 
-      this.walFactory = setupWALAndReplication();
+      setupWALAndReplication();
       // Init in here rather than in constructor after thread name has been set
       this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
       this.metricsTable = new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
@@ -1855,13 +1855,12 @@ public class HRegionServer extends HasThread implements
   /**
    * Setup WAL log and replication if enabled.
    * Replication setup is done in here because it wants to be hooked up to WAL.
-   * @return A WAL instance.
    * @throws IOException
    */
-  private WALFactory setupWALAndReplication() throws IOException {
+  private void setupWALAndReplication() throws IOException {
     // TODO Replication make assumptions here based on the default filesystem impl
-    final Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
-    final String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
+    Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+    String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
 
     Path logDir = new Path(walRootDir, logName);
     if (LOG.isDebugEnabled()) LOG.debug("logDir=" + logDir);
@@ -1875,7 +1874,7 @@ public class HRegionServer extends HasThread implements
     createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir);
 
     // listeners the wal factory will add to wals it creates.
-    final List<WALActionsListener> listeners = new ArrayList<>();
+    List<WALActionsListener> listeners = new ArrayList<>();
     listeners.add(new MetricsWAL());
     if (this.replicationSourceHandler != null &&
         this.replicationSourceHandler.getWALActionsListener() != null) {
@@ -1883,7 +1882,21 @@ public class HRegionServer extends HasThread implements
       listeners.add(this.replicationSourceHandler.getWALActionsListener());
     }
 
-    return new WALFactory(conf, listeners, serverName.toString());
+    // There is a cyclic dependency between ReplicationSourceHandler and WALFactory.
+    // We use WALActionsListener to get the newly rolled WALs, so we need to get the
+    // WALActionsListeners from ReplicationSourceHandler before constructing WALFactory. And then
+    // ReplicationSourceHandler need to use WALFactory get the length of the wal file being written.
+    // So we here we need to construct WALFactory first, and then pass it to the initialize method
+    // of ReplicationSourceHandler.
+    WALFactory factory = new WALFactory(conf, listeners, serverName.toString());
+    this.walFactory = factory;
+    if (this.replicationSourceHandler != null) {
+      this.replicationSourceHandler.initialize(this, walFs, logDir, oldLogDir, factory);
+    }
+    if (this.replicationSinkHandler != null &&
+        this.replicationSinkHandler != this.replicationSourceHandler) {
+      this.replicationSinkHandler.initialize(this, walFs, logDir, oldLogDir, factory);
+    }
   }
 
   public MetricsRegionServer getRegionServerMetrics() {
@@ -2898,7 +2911,7 @@ public class HRegionServer extends HasThread implements
   /**
    * Load the replication service objects, if any
    */
-  static private void createNewReplicationInstance(Configuration conf,
+  private static void createNewReplicationInstance(Configuration conf,
     HRegionServer server, FileSystem walFs, Path walDir, Path oldWALDir) throws IOException{
 
     if ((server instanceof HMaster) && (!LoadBalancer.isTablesOnMaster(conf) ||
@@ -2908,47 +2921,41 @@ public class HRegionServer extends HasThread implements
 
     // read in the name of the source replication class from the config file.
     String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
-                               HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
+      HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
 
     // read in the name of the sink replication class from the config file.
     String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
-                             HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
+      HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
 
     // If both the sink and the source class names are the same, then instantiate
     // only one object.
     if (sourceClassname.equals(sinkClassname)) {
-      server.replicationSourceHandler = (ReplicationSourceService)
-                                         newReplicationInstance(sourceClassname,
-                                         conf, server, walFs, walDir, oldWALDir);
-      server.replicationSinkHandler = (ReplicationSinkService)
-                                         server.replicationSourceHandler;
+      server.replicationSourceHandler =
+          (ReplicationSourceService) newReplicationInstance(sourceClassname, conf, server, walFs,
+            walDir, oldWALDir);
+      server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
     } else {
-      server.replicationSourceHandler = (ReplicationSourceService)
-                                         newReplicationInstance(sourceClassname,
-                                         conf, server, walFs, walDir, oldWALDir);
-      server.replicationSinkHandler = (ReplicationSinkService)
-                                         newReplicationInstance(sinkClassname,
-                                         conf, server, walFs, walDir, oldWALDir);
+      server.replicationSourceHandler =
+          (ReplicationSourceService) newReplicationInstance(sourceClassname, conf, server, walFs,
+            walDir, oldWALDir);
+      server.replicationSinkHandler = (ReplicationSinkService) newReplicationInstance(sinkClassname,
+        conf, server, walFs, walDir, oldWALDir);
     }
   }
 
-  static private ReplicationService newReplicationInstance(String classname,
-    Configuration conf, HRegionServer server, FileSystem walFs, Path logDir,
-    Path oldLogDir) throws IOException{
-
-    Class<?> clazz = null;
+  private static ReplicationService newReplicationInstance(String classname, Configuration conf,
+      HRegionServer server, FileSystem walFs, Path logDir, Path oldLogDir) throws IOException {
+    Class<? extends ReplicationService> clazz = null;
     try {
       ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
-      clazz = Class.forName(classname, true, classLoader);
+      clazz = Class.forName(classname, true, classLoader).asSubclass(ReplicationService.class);
     } catch (java.lang.ClassNotFoundException nfe) {
       throw new IOException("Could not find class for " + classname);
     }
 
-    // create an instance of the replication object.
-    ReplicationService service = (ReplicationService)
-                              ReflectionUtils.newInstance(clazz, conf);
-    service.initialize(server, walFs, logDir, oldLogDir);
-    return service;
+    // create an instance of the replication object, but do not initialize it here as we need to use
+    // WALFactory when initializing.
+    return ReflectionUtils.newInstance(clazz, conf);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
index d88450a..f3bc188 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
@@ -20,17 +20,17 @@ package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Server;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
 
 /**
- * Gateway to Cluster Replication.
- * Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
- * One such application is a cross-datacenter
- * replication service that can keep two hbase clusters in sync.
+ * Gateway to Cluster Replication. Used by
+ * {@link org.apache.hadoop.hbase.regionserver.HRegionServer}. One such application is a
+ * cross-datacenter replication service that can keep two hbase clusters in sync.
  */
 @InterfaceAudience.Private
 public interface ReplicationService {
@@ -39,9 +39,8 @@ public interface ReplicationService {
    * Initializes the replication service object.
    * @throws IOException
    */
-  void initialize(
-    Server rs, FileSystem fs, Path logdir, Path oldLogDir
-  ) throws IOException;
+  void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir,
+      WALFileLengthProvider walFileLengthProvider) throws IOException;
 
   /**
    * Start replication services.

http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 8b99676..8157108 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -17,9 +17,12 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
-import static org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.*;
+import static org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
 
+import com.lmax.disruptor.RingBuffer;
+
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.lang.management.MemoryType;
@@ -29,6 +32,7 @@ import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -58,6 +62,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
 import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CollectionUtils;
 import org.apache.hadoop.hbase.util.DrainBarrier;
@@ -68,6 +73,7 @@ import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALProvider.WriterBase;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.htrace.NullScope;
@@ -75,9 +81,6 @@ import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
-import com.lmax.disruptor.RingBuffer;
-
 /**
  * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one
  * WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled.
@@ -105,7 +108,7 @@ import com.lmax.disruptor.RingBuffer;
  * (Need to keep our own file lengths, not rely on HDFS).
  */
 @InterfaceAudience.Private
-public abstract class AbstractFSWAL<W> implements WAL {
+public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
 
   private static final Log LOG = LogFactory.getLog(AbstractFSWAL.class);
 
@@ -984,6 +987,28 @@ public abstract class AbstractFSWAL<W> implements WAL {
   }
 
   /**
+   * if the given {@code path} is being written currently, then return its length.
+   * <p>
+   * This is used by replication to prevent replicating unacked log entries. See
+   * https://issues.apache.org/jira/browse/HBASE-14004 for more details.
+   */
+  @Override
+  public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
+    rollWriterLock.lock();
+    try {
+      Path currentPath = getOldPath();
+      if (path.equals(currentPath)) {
+        W writer = this.writer;
+        return writer != null ? OptionalLong.of(writer.getLength()) : OptionalLong.empty();
+      } else {
+        return OptionalLong.empty();
+      }
+    } finally {
+      rollWriterLock.unlock();
+    }
+  }
+
+  /**
    * NOTE: This append, at a time that is usually after this call returns, starts an mvcc
    * transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment
    * time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must

http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index 7e91f8c..42183ec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -707,8 +707,10 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
   @Override
   protected void doShutdown() throws IOException {
     waitForSafePoint();
-    this.writer.close();
-    this.writer = null;
+    if (this.writer != null) {
+      this.writer.close();
+      this.writer = null;
+    }
     closeExecutor.shutdown();
     IOException error = new IOException("WAL has been closed");
     syncFutures.forEach(f -> f.done(f.getTxid(), error));

http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 7298137..7e0fc37 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
 import com.lmax.disruptor.BlockingWaitStrategy;
 import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.ExceptionHandler;
@@ -46,8 +45,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.util.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HasThread;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.FSHLogProvider;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALFactory;
@@ -62,6 +65,9 @@ import org.apache.htrace.NullScope;
 import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
 
 /**
  * The default implementation of FSWAL.

http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index 3594868..248a52a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -50,13 +50,12 @@ public class RecoveredReplicationSource extends ReplicationSource {
   private String actualPeerId;
 
   @Override
-  public void init(final Configuration conf, final FileSystem fs,
-      final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
-      final ReplicationPeers replicationPeers, final Stoppable stopper,
-      final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
-      final MetricsSource metrics) throws IOException {
+  public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
+      ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Stoppable stopper,
+      String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint,
+      WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
     super.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerClusterZnode,
-      clusterId, replicationEndpoint, metrics);
+      clusterId, replicationEndpoint, walFileLengthProvider, metrics);
     this.actualPeerId = this.replicationQueueInfo.getPeerId();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 9fd1a87..d26f253 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.OptionalLong;
 import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -85,6 +86,7 @@ public class Replication extends WALActionsListener.Base implements
   private int statsThreadPeriod;
   // ReplicationLoad to access replication metrics
   private ReplicationLoad replicationLoad;
+
   /**
    * Instantiate the replication management (if rep is enabled).
    * @param server Hosting server
@@ -93,9 +95,8 @@ public class Replication extends WALActionsListener.Base implements
    * @param oldLogDir directory where logs are archived
    * @throws IOException
    */
-  public Replication(final Server server, final FileSystem fs,
-      final Path logDir, final Path oldLogDir) throws IOException{
-    initialize(server, fs, logDir, oldLogDir);
+  public Replication(Server server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException {
+    initialize(server, fs, logDir, oldLogDir, p -> OptionalLong.empty());
   }
 
   /**
@@ -104,8 +105,8 @@ public class Replication extends WALActionsListener.Base implements
   public Replication() {
   }
 
-  public void initialize(final Server server, final FileSystem fs,
-      final Path logDir, final Path oldLogDir) throws IOException {
+  public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir,
+      WALFileLengthProvider walFileLengthProvider) throws IOException {
     this.server = server;
     this.conf = this.server.getConfiguration();
     this.replicationForBulkLoadData = isReplicationForBulkLoadDataEnabled(this.conf);
@@ -144,8 +145,8 @@ public class Replication extends WALActionsListener.Base implements
       throw new IOException("Could not read cluster id", ke);
     }
     this.replicationManager =
-        new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker,
-            conf, this.server, fs, logDir, oldLogDir, clusterId);
+        new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker, conf,
+            this.server, fs, logDir, oldLogDir, clusterId, walFileLengthProvider);
     this.statsThreadPeriod =
         this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
     LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);

http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 6c96852..d16a68f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -18,8 +18,6 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -62,6 +60,8 @@ import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+
 
 /**
  * Class that handles the source of a replication stream.
@@ -73,7 +73,6 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
  * A stream is considered down when we cannot contact a region server on the
  * peer cluster for more than 55 seconds by default.
  * </p>
- *
  */
 @InterfaceAudience.Private
 public class ReplicationSource extends Thread implements ReplicationSourceInterface {
@@ -123,6 +122,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
   private ReplicationThrottler throttler;
   private long defaultBandwidth;
   private long currentBandwidth;
+  private WALFileLengthProvider walFileLengthProvider;
   protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
       new ConcurrentHashMap<>();
 
@@ -147,12 +147,10 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
    * @throws IOException
    */
   @Override
-  public void init(final Configuration conf, final FileSystem fs,
-      final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
-      final ReplicationPeers replicationPeers, final Stoppable stopper,
-      final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
-      final MetricsSource metrics)
-          throws IOException {
+  public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
+      ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Stoppable stopper,
+      String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint,
+      WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
     this.stopper = stopper;
     this.conf = HBaseConfiguration.create(conf);
     this.waitOnEndpointSeconds =
@@ -181,6 +179,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
     currentBandwidth = getCurrentBandwidth();
     this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
     this.totalBufferUsed = manager.getTotalBufferUsed();
+    this.walFileLengthProvider = walFileLengthProvider;
     LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId
         + ", currentBandwidth=" + this.currentBandwidth);
   }
@@ -560,4 +559,9 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
     totalReplicatedEdits.addAndGet(entries.size());
     totalBufferUsed.addAndGet(-batchSize);
   }
+
+  @Override
+  public WALFileLengthProvider getWALFileLengthProvider() {
+    return walFileLengthProvider;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index da89aba..066f799 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -53,11 +53,10 @@ public interface ReplicationSourceInterface {
    * @param clusterId
    * @throws IOException
    */
-  public void init(final Configuration conf, final FileSystem fs,
-      final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
-      final ReplicationPeers replicationPeers, final Stoppable stopper,
-      final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
-      final MetricsSource metrics) throws IOException;
+  void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
+      ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Stoppable stopper,
+      String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint,
+      WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException;
 
   /**
    * Add a log to the list of logs to replicate
@@ -147,6 +146,11 @@ public interface ReplicationSourceInterface {
   ReplicationSourceManager getSourceManager();
 
   /**
+   * @return the wal file length provider
+   */
+  WALFileLengthProvider getWALFileLengthProvider();
+
+  /**
    * Try to throttle when the peer config with a bandwidth
    * @param batchSize entries size will be pushed
    * @throws InterruptedException

http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 5b54ce0..609274f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -19,9 +19,6 @@
 
 package org.apache.hadoop.hbase.replication.regionserver;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -31,7 +28,6 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
@@ -40,6 +36,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -68,10 +65,13 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * This class is responsible to manage all the replication
  * sources. There are two classes of sources:
@@ -116,12 +116,12 @@ public class ReplicationSourceManager implements ReplicationListener {
   private final Path logDir;
   // Path to the wal archive
   private final Path oldLogDir;
+  private final WALFileLengthProvider walFileLengthProvider;
   // The number of ms that we wait before moving znodes, HBASE-3596
   private final long sleepBeforeFailover;
   // Homemade executer service for replication
   private final ThreadPoolExecutor executor;
 
-  private final Random rand;
   private final boolean replicationForBulkLoadDataEnabled;
 
   private Connection connection;
@@ -141,10 +141,10 @@ public class ReplicationSourceManager implements ReplicationListener {
    * @param oldLogDir the directory where old logs are archived
    * @param clusterId
    */
-  public ReplicationSourceManager(final ReplicationQueues replicationQueues,
-      final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
-      final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
-      final Path oldLogDir, final UUID clusterId) throws IOException {
+  public ReplicationSourceManager(ReplicationQueues replicationQueues,
+      ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf,
+      Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId,
+      WALFileLengthProvider walFileLengthProvider) throws IOException {
     //CopyOnWriteArrayList is thread-safe.
     //Generally, reading is more than modifying.
     this.sources = new CopyOnWriteArrayList<>();
@@ -162,6 +162,7 @@ public class ReplicationSourceManager implements ReplicationListener {
     this.sleepBeforeFailover =
         conf.getLong("replication.sleep.before.failover", 30000); // 30 seconds
     this.clusterId = clusterId;
+    this.walFileLengthProvider = walFileLengthProvider;
     this.replicationTracker.registerListener(this);
     this.replicationPeers.getAllPeerIds();
     // It's preferable to failover 1 RS at a time, but with good zk servers
@@ -175,8 +176,7 @@ public class ReplicationSourceManager implements ReplicationListener {
     tfb.setNameFormat("ReplicationExecutor-%d");
     tfb.setDaemon(true);
     this.executor.setThreadFactory(tfb.build());
-    this.rand = new Random();
-    this.latestPaths = Collections.synchronizedSet(new HashSet<Path>());
+    this.latestPaths = new HashSet<Path>();
     replicationForBulkLoadDataEnabled =
         conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
           HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
@@ -243,7 +243,7 @@ public class ReplicationSourceManager implements ReplicationListener {
    * Adds a normal source per registered peer cluster and tries to process all
    * old region server wal queues
    */
-  protected void init() throws IOException, ReplicationException {
+  void init() throws IOException, ReplicationException {
     for (String id : this.replicationPeers.getConnectedPeerIds()) {
       addSource(id);
       if (replicationForBulkLoadDataEnabled) {
@@ -267,13 +267,13 @@ public class ReplicationSourceManager implements ReplicationListener {
    * @return the source that was created
    * @throws IOException
    */
-  protected ReplicationSourceInterface addSource(String id) throws IOException,
-      ReplicationException {
+  @VisibleForTesting
+  ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException {
     ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id);
     ReplicationPeer peer = replicationPeers.getConnectedPeer(id);
-    ReplicationSourceInterface src =
-        getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
-          this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
+    ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this,
+      this.replicationQueues, this.replicationPeers, server, id, this.clusterId, peerConfig, peer,
+      walFileLengthProvider);
     synchronized (this.walsById) {
       this.sources.add(src);
       Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
@@ -330,7 +330,8 @@ public class ReplicationSourceManager implements ReplicationListener {
    * Get a copy of the wals of the first source on this rs
    * @return a sorted set of wal names
    */
-  protected Map<String, Map<String, SortedSet<String>>> getWALs() {
+  @VisibleForTesting
+  Map<String, Map<String, SortedSet<String>>> getWALs() {
     return Collections.unmodifiableMap(walsById);
   }
 
@@ -338,7 +339,8 @@ public class ReplicationSourceManager implements ReplicationListener {
    * Get a copy of the wals of the recovered sources on this rs
    * @return a sorted set of wal names
    */
-  protected Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() {
+  @VisibleForTesting
+  Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() {
     return Collections.unmodifiableMap(walsByIdRecoveredQueues);
   }
 
@@ -364,12 +366,7 @@ public class ReplicationSourceManager implements ReplicationListener {
    * @return the normal source for the give peer if it exists, otherwise null.
    */
   public ReplicationSourceInterface getSource(String peerId) {
-    for (ReplicationSourceInterface source: getSources()) {
-      if (source.getPeerId().equals(peerId)) {
-        return source;
-      }
-    }
-    return null;
+    return getSources().stream().filter(s -> s.getPeerId().equals(peerId)).findFirst().orElse(null);
   }
 
   @VisibleForTesting
@@ -466,12 +463,11 @@ public class ReplicationSourceManager implements ReplicationListener {
    * @return the created source
    * @throws IOException
    */
-  protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
-      final FileSystem fs, final ReplicationSourceManager manager,
-      final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
-      final Server server, final String peerId, final UUID clusterId,
-      final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
-      throws IOException {
+  private ReplicationSourceInterface getReplicationSource(Configuration conf, FileSystem fs,
+      ReplicationSourceManager manager, ReplicationQueues replicationQueues,
+      ReplicationPeers replicationPeers, Server server, String peerId, UUID clusterId,
+      ReplicationPeerConfig peerConfig, ReplicationPeer replicationPeer,
+      WALFileLengthProvider walFileLengthProvider) throws IOException {
     RegionServerCoprocessorHost rsServerHost = null;
     TableDescriptors tableDescriptors = null;
     if (server instanceof HRegionServer) {
@@ -507,8 +503,8 @@ public class ReplicationSourceManager implements ReplicationListener {
 
     MetricsSource metrics = new MetricsSource(peerId);
     // init replication source
-    src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId,
-      clusterId, replicationEndpoint, metrics);
+    src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId, clusterId,
+      replicationEndpoint, walFileLengthProvider, metrics);
 
     // init replication endpoint
     replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
@@ -674,7 +670,8 @@ public class ReplicationSourceManager implements ReplicationListener {
       // Wait a bit before transferring the queues, we may be shutting down.
       // This sleep may not be enough in some cases.
       try {
-        Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover));
+        Thread.sleep(sleepBeforeFailover +
+            (long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover));
       } catch (InterruptedException e) {
         LOG.warn("Interrupted while waiting before transferring a queue.");
         Thread.currentThread().interrupt();
@@ -688,7 +685,7 @@ public class ReplicationSourceManager implements ReplicationListener {
       List<String> peers = rq.getUnClaimedQueueIds(rsZnode);
       while (peers != null && !peers.isEmpty()) {
         Pair<String, SortedSet<String>> peer = this.rq.claimQueue(rsZnode,
-            peers.get(rand.nextInt(peers.size())));
+          peers.get(ThreadLocalRandom.current().nextInt(peers.size())));
         long sleep = sleepBeforeFailover/2;
         if (peer != null) {
           newQueues.put(peer.getFirst(), peer.getSecond());
@@ -748,7 +745,7 @@ public class ReplicationSourceManager implements ReplicationListener {
           // enqueue sources
           ReplicationSourceInterface src =
               getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
-                server, peerId, this.clusterId, peerConfig, peer);
+                server, peerId, this.clusterId, peerConfig, peer, walFileLengthProvider);
           // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
           // see removePeer
           synchronized (oldsources) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index bb5abe9..bb993c6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -37,18 +37,18 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
-import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
-import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.WALEntryStreamRuntimeException;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
 
 /**
  * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches onto a queue
@@ -127,8 +127,8 @@ public class ReplicationSourceWALReader extends Thread {
   public void run() {
     int sleepMultiplier = 1;
     while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
-      try (WALEntryStream entryStream =
-          new WALEntryStream(logQueue, fs, conf, currentPosition, source.getSourceMetrics())) {
+      try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, conf, currentPosition,
+          source.getWALFileLengthProvider(), source.getSourceMetrics())) {
         while (isReaderRunning()) { // loop here to keep reusing stream while we can
           if (!checkQuota()) {
             continue;
@@ -147,7 +147,7 @@ public class ReplicationSourceWALReader extends Thread {
           currentPosition = entryStream.getPosition();
           entryStream.reset(); // reuse stream
         }
-      } catch (IOException | WALEntryStreamRuntimeException e) { // stream related
+      } catch (IOException e) { // stream related
         if (sleepMultiplier < maxRetriesMultiplier) {
           LOG.debug("Failed to read stream of replication entries: " + e);
           sleepMultiplier++;
@@ -202,8 +202,9 @@ public class ReplicationSourceWALReader extends Thread {
   // if we get an EOF due to a zero-length log, and there are other logs in queue
   // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
   // enabled, then dump the log
-  private void handleEofException(Exception e) {
-    if (e.getCause() instanceof EOFException && logQueue.size() > 1 && this.eofAutoRecovery) {
+  private void handleEofException(IOException e) {
+    if (e instanceof EOFException ||
+        e.getCause() instanceof EOFException && logQueue.size() > 1 && this.eofAutoRecovery) {
       try {
         if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
           LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());

http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index 54511ae..3be4ca4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -21,8 +21,8 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import java.io.Closeable;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.OptionalLong;
 import java.util.concurrent.PriorityBlockingQueue;
 
 import org.apache.commons.logging.Log;
@@ -50,7 +50,7 @@ import org.apache.hadoop.ipc.RemoteException;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entry> {
+class WALEntryStream implements Closeable {
   private static final Log LOG = LogFactory.getLog(WALEntryStream.class);
 
   private Reader reader;
@@ -59,24 +59,11 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
   private Entry currentEntry;
   // position after reading current entry
   private long currentPosition = 0;
-  private PriorityBlockingQueue<Path> logQueue;
-  private FileSystem fs;
-  private Configuration conf;
-  private MetricsSource metrics;
-
-  /**
-   * Create an entry stream over the given queue
-   * @param logQueue the queue of WAL paths
-   * @param fs {@link FileSystem} to use to create {@link Reader} for this stream
-   * @param conf {@link Configuration} to use to create {@link Reader} for this stream
-   * @param metrics replication metrics
-   * @throws IOException
-   */
-  public WALEntryStream(PriorityBlockingQueue<Path> logQueue, FileSystem fs, Configuration conf,
-      MetricsSource metrics)
-      throws IOException {
-    this(logQueue, fs, conf, 0, metrics);
-  }
+  private final PriorityBlockingQueue<Path> logQueue;
+  private final FileSystem fs;
+  private final Configuration conf;
+  private final WALFileLengthProvider walFileLengthProvider;
+  private final MetricsSource metrics;
 
   /**
    * Create an entry stream over the given queue at the given start position
@@ -88,52 +75,41 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
    * @throws IOException
    */
   public WALEntryStream(PriorityBlockingQueue<Path> logQueue, FileSystem fs, Configuration conf,
-      long startPosition, MetricsSource metrics) throws IOException {
+      long startPosition, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
+      throws IOException {
     this.logQueue = logQueue;
     this.fs = fs;
     this.conf = conf;
     this.currentPosition = startPosition;
+    this.walFileLengthProvider = walFileLengthProvider;
     this.metrics = metrics;
   }
 
   /**
    * @return true if there is another WAL {@link Entry}
-   * @throws WALEntryStreamRuntimeException if there was an Exception while reading
    */
-  @Override
-  public boolean hasNext() {
+  public boolean hasNext() throws IOException {
     if (currentEntry == null) {
-      try {
-        tryAdvanceEntry();
-      } catch (Exception e) {
-        throw new WALEntryStreamRuntimeException(e);
-      }
+      tryAdvanceEntry();
     }
     return currentEntry != null;
   }
 
   /**
    * @return the next WAL entry in this stream
-   * @throws WALEntryStreamRuntimeException if there was an IOException
+   * @throws IOException
    * @throws NoSuchElementException if no more entries in the stream.
    */
-  @Override
-  public Entry next() {
-    if (!hasNext()) throw new NoSuchElementException();
+  public Entry next() throws IOException {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
     Entry save = currentEntry;
     currentEntry = null; // gets reloaded by hasNext()
     return save;
   }
 
   /**
-   * Not supported.
-   */
-  @Override
-  public void remove() {
-    throw new UnsupportedOperationException();
-  }
-
-  /**
    * {@inheritDoc}
    */
   @Override
@@ -142,14 +118,6 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
   }
 
   /**
-   * @return the iterator over WAL entries in the queue.
-   */
-  @Override
-  public Iterator<Entry> iterator() {
-    return this;
-  }
-
-  /**
    * @return the position of the last Entry returned by next()
    */
   public long getPosition() {
@@ -195,24 +163,27 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
 
   private void tryAdvanceEntry() throws IOException {
     if (checkReader()) {
-      readNextEntryAndSetPosition();
-      if (currentEntry == null) { // no more entries in this log file - see if log was rolled
-        if (logQueue.size() > 1) { // log was rolled
-          // Before dequeueing, we should always get one more attempt at reading.
-          // This is in case more entries came in after we opened the reader,
-          // and a new log was enqueued while we were reading. See HBASE-6758
-          resetReader();
-          readNextEntryAndSetPosition();
-          if (currentEntry == null) {
-            if (checkAllBytesParsed()) { // now we're certain we're done with this log file
-              dequeueCurrentLog();
-              if (openNextLog()) {
-                readNextEntryAndSetPosition();
-              }
+      boolean beingWritten = readNextEntryAndSetPosition();
+      if (currentEntry == null && !beingWritten) {
+        // no more entries in this log file, and the file is already closed, i.e, rolled
+        // Before dequeueing, we should always get one more attempt at reading.
+        // This is in case more entries came in after we opened the reader, and the log is rolled
+        // while we were reading. See HBASE-6758
+        resetReader();
+        readNextEntryAndSetPosition();
+        if (currentEntry == null) {
+          if (checkAllBytesParsed()) { // now we're certain we're done with this log file
+            dequeueCurrentLog();
+            if (openNextLog()) {
+              readNextEntryAndSetPosition();
             }
           }
-        } // no other logs, we've simply hit the end of the current open log. Do nothing
+        }
       }
+      // if currentEntry != null then just return
+      // if currentEntry == null but the file is still being written, then we should not switch to
+      // the next log either, just return here and try next time to see if there are more entries in
+      // the current file
     }
     // do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue)
   }
@@ -270,15 +241,30 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
     metrics.decrSizeOfLogQueue();
   }
 
-  private void readNextEntryAndSetPosition() throws IOException {
+  /**
+   * Returns whether the file is opened for writing.
+   */
+  private boolean readNextEntryAndSetPosition() throws IOException {
     Entry readEntry = reader.next();
     long readerPos = reader.getPosition();
+    OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath);
+    if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) {
+      // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted
+      // data, so we need to make sure that we do not read beyond the committed file length.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("The provider tells us the valid length for " + currentPath + " is " +
+            fileLength.getAsLong() + ", but we have advanced to " + readerPos);
+      }
+      resetReader();
+      return true;
+    }
     if (readEntry != null) {
       metrics.incrLogEditsRead();
       metrics.incrLogReadInBytes(readerPos - currentPosition);
     }
     currentEntry = readEntry; // could be null
     setPosition(readerPos);
+    return fileLength.isPresent();
   }
 
   private void closeReader() throws IOException {
@@ -301,7 +287,9 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
     Path nextPath = logQueue.peek();
     if (nextPath != null) {
       openReader(nextPath);
-      if (reader != null) return true;
+      if (reader != null) {
+        return true;
+      }
     }
     return false;
   }
@@ -408,14 +396,4 @@ public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entr
     }
     return size;
   }
-
-  @InterfaceAudience.Private
-  public static class WALEntryStreamRuntimeException extends RuntimeException {
-    private static final long serialVersionUID = -6298201811259982568L;
-
-    public WALEntryStreamRuntimeException(Exception e) {
-      super(e);
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
new file mode 100644
index 0000000..010fa69
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.util.OptionalLong;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Used by replication to prevent replicating unacked log entries. See
+ * https://issues.apache.org/jira/browse/HBASE-14004 for more details.
+ */
+@InterfaceAudience.Private
+@FunctionalInterface
+public interface WALFileLengthProvider {
+
+  OptionalLong getLogFileSizeIfBeingWritten(Path path);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index 9c80fe6..1a81b17 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -115,7 +115,7 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
   }
 
   @Override
-  public List<WAL> getWALs() throws IOException {
+  public List<WAL> getWALs() {
     if (wal == null) {
       return Collections.emptyList();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index c805ff3..a6d43d6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -66,7 +67,7 @@ class DisabledWALProvider implements WALProvider {
   }
 
   @Override
-  public List<WAL> getWALs() throws IOException {
+  public List<WAL> getWALs() {
     List<WAL> wals = new ArrayList<>(1);
     wals.add(disabled);
     return wals;
@@ -232,6 +233,11 @@ class DisabledWALProvider implements WALProvider {
     public String toString() {
       return "WAL disabled.";
     }
+
+    @Override
+    public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
+      return OptionalLong.empty();
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
index 95b7dae..ab3a7d9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
@@ -169,7 +169,7 @@ public class RegionGroupingProvider implements WALProvider {
   }
 
   @Override
-  public List<WAL> getWALs() throws IOException {
+  public List<WAL> getWALs() {
     List<WAL> wals = new ArrayList<>();
     for (WALProvider provider : cached.values()) {
       wals.addAll(provider.getWALs());

http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index eede937..9ec58ab 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -19,8 +19,6 @@
 
 package org.apache.hadoop.hbase.wal;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
-
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Map;
@@ -35,6 +33,8 @@ import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
 import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
+import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
 
 /**
  * A Write Ahead Log (WAL) provides service for reading, writing waledits. This interface provides
@@ -45,7 +45,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public interface WAL extends Closeable {
+public interface WAL extends Closeable, WALFileLengthProvider {
 
   /**
    * Registers WALActionsListener

http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index aaa828f..efb8e2d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.Collections;
 import java.util.List;
+import java.util.OptionalLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.logging.Log;
@@ -38,6 +39,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
@@ -63,7 +65,7 @@ import org.apache.hadoop.hbase.wal.WALProvider.Writer;
  * Alternatively, you may provide a custom implementation of {@link WALProvider} by class name.
  */
 @InterfaceAudience.Private
-public class WALFactory {
+public class WALFactory implements WALFileLengthProvider {
 
   private static final Log LOG = LogFactory.getLog(WALFactory.class);
 
@@ -230,7 +232,7 @@ public class WALFactory {
     }
   }
 
-  public List<WAL> getWALs() throws IOException {
+  public List<WAL> getWALs() {
     return provider.getWALs();
   }
 
@@ -450,4 +452,9 @@ public class WALFactory {
   public final WALProvider getMetaWALProvider() {
     return this.metaProvider.get();
   }
+
+  @Override
+  public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
+    return getWALs().stream().map(w -> w.getLogFileSizeIfBeingWritten(path)).filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
index ffcfcd4..c38f419 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
@@ -60,7 +60,7 @@ public interface WALProvider {
 
   /** @return the List of WALs that are used by this server
    */
-  List<WAL> getWALs() throws IOException;
+  List<WAL> getWALs();
 
   /**
    * persist outstanding WALs to storage and stop accepting new appends.
@@ -76,18 +76,20 @@ public interface WALProvider {
    */
   void close() throws IOException;
 
+  interface WriterBase extends Closeable {
+    long getLength();
+  }
+
   // Writers are used internally. Users outside of the WAL should be relying on the
   // interface provided by WAL.
-  interface Writer extends Closeable {
+  interface Writer extends WriterBase {
     void sync() throws IOException;
     void append(WAL.Entry entry) throws IOException;
-    long getLength();
   }
 
-  interface AsyncWriter extends Closeable {
+  interface AsyncWriter extends WriterBase {
     CompletableFuture<Long> sync();
     void append(WAL.Entry entry);
-    long getLength();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index e23e15b..bfe17b5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
+import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 
@@ -42,16 +43,17 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
   String peerClusterId;
   Path currentPath;
   MetricsSource metrics;
+  WALFileLengthProvider walFileLengthProvider;
 
   @Override
   public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
       ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId,
-      UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics)
-          throws IOException {
-
+      UUID clusterId, ReplicationEndpoint replicationEndpoint,
+      WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
     this.manager = manager;
     this.peerClusterId = peerClusterId;
     this.metrics = metrics;
+    this.walFileLengthProvider = walFileLengthProvider;
   }
 
   @Override
@@ -135,4 +137,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
   @Override
   public void postShipEdits(List<Entry> entries, int batchSize) {
   }
+
+  @Override
+  public WALFileLengthProvider getWALFileLengthProvider() {
+    return walFileLengthProvider;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
index 56a5bdc..ebb1bf8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
 import java.io.IOException;
+import java.util.OptionalLong;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -166,8 +167,8 @@ public class TestReplicationSource {
     testConf.setInt("replication.source.maxretriesmultiplier", 1);
     ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
     Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
-    source.init(testConf, null, manager, null, mockPeers, null, "testPeer",
-        null, replicationEndpoint, null);
+    source.init(testConf, null, manager, null, mockPeers, null, "testPeer", null,
+      replicationEndpoint, p -> OptionalLong.empty(), null);
     ExecutorService executor = Executors.newSingleThreadExecutor();
     Future<?> future = executor.submit(new Runnable() {
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 9804df4..3934e05 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -61,9 +61,6 @@ import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.wal.WALEdit;
@@ -97,6 +94,9 @@ import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
 
 /**
  * An abstract class that tests ReplicationSourceManager. Classes that extend this class should
@@ -646,8 +646,8 @@ public abstract class TestReplicationSourceManager {
     @Override
     public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
         ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId,
-        UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics)
-        throws IOException {
+        UUID clusterId, ReplicationEndpoint replicationEndpoint,
+        WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
       throw new IOException("Failing deliberately");
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 5f3452a..d65054c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -31,6 +31,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.NavigableMap;
 import java.util.NoSuchElementException;
+import java.util.OptionalLong;
 import java.util.TreeMap;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
@@ -42,13 +43,10 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
-import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -56,6 +54,7 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -67,11 +66,8 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
-import org.junit.runner.RunWith;
 import org.mockito.Mockito;
-import org.mockito.runners.MockitoJUnitRunner;
 
-@RunWith(MockitoJUnitRunner.class)
 @Category({ ReplicationTests.class, LargeTests.class })
 public class TestWALEntryStream {
 
@@ -84,8 +80,13 @@ public class TestWALEntryStream {
   private static final byte[] qualifier = Bytes.toBytes("qualifier");
   private static final HRegionInfo info =
       new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false);
-  private static final HTableDescriptor htd = new HTableDescriptor(tableName);
-  private static NavigableMap<byte[], Integer> scopes;
+  private static final NavigableMap<byte[], Integer> scopes = getScopes();
+
+  private static NavigableMap<byte[], Integer> getScopes() {
+    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    scopes.put(family, 1);
+    return scopes;
+  }
 
   private WAL log;
   PriorityBlockingQueue<Path> walQueue;
@@ -103,10 +104,6 @@ public class TestWALEntryStream {
 
     cluster = TEST_UTIL.getDFSCluster();
     fs = cluster.getFileSystem();
-    scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
-    for (byte[] fam : htd.getFamiliesKeys()) {
-      scopes.put(fam, 0);
-    }
   }
 
   @AfterClass
@@ -151,10 +148,10 @@ public class TestWALEntryStream {
           log.rollWriter();
 
           try (WALEntryStream entryStream =
-              new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
+              new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
             int i = 0;
-            for (WAL.Entry e : entryStream) {
-              assertNotNull(e);
+            while (entryStream.hasNext()) {
+              assertNotNull(entryStream.next());
               i++;
             }
             assertEquals(nbRows, i);
@@ -176,10 +173,9 @@ public class TestWALEntryStream {
   @Test
   public void testAppendsWithRolls() throws Exception {
     appendToLog();
-
     long oldPos;
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
+        new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
       // There's one edit in the log, read it. Reading past it needs to throw exception
       assertTrue(entryStream.hasNext());
       WAL.Entry entry = entryStream.next();
@@ -196,8 +192,8 @@ public class TestWALEntryStream {
 
     appendToLog();
 
-    try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, oldPos, new MetricsSource("1"))) {
+    try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, oldPos,
+        log, new MetricsSource("1"))) {
       // Read the newly added entry, make sure we made progress
       WAL.Entry entry = entryStream.next();
       assertNotEquals(oldPos, entryStream.getPosition());
@@ -210,8 +206,8 @@ public class TestWALEntryStream {
     log.rollWriter();
     appendToLog();
 
-    try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, oldPos, new MetricsSource("1"))) {
+    try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, oldPos,
+        log, new MetricsSource("1"))) {
       WAL.Entry entry = entryStream.next();
       assertNotEquals(oldPos, entryStream.getPosition());
       assertNotNull(entry);
@@ -236,7 +232,7 @@ public class TestWALEntryStream {
     appendToLog("1");
     appendToLog("2");// 2
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
+        new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
       assertEquals("1", getRow(entryStream.next()));
 
       appendToLog("3"); // 3 - comes in after reader opened
@@ -261,7 +257,7 @@ public class TestWALEntryStream {
   public void testNewEntriesWhileStreaming() throws Exception {
     appendToLog("1");
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
+        new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
       entryStream.next(); // we've hit the end of the stream at this point
 
       // some new entries come in while we're streaming
@@ -284,7 +280,7 @@ public class TestWALEntryStream {
     long lastPosition = 0;
     appendToLog("1");
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
+        new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
       entryStream.next(); // we've hit the end of the stream at this point
       appendToLog("2");
       appendToLog("3");
@@ -292,7 +288,7 @@ public class TestWALEntryStream {
     }
     // next stream should picks up where we left off
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) {
+        new WALEntryStream(walQueue, fs, conf, lastPosition, log, new MetricsSource("1"))) {
       assertEquals("2", getRow(entryStream.next()));
       assertEquals("3", getRow(entryStream.next()));
       assertFalse(entryStream.hasNext()); // done
@@ -309,14 +305,14 @@ public class TestWALEntryStream {
     long lastPosition = 0;
     appendEntriesToLog(3);
     // read only one element
-    try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) {
+    try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, lastPosition,
+        log, new MetricsSource("1"))) {
       entryStream.next();
       lastPosition = entryStream.getPosition();
     }
     // there should still be two more entries from where we left off
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) {
+        new WALEntryStream(walQueue, fs, conf, lastPosition, log, new MetricsSource("1"))) {
       assertNotNull(entryStream.next());
       assertNotNull(entryStream.next());
       assertFalse(entryStream.hasNext());
@@ -327,7 +323,7 @@ public class TestWALEntryStream {
   @Test
   public void testEmptyStream() throws Exception {
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
+        new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
       assertFalse(entryStream.hasNext());
     }
   }
@@ -338,7 +334,7 @@ public class TestWALEntryStream {
     // get ending position
     long position;
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
+        new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
       entryStream.next();
       entryStream.next();
       entryStream.next();
@@ -351,6 +347,7 @@ public class TestWALEntryStream {
     ReplicationSource source = Mockito.mock(ReplicationSource.class);
     when(source.getSourceManager()).thenReturn(mockSourceManager);
     when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
+    when(source.getWALFileLengthProvider()).thenReturn(log);
     ReplicationSourceWALReader batcher = new ReplicationSourceWALReader(fs, conf,
         walQueue, 0, getDummyFilter(), source);
     Path walPath = walQueue.peek();
@@ -425,10 +422,6 @@ public class TestWALEntryStream {
     };
   }
 
-  private ReplicationQueueInfo getQueueInfo() {
-    return new ReplicationQueueInfo("1");
-  }
-
   class PathWatcher extends WALActionsListener.Base {
 
     Path currentPath;
@@ -440,4 +433,30 @@ public class TestWALEntryStream {
     }
   }
 
+  @Test
+  public void testReadBeyondCommittedLength() throws IOException, InterruptedException {
+    appendToLog("1");
+    appendToLog("2");
+    long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong();
+    AtomicLong fileLength = new AtomicLong(size - 1);
+    try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, 0,
+        p -> OptionalLong.of(fileLength.get()), new MetricsSource("1"))) {
+      assertTrue(entryStream.hasNext());
+      assertNotNull(entryStream.next());
+      // can not get log 2
+      assertFalse(entryStream.hasNext());
+      Thread.sleep(1000);
+      entryStream.reset();
+      // still can not get log 2
+      assertFalse(entryStream.hasNext());
+
+      // can get log 2 now
+      fileLength.set(size);
+      entryStream.reset();
+      assertTrue(entryStream.hasNext());
+      assertNotNull(entryStream.next());
+
+      assertFalse(entryStream.hasNext());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4341c3f5/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
index 041d8ae..944a4f1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
@@ -107,7 +107,7 @@ public class IOTestProvider implements WALProvider {
   }
 
   @Override
-  public List<WAL> getWALs() throws IOException {
+  public List<WAL> getWALs() {
     List<WAL> wals = new ArrayList<>(1);
     wals.add(log);
     return wals;