You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/04/09 07:22:20 UTC

[13/20] hbase git commit: HBASE-20206 WALEntryStream should not switch WAL file silently

HBASE-20206 WALEntryStream should not switch WAL file silently


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/16a4dd6b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/16a4dd6b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/16a4dd6b

Branch: refs/heads/branch-2
Commit: 16a4dd6b8f98cb1116007764cb86f6835a7ca84f
Parents: 644bfe3
Author: zhangduo <zh...@apache.org>
Authored: Sun Mar 18 18:09:45 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Mon Apr 9 15:18:44 2018 +0800

----------------------------------------------------------------------
 .../replication/ReplicationQueueStorage.java    |   2 +-
 .../replication/ZKReplicationQueueStorage.java  |  39 ++--
 .../replication/TestReplicationStateBasic.java  |   3 +-
 .../TestZKReplicationQueueStorage.java          |   6 +-
 .../RecoveredReplicationSource.java             |  33 ----
 .../RecoveredReplicationSourceShipper.java      |  13 +-
 .../regionserver/ReplicationSource.java         |   2 +-
 .../regionserver/ReplicationSourceManager.java  | 100 +++++-----
 .../regionserver/ReplicationSourceShipper.java  |  96 +++++-----
 .../ReplicationSourceWALReader.java             |  50 ++++-
 .../SerialReplicationSourceWALReader.java       |  29 ++-
 .../replication/regionserver/WALEntryBatch.java |  22 +++
 .../regionserver/WALEntryStream.java            |   5 +-
 .../TestReplicationSourceManager.java           |  17 +-
 .../regionserver/TestWALEntryStream.java        | 188 ++++++++++++++-----
 15 files changed, 384 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/16a4dd6b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
index 4c93da6..cfe9c9c 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
@@ -63,7 +63,7 @@ public interface ReplicationQueueStorage {
    * @param serverName the name of the regionserver
    * @param queueId a String that identifies the queue
    * @param fileName name of the WAL
-   * @param position the current position in the file
+   * @param position the current position in the file. Will ignore if less than or equal to 0.
    * @param lastSeqIds map with {encodedRegionName, sequenceId} pairs for serial replication.
    */
   void setWALPosition(ServerName serverName, String queueId, String fileName, long position,

http://git-wip-us.apache.org/repos/asf/hbase/blob/16a4dd6b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
index adbf259..63f43e8 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
@@ -193,27 +193,28 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
       Map<String, Long> lastSeqIds) throws ReplicationException {
     try {
       List<ZKUtilOp> listOfOps = new ArrayList<>();
-      listOfOps.add(ZKUtilOp.setData(getFileNode(serverName, queueId, fileName),
-        ZKUtil.positionToByteArray(position)));
+      if (position > 0) {
+        listOfOps.add(ZKUtilOp.setData(getFileNode(serverName, queueId, fileName),
+          ZKUtil.positionToByteArray(position)));
+      }
       // Persist the max sequence id(s) of regions for serial replication atomically.
-      if (lastSeqIds != null && lastSeqIds.size() > 0) {
-        for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
-          String peerId = new ReplicationQueueInfo(queueId).getPeerId();
-          String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
-          /*
-           * Make sure the existence of path
-           * /hbase/replication/regions/<hash>/<encoded-region-name>-<peer-id>. As the javadoc in
-           * multiOrSequential() method said, if received a NodeExistsException, all operations will
-           * fail. So create the path here, and in fact, no need to add this operation to listOfOps,
-           * because only need to make sure that update file position and sequence id atomically.
-           */
-          ZKUtil.createWithParents(zookeeper, path);
-          // Persist the max sequence id of region to zookeeper.
-          listOfOps
-              .add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue())));
-        }
+      for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
+        String peerId = new ReplicationQueueInfo(queueId).getPeerId();
+        String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
+        /*
+         * Make sure the existence of path
+         * /hbase/replication/regions/<hash>/<encoded-region-name>-<peer-id>. As the javadoc in
+         * multiOrSequential() method said, if received a NodeExistsException, all operations will
+         * fail. So create the path here, and in fact, no need to add this operation to listOfOps,
+         * because only need to make sure that update file position and sequence id atomically.
+         */
+        ZKUtil.createWithParents(zookeeper, path);
+        // Persist the max sequence id of region to zookeeper.
+        listOfOps.add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue())));
+      }
+      if (!listOfOps.isEmpty()) {
+        ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
       }
-      ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
     } catch (KeeperException e) {
       throw new ReplicationException("Failed to set log position (serverName=" + serverName
           + ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e);

http://git-wip-us.apache.org/repos/asf/hbase/blob/16a4dd6b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index 5999c1f..21b09aa 100644
--- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.fs.Path;
@@ -127,7 +128,7 @@ public abstract class TestReplicationStateBasic {
     assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size());
     assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
     assertEquals(0, rqs.getWALPosition(server3, "qId1", "filename0"));
-    rqs.setWALPosition(server3, "qId5", "filename4", 354L, null);
+    rqs.setWALPosition(server3, "qId5", "filename4", 354L, Collections.emptyMap());
     assertEquals(354L, rqs.getWALPosition(server3, "qId5", "filename4"));
 
     assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());

http://git-wip-us.apache.org/repos/asf/hbase/blob/16a4dd6b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
index 8ff52f3..c813870 100644
--- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.SortedSet;
@@ -136,9 +137,10 @@ public class TestZKReplicationQueueStorage {
     for (int i = 0; i < 10; i++) {
       assertEquals(0, STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i)));
       assertEquals(0, STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i)));
-      STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100, null);
+      STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100,
+        Collections.emptyMap());
       STORAGE.setWALPosition(serverName1, queue2, getFileName("file2", i), (i + 1) * 100 + 10,
-        null);
+        Collections.emptyMap());
     }
 
     for (int i = 0; i < 10; i++) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/16a4dd6b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index 169b469..f1ad99d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -64,38 +63,6 @@ public class RecoveredReplicationSource extends ReplicationSource {
     return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, queueStorage);
   }
 
