You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2017/06/19 01:30:08 UTC

[2/2] hbase git commit: HBASE-18170 Refactor ReplicationSourceWALReaderThread

HBASE-18170 Refactor ReplicationSourceWALReaderThread


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c6e71f15
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c6e71f15
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c6e71f15

Branch: refs/heads/master
Commit: c6e71f159cbd3f993b7fe361a5e5d50352efb306
Parents: d49208b
Author: Guanghao Zhang <zg...@apache.org>
Authored: Sat Jun 17 00:45:52 2017 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Mon Jun 19 09:26:45 2017 +0800

----------------------------------------------------------------------
 .../RecoveredReplicationSource.java             |  23 +-
 .../RecoveredReplicationSourceShipper.java      | 151 ++++++
 ...RecoveredReplicationSourceShipperThread.java | 151 ------
 .../RecoveredReplicationSourceWALReader.java    |  55 ++
 .../regionserver/ReplicationSource.java         |  67 +--
 .../regionserver/ReplicationSourceShipper.java  | 359 +++++++++++++
 .../ReplicationSourceShipperThread.java         | 359 -------------
 .../ReplicationSourceWALReader.java             | 502 +++++++++++++++++++
 .../ReplicationSourceWALReaderThread.java       | 502 -------------------
 .../regionserver/TestWALEntryStream.java        |   9 +-
 10 files changed, 1124 insertions(+), 1054 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c6e71f15/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index d3bcff1..158330e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -61,22 +61,33 @@ public class RecoveredReplicationSource extends ReplicationSource {
   }
 
   @Override
-  protected void tryStartNewShipperThread(String walGroupId, PriorityBlockingQueue<Path> queue) {
-    final RecoveredReplicationSourceShipperThread worker =
-        new RecoveredReplicationSourceShipperThread(conf, walGroupId, queue, this,
+  protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
+    final RecoveredReplicationSourceShipper worker =
+        new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this,
             this.replicationQueues);
-    ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(walGroupId, worker);
+    ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
     if (extant != null) {
       LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
     } else {
       LOG.debug("Starting up worker for wal group " + walGroupId);
       worker.startup(getUncaughtExceptionHandler());
       worker.setWALReader(
-        startNewWALReaderThread(worker.getName(), walGroupId, queue, worker.getStartPosition()));
+        startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition()));
       workerThreads.put(walGroupId, worker);
     }
   }
 