-  private void handleEmptyWALEntryBatch0(ReplicationSourceWALReader reader,
-      BlockingQueue<WALEntryBatch> entryBatchQueue, Path currentPath) throws InterruptedException {
-    LOG.trace("Didn't read any new entries from WAL");
-    // we're done with queue recovery, shut ourself down
-    reader.setReaderRunning(false);
-    // shuts down shipper thread immediately
-    entryBatchQueue.put(new WALEntryBatch(0, currentPath));
-  }
-
-  @Override
-  protected ReplicationSourceWALReader createNewWALReader(String walGroupId,
-      PriorityBlockingQueue<Path> queue, long startPosition) {
-    if (replicationPeer.getPeerConfig().isSerial()) {
-      return new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter,
-        this) {
-
-        @Override
-        protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
-          handleEmptyWALEntryBatch0(this, entryBatchQueue, currentPath);
-        }
-      };
-    } else {
-      return new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this) {
-
-        @Override
-        protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
-          handleEmptyWALEntryBatch0(this, entryBatchQueue, currentPath);
-        }
-      };
-    }
-  }
-
   public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOException {
     boolean hasPathChanged = false;
     PriorityBlockingQueue<Path> newPaths =

http://git-wip-us.apache.org/repos/asf/hbase/blob/16a4dd6b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
index 1ae5cb9..d74211e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
@@ -48,13 +48,10 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
   }
 
   @Override
-  protected void postShipEdits(WALEntryBatch entryBatch) {
-    if (entryBatch.getWalEntries().isEmpty()) {
-      LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
-          + source.getQueueId());
-      source.getSourceMetrics().incrCompletedRecoveryQueue();
-      setWorkerState(WorkerState.FINISHED);
-    }
+  protected void noMoreData() {
+    LOG.debug("Finished recovering queue for group {} of peer {}", walGroupId, source.getQueueId());
+    source.getSourceMetrics().incrCompletedRecoveryQueue();
+    setWorkerState(WorkerState.FINISHED);
   }
 
   @Override
@@ -63,7 +60,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
   }
 
   @Override
-  public long getStartPosition() {
+  long getStartPosition() {
     long startPosition = getRecoveredQueueStartPos();
     int numRetries = 0;
     while (numRetries <= maxRetriesMultiplier) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/16a4dd6b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 3480919..236c575 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -315,7 +315,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
     return new ReplicationSourceShipper(conf, walGroupId, queue, this);
   }
 
-  protected ReplicationSourceWALReader createNewWALReader(String walGroupId,
+  private ReplicationSourceWALReader createNewWALReader(String walGroupId,
       PriorityBlockingQueue<Path> queue, long startPosition) {
     return replicationPeer.getPeerConfig().isSerial()
       ? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this)

http://git-wip-us.apache.org/repos/asf/hbase/blob/16a4dd6b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 06fe977..23e1115 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableSet;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
@@ -82,25 +83,28 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
  * operations.</li>
  * <li>Need synchronized on {@link #walsById}. There are four methods which modify it,
  * {@link #addPeer(String)}, {@link #removePeer(String)},
- * {@link #cleanOldLogs(SortedSet, String, String)} and {@link #preLogRoll(Path)}. {@link #walsById}
- * is a ConcurrentHashMap and there is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So
- * there is no race between {@link #addPeer(String)} and {@link #removePeer(String)}.
- * {@link #cleanOldLogs(SortedSet, String, String)} is called by {@link ReplicationSourceInterface}.
- * So no race with {@link #addPeer(String)}. {@link #removePeer(String)} will terminate the
- * {@link ReplicationSourceInterface} firstly, then remove the wals from {@link #walsById}. So no
- * race with {@link #removePeer(String)}. The only case need synchronized is
- * {@link #cleanOldLogs(SortedSet, String, String)} and {@link #preLogRoll(Path)}.</li>
+ * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and {@link #preLogRoll(Path)}.
+ * {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer id in
+ * {@link PeerProcedureHandlerImpl}. So there is no race between {@link #addPeer(String)} and
+ * {@link #removePeer(String)}. {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is
+ * called by {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}.
+ * {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then
+ * remove the wals from {@link #walsById}. So no race with {@link #removePeer(String)}. The only
+ * case need synchronized is {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and
+ * {@link #preLogRoll(Path)}.</li>
  * <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which
- * modify it, {@link #removePeer(String)} , {@link #cleanOldLogs(SortedSet, String, String)} and
+ * modify it, {@link #removePeer(String)} ,
+ * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and
  * {@link ReplicationSourceManager.NodeFailoverWorker#run()}.
- * {@link #cleanOldLogs(SortedSet, String, String)} is called by {@link ReplicationSourceInterface}.
- * {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then
- * remove the wals from {@link #walsByIdRecoveredQueues}. And
- * {@link ReplicationSourceManager.NodeFailoverWorker#run()} will add the wals to
- * {@link #walsByIdRecoveredQueues} firstly, then start up a {@link ReplicationSourceInterface}. So
- * there is no race here. For {@link ReplicationSourceManager.NodeFailoverWorker#run()} and
- * {@link #removePeer(String)}, there is already synchronized on {@link #oldsources}. So no need
- * synchronized on {@link #walsByIdRecoveredQueues}.</li>
+ * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is called by
+ * {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the
+ * {@link ReplicationSourceInterface} firstly, then remove the wals from
+ * {@link #walsByIdRecoveredQueues}. And {@link ReplicationSourceManager.NodeFailoverWorker#run()}
+ * will add the wals to {@link #walsByIdRecoveredQueues} firstly, then start up a
+ * {@link ReplicationSourceInterface}. So there is no race here. For
+ * {@link ReplicationSourceManager.NodeFailoverWorker#run()} and {@link #removePeer(String)}, there
+ * is already synchronized on {@link #oldsources}. So no need synchronized on
+ * {@link #walsByIdRecoveredQueues}.</li>
  * <li>Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.</li>
  * <li>Need synchronized on {@link #oldsources} to avoid adding recovered source for the
  * to-be-removed peer.</li>
@@ -124,11 +128,11 @@ public class ReplicationSourceManager implements ReplicationListener {
   // All logs we are currently tracking
   // Index structure of the map is: queue_id->logPrefix/logGroup->logs
   // For normal replication source, the peer id is same with the queue id
-  private final ConcurrentMap<String, Map<String, SortedSet<String>>> walsById;
+  private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsById;
   // Logs for recovered sources we are currently tracking
   // the map is: queue_id->logPrefix/logGroup->logs
   // For recovered source, the queue id's format is peer_id-servername-*
-  private final ConcurrentMap<String, Map<String, SortedSet<String>>> walsByIdRecoveredQueues;
+  private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsByIdRecoveredQueues;
 
   private final Configuration conf;
   private final FileSystem fs;
@@ -335,14 +339,14 @@ public class ReplicationSourceManager implements ReplicationListener {
     // synchronized on latestPaths to avoid missing the new log
     synchronized (this.latestPaths) {
       this.sources.put(peerId, src);
-      Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
+      Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
       this.walsById.put(peerId, walsByGroup);
       // Add the latest wal to that source's queue
       if (this.latestPaths.size() > 0) {
         for (Path logPath : latestPaths) {
           String name = logPath.getName();
           String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name);
-          SortedSet<String> logs = new TreeSet<>();
+          NavigableSet<String> logs = new TreeSet<>();
           logs.add(name);
           walsByGroup.put(walPrefix, logs);
           // Abort RS and throw exception to make add peer failed
@@ -474,50 +478,51 @@ public class ReplicationSourceManager implements ReplicationListener {
   /**
    * This method will log the current position to storage. And also clean old logs from the
    * replication queue.
-   * @param log Path to the log currently being replicated
    * @param queueId id of the replication queue
-   * @param position current location in the log
    * @param queueRecovered indicates if this queue comes from another region server
+   * @param entryBatch the wal entry batch we just shipped
    */
-  public void logPositionAndCleanOldLogs(Path log, String queueId, long position,
-      Map<String, Long> lastSeqIds, boolean queueRecovered) {
-    String fileName = log.getName();
+  public void logPositionAndCleanOldLogs(String queueId, boolean queueRecovered,
+      WALEntryBatch entryBatch) {
+    String fileName = entryBatch.getLastWalPath().getName();
     abortWhenFail(() -> this.queueStorage.setWALPosition(server.getServerName(), queueId, fileName,
-      position, lastSeqIds));
-    cleanOldLogs(fileName, queueId, queueRecovered);
+      entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds()));
+    cleanOldLogs(fileName, entryBatch.isEndOfFile(), queueId, queueRecovered);
   }
 
   /**
    * Cleans a log file and all older logs from replication queue. Called when we are sure that a log
    * file is closed and has no more entries.
    * @param log Path to the log
+   * @param inclusive whether we should also remove the given log file
    * @param queueId id of the replication queue
    * @param queueRecovered Whether this is a recovered queue
    */
   @VisibleForTesting
-  void cleanOldLogs(String log, String queueId, boolean queueRecovered) {
+  void cleanOldLogs(String log, boolean inclusive, String queueId, boolean queueRecovered) {
     String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
     if (queueRecovered) {
-      SortedSet<String> wals = walsByIdRecoveredQueues.get(queueId).get(logPrefix);
-      if (wals != null && !wals.first().equals(log)) {
-        cleanOldLogs(wals, log, queueId);
+      NavigableSet<String> wals = walsByIdRecoveredQueues.get(queueId).get(logPrefix);
+      if (wals != null) {
+        cleanOldLogs(wals, log, inclusive, queueId);
       }
     } else {
       // synchronized on walsById to avoid race with preLogRoll
       synchronized (this.walsById) {
-        SortedSet<String> wals = walsById.get(queueId).get(logPrefix);
+        NavigableSet<String> wals = walsById.get(queueId).get(logPrefix);
         if (wals != null && !wals.first().equals(log)) {
-          cleanOldLogs(wals, log, queueId);
+          cleanOldLogs(wals, log, inclusive, queueId);
         }
       }
     }
   }
 
-  private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
-    SortedSet<String> walSet = wals.headSet(key);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
+  private void cleanOldLogs(NavigableSet<String> wals, String key, boolean inclusive, String id) {
+    NavigableSet<String> walSet = wals.headSet(key, inclusive);
+    if (walSet.isEmpty()) {
+      return;
     }
+    LOG.debug("Removing {} logs in the list: {}", walSet.size(), walSet);
     for (String wal : walSet) {
       abortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal));
     }
@@ -542,11 +547,12 @@ public class ReplicationSourceManager implements ReplicationListener {
       // synchronized on walsById to avoid race with cleanOldLogs
       synchronized (this.walsById) {
         // Update walsById map
-        for (Map.Entry<String, Map<String, SortedSet<String>>> entry : this.walsById.entrySet()) {
+        for (Map.Entry<String, Map<String, NavigableSet<String>>> entry : this.walsById
+          .entrySet()) {
           String peerId = entry.getKey();
-          Map<String, SortedSet<String>> walsByPrefix = entry.getValue();
+          Map<String, NavigableSet<String>> walsByPrefix = entry.getValue();
           boolean existingPrefix = false;
-          for (Map.Entry<String, SortedSet<String>> walsEntry : walsByPrefix.entrySet()) {
+          for (Map.Entry<String, NavigableSet<String>> walsEntry : walsByPrefix.entrySet()) {
             SortedSet<String> wals = walsEntry.getValue();
             if (this.sources.isEmpty()) {
               // If there's no slaves, don't need to keep the old wals since
@@ -560,8 +566,8 @@ public class ReplicationSourceManager implements ReplicationListener {
           }
           if (!existingPrefix) {
             // The new log belongs to a new group, add it into this peer
-            LOG.debug("Start tracking logs for wal group " + logPrefix + " for peer " + peerId);
-            SortedSet<String> wals = new TreeSet<>();
+            LOG.debug("Start tracking logs for wal group {} for peer {}", logPrefix, peerId);
+            NavigableSet<String> wals = new TreeSet<>();
             wals.add(logName);
             walsByPrefix.put(logPrefix, wals);
           }
@@ -700,11 +706,11 @@ public class ReplicationSourceManager implements ReplicationListener {
             continue;
           }
           // track sources in walsByIdRecoveredQueues
-          Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
+          Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
           walsByIdRecoveredQueues.put(queueId, walsByGroup);
           for (String wal : walsSet) {
             String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
-            SortedSet<String> wals = walsByGroup.get(walPrefix);
+            NavigableSet<String> wals = walsByGroup.get(walPrefix);
             if (wals == null) {
               wals = new TreeSet<>();
               walsByGroup.put(walPrefix, wals);
@@ -749,7 +755,7 @@ public class ReplicationSourceManager implements ReplicationListener {
    * @return a sorted set of wal names
    */
   @VisibleForTesting
-  Map<String, Map<String, SortedSet<String>>> getWALs() {
+  Map<String, Map<String, NavigableSet<String>>> getWALs() {
     return Collections.unmodifiableMap(walsById);
   }
 
@@ -758,7 +764,7 @@ public class ReplicationSourceManager implements ReplicationListener {
    * @return a sorted set of wal names
    */
   @VisibleForTesting
-  Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() {
+  Map<String, Map<String, NavigableSet<String>>> getWalsByIdRecoveredQueues() {
     return Collections.unmodifiableMap(walsByIdRecoveredQueues);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/16a4dd6b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index aa5251e..2097d00 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.PriorityBlockingQueue;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -52,17 +51,18 @@ public class ReplicationSourceShipper extends Thread {
     FINISHED,  // The worker is done processing a recovered queue
   }
 
-  protected final Configuration conf;
+  private final Configuration conf;
   protected final String walGroupId;
   protected final PriorityBlockingQueue<Path> queue;
-  protected final ReplicationSourceInterface source;
+  private final ReplicationSourceInterface source;
 
   // Last position in the log that we sent to ZooKeeper
-  protected long lastLoggedPosition = -1;
+  // It will be accessed by the stats thread so make it volatile
+  private volatile long currentPosition = -1;
   // Path of the current log
-  protected volatile Path currentPath;
+  private Path currentPath;
   // Current state of the worker thread
-  private WorkerState state;
+  private volatile WorkerState state;
   protected ReplicationSourceWALReader entryReader;
 
   // How long should we sleep for each retry
@@ -97,8 +97,12 @@ public class ReplicationSourceShipper extends Thread {
       }
       try {
         WALEntryBatch entryBatch = entryReader.take();
-        shipEdits(entryBatch);
-        postShipEdits(entryBatch);
+        // the NO_MORE_DATA instance has no path so do not all shipEdits
+        if (entryBatch == WALEntryBatch.NO_MORE_DATA) {
+          noMoreData();
+        } else {
+          shipEdits(entryBatch);
+        }
       } catch (InterruptedException e) {
         LOG.trace("Interrupted while waiting for next replication entry batch", e);
         Thread.currentThread().interrupt();
@@ -113,7 +117,7 @@ public class ReplicationSourceShipper extends Thread {
   }
 
   // To be implemented by recovered shipper
-  protected void postShipEdits(WALEntryBatch entryBatch) {
+  protected void noMoreData() {
   }
 
   // To be implemented by recovered shipper
@@ -123,14 +127,11 @@ public class ReplicationSourceShipper extends Thread {
   /**
    * Do the shipping logic
    */
-  protected final void shipEdits(WALEntryBatch entryBatch) {
+  private void shipEdits(WALEntryBatch entryBatch) {
     List<Entry> entries = entryBatch.getWalEntries();
-    long lastReadPosition = entryBatch.getLastWalPosition();
-    currentPath = entryBatch.getLastWalPath();
     int sleepMultiplier = 0;
     if (entries.isEmpty()) {
-      if (lastLoggedPosition != lastReadPosition) {
-        updateLogPosition(lastReadPosition, entryBatch.getLastSeqIds());
+      if (updateLogPosition(entryBatch)) {
         // if there was nothing to ship and it's not an error
         // set "ageOfLastShippedOp" to <now> to indicate that we're current
         source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(),
@@ -168,16 +169,12 @@ public class ReplicationSourceShipper extends Thread {
         } else {
           sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
         }
-
-        if (this.lastLoggedPosition != lastReadPosition) {
-          // Clean up hfile references
-          int size = entries.size();
-          for (int i = 0; i < size; i++) {
-            cleanUpHFileRefs(entries.get(i).getEdit());
-          }
-          // Log and clean up WAL logs
-          updateLogPosition(lastReadPosition, entryBatch.getLastSeqIds());
+        // Clean up hfile references
+        for (Entry entry : entries) {
+          cleanUpHFileRefs(entry.getEdit());
         }
+        // Log and clean up WAL logs
+        updateLogPosition(entryBatch);
 
         source.postShipEdits(entries, currentSize);
         // FIXME check relationship between wal group and overall
@@ -224,10 +221,29 @@ public class ReplicationSourceShipper extends Thread {
     }
   }
 
-  private void updateLogPosition(long lastReadPosition, Map<String, Long> lastSeqIds) {
-    source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getQueueId(),
-      lastReadPosition, lastSeqIds, source.isRecovered());
-    lastLoggedPosition = lastReadPosition;
+  private boolean updateLogPosition(WALEntryBatch batch) {
+    boolean updated = false;
+    // if end of file is true, then the logPositionAndCleanOldLogs method will remove the file
+    // record on zk, so let's call it. The last wal position maybe zero if end of file is true and
+    // there is no entry in the batch. It is OK because that the queue storage will ignore the zero
+    // position and the file will be removed soon in cleanOldLogs.
+    if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) ||
+      batch.getLastWalPosition() != currentPosition) {
+      source.getSourceManager().logPositionAndCleanOldLogs(source.getQueueId(),
+        source.isRecovered(), batch);
+      updated = true;
+    }
+    // if end of file is true, then we can just skip to the next file in queue.
+    // the only exception is for recovered queue, if we reach the end of the queue, then there will
+    // no more files so here the currentPath may be null.
+    if (batch.isEndOfFile()) {
+      currentPath = entryReader.getCurrentPath();
+      currentPosition = 0L;
+    } else {
+      currentPath = batch.getLastWalPath();
+      currentPosition = batch.getLastWalPosition();
+    }
+    return updated;
   }
 
   public void startup(UncaughtExceptionHandler handler) {
@@ -236,39 +252,31 @@ public class ReplicationSourceShipper extends Thread {
       name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), handler);
   }
 
-  public PriorityBlockingQueue<Path> getLogQueue() {
-    return this.queue;
-  }
-
-  public Path getCurrentPath() {
-    return this.entryReader.getCurrentPath();
+  Path getCurrentPath() {
+    return entryReader.getCurrentPath();
   }
 
-  public long getCurrentPosition() {
-    return this.lastLoggedPosition;
+  long getCurrentPosition() {
+    return currentPosition;
   }
 
-  public void setWALReader(ReplicationSourceWALReader entryReader) {
+  void setWALReader(ReplicationSourceWALReader entryReader) {
     this.entryReader = entryReader;
   }
 
-  public long getStartPosition() {
+  long getStartPosition() {
     return 0;
   }
 
-  protected final boolean isActive() {
+  private boolean isActive() {
     return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted();
   }
 
-  public void setWorkerState(WorkerState state) {
+  protected final void setWorkerState(WorkerState state) {
     this.state = state;
   }
 
-  public WorkerState getWorkerState() {
-    return state;
-  }
-
-  public void stopWorker() {
+  void stopWorker() {
     setWorkerState(WorkerState.STOPPED);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/16a4dd6b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index b125133..2154856 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -59,7 +59,7 @@ class ReplicationSourceWALReader extends Thread {
   private final WALEntryFilter filter;
   private final ReplicationSource source;
 
-  protected final BlockingQueue<WALEntryBatch> entryBatchQueue;
+  private final BlockingQueue<WALEntryBatch> entryBatchQueue;
   // max (heap) size of each batch - multiply by number of batches in queue to get total
   private final long replicationBatchSizeCapacity;
   // max count of each batch - multiply by number of batches in queue to get total
@@ -130,6 +130,7 @@ class ReplicationSourceWALReader extends Thread {
             continue;
           }
           WALEntryBatch batch = readWALEntries(entryStream);
+          currentPosition = entryStream.getPosition();
           if (batch != null) {
             // need to propagate the batch even it has no entries since it may carry the last
             // sequence id information for serial replication.
@@ -138,9 +139,8 @@ class ReplicationSourceWALReader extends Thread {
             sleepMultiplier = 1;
           } else { // got no entries and didn't advance position in WAL
             handleEmptyWALEntryBatch(entryStream.getCurrentPath());
+            entryStream.reset(); // reuse stream
           }
-          currentPosition = entryStream.getPosition();
-          entryStream.reset(); // reuse stream
         }
       } catch (IOException e) { // stream related
         if (sleepMultiplier < maxRetriesMultiplier) {
@@ -173,13 +173,31 @@ class ReplicationSourceWALReader extends Thread {
       batch.getNbEntries() >= replicationBatchCountCapacity;
   }
 
+  protected static final boolean switched(WALEntryStream entryStream, Path path) {
+    return !path.equals(entryStream.getCurrentPath());
+  }
+
   protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
       throws IOException, InterruptedException {
+    Path currentPath = entryStream.getCurrentPath();
     if (!entryStream.hasNext()) {
-      return null;
+      // check whether we have switched a file
+      if (currentPath != null && switched(entryStream, currentPath)) {
+        return WALEntryBatch.endOfFile(currentPath);
+      } else {
+        return null;
+      }
+    }
+    if (currentPath != null) {
+      if (switched(entryStream, currentPath)) {
+        return WALEntryBatch.endOfFile(currentPath);
+      }
+    } else {
+      // when reading from the entry stream first time we will enter here
+      currentPath = entryStream.getCurrentPath();
     }
     WALEntryBatch batch = createBatch(entryStream);
-    do {
+    for (;;) {
       Entry entry = entryStream.next();
       batch.setLastWalPosition(entryStream.getPosition());
       entry = filterEntry(entry);
@@ -188,13 +206,29 @@ class ReplicationSourceWALReader extends Thread {
           break;
         }
       }
-    } while (entryStream.hasNext());
+      boolean hasNext = entryStream.hasNext();
+      // always return if we have switched to a new file
+      if (switched(entryStream, currentPath)) {
+        batch.setEndOfFile(true);
+        break;
+      }
+      if (!hasNext) {
+        break;
+      }
+    }
     return batch;
   }
 
-  protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
+  private void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
     LOG.trace("Didn't read any new entries from WAL");
-    Thread.sleep(sleepForRetries);
+    if (source.isRecovered()) {
+      // we're done with queue recovery, shut ourself down
+      setReaderRunning(false);
+      // shuts down shipper thread immediately
+      entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA);
+    } else {
+      Thread.sleep(sleepForRetries);
+    }
   }
 
   // if we get an EOF due to a zero-length log, and there are other logs in queue

http://git-wip-us.apache.org/repos/asf/hbase/blob/16a4dd6b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
index 5e9a9f6..9edcc8a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
@@ -53,12 +53,26 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader
   @Override
   protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
       throws IOException, InterruptedException {
+    Path currentPath = entryStream.getCurrentPath();
     if (!entryStream.hasNext()) {
-      return null;
+      // check whether we have switched a file
+      if (currentPath != null && switched(entryStream, currentPath)) {
+        return WALEntryBatch.endOfFile(currentPath);
+      } else {
+        return null;
+      }
+    }
+    if (currentPath != null) {
+      if (switched(entryStream, currentPath)) {
+        return WALEntryBatch.endOfFile(currentPath);
+      }
+    } else {
+      // when reading from the entry stream first time we will enter here
+      currentPath = entryStream.getCurrentPath();
     }
     long positionBefore = entryStream.getPosition();
     WALEntryBatch batch = createBatch(entryStream);
-    do {
+    for (;;) {
       Entry entry = entryStream.peek();
       boolean doFiltering = true;
       if (firstCellInEntryBeforeFiltering == null) {
@@ -99,7 +113,16 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader
         // actually remove the entry.
         removeEntryFromStream(entryStream, batch);
       }
-    } while (entryStream.hasNext());
+      boolean hasNext = entryStream.hasNext();
+      // always return if we have switched to a new file.
+      if (switched(entryStream, currentPath)) {
+        batch.setEndOfFile(true);
+        break;
+      }
+      if (!hasNext) {
+        break;
+      }
+    }
     return batch;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/16a4dd6b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
index 31c3ac7..960d473 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
@@ -30,6 +30,10 @@ import org.apache.yetus.audience.InterfaceAudience;
  */
 @InterfaceAudience.Private
 class WALEntryBatch {
+
+  // used by recovered replication queue to indicate that all the entries have been read.
+  public static final WALEntryBatch NO_MORE_DATA = new WALEntryBatch(0, null);
+
   private List<Entry> walEntries;
   // last WAL that was read
   private Path lastWalPath;
@@ -43,6 +47,8 @@ class WALEntryBatch {
   private long heapSize = 0;
   // save the last sequenceid for each region if the table has serial-replication scope
   private Map<String, Long> lastSeqIds = new HashMap<>();
+  // indicate that this is the end of the current file
+  private boolean endOfFile;
 
   /**
    * @param lastWalPath Path of the WAL the last entry in this batch was read from
@@ -52,6 +58,14 @@ class WALEntryBatch {
     this.lastWalPath = lastWalPath;
   }
 
+
+  static WALEntryBatch endOfFile(Path lastWalPath) {
+    WALEntryBatch batch = new WALEntryBatch(0, lastWalPath);
+    batch.setLastWalPosition(-1L);
+    batch.setEndOfFile(true);
+    return batch;
+  }
+
   public void addEntry(Entry entry) {
     walEntries.add(entry);
   }
@@ -120,6 +134,14 @@ class WALEntryBatch {
     return lastSeqIds;
   }
 
+  public boolean isEndOfFile() {
+    return endOfFile;
+  }
+
+  public void setEndOfFile(boolean endOfFile) {
+    this.endOfFile = endOfFile;
+  }
+
   public void incrementNbRowKeys(int increment) {
     nbRowKeys += increment;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/16a4dd6b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index c639a48..b2c199e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -155,7 +155,6 @@ class WALEntryStream implements Closeable {
   /**
    * Should be called if the stream is to be reused (i.e. used again after hasNext() has returned
    * false)
-   * @throws IOException
    */
   public void reset() throws IOException {
     if (reader != null && currentPath != null) {
@@ -304,6 +303,9 @@ class WALEntryStream implements Closeable {
       if (reader != null) {
         return true;
       }
+    } else {
+      // no more files in queue, this could only happen for recovered queue.
+      setCurrentPath(null);
     }
     return false;
   }
@@ -394,6 +396,7 @@ class WALEntryStream implements Closeable {
 
   private void resetReader() throws IOException {
     try {
+      currentEntry = null;
       reader.reset();
       seek();
     } catch (FileNotFoundException fnfe) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/16a4dd6b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 6d75fec..eb46cd7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -32,6 +32,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
+import java.util.NavigableSet;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeMap;
@@ -308,25 +309,25 @@ public abstract class TestReplicationSourceManager {
     for (int i = 0; i < 3; i++) {
       wal.append(hri,
         new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
-          edit, true);
+        edit, true);
     }
     wal.sync();
 
     int logNumber = 0;
-    for (Map.Entry<String, SortedSet<String>> entry : manager.getWALs().get(slaveId).entrySet()) {
+    for (Map.Entry<String, NavigableSet<String>> entry : manager.getWALs().get(slaveId)
+      .entrySet()) {
       logNumber += entry.getValue().size();
     }
     assertEquals(6, logNumber);
 
     wal.rollWriter();
 
-    manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
-        "1", 0, null, false);
+    manager.logPositionAndCleanOldLogs("1", false,
+      new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath()));
 
     wal.append(hri,
-        new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
-        edit,
-        true);
+      new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
+      edit, true);
     wal.sync();
 
     assertEquals(1, manager.getWALs().size());
@@ -396,7 +397,7 @@ public abstract class TestReplicationSourceManager {
     assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
     String id = "1-" + server.getServerName().getServerName();
     assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group));
-    manager.cleanOldLogs(file2, id, true);
+    manager.cleanOldLogs(file2, false, id, true);
     // log1 should be deleted
     assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group));
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/16a4dd6b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index eb7d5a0..2670756 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
@@ -75,7 +76,7 @@ public class TestWALEntryStream {
       HBaseClassTestRule.forClass(TestWALEntryStream.class);
 
   private static HBaseTestingUtility TEST_UTIL;
-  private static Configuration conf;
+  private static Configuration CONF;
   private static FileSystem fs;
   private static MiniDFSCluster cluster;
   private static final TableName tableName = TableName.valueOf("tablename");
@@ -102,7 +103,7 @@ public class TestWALEntryStream {
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     TEST_UTIL = new HBaseTestingUtility();
-    conf = TEST_UTIL.getConfiguration();
+    CONF = TEST_UTIL.getConfiguration();
     TEST_UTIL.startMiniDFSCluster(3);
 
     cluster = TEST_UTIL.getDFSCluster();
@@ -118,7 +119,7 @@ public class TestWALEntryStream {
   public void setUp() throws Exception {
     walQueue = new PriorityBlockingQueue<>();
     pathWatcher = new PathWatcher();
-    final WALFactory wals = new WALFactory(conf, tn.getMethodName());
+    final WALFactory wals = new WALFactory(CONF, tn.getMethodName());
     wals.getWALProvider().addWALActionsListener(pathWatcher);
     log = wals.getWAL(info);
   }
@@ -144,13 +145,13 @@ public class TestWALEntryStream {
           mvcc.advanceTo(1);
 
           for (int i = 0; i < nbRows; i++) {
-            appendToLogPlus(walEditKVs);
+            appendToLogAndSync(walEditKVs);
           }
 
           log.rollWriter();
 
           try (WALEntryStream entryStream =
-              new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
+              new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
             int i = 0;
             while (entryStream.hasNext()) {
               assertNotNull(entryStream.next());
@@ -174,10 +175,10 @@ public class TestWALEntryStream {
    */
   @Test
   public void testAppendsWithRolls() throws Exception {
-    appendToLog();
+    appendToLogAndSync();
     long oldPos;
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
+        new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
       // There's one edit in the log, read it. Reading past it needs to throw exception
       assertTrue(entryStream.hasNext());
       WAL.Entry entry = entryStream.peek();
@@ -189,9 +190,9 @@ public class TestWALEntryStream {
       oldPos = entryStream.getPosition();
     }
 
-    appendToLog();
+    appendToLogAndSync();
 
-    try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, oldPos,
+    try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, oldPos,
         log, null, new MetricsSource("1"))) {
       // Read the newly added entry, make sure we made progress
       WAL.Entry entry = entryStream.next();
@@ -201,11 +202,11 @@ public class TestWALEntryStream {
     }
 
     // We rolled but we still should see the end of the first log and get that item
-    appendToLog();
+    appendToLogAndSync();
     log.rollWriter();
-    appendToLog();
+    appendToLogAndSync();
 
-    try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, oldPos,
+    try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, oldPos,
         log, null, new MetricsSource("1"))) {
       WAL.Entry entry = entryStream.next();
       assertNotEquals(oldPos, entryStream.getPosition());
@@ -231,7 +232,7 @@ public class TestWALEntryStream {
     appendToLog("1");
     appendToLog("2");// 2
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
+        new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
       assertEquals("1", getRow(entryStream.next()));
 
       appendToLog("3"); // 3 - comes in after reader opened
@@ -256,7 +257,7 @@ public class TestWALEntryStream {
   public void testNewEntriesWhileStreaming() throws Exception {
     appendToLog("1");
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
+        new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
       entryStream.next(); // we've hit the end of the stream at this point
 
       // some new entries come in while we're streaming
@@ -279,7 +280,7 @@ public class TestWALEntryStream {
     long lastPosition = 0;
     appendToLog("1");
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
+        new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
       entryStream.next(); // we've hit the end of the stream at this point
       appendToLog("2");
       appendToLog("3");
@@ -287,7 +288,7 @@ public class TestWALEntryStream {
     }
     // next stream should picks up where we left off
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, lastPosition, log, null, new MetricsSource("1"))) {
+        new WALEntryStream(walQueue, fs, CONF, lastPosition, log, null, new MetricsSource("1"))) {
       assertEquals("2", getRow(entryStream.next()));
       assertEquals("3", getRow(entryStream.next()));
       assertFalse(entryStream.hasNext()); // done
@@ -302,16 +303,16 @@ public class TestWALEntryStream {
   @Test
   public void testPosition() throws Exception {
     long lastPosition = 0;
-    appendEntriesToLog(3);
+    appendEntriesToLogAndSync(3);
     // read only one element
-    try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, lastPosition,
+    try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, lastPosition,
         log, null, new MetricsSource("1"))) {
       entryStream.next();
       lastPosition = entryStream.getPosition();
     }
     // there should still be two more entries from where we left off
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, lastPosition, log, null, new MetricsSource("1"))) {
+        new WALEntryStream(walQueue, fs, CONF, lastPosition, log, null, new MetricsSource("1"))) {
       assertNotNull(entryStream.next());
       assertNotNull(entryStream.next());
       assertFalse(entryStream.hasNext());
@@ -322,38 +323,44 @@ public class TestWALEntryStream {
   @Test
   public void testEmptyStream() throws Exception {
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
+        new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
       assertFalse(entryStream.hasNext());
     }
   }
 
+  private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) {
+    ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
+    when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+    Server mockServer = Mockito.mock(Server.class);
+    ReplicationSource source = Mockito.mock(ReplicationSource.class);
+    when(source.getSourceManager()).thenReturn(mockSourceManager);
+    when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
+    when(source.getWALFileLengthProvider()).thenReturn(log);
+    when(source.getServer()).thenReturn(mockServer);
+    when(source.isRecovered()).thenReturn(recovered);
+    ReplicationSourceWALReader reader =
+      new ReplicationSourceWALReader(fs, conf, walQueue, 0, getDummyFilter(), source);
+    reader.start();
+    return reader;
+  }
+
   @Test
-  public void testReplicationSourceWALReaderThread() throws Exception {
-    appendEntriesToLog(3);
+  public void testReplicationSourceWALReader() throws Exception {
+    appendEntriesToLogAndSync(3);
     // get ending position
     long position;
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
+        new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
       entryStream.next();
       entryStream.next();
       entryStream.next();
       position = entryStream.getPosition();
     }
 
-    // start up a batcher
-    ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
-    when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
-    Server mockServer= Mockito.mock(Server.class);
-    ReplicationSource source = Mockito.mock(ReplicationSource.class);
-    when(source.getSourceManager()).thenReturn(mockSourceManager);
-    when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
-    when(source.getWALFileLengthProvider()).thenReturn(log);
-    when(source.getServer()).thenReturn(mockServer);
-    ReplicationSourceWALReader batcher = new ReplicationSourceWALReader(fs, conf,
-        walQueue, 0, getDummyFilter(), source);
+    // start up a reader
     Path walPath = walQueue.peek();
-    batcher.start();
-    WALEntryBatch entryBatch = batcher.take();
+    ReplicationSourceWALReader reader = createReader(false, CONF);
+    WALEntryBatch entryBatch = reader.take();
 
     // should've batched up our entries
     assertNotNull(entryBatch);
@@ -363,11 +370,96 @@ public class TestWALEntryStream {
     assertEquals(3, entryBatch.getNbRowKeys());
 
     appendToLog("foo");
-    entryBatch = batcher.take();
+    entryBatch = reader.take();
     assertEquals(1, entryBatch.getNbEntries());
     assertEquals("foo", getRow(entryBatch.getWalEntries().get(0)));
   }
 
+  @Test
+  public void testReplicationSourceWALReaderRecovered() throws Exception {
+    appendEntriesToLogAndSync(10);
+    Path walPath = walQueue.peek();
+    log.rollWriter();
+    appendEntriesToLogAndSync(5);
+    log.shutdown();
+
+    Configuration conf = new Configuration(CONF);
+    conf.setInt("replication.source.nb.capacity", 10);
+
+    ReplicationSourceWALReader reader = createReader(true, conf);
+
+    WALEntryBatch batch = reader.take();
+    assertEquals(walPath, batch.getLastWalPath());
+    assertEquals(10, batch.getNbEntries());
+    assertFalse(batch.isEndOfFile());
+
+    batch = reader.take();
+    assertEquals(walPath, batch.getLastWalPath());
+    assertEquals(0, batch.getNbEntries());
+    assertTrue(batch.isEndOfFile());
+
+    walPath = walQueue.peek();
+    batch = reader.take();
+    assertEquals(walPath, batch.getLastWalPath());
+    assertEquals(5, batch.getNbEntries());
+    // Actually this should be true but we haven't handled this yet since for a normal queue the
+    // last one is always open... Not a big deal for now.
+    assertFalse(batch.isEndOfFile());
+
+    assertSame(WALEntryBatch.NO_MORE_DATA, reader.take());
+  }
+
+  // Testcase for HBASE-20206
+  @Test
+  public void testReplicationSourceWALReaderWrongPosition() throws Exception {
+    appendEntriesToLogAndSync(1);
+    Path walPath = walQueue.peek();
+    log.rollWriter();
+    appendEntriesToLogAndSync(20);
+    TEST_UTIL.waitFor(5000, new ExplainingPredicate<Exception>() {
+
+      @Override
+      public boolean evaluate() throws Exception {
+        return fs.getFileStatus(walPath).getLen() > 0;
+      }
+
+      @Override
+      public String explainFailure() throws Exception {
+        return walPath + " has not been closed yet";
+      }
+
+    });
+    long walLength = fs.getFileStatus(walPath).getLen();
+
+    ReplicationSourceWALReader reader = createReader(false, CONF);
+
+    WALEntryBatch entryBatch = reader.take();
+    assertEquals(walPath, entryBatch.getLastWalPath());
+    assertTrue("Position " + entryBatch.getLastWalPosition() + " is out of range, file length is " +
+      walLength, entryBatch.getLastWalPosition() <= walLength);
+    assertEquals(1, entryBatch.getNbEntries());
+    assertTrue(entryBatch.isEndOfFile());
+
+    Path walPath2 = walQueue.peek();
+    entryBatch = reader.take();
+    assertEquals(walPath2, entryBatch.getLastWalPath());
+    assertEquals(20, entryBatch.getNbEntries());
+    assertFalse(entryBatch.isEndOfFile());
+
+    log.rollWriter();
+    appendEntriesToLogAndSync(10);
+    entryBatch = reader.take();
+    assertEquals(walPath2, entryBatch.getLastWalPath());
+    assertEquals(0, entryBatch.getNbEntries());
+    assertTrue(entryBatch.isEndOfFile());
+
+    Path walPath3 = walQueue.peek();
+    entryBatch = reader.take();
+    assertEquals(walPath3, entryBatch.getLastWalPath());
+    assertEquals(10, entryBatch.getNbEntries());
+    assertFalse(entryBatch.isEndOfFile());
+  }
+
   private String getRow(WAL.Entry entry) {
     Cell cell = entry.getEdit().getCells().get(0);
     return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
@@ -380,22 +472,28 @@ public class TestWALEntryStream {
     log.sync(txid);
   }
 
-  private void appendEntriesToLog(int count) throws IOException {
+  private void appendEntriesToLogAndSync(int count) throws IOException {
+    long txid = -1L;
     for (int i = 0; i < count; i++) {
-      appendToLog();
+      txid = appendToLog(1);
     }
+    log.sync(txid);
   }
 
-  private void appendToLog() throws IOException {
-    appendToLogPlus(1);
+  private void appendToLogAndSync() throws IOException {
+    appendToLogAndSync(1);
   }
 
-  private void appendToLogPlus(int count) throws IOException {
-    final long txid = log.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
-            System.currentTimeMillis(), mvcc, scopes), getWALEdits(count), true);
+  private void appendToLogAndSync(int count) throws IOException {
+    long txid = appendToLog(count);
     log.sync(txid);
   }
 
+  private long appendToLog(int count) throws IOException {
+    return log.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
+      System.currentTimeMillis(), mvcc, scopes), getWALEdits(count), true);
+  }
+
   private WALEdit getWALEdits(int count) {
     WALEdit edit = new WALEdit();
     for (int i = 0; i < count; i++) {
@@ -439,7 +537,7 @@ public class TestWALEntryStream {
     appendToLog("2");
     long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong();
     AtomicLong fileLength = new AtomicLong(size - 1);
-    try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, 0,
+    try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, 0,
         p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"))) {
       assertTrue(entryStream.hasNext());
       assertNotNull(entryStream.next());