+  @Override
+  protected ReplicationSourceWALReader startNewWALReader(String threadName,
+      String walGroupId, PriorityBlockingQueue<Path> queue, long startPosition) {
+    ReplicationSourceWALReader walReader = new RecoveredReplicationSourceWALReader(fs,
+        conf, queue, startPosition, walEntryFilter, this);
+    Threads.setDaemonThreadRunning(walReader, threadName
+        + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode,
+      getUncaughtExceptionHandler());
+    return walReader;
+  }
+
   public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOException {
     boolean hasPathChanged = false;
     PriorityBlockingQueue<Path> newPaths =
@@ -161,7 +172,7 @@ public class RecoveredReplicationSource extends ReplicationSource {
     synchronized (workerThreads) {
       Threads.sleep(100);// wait a short while for other worker thread to fully exit
       boolean allTasksDone = true;
-      for (ReplicationSourceShipperThread worker : workerThreads.values()) {
+      for (ReplicationSourceShipper worker : workerThreads.values()) {
         if (!worker.isFinished()) {
           allTasksDone = false;
           break;

http://git-wip-us.apache.org/repos/asf/hbase/blob/c6e71f15/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
new file mode 100644
index 0000000..a737910
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
@@ -0,0 +1,151 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.util.concurrent.PriorityBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
+import org.apache.hadoop.hbase.util.Threads;
+
+/**
+ *  Used by a {@link RecoveredReplicationSource}.
+ */
+@InterfaceAudience.Private
+public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper {
+
+  private static final Log LOG = LogFactory.getLog(RecoveredReplicationSourceShipper.class);
+  protected final RecoveredReplicationSource source;
+  private final ReplicationQueues replicationQueues;
+
+  public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId,
+      PriorityBlockingQueue<Path> queue, RecoveredReplicationSource source,
+      ReplicationQueues replicationQueues) {
+    super(conf, walGroupId, queue, source);
+    this.source = source;
+    this.replicationQueues = replicationQueues;
+  }
+
+  @Override
+  public void run() {
+    setWorkerState(WorkerState.RUNNING);
+    // Loop until we close down
+    while (isActive()) {
+      int sleepMultiplier = 1;
+      // Sleep until replication is enabled again
+      if (!source.isPeerEnabled()) {
+        if (source.sleepForRetries("Replication is disabled", sleepMultiplier)) {
+          sleepMultiplier++;
+        }
+        continue;
+      }
+
+      while (entryReader == null) {
+        if (source.sleepForRetries("Replication WAL entry reader thread not initialized",
+          sleepMultiplier)) {
+          sleepMultiplier++;
+        }
+      }
+
+      try {
+        WALEntryBatch entryBatch = entryReader.take();
+        shipEdits(entryBatch);
+        if (entryBatch.getWalEntries().isEmpty()
+            && entryBatch.getLastSeqIds().isEmpty()) {
+          LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
+              + source.getPeerClusterZnode());
+          source.getSourceMetrics().incrCompletedRecoveryQueue();
+          setWorkerState(WorkerState.FINISHED);
+          continue;
+        }
+      } catch (InterruptedException e) {
+        LOG.trace("Interrupted while waiting for next replication entry batch", e);
+        Thread.currentThread().interrupt();
+      }
+    }
+    source.tryFinish();
+    // If the worker exits run loop without finishing its task, mark it as stopped.
+    if (!isFinished()) {
+      setWorkerState(WorkerState.STOPPED);
+    }
+  }
+
+  @Override
+  public long getStartPosition() {
+    long startPosition = getRecoveredQueueStartPos();
+    int numRetries = 0;
+    while (numRetries <= maxRetriesMultiplier) {
+      try {
+        source.locateRecoveredPaths(queue);
+        break;
+      } catch (IOException e) {
+        LOG.error("Error while locating recovered queue paths, attempt #" + numRetries);
+        numRetries++;
+      }
+    }
+    return startPosition;
+  }
+
+  // If this is a recovered queue, the queue is already full and the first log
+  // normally has a position (unless the RS failed between 2 logs)
+  private long getRecoveredQueueStartPos() {
+    long startPosition = 0;
+    String peerClusterZnode = source.getPeerClusterZnode();
+    try {
+      startPosition = this.replicationQueues.getLogPosition(peerClusterZnode,
+        this.queue.peek().getName());
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
+            + startPosition);
+      }
+    } catch (ReplicationException e) {
+      terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
+    }
+    return startPosition;
+  }
+
+  @Override
+  protected void updateLogPosition(long lastReadPosition) {
+    source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getPeerClusterZnode(),
+      lastReadPosition, true, false);
+    lastLoggedPosition = lastReadPosition;
+  }
+
+  private void terminate(String reason, Exception cause) {
+    if (cause == null) {
+      LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason);
+
+    } else {
+      LOG.error("Closing worker for wal group " + this.walGroupId
+          + " because an error occurred: " + reason, cause);
+    }
+    entryReader.interrupt();
+    Threads.shutdown(entryReader, sleepForRetries);
+    this.interrupt();
+    Threads.shutdown(this, sleepForRetries);
+    LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c6e71f15/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipperThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipperThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipperThread.java
deleted file mode 100644
index 65aeb2f..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipperThread.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import java.io.IOException;
-import java.util.concurrent.PriorityBlockingQueue;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationQueues;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
-import org.apache.hadoop.hbase.util.Threads;
-
-/**
- *  Used by a {@link RecoveredReplicationSource}.
- */
-@InterfaceAudience.Private
-public class RecoveredReplicationSourceShipperThread extends ReplicationSourceShipperThread {
-
-  private static final Log LOG = LogFactory.getLog(RecoveredReplicationSourceShipperThread.class);
-  protected final RecoveredReplicationSource source;
-  private final ReplicationQueues replicationQueues;
-
-  public RecoveredReplicationSourceShipperThread(Configuration conf, String walGroupId,
-      PriorityBlockingQueue<Path> queue, RecoveredReplicationSource source,
-      ReplicationQueues replicationQueues) {
-    super(conf, walGroupId, queue, source);
-    this.source = source;
-    this.replicationQueues = replicationQueues;
-  }
-
-  @Override
-  public void run() {
-    setWorkerState(WorkerState.RUNNING);
-    // Loop until we close down
-    while (isActive()) {
-      int sleepMultiplier = 1;
-      // Sleep until replication is enabled again
-      if (!source.isPeerEnabled()) {
-        if (source.sleepForRetries("Replication is disabled", sleepMultiplier)) {
-          sleepMultiplier++;
-        }
-        continue;
-      }
-
-      while (entryReader == null) {
-        if (source.sleepForRetries("Replication WAL entry reader thread not initialized",
-          sleepMultiplier)) {
-          sleepMultiplier++;
-        }
-      }
-
-      try {
-        WALEntryBatch entryBatch = entryReader.take();
-        shipEdits(entryBatch);
-        if (entryBatch.getWalEntries().isEmpty()
-            && entryBatch.getLastSeqIds().isEmpty()) {
-          LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
-              + source.getPeerClusterZnode());
-          source.getSourceMetrics().incrCompletedRecoveryQueue();
-          setWorkerState(WorkerState.FINISHED);
-          continue;
-        }
-      } catch (InterruptedException e) {
-        LOG.trace("Interrupted while waiting for next replication entry batch", e);
-        Thread.currentThread().interrupt();
-      }
-    }
-    source.tryFinish();
-    // If the worker exits run loop without finishing its task, mark it as stopped.
-    if (!isFinished()) {
-      setWorkerState(WorkerState.STOPPED);
-    }
-  }
-
-  @Override
-  public long getStartPosition() {
-    long startPosition = getRecoveredQueueStartPos();
-    int numRetries = 0;
-    while (numRetries <= maxRetriesMultiplier) {
-      try {
-        source.locateRecoveredPaths(queue);
-        break;
-      } catch (IOException e) {
-        LOG.error("Error while locating recovered queue paths, attempt #" + numRetries);
-        numRetries++;
-      }
-    }
-    return startPosition;
-  }
-
-  // If this is a recovered queue, the queue is already full and the first log
-  // normally has a position (unless the RS failed between 2 logs)
-  private long getRecoveredQueueStartPos() {
-    long startPosition = 0;
-    String peerClusterZnode = source.getPeerClusterZnode();
-    try {
-      startPosition = this.replicationQueues.getLogPosition(peerClusterZnode,
-        this.queue.peek().getName());
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
-            + startPosition);
-      }
-    } catch (ReplicationException e) {
-      terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
-    }
-    return startPosition;
-  }
-
-  @Override
-  protected void updateLogPosition(long lastReadPosition) {
-    source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getPeerClusterZnode(),
-      lastReadPosition, true, false);
-    lastLoggedPosition = lastReadPosition;
-  }
-
-  private void terminate(String reason, Exception cause) {
-    if (cause == null) {
-      LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason);
-
-    } else {
-      LOG.error("Closing worker for wal group " + this.walGroupId
-          + " because an error occurred: " + reason, cause);
-    }
-    entryReader.interrupt();
-    Threads.shutdown(entryReader, sleepForRetries);
-    this.interrupt();
-    Threads.shutdown(this, sleepForRetries);
-    LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c6e71f15/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java
new file mode 100644
index 0000000..6462a2a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java
@@ -0,0 +1,55 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.util.concurrent.PriorityBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.replication.WALEntryFilter;
+
+/**
+ * Used by a {@link RecoveredReplicationSourceShipper}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class RecoveredReplicationSourceWALReader extends ReplicationSourceWALReader {
+  private static final Log LOG = LogFactory.getLog(RecoveredReplicationSourceWALReader.class);
+
+  public RecoveredReplicationSourceWALReader(FileSystem fs, Configuration conf,
+      PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter,
+      ReplicationSource source) {
+    super(fs, conf, logQueue, startPosition, filter, source);
+  }
+
+  protected void handleEmptyWALEntryBatch(WALEntryBatch batch, Path currentPath)
+      throws InterruptedException {
+    LOG.trace("Didn't read any new entries from WAL");
+    // we're done with queue recovery, shut ourself down
+    setReaderRunning(false);
+    // shuts down shipper thread immediately
+    entryBatchQueue.put(batch != null ? batch
+        : new WALEntryBatch(replicationBatchCountCapacity, currentPath));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c6e71f15/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 1dbf07f..3d4353f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -119,12 +119,12 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
   // ReplicationEndpoint which will handle the actual replication
   private ReplicationEndpoint replicationEndpoint;
   // A filter (or a chain of filters) for the WAL entries.
-  private WALEntryFilter walEntryFilter;
+  protected WALEntryFilter walEntryFilter;
   // throttler
   private ReplicationThrottler throttler;
   private long defaultBandwidth;
   private long currentBandwidth;
-  protected final ConcurrentHashMap<String, ReplicationSourceShipperThread> workerThreads =
+  protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
       new ConcurrentHashMap<>();
 
   private AtomicLong totalBufferUsed;
@@ -197,7 +197,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
         // new wal group observed after source startup, start a new worker thread to track it
         // notice: it's possible that log enqueued when this.running is set but worker thread
         // still not launched, so it's necessary to check workerThreads before start the worker
-        tryStartNewShipperThread(logPrefix, queue);
+        tryStartNewShipper(logPrefix, queue);
       }
     }
     queue.put(log);
@@ -255,15 +255,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
       throw new RuntimeException(ex);
     }
 
-    // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
-    ArrayList<WALEntryFilter> filters = Lists.newArrayList(
-      (WALEntryFilter)new SystemTableWALEntryFilter());
-    WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
-    if (filterFromEndpoint != null) {
-      filters.add(filterFromEndpoint);
-    }
-    this.walEntryFilter = new ChainWALEntryFilter(filters);
-
     int sleepMultiplier = 1;
     // delay this until we are in an asynchronous thread
     while (this.isSourceActive() && this.peerClusterId == null) {
@@ -285,40 +276,50 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
       return;
     }
     LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
+
+    initializeWALEntryFilter();
     // start workers
     for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
       String walGroupId = entry.getKey();
       PriorityBlockingQueue<Path> queue = entry.getValue();
-      tryStartNewShipperThread(walGroupId, queue);
+      tryStartNewShipper(walGroupId, queue);
     }
   }
 
-  protected void tryStartNewShipperThread(String walGroupId, PriorityBlockingQueue<Path> queue) {
-    final ReplicationSourceShipperThread worker = new ReplicationSourceShipperThread(conf,
+  private void initializeWALEntryFilter() {
+    // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
+    ArrayList<WALEntryFilter> filters = Lists.newArrayList(
+      (WALEntryFilter)new SystemTableWALEntryFilter());
+    WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
+    if (filterFromEndpoint != null) {
+      filters.add(filterFromEndpoint);
+    }
+    filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
+    this.walEntryFilter = new ChainWALEntryFilter(filters);
+  }
+
+  protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
+    final ReplicationSourceShipper worker = new ReplicationSourceShipper(conf,
         walGroupId, queue, this);
-    ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(walGroupId, worker);
+    ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
     if (extant != null) {
       LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
     } else {
       LOG.debug("Starting up worker for wal group " + walGroupId);
       worker.startup(getUncaughtExceptionHandler());
-      worker.setWALReader(startNewWALReaderThread(worker.getName(), walGroupId, queue,
+      worker.setWALReader(startNewWALReader(worker.getName(), walGroupId, queue,
         worker.getStartPosition()));
       workerThreads.put(walGroupId, worker);
     }
   }
 
-  protected ReplicationSourceWALReaderThread startNewWALReaderThread(String threadName,
-      String walGroupId, PriorityBlockingQueue<Path> queue, long startPosition) {
-    ArrayList<WALEntryFilter> filters = Lists.newArrayList(walEntryFilter,
-      new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
-    ChainWALEntryFilter readerFilter = new ChainWALEntryFilter(filters);
-    ReplicationSourceWALReaderThread walReader = new ReplicationSourceWALReaderThread(manager,
-        replicationQueueInfo, queue, startPosition, fs, conf, readerFilter, metrics);
-    Threads.setDaemonThreadRunning(walReader, threadName
-        + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode,
+  protected ReplicationSourceWALReader startNewWALReader(String threadName, String walGroupId,
+      PriorityBlockingQueue<Path> queue, long startPosition) {
+    ReplicationSourceWALReader walReader =
+        new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
+    return (ReplicationSourceWALReader) Threads.setDaemonThreadRunning(walReader,
+      threadName + ".replicationSource.wal-reader." + walGroupId + "," + peerClusterZnode,
       getUncaughtExceptionHandler());
-    return walReader;
   }
 
   public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
@@ -446,8 +447,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
           + " because an error occurred: " + reason, cause);
     }
     this.sourceRunning = false;
-    Collection<ReplicationSourceShipperThread> workers = workerThreads.values();
-    for (ReplicationSourceShipperThread worker : workers) {
+    Collection<ReplicationSourceShipper> workers = workerThreads.values();
+    for (ReplicationSourceShipper worker : workers) {
       worker.stopWorker();
       worker.entryReader.interrupt();
       worker.interrupt();
@@ -457,7 +458,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
       future = this.replicationEndpoint.stop();
     }
     if (join) {
-      for (ReplicationSourceShipperThread worker : workers) {
+      for (ReplicationSourceShipper worker : workers) {
         Threads.shutdown(worker, this.sleepForRetries);
         LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated");
       }
@@ -486,7 +487,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
   @Override
   public Path getCurrentPath() {
     // only for testing
-    for (ReplicationSourceShipperThread worker : workerThreads.values()) {
+    for (ReplicationSourceShipper worker : workerThreads.values()) {
       if (worker.getCurrentPath() != null) return worker.getCurrentPath();
     }
     return null;
@@ -524,9 +525,9 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
     StringBuilder sb = new StringBuilder();
     sb.append("Total replicated edits: ").append(totalReplicatedEdits)
         .append(", current progress: \n");
-    for (Map.Entry<String, ReplicationSourceShipperThread> entry : workerThreads.entrySet()) {
+    for (Map.Entry<String, ReplicationSourceShipper> entry : workerThreads.entrySet()) {
       String walGroupId = entry.getKey();
-      ReplicationSourceShipperThread worker = entry.getValue();
+      ReplicationSourceShipper worker = entry.getValue();
       long position = worker.getCurrentPosition();
       Path currentPath = worker.getCurrentPath();
       sb.append("walGroup [").append(walGroupId).append("]: ");

http://git-wip-us.apache.org/repos/asf/hbase/blob/c6e71f15/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
new file mode 100644
index 0000000..3e1e50b
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -0,0 +1,359 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+/**
+ * This thread reads entries from a queue and ships them. Entries are placed onto the queue by
+ * ReplicationSourceWALReaderThread
+ */
+@InterfaceAudience.Private
+public class ReplicationSourceShipper extends Thread {
+  private static final Log LOG = LogFactory.getLog(ReplicationSourceShipper.class);
+
+  // Hold the state of a replication worker thread
+  public enum WorkerState {
+    RUNNING,
+    STOPPED,
+    FINISHED,  // The worker is done processing a recovered queue
+  }
+
+  protected final Configuration conf;
+  protected final String walGroupId;
+  protected final PriorityBlockingQueue<Path> queue;
+  protected final ReplicationSourceInterface source;
+
+  // Last position in the log that we sent to ZooKeeper
+  protected long lastLoggedPosition = -1;
+  // Path of the current log
+  protected volatile Path currentPath;
+  // Current state of the worker thread
+  private WorkerState state;
+  protected ReplicationSourceWALReader entryReader;
+
+  // How long should we sleep for each retry
+  protected final long sleepForRetries;
+  // Maximum number of retries before taking bold actions
+  protected final int maxRetriesMultiplier;
+
+  // Use guava cache to set ttl for each key
+  private final LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder()
+      .expireAfterAccess(1, TimeUnit.DAYS).build(
+      new CacheLoader<String, Boolean>() {
+        @Override
+        public Boolean load(String key) throws Exception {
+          return false;
+        }
+      }
+  );
+
+  public ReplicationSourceShipper(Configuration conf, String walGroupId,
+      PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) {
+    this.conf = conf;
+    this.walGroupId = walGroupId;
+    this.queue = queue;
+    this.source = source;
+    this.sleepForRetries =
+        this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
+    this.maxRetriesMultiplier =
+        this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
+  }
+
+  @Override
+  public void run() {
+    setWorkerState(WorkerState.RUNNING);
+    // Loop until we close down
+    while (isActive()) {
+      int sleepMultiplier = 1;
+      // Sleep until replication is enabled again
+      if (!source.isPeerEnabled()) {
+        if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
+          sleepMultiplier++;
+        }
+        continue;
+      }
+
+      while (entryReader == null) {
+        if (sleepForRetries("Replication WAL entry reader thread not initialized",
+          sleepMultiplier)) {
+          sleepMultiplier++;
+        }
+      }
+
+      try {
+        WALEntryBatch entryBatch = entryReader.take();
+        for (Map.Entry<String, Long> entry : entryBatch.getLastSeqIds().entrySet()) {
+          waitingUntilCanPush(entry);
+        }
+        shipEdits(entryBatch);
+      } catch (InterruptedException e) {
+        LOG.trace("Interrupted while waiting for next replication entry batch", e);
+        Thread.currentThread().interrupt();
+      }
+    }
+    // If the worker exits run loop without finishing its task, mark it as stopped.
+    if (state != WorkerState.FINISHED) {
+      setWorkerState(WorkerState.STOPPED);
+    }
+  }
+
+  /**
+   * Do the shipping logic
+   */
+  protected void shipEdits(WALEntryBatch entryBatch) {
+    List<Entry> entries = entryBatch.getWalEntries();
+    long lastReadPosition = entryBatch.getLastWalPosition();
+    currentPath = entryBatch.getLastWalPath();
+    int sleepMultiplier = 0;
+    if (entries.isEmpty()) {
+      if (lastLoggedPosition != lastReadPosition) {
+        // Save positions to meta table before zk.
+        updateSerialRepPositions(entryBatch.getLastSeqIds());
+        updateLogPosition(lastReadPosition);
+        // if there was nothing to ship and it's not an error
+        // set "ageOfLastShippedOp" to <now> to indicate that we're current
+        source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(),
+          walGroupId);
+      }
+      return;
+    }
+    int currentSize = (int) entryBatch.getHeapSize();
+    while (isActive()) {
+      try {
+        try {
+          source.tryThrottle(currentSize);
+        } catch (InterruptedException e) {
+          LOG.debug("Interrupted while sleeping for throttling control");
+          Thread.currentThread().interrupt();
+          // current thread might be interrupted to terminate
+          // directly go back to while() for confirm this
+          continue;
+        }
+
+        // create replicateContext here, so the entries can be GC'd upon return from this call
+        // stack
+        ReplicationEndpoint.ReplicateContext replicateContext =
+            new ReplicationEndpoint.ReplicateContext();
+        replicateContext.setEntries(entries).setSize(currentSize);
+        replicateContext.setWalGroupId(walGroupId);
+
+        long startTimeNs = System.nanoTime();
+        // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
+        boolean replicated = source.getReplicationEndpoint().replicate(replicateContext);
+        long endTimeNs = System.nanoTime();
+
+        if (!replicated) {
+          continue;
+        } else {
+          sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
+        }
+
+        if (this.lastLoggedPosition != lastReadPosition) {
+          //Clean up hfile references
+          int size = entries.size();
+          for (int i = 0; i < size; i++) {
+            cleanUpHFileRefs(entries.get(i).getEdit());
+          }
+
+          // Save positions to meta table before zk.
+          updateSerialRepPositions(entryBatch.getLastSeqIds());
+          //Log and clean up WAL logs
+          updateLogPosition(lastReadPosition);
+        }
+
+        source.postShipEdits(entries, currentSize);
+        // FIXME check relationship between wal group and overall
+        source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize,
+          entryBatch.getNbHFiles());
+        source.getSourceMetrics().setAgeOfLastShippedOp(
+          entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Replicated " + entries.size() + " entries or " + entryBatch.getNbOperations()
+              + " operations in " + ((endTimeNs - startTimeNs) / 1000000) + " ms");
+        }
+        break;
+      } catch (Exception ex) {
+        LOG.warn(source.getReplicationEndpoint().getClass().getName() + " threw unknown exception:"
+            + org.apache.hadoop.util.StringUtils.stringifyException(ex));
+        if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
+          sleepMultiplier++;
+        }
+      }
+    }
+  }
+
+  private void waitingUntilCanPush(Map.Entry<String, Long> entry) {
+    String key = entry.getKey();
+    long seq = entry.getValue();
+    boolean deleteKey = false;
+    if (seq <= 0) {
+      // There is a REGION_CLOSE marker, we can not continue skipping after this entry.
+      deleteKey = true;
+      seq = -seq;
+    }
+
+    if (!canSkipWaitingSet.getUnchecked(key)) {
+      try {
+        source.getSourceManager().waitUntilCanBePushed(Bytes.toBytes(key), seq, source.getPeerId());
+      } catch (IOException e) {
+        LOG.error("waitUntilCanBePushed fail", e);
+        throw new RuntimeException("waitUntilCanBePushed fail");
+      } catch (InterruptedException e) {
+        LOG.warn("waitUntilCanBePushed interrupted", e);
+        Thread.currentThread().interrupt();
+      }
+      canSkipWaitingSet.put(key, true);
+    }
+    if (deleteKey) {
+      canSkipWaitingSet.invalidate(key);
+    }
+  }
+
+  private void cleanUpHFileRefs(WALEdit edit) throws IOException {
+    String peerId = source.getPeerId();
+    if (peerId.contains("-")) {
+      // peerClusterZnode will be in the form peerId + "-" + rsZNode.
+      // A peerId will not have "-" in its name, see HBASE-11394
+      peerId = peerId.split("-")[0];
+    }
+    List<Cell> cells = edit.getCells();
+    int totalCells = cells.size();
+    for (int i = 0; i < totalCells; i++) {
+      Cell cell = cells.get(i);
+      if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
+        BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
+        List<StoreDescriptor> stores = bld.getStoresList();
+        int totalStores = stores.size();
+        for (int j = 0; j < totalStores; j++) {
+          List<String> storeFileList = stores.get(j).getStoreFileList();
+          source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList);
+          source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size());
+        }
+      }
+    }
+  }
+
+  protected void updateLogPosition(long lastReadPosition) {
+    source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getPeerClusterZnode(),
+      lastReadPosition, false, false);
+    lastLoggedPosition = lastReadPosition;
+  }
+
+  private void updateSerialRepPositions(Map<String, Long> lastPositionsForSerialScope) {
+    try {
+      MetaTableAccessor.updateReplicationPositions(source.getSourceManager().getConnection(),
+        source.getPeerId(), lastPositionsForSerialScope);
+    } catch (IOException e) {
+      LOG.error("updateReplicationPositions fail", e);
+      throw new RuntimeException("updateReplicationPositions fail");
+    }
+  }
+
+  public void startup(UncaughtExceptionHandler handler) {
+    String name = Thread.currentThread().getName();
+    Threads.setDaemonThreadRunning(this, name + ".replicationSource." + walGroupId + ","
+        + source.getPeerClusterZnode(), handler);
+  }
+
+  public PriorityBlockingQueue<Path> getLogQueue() {
+    return this.queue;
+  }
+
+  public Path getCurrentPath() {
+    return this.entryReader.getCurrentPath();
+  }
+
+  public long getCurrentPosition() {
+    return this.lastLoggedPosition;
+  }
+
+  public void setWALReader(ReplicationSourceWALReader entryReader) {
+    this.entryReader = entryReader;
+  }
+
+  public long getStartPosition() {
+    return 0;
+  }
+
+  protected boolean isActive() {
+    return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted();
+  }
+
+  public void setWorkerState(WorkerState state) {
+    this.state = state;
+  }
+
+  public WorkerState getWorkerState() {
+    return state;
+  }
+
+  public void stopWorker() {
+    setWorkerState(WorkerState.STOPPED);
+  }
+
+  public boolean isFinished() {
+    return state == WorkerState.FINISHED;
+  }
+
+  /**
+   * Do the sleeping logic
+   * @param msg Why we sleep
+   * @param sleepMultiplier by how many times the default sleeping time is augmented
+   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
+   */
+  public boolean sleepForRetries(String msg, int sleepMultiplier) {
+    try {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
+      }
+      Thread.sleep(this.sleepForRetries * sleepMultiplier);
+    } catch (InterruptedException e) {
+      LOG.debug("Interrupted while sleeping between retries");
+      Thread.currentThread().interrupt();
+    }
+    return sleepMultiplier < maxRetriesMultiplier;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c6e71f15/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
deleted file mode 100644
index 6807da2..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
+++ /dev/null
@@ -1,359 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-
-/**
- * This thread reads entries from a queue and ships them. Entries are placed onto the queue by
- * ReplicationSourceWALReaderThread
- */
-@InterfaceAudience.Private
-public class ReplicationSourceShipperThread extends Thread {
-  private static final Log LOG = LogFactory.getLog(ReplicationSourceShipperThread.class);
-
-  // Hold the state of a replication worker thread
-  public enum WorkerState {
-    RUNNING,
-    STOPPED,
-    FINISHED,  // The worker is done processing a recovered queue
-  }
-
-  protected final Configuration conf;
-  protected final String walGroupId;
-  protected final PriorityBlockingQueue<Path> queue;
-  protected final ReplicationSourceInterface source;
-
-  // Last position in the log that we sent to ZooKeeper
-  protected long lastLoggedPosition = -1;
-  // Path of the current log
-  protected volatile Path currentPath;
-  // Current state of the worker thread
-  private WorkerState state;
-  protected ReplicationSourceWALReaderThread entryReader;
-
-  // How long should we sleep for each retry
-  protected final long sleepForRetries;
-  // Maximum number of retries before taking bold actions
-  protected final int maxRetriesMultiplier;
-
-  // Use guava cache to set ttl for each key
-  private final LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder()
-      .expireAfterAccess(1, TimeUnit.DAYS).build(
-      new CacheLoader<String, Boolean>() {
-        @Override
-        public Boolean load(String key) throws Exception {
-          return false;
-        }
-      }
-  );
-
-  public ReplicationSourceShipperThread(Configuration conf, String walGroupId,
-      PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) {
-    this.conf = conf;
-    this.walGroupId = walGroupId;
-    this.queue = queue;
-    this.source = source;
-    this.sleepForRetries =
-        this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
-    this.maxRetriesMultiplier =
-        this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
-  }
-
-  @Override
-  public void run() {
-    setWorkerState(WorkerState.RUNNING);
-    // Loop until we close down
-    while (isActive()) {
-      int sleepMultiplier = 1;
-      // Sleep until replication is enabled again
-      if (!source.isPeerEnabled()) {
-        if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
-          sleepMultiplier++;
-        }
-        continue;
-      }
-
-      while (entryReader == null) {
-        if (sleepForRetries("Replication WAL entry reader thread not initialized",
-          sleepMultiplier)) {
-          sleepMultiplier++;
-        }
-      }
-
-      try {
-        WALEntryBatch entryBatch = entryReader.take();
-        for (Map.Entry<String, Long> entry : entryBatch.getLastSeqIds().entrySet()) {
-          waitingUntilCanPush(entry);
-        }
-        shipEdits(entryBatch);
-      } catch (InterruptedException e) {
-        LOG.trace("Interrupted while waiting for next replication entry batch", e);
-        Thread.currentThread().interrupt();
-      }
-    }
-    // If the worker exits run loop without finishing its task, mark it as stopped.
-    if (state != WorkerState.FINISHED) {
-      setWorkerState(WorkerState.STOPPED);
-    }
-  }
-
-  /**
-   * Do the shipping logic
-   */
-  protected void shipEdits(WALEntryBatch entryBatch) {
-    List<Entry> entries = entryBatch.getWalEntries();
-    long lastReadPosition = entryBatch.getLastWalPosition();
-    currentPath = entryBatch.getLastWalPath();
-    int sleepMultiplier = 0;
-    if (entries.isEmpty()) {
-      if (lastLoggedPosition != lastReadPosition) {
-        // Save positions to meta table before zk.
-        updateSerialRepPositions(entryBatch.getLastSeqIds());
-        updateLogPosition(lastReadPosition);
-        // if there was nothing to ship and it's not an error
-        // set "ageOfLastShippedOp" to <now> to indicate that we're current
-        source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(),
-          walGroupId);
-      }
-      return;
-    }
-    int currentSize = (int) entryBatch.getHeapSize();
-    while (isActive()) {
-      try {
-        try {
-          source.tryThrottle(currentSize);
-        } catch (InterruptedException e) {
-          LOG.debug("Interrupted while sleeping for throttling control");
-          Thread.currentThread().interrupt();
-          // current thread might be interrupted to terminate
-          // directly go back to while() for confirm this
-          continue;
-        }
-
-        // create replicateContext here, so the entries can be GC'd upon return from this call
-        // stack
-        ReplicationEndpoint.ReplicateContext replicateContext =
-            new ReplicationEndpoint.ReplicateContext();
-        replicateContext.setEntries(entries).setSize(currentSize);
-        replicateContext.setWalGroupId(walGroupId);
-
-        long startTimeNs = System.nanoTime();
-        // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
-        boolean replicated = source.getReplicationEndpoint().replicate(replicateContext);
-        long endTimeNs = System.nanoTime();
-
-        if (!replicated) {
-          continue;
-        } else {
-          sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
-        }
-
-        if (this.lastLoggedPosition != lastReadPosition) {
-          //Clean up hfile references
-          int size = entries.size();
-          for (int i = 0; i < size; i++) {
-            cleanUpHFileRefs(entries.get(i).getEdit());
-          }
-
-          // Save positions to meta table before zk.
-          updateSerialRepPositions(entryBatch.getLastSeqIds());
-          //Log and clean up WAL logs
-          updateLogPosition(lastReadPosition);
-        }
-
-        source.postShipEdits(entries, currentSize);
-        // FIXME check relationship between wal group and overall
-        source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize,
-          entryBatch.getNbHFiles());
-        source.getSourceMetrics().setAgeOfLastShippedOp(
-          entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Replicated " + entries.size() + " entries or " + entryBatch.getNbOperations()
-              + " operations in " + ((endTimeNs - startTimeNs) / 1000000) + " ms");
-        }
-        break;
-      } catch (Exception ex) {
-        LOG.warn(source.getReplicationEndpoint().getClass().getName() + " threw unknown exception:"
-            + org.apache.hadoop.util.StringUtils.stringifyException(ex));
-        if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
-          sleepMultiplier++;
-        }
-      }
-    }
-  }
-
-  private void waitingUntilCanPush(Map.Entry<String, Long> entry) {
-    String key = entry.getKey();
-    long seq = entry.getValue();
-    boolean deleteKey = false;
-    if (seq <= 0) {
-      // There is a REGION_CLOSE marker, we can not continue skipping after this entry.
-      deleteKey = true;
-      seq = -seq;
-    }
-
-    if (!canSkipWaitingSet.getUnchecked(key)) {
-      try {
-        source.getSourceManager().waitUntilCanBePushed(Bytes.toBytes(key), seq, source.getPeerId());
-      } catch (IOException e) {
-        LOG.error("waitUntilCanBePushed fail", e);
-        throw new RuntimeException("waitUntilCanBePushed fail");
-      } catch (InterruptedException e) {
-        LOG.warn("waitUntilCanBePushed interrupted", e);
-        Thread.currentThread().interrupt();
-      }
-      canSkipWaitingSet.put(key, true);
-    }
-    if (deleteKey) {
-      canSkipWaitingSet.invalidate(key);
-    }
-  }
-
-  private void cleanUpHFileRefs(WALEdit edit) throws IOException {
-    String peerId = source.getPeerId();
-    if (peerId.contains("-")) {
-      // peerClusterZnode will be in the form peerId + "-" + rsZNode.
-      // A peerId will not have "-" in its name, see HBASE-11394
-      peerId = peerId.split("-")[0];
-    }
-    List<Cell> cells = edit.getCells();
-    int totalCells = cells.size();
-    for (int i = 0; i < totalCells; i++) {
-      Cell cell = cells.get(i);
-      if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
-        BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
-        List<StoreDescriptor> stores = bld.getStoresList();
-        int totalStores = stores.size();
-        for (int j = 0; j < totalStores; j++) {
-          List<String> storeFileList = stores.get(j).getStoreFileList();
-          source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList);
-          source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size());
-        }
-      }
-    }
-  }
-
-  protected void updateLogPosition(long lastReadPosition) {
-    source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getPeerClusterZnode(),
-      lastReadPosition, false, false);
-    lastLoggedPosition = lastReadPosition;
-  }
-
-  private void updateSerialRepPositions(Map<String, Long> lastPositionsForSerialScope) {
-    try {
-      MetaTableAccessor.updateReplicationPositions(source.getSourceManager().getConnection(),
-        source.getPeerId(), lastPositionsForSerialScope);
-    } catch (IOException e) {
-      LOG.error("updateReplicationPositions fail", e);
-      throw new RuntimeException("updateReplicationPositions fail");
-    }
-  }
-
-  public void startup(UncaughtExceptionHandler handler) {
-    String name = Thread.currentThread().getName();
-    Threads.setDaemonThreadRunning(this, name + ".replicationSource." + walGroupId + ","
-        + source.getPeerClusterZnode(), handler);
-  }
-
-  public PriorityBlockingQueue<Path> getLogQueue() {
-    return this.queue;
-  }
-
-  public Path getCurrentPath() {
-    return this.entryReader.getCurrentPath();
-  }
-
-  public long getCurrentPosition() {
-    return this.lastLoggedPosition;
-  }
-
-  public void setWALReader(ReplicationSourceWALReaderThread entryReader) {
-    this.entryReader = entryReader;
-  }
-
-  public long getStartPosition() {
-    return 0;
-  }
-
-  protected boolean isActive() {
-    return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted();
-  }
-
-  public void setWorkerState(WorkerState state) {
-    this.state = state;
-  }
-
-  public WorkerState getWorkerState() {
-    return state;
-  }
-
-  public void stopWorker() {
-    setWorkerState(WorkerState.STOPPED);
-  }
-
-  public boolean isFinished() {
-    return state == WorkerState.FINISHED;
-  }
-
-  /**
-   * Do the sleeping logic
-   * @param msg Why we sleep
-   * @param sleepMultiplier by how many times the default sleeping time is augmented
-   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
-   */
-  public boolean sleepForRetries(String msg, int sleepMultiplier) {
-    try {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
-      }
-      Thread.sleep(this.sleepForRetries * sleepMultiplier);
-    } catch (InterruptedException e) {
-      LOG.debug("Interrupted while sleeping between retries");
-      Thread.currentThread().interrupt();
-    }
-    return sleepMultiplier < maxRetriesMultiplier;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c6e71f15/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
new file mode 100644
index 0000000..04b596c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -0,0 +1,502 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
+import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
+import org.apache.hadoop.hbase.replication.WALEntryFilter;
+import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.WALEntryStreamRuntimeException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+
+/**
+ * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches onto a queue
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ReplicationSourceWALReader extends Thread {
+  private static final Log LOG = LogFactory.getLog(ReplicationSourceWALReader.class);
+
+  private final PriorityBlockingQueue<Path> logQueue;
+  private final FileSystem fs;
+  private final Configuration conf;
+  private final WALEntryFilter filter;
+  private final ReplicationSource source;
+
+  protected final BlockingQueue<WALEntryBatch> entryBatchQueue;
+  // max (heap) size of each batch - multiply by number of batches in queue to get total
+  private final long replicationBatchSizeCapacity;
+  // max count of each batch - multiply by number of batches in queue to get total
+  protected final int replicationBatchCountCapacity;
+  // position in the WAL to start reading at
+  private long currentPosition;
+  private final long sleepForRetries;
+  private final int maxRetriesMultiplier;
+  private final boolean eofAutoRecovery;
+
+  //Indicates whether this particular worker is running
+  private boolean isReaderRunning = true;
+
+  private AtomicLong totalBufferUsed;
+  private long totalBufferQuota;
+
+  /**
+   * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
+   * entries, and puts them on a batch queue.
+   * @param fs the files system to use
+   * @param conf configuration to use
+   * @param logQueue The WAL queue to read off of
+   * @param startPosition position in the first WAL to start reading from
+   * @param filter The filter to use while reading
+   * @param source replication source
+   */
+  public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
+      PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter,
+      ReplicationSource source) {
+    this.logQueue = logQueue;
+    this.currentPosition = startPosition;
+    this.fs = fs;
+    this.conf = conf;
+    this.filter = filter;
+    this.source = source;
+    this.replicationBatchSizeCapacity =
+        this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64);
+    this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000);
+    // memory used will be batchSizeCapacity * (nb.batches + 1)
+    // the +1 is for the current thread reading before placing onto the queue
+    int batchCount = conf.getInt("replication.source.nb.batches", 1);
+    this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed();
+    this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
+      HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
+    this.sleepForRetries =
+        this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
+    this.maxRetriesMultiplier =
+        this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
+    this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false);
+    this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
+    LOG.info("peerClusterZnode=" + source.getPeerClusterZnode()
+        + ", ReplicationSourceWALReaderThread : " + source.getPeerId()
+        + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
+        + ", replicationBatchCountCapacity=" + replicationBatchCountCapacity
+        + ", replicationBatchQueueCapacity=" + batchCount);
+  }
+
+  @Override
+  public void run() {
+    int sleepMultiplier = 1;
+    while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
+      try (WALEntryStream entryStream =
+          new WALEntryStream(logQueue, fs, conf, currentPosition, source.getSourceMetrics())) {
+        while (isReaderRunning()) { // loop here to keep reusing stream while we can
+          if (!checkQuota()) {
+            continue;
+          }
+          WALEntryBatch batch = readWALEntries(entryStream);
+          if (batch != null && (!batch.getLastSeqIds().isEmpty() || batch.getNbEntries() > 0)) {
+            if (LOG.isTraceEnabled()) {
+              LOG.trace(String.format("Read %s WAL entries eligible for replication",
+                batch.getNbEntries()));
+            }
+            entryBatchQueue.put(batch);
+            sleepMultiplier = 1;
+          } else { // got no entries and didn't advance position in WAL
+            handleEmptyWALEntryBatch(batch, entryStream.getCurrentPath());
+          }
+          currentPosition = entryStream.getPosition();
+          entryStream.reset(); // reuse stream
+        }
+      } catch (IOException | WALEntryStreamRuntimeException e) { // stream related
+        if (sleepMultiplier < maxRetriesMultiplier) {
+          LOG.debug("Failed to read stream of replication entries: " + e);
+          sleepMultiplier++;
+        } else {
+          LOG.error("Failed to read stream of replication entries", e);
+          handleEofException(e);
+        }
+        Threads.sleep(sleepForRetries * sleepMultiplier);
+      } catch (InterruptedException e) {
+        LOG.trace("Interrupted while sleeping between WAL reads");
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  private WALEntryBatch readWALEntries(WALEntryStream entryStream) throws IOException {
+    WALEntryBatch batch = null;
+    while (entryStream.hasNext()) {
+      if (batch == null) {
+        batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
+      }
+      Entry entry = entryStream.next();
+      if (updateSerialReplPos(batch, entry)) {
+        batch.lastWalPosition = entryStream.getPosition();
+        break;
+      }
+      entry = filterEntry(entry);
+      if (entry != null) {
+        WALEdit edit = entry.getEdit();
+        if (edit != null && !edit.isEmpty()) {
+          long entrySize = getEntrySize(entry);
+          batch.addEntry(entry);
+          updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
+          boolean totalBufferTooLarge = acquireBufferQuota(entrySize);
+          // Stop if too many entries or too big
+          if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
+              || batch.getNbEntries() >= replicationBatchCountCapacity) {
+            break;
+          }
+        }
+      }
+    }
+    return batch;
+  }
+
+  protected void handleEmptyWALEntryBatch(WALEntryBatch batch, Path currentPath)
+      throws InterruptedException {
+    LOG.trace("Didn't read any new entries from WAL");
+    Thread.sleep(sleepForRetries);
+  }
+
+  // if we get an EOF due to a zero-length log, and there are other logs in queue
+  // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
+  // enabled, then dump the log
+  private void handleEofException(Exception e) {
+    if (e.getCause() instanceof EOFException && logQueue.size() > 1 && this.eofAutoRecovery) {
+      try {
+        if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
+          LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());
+          logQueue.remove();
+          currentPosition = 0;
+        }
+      } catch (IOException ioe) {
+        LOG.warn("Couldn't get file length information about log " + logQueue.peek());
+      }
+    }
+  }
+
+  public Path getCurrentPath() {
+    // if we've read some WAL entries, get the Path we read from
+    WALEntryBatch batchQueueHead = entryBatchQueue.peek();
+    if (batchQueueHead != null) {
+      return batchQueueHead.lastWalPath;
+    }
+    // otherwise, we must be currently reading from the head of the log queue
+    return logQueue.peek();
+  }
+
+  //returns false if we've already exceeded the global quota
+  private boolean checkQuota() {
+    // try not to go over total quota
+    if (totalBufferUsed.get() > totalBufferQuota) {
+      Threads.sleep(sleepForRetries);
+      return false;
+    }
+    return true;
+  }
+
+  private Entry filterEntry(Entry entry) {
+    Entry filtered = filter.filter(entry);
+    if (entry != null && filtered == null) {
+      source.getSourceMetrics().incrLogEditsFiltered();
+    }
+    return filtered;
+  }
+
+  /**
+   * @return true if we should stop reading because we're at REGION_CLOSE
+   */
+  private boolean updateSerialReplPos(WALEntryBatch batch, Entry entry) throws IOException {
+    if (entry.hasSerialReplicationScope()) {
+      String key = Bytes.toString(entry.getKey().getEncodedRegionName());
+      batch.setLastPosition(key, entry.getKey().getSequenceId());
+      if (!entry.getEdit().getCells().isEmpty()) {
+        WALProtos.RegionEventDescriptor maybeEvent =
+            WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
+        if (maybeEvent != null && maybeEvent
+            .getEventType() == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE) {
+          // In serially replication, if we move a region to another RS and move it back, we may
+          // read logs crossing two sections. We should break at REGION_CLOSE and push the first
+          // section first in case of missing the middle section belonging to the other RS.
+          // In a worker thread, if we can push the first log of a region, we can push all logs
+          // in the same region without waiting until we read a close marker because next time
+          // we read logs in this region, it must be a new section and not adjacent with this
+          // region. Mark it negative.
+          batch.setLastPosition(key, -entry.getKey().getSequenceId());
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Retrieves the next batch of WAL entries from the queue, waiting up to the specified time for a
+   * batch to become available
+   * @return A batch of entries, along with the position in the log after reading the batch
+   * @throws InterruptedException if interrupted while waiting
+   */
+  public WALEntryBatch take() throws InterruptedException {
+    return entryBatchQueue.take();
+  }
+
+  private long getEntrySize(Entry entry) {
+    WALEdit edit = entry.getEdit();
+    return edit.heapSize() + calculateTotalSizeOfStoreFiles(edit);
+  }
+
+  private void updateBatchStats(WALEntryBatch batch, Entry entry, long entryPosition, long entrySize) {
+    WALEdit edit = entry.getEdit();
+    if (edit != null && !edit.isEmpty()) {
+      batch.incrementHeapSize(entrySize);
+      Pair<Integer, Integer> nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit);
+      batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst());
+      batch.incrementNbHFiles(nbRowsAndHFiles.getSecond());
+    }
+    batch.lastWalPosition = entryPosition;
+  }
+
+  /**
+   * Count the number of different row keys in the given edit because of mini-batching. We assume
+   * that there's at least one Cell in the WALEdit.
+   * @param edit edit to count row keys from
+   * @return number of different row keys and HFiles
+   */
+  private Pair<Integer, Integer> countDistinctRowKeysAndHFiles(WALEdit edit) {
+    List<Cell> cells = edit.getCells();
+    int distinctRowKeys = 1;
+    int totalHFileEntries = 0;
+    Cell lastCell = cells.get(0);
+
+    int totalCells = edit.size();
+    for (int i = 0; i < totalCells; i++) {
+      // Count HFiles to be replicated
+      if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
+        try {
+          BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
+          List<StoreDescriptor> stores = bld.getStoresList();
+          int totalStores = stores.size();
+          for (int j = 0; j < totalStores; j++) {
+            totalHFileEntries += stores.get(j).getStoreFileList().size();
+          }
+        } catch (IOException e) {
+          LOG.error("Failed to deserialize bulk load entry from wal edit. "
+              + "Then its hfiles count will not be added into metric.");
+        }
+      }
+
+      if (!CellUtil.matchingRows(cells.get(i), lastCell)) {
+        distinctRowKeys++;
+      }
+      lastCell = cells.get(i);
+    }
+
+    Pair<Integer, Integer> result = new Pair<>(distinctRowKeys, totalHFileEntries);
+    return result;
+  }
+
+  /**
+   * Calculate the total size of all the store files
+   * @param edit edit to count row keys from
+   * @return the total size of the store files
+   */
+  private int calculateTotalSizeOfStoreFiles(WALEdit edit) {
+    List<Cell> cells = edit.getCells();
+    int totalStoreFilesSize = 0;
+
+    int totalCells = edit.size();
+    for (int i = 0; i < totalCells; i++) {
+      if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
+        try {
+          BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
+          List<StoreDescriptor> stores = bld.getStoresList();
+          int totalStores = stores.size();
+          for (int j = 0; j < totalStores; j++) {
+            totalStoreFilesSize += stores.get(j).getStoreFileSizeBytes();
+          }
+        } catch (IOException e) {
+          LOG.error("Failed to deserialize bulk load entry from wal edit. "
+              + "Size of HFiles part of cell will not be considered in replication "
+              + "request size calculation.",
+            e);
+        }
+      }
+    }
+    return totalStoreFilesSize;
+  }
+
+  /**
+   * @param size delta size for grown buffer
+   * @return true if we should clear buffer and push all
+   */
+  private boolean acquireBufferQuota(long size) {
+    return totalBufferUsed.addAndGet(size) >= totalBufferQuota;
+  }
+
+  /**
+   * @return whether the reader thread is running
+   */
+  public boolean isReaderRunning() {
+    return isReaderRunning && !isInterrupted();
+  }
+
+  /**
+   * @param readerRunning the readerRunning to set
+   */
+  public void setReaderRunning(boolean readerRunning) {
+    this.isReaderRunning = readerRunning;
+  }
+
+  /**
+   * Holds a batch of WAL entries to replicate, along with some statistics
+   *
+   */
+  static class WALEntryBatch {
+    private List<Entry> walEntries;
+    // last WAL that was read
+    private Path lastWalPath;
+    // position in WAL of last entry in this batch
+    private long lastWalPosition = 0;
+    // number of distinct row keys in this batch
+    private int nbRowKeys = 0;
+    // number of HFiles
+    private int nbHFiles = 0;
+    // heap size of data we need to replicate
+    private long heapSize = 0;
+    // save the last sequenceid for each region if the table has serial-replication scope
+    private Map<String, Long> lastSeqIds = new HashMap<>();
+
+    /**
+     * @param walEntries
+     * @param lastWalPath Path of the WAL the last entry in this batch was read from
+     * @param lastWalPosition Position in the WAL the last entry in this batch was read from
+     */
+    WALEntryBatch(int maxNbEntries, Path lastWalPath) {
+      this.walEntries = new ArrayList<>(maxNbEntries);
+      this.lastWalPath = lastWalPath;
+    }
+
+    public void addEntry(Entry entry) {
+      walEntries.add(entry);
+    }
+
+    /**
+     * @return the WAL Entries.
+     */
+    public List<Entry> getWalEntries() {
+      return walEntries;
+    }
+
+    /**
+     * @return the path of the last WAL that was read.
+     */
+    public Path getLastWalPath() {
+      return lastWalPath;
+    }
+
+    /**
+     * @return the position in the last WAL that was read.
+     */
+    public long getLastWalPosition() {
+      return lastWalPosition;
+    }
+
+    public int getNbEntries() {
+      return walEntries.size();
+    }
+
+    /**
+     * @return the number of distinct row keys in this batch
+     */
+    public int getNbRowKeys() {
+      return nbRowKeys;
+    }
+
+    /**
+     * @return the number of HFiles in this batch
+     */
+    public int getNbHFiles() {
+      return nbHFiles;
+    }
+
+    /**
+     * @return total number of operations in this batch
+     */
+    public int getNbOperations() {
+      return getNbRowKeys() + getNbHFiles();
+    }
+
+    /**
+     * @return the heap size of this batch
+     */
+    public long getHeapSize() {
+      return heapSize;
+    }
+
+    /**
+     * @return the last sequenceid for each region if the table has serial-replication scope
+     */
+    public Map<String, Long> getLastSeqIds() {
+      return lastSeqIds;
+    }
+
+    private void incrementNbRowKeys(int increment) {
+      nbRowKeys += increment;
+    }
+
+    private void incrementNbHFiles(int increment) {
+      nbHFiles += increment;
+    }
+
+    private void incrementHeapSize(long increment) {
+      heapSize += increment;
+    }
+
+    private void setLastPosition(String region, Long sequenceId) {
+      getLastSeqIds().put(region, sequenceId);
+    }
+  }
+}