You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/04/09 07:22:16 UTC

[09/20] hbase git commit: HBASE-20167 Optimize the implementation of ReplicationSourceWALReader

HBASE-20167 Optimize the implementation of ReplicationSourceWALReader


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2d0d6a3b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2d0d6a3b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2d0d6a3b

Branch: refs/heads/branch-2
Commit: 2d0d6a3ba1bdeac37e898d37d41eb6b079fc9a6d
Parents: cea5199
Author: zhangduo <zh...@apache.org>
Authored: Mon Mar 12 12:21:44 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Mon Apr 9 15:18:44 2018 +0800

----------------------------------------------------------------------
 .../RecoveredReplicationSource.java             |  67 +++++------
 .../RecoveredReplicationSourceShipper.java      |  48 ++------
 .../RecoveredReplicationSourceWALReader.java    |  56 ----------
 .../regionserver/ReplicationSource.java         |  36 +++---
 .../regionserver/ReplicationSourceShipper.java  |  27 +++--
 .../ReplicationSourceWALReader.java             | 101 +++++------------
 .../SerialReplicationSourceWALReader.java       | 112 +++++++++++++++++++
 7 files changed, 218 insertions(+), 229 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/2d0d6a3b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index d9506c0..169b469 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -20,8 +20,8 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -59,31 +59,41 @@ public class RecoveredReplicationSource extends ReplicationSource {
   }
 
   @Override
-  protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
-    final RecoveredReplicationSourceShipper worker =
-        new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this,
-            this.queueStorage);
-    ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
-    if (extant != null) {
-      LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
-    } else {
-      LOG.debug("Starting up worker for wal group " + walGroupId);
-      worker.startup(this::uncaughtException);
-      worker.setWALReader(
-        startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition()));
-      workerThreads.put(walGroupId, worker);
-    }
+  protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId,
+      PriorityBlockingQueue<Path> queue) {
+    return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, queueStorage);
+  }
+
+  private void handleEmptyWALEntryBatch0(ReplicationSourceWALReader reader,
+      BlockingQueue<WALEntryBatch> entryBatchQueue, Path currentPath) throws InterruptedException {
+    LOG.trace("Didn't read any new entries from WAL");
+    // we're done with queue recovery, shut ourself down
+    reader.setReaderRunning(false);
+    // shuts down shipper thread immediately
+    entryBatchQueue.put(new WALEntryBatch(0, currentPath));
   }
 
   @Override
-  protected ReplicationSourceWALReader startNewWALReader(String threadName, String walGroupId,
+  protected ReplicationSourceWALReader createNewWALReader(String walGroupId,
       PriorityBlockingQueue<Path> queue, long startPosition) {
-    ReplicationSourceWALReader walReader =
-      new RecoveredReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
-    Threads.setDaemonThreadRunning(walReader,
-      threadName + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + queueId,
-      this::uncaughtException);
-    return walReader;
+    if (replicationPeer.getPeerConfig().isSerial()) {
+      return new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter,
+        this) {
+
+        @Override
+        protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
+          handleEmptyWALEntryBatch0(this, entryBatchQueue, currentPath);
+        }
+      };
+    } else {
+      return new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this) {
+
+        @Override
+        protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
+          handleEmptyWALEntryBatch0(this, entryBatchQueue, currentPath);
+        }
+      };
+    }
   }
 
   public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOException {
@@ -166,21 +176,14 @@ public class RecoveredReplicationSource extends ReplicationSource {
     return path;
   }
 
-  public void tryFinish() {
+  void tryFinish() {
     // use synchronize to make sure one last thread will clean the queue
     synchronized (workerThreads) {
       Threads.sleep(100);// wait a short while for other worker thread to fully exit
-      boolean allTasksDone = true;
-      for (ReplicationSourceShipper worker : workerThreads.values()) {
-        if (!worker.isFinished()) {
-          allTasksDone = false;
-          break;
-        }
-      }
+      boolean allTasksDone = workerThreads.values().stream().allMatch(w -> w.isFinished());
       if (allTasksDone) {
         manager.removeRecoveredSource(this);
-        LOG.info("Finished recovering queue " + queueId + " with the following stats: "
-            + getStats());
+        LOG.info("Finished recovering queue {} with the following stats: {}", queueId, getStats());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2d0d6a3b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
index 9c36497..1ae5cb9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
@@ -48,46 +48,18 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
   }
 
   @Override
-  public void run() {
-    setWorkerState(WorkerState.RUNNING);
-    // Loop until we close down
-    while (isActive()) {
-      int sleepMultiplier = 1;
-      // Sleep until replication is enabled again
-      if (!source.isPeerEnabled()) {
-        if (source.sleepForRetries("Replication is disabled", sleepMultiplier)) {
-          sleepMultiplier++;
-        }
-        continue;
-      }
-
-      while (entryReader == null) {
-        if (source.sleepForRetries("Replication WAL entry reader thread not initialized",
-          sleepMultiplier)) {
-          sleepMultiplier++;
-        }
-      }
-
-      try {
-        WALEntryBatch entryBatch = entryReader.take();
-        shipEdits(entryBatch);
-        if (entryBatch.getWalEntries().isEmpty()) {
-          LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
-              + source.getQueueId());
-          source.getSourceMetrics().incrCompletedRecoveryQueue();
-          setWorkerState(WorkerState.FINISHED);
-          continue;
-        }
-      } catch (InterruptedException e) {
-        LOG.trace("Interrupted while waiting for next replication entry batch", e);
-        Thread.currentThread().interrupt();
-      }
+  protected void postShipEdits(WALEntryBatch entryBatch) {
+    if (entryBatch.getWalEntries().isEmpty()) {
+      LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
+          + source.getQueueId());
+      source.getSourceMetrics().incrCompletedRecoveryQueue();
+      setWorkerState(WorkerState.FINISHED);
     }
+  }
+
+  @Override
+  protected void postFinish() {
     source.tryFinish();
-    // If the worker exits run loop without finishing its task, mark it as stopped.
-    if (!isFinished()) {
-      setWorkerState(WorkerState.STOPPED);
-    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/2d0d6a3b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java
deleted file mode 100644
index 114f139..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import java.util.concurrent.PriorityBlockingQueue;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.replication.WALEntryFilter;
-
-/**
- * Used by a {@link RecoveredReplicationSourceShipper}.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class RecoveredReplicationSourceWALReader extends ReplicationSourceWALReader {
-
-  private static final Logger LOG =
-    LoggerFactory.getLogger(RecoveredReplicationSourceWALReader.class);
-
-  public RecoveredReplicationSourceWALReader(FileSystem fs, Configuration conf,
-      PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter,
-      ReplicationSource source) {
-    super(fs, conf, logQueue, startPosition, filter, source);
-  }
-
-  @Override
-  protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
-    LOG.trace("Didn't read any new entries from WAL");
-    // we're done with queue recovery, shut ourself down
-    setReaderRunning(false);
-    // shuts down shipper thread immediately
-    entryBatchQueue.put(new WALEntryBatch(replicationBatchCountCapacity, currentPath));
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2d0d6a3b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index f5e4185..3480919 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -85,7 +85,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
   // per group queue size, keep no more than this number of logs in each wal group
   protected int queueSizePerGroup;
   protected ReplicationQueueStorage queueStorage;
-  private ReplicationPeer replicationPeer;
+  protected ReplicationPeer replicationPeer;
 
   protected Configuration conf;
   protected ReplicationQueueInfo replicationQueueInfo;
@@ -294,26 +294,32 @@ public class ReplicationSource implements ReplicationSourceInterface {
     this.walEntryFilter = new ChainWALEntryFilter(filters);
   }
 
-  protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
-    ReplicationSourceShipper worker = new ReplicationSourceShipper(conf, walGroupId, queue, this);
+  private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
+    ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
     ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
     if (extant != null) {
-      LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
+      LOG.debug("Someone has beat us to start a worker thread for wal group {}", walGroupId);
     } else {
-      LOG.debug("Starting up worker for wal group " + walGroupId);
+      LOG.debug("Starting up worker for wal group {}", walGroupId);
+      ReplicationSourceWALReader walReader =
+        createNewWALReader(walGroupId, queue, worker.getStartPosition());
+      Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName() +
+        ".replicationSource.wal-reader." + walGroupId + "," + queueId, this::uncaughtException);
+      worker.setWALReader(walReader);
       worker.startup(this::uncaughtException);
-      worker.setWALReader(
-        startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition()));
     }
   }
 
-  protected ReplicationSourceWALReader startNewWALReader(String threadName, String walGroupId,
+  protected ReplicationSourceShipper createNewShipper(String walGroupId,
+      PriorityBlockingQueue<Path> queue) {
+    return new ReplicationSourceShipper(conf, walGroupId, queue, this);
+  }
+
+  protected ReplicationSourceWALReader createNewWALReader(String walGroupId,
       PriorityBlockingQueue<Path> queue, long startPosition) {
-    ReplicationSourceWALReader walReader =
-      new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
-    return (ReplicationSourceWALReader) Threads.setDaemonThreadRunning(walReader,
-      threadName + ".replicationSource.wal-reader." + walGroupId + "," + queueId,
-      this::uncaughtException);
+    return replicationPeer.getPeerConfig().isSerial()
+      ? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this)
+      : new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
   }
 
   protected final void uncaughtException(Thread t, Throwable e) {
@@ -392,10 +398,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
     return replicationPeer.isPeerEnabled();
   }
 
-  public boolean isSerial() {
-    return replicationPeer.getPeerConfig().isSerial();
-  }
-
   private void initialize() {
     int sleepMultiplier = 1;
     while (this.isSourceActive()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/2d0d6a3b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 50aaf95..aa5251e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -83,7 +83,7 @@ public class ReplicationSourceShipper extends Thread {
   }
 
   @Override
-  public void run() {
+  public final void run() {
     setWorkerState(WorkerState.RUNNING);
     // Loop until we close down
     while (isActive()) {
@@ -95,28 +95,31 @@ public class ReplicationSourceShipper extends Thread {
         }
         continue;
       }
-
-      while (entryReader == null) {
-        if (sleepForRetries("Replication WAL entry reader thread not initialized",
-          sleepMultiplier)) {
-          sleepMultiplier++;
-        }
-      }
-
       try {
         WALEntryBatch entryBatch = entryReader.take();
         shipEdits(entryBatch);
+        postShipEdits(entryBatch);
       } catch (InterruptedException e) {
         LOG.trace("Interrupted while waiting for next replication entry batch", e);
         Thread.currentThread().interrupt();
       }
     }
     // If the worker exits run loop without finishing its task, mark it as stopped.
-    if (state != WorkerState.FINISHED) {
+    if (!isFinished()) {
       setWorkerState(WorkerState.STOPPED);
+    } else {
+      postFinish();
     }
   }
 
+  // To be implemented by recovered shipper
+  protected void postShipEdits(WALEntryBatch entryBatch) {
+  }
+
+  // To be implemented by recovered shipper
+  protected void postFinish() {
+  }
+
   /**
    * Do the shipping logic
    */
@@ -229,8 +232,8 @@ public class ReplicationSourceShipper extends Thread {
 
   public void startup(UncaughtExceptionHandler handler) {
     String name = Thread.currentThread().getName();
-    Threads.setDaemonThreadRunning(this, name + ".replicationSource." + walGroupId + ","
-        + source.getQueueId(), handler);
+    Threads.setDaemonThreadRunning(this,
+      name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), handler);
   }
 
   public PriorityBlockingQueue<Path> getLogQueue() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/2d0d6a3b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index da92a09..b125133 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -51,7 +50,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescript
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class ReplicationSourceWALReader extends Thread {
+class ReplicationSourceWALReader extends Thread {
   private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class);
 
   private final PriorityBlockingQueue<Path> logQueue;
@@ -64,28 +63,19 @@ public class ReplicationSourceWALReader extends Thread {
   // max (heap) size of each batch - multiply by number of batches in queue to get total
   private final long replicationBatchSizeCapacity;
   // max count of each batch - multiply by number of batches in queue to get total
-  protected final int replicationBatchCountCapacity;
+  private final int replicationBatchCountCapacity;
   // position in the WAL to start reading at
   private long currentPosition;
   private final long sleepForRetries;
   private final int maxRetriesMultiplier;
   private final boolean eofAutoRecovery;
 
-  // used to store the first cell in an entry before filtering. This is because that if serial
-  // replication is enabled, we may find out that an entry can not be pushed after filtering. And
-  // when we try the next time, the cells maybe null since the entry has already been filtered,
-  // especially for region event wal entries. And this can also used to determine whether we can
-  // skip filtering.
-  private Cell firstCellInEntryBeforeFiltering;
-
   //Indicates whether this particular worker is running
   private boolean isReaderRunning = true;
 
   private AtomicLong totalBufferUsed;
   private long totalBufferQuota;
 
-  private final SerialReplicationChecker serialReplicationChecker;
-
   /**
    * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
    * entries, and puts them on a batch queue.
@@ -120,7 +110,6 @@ public class ReplicationSourceWALReader extends Thread {
         this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
     this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false);
     this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
-    this.serialReplicationChecker = new SerialReplicationChecker(conf, source);
     LOG.info("peerClusterZnode=" + source.getQueueId()
         + ", ReplicationSourceWALReaderThread : " + source.getPeerId()
         + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
@@ -169,75 +158,35 @@ public class ReplicationSourceWALReader extends Thread {
     }
   }
 
-  private void removeEntryFromStream(WALEntryStream entryStream, WALEntryBatch batch)
-      throws IOException {
-    entryStream.next();
-    firstCellInEntryBeforeFiltering = null;
-    batch.setLastWalPosition(entryStream.getPosition());
+  // returns true if we reach the size limit for batch, i.e, we need to finish the batch and return.
+  protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) {
+    WALEdit edit = entry.getEdit();
+    if (edit == null || edit.isEmpty()) {
+      return false;
+    }
+    long entrySize = getEntrySize(entry);
+    batch.addEntry(entry);
+    updateBatchStats(batch, entry, entrySize);
+    boolean totalBufferTooLarge = acquireBufferQuota(entrySize);
+    // Stop if too many entries or too big
+    return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity ||
+      batch.getNbEntries() >= replicationBatchCountCapacity;
   }
 
-  private WALEntryBatch readWALEntries(WALEntryStream entryStream)
+  protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
       throws IOException, InterruptedException {
     if (!entryStream.hasNext()) {
       return null;
     }
-    long positionBefore = entryStream.getPosition();
-    WALEntryBatch batch =
-      new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
+    WALEntryBatch batch = createBatch(entryStream);
     do {
-      Entry entry = entryStream.peek();
-      boolean isSerial = source.isSerial();
-      boolean doFiltering = true;
-      if (isSerial) {
-        if (firstCellInEntryBeforeFiltering == null) {
-          assert !entry.getEdit().isEmpty() : "should not write empty edits";
-          // Used to locate the region record in meta table. In WAL we only have the table name and
-          // encoded region name which can not be mapping to region name without scanning all the
-          // records for a table, so we need a start key, just like what we have done at client side
-          // when locating a region. For the markers, we will use the start key of the region as the
-          // row key for the edit. And we need to do this before filtering since all the cells may
-          // be filtered out, especially that for the markers.
-          firstCellInEntryBeforeFiltering = entry.getEdit().getCells().get(0);
-        } else {
-          // if this is not null then we know that the entry has already been filtered.
-          doFiltering = false;
-        }
-      }
-
-      if (doFiltering) {
-        entry = filterEntry(entry);
-      }
+      Entry entry = entryStream.next();
+      batch.setLastWalPosition(entryStream.getPosition());
+      entry = filterEntry(entry);
       if (entry != null) {
-        if (isSerial) {
-          if (!serialReplicationChecker.canPush(entry, firstCellInEntryBeforeFiltering)) {
-            if (batch.getLastWalPosition() > positionBefore) {
-              // we have something that can push, break
-              break;
-            } else {
-              serialReplicationChecker.waitUntilCanPush(entry, firstCellInEntryBeforeFiltering);
-            }
-          }
-          // arrive here means we can push the entry, record the last sequence id
-          batch.setLastSeqId(Bytes.toString(entry.getKey().getEncodedRegionName()),
-            entry.getKey().getSequenceId());
+        if (addEntryToBatch(batch, entry)) {
+          break;
         }
-        // actually remove the entry.
-        removeEntryFromStream(entryStream, batch);
-        WALEdit edit = entry.getEdit();
-        if (edit != null && !edit.isEmpty()) {
-          long entrySize = getEntrySize(entry);
-          batch.addEntry(entry);
-          updateBatchStats(batch, entry, entrySize);
-          boolean totalBufferTooLarge = acquireBufferQuota(entrySize);
-          // Stop if too many entries or too big
-          if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity ||
-            batch.getNbEntries() >= replicationBatchCountCapacity) {
-            break;
-          }
-        }
-      } else {
-        // actually remove the entry.
-        removeEntryFromStream(entryStream, batch);
       }
     } while (entryStream.hasNext());
     return batch;
@@ -286,7 +235,11 @@ public class ReplicationSourceWALReader extends Thread {
     return true;
   }
 
-  private Entry filterEntry(Entry entry) {
+  protected final WALEntryBatch createBatch(WALEntryStream entryStream) {
+    return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
+  }
+
+  protected final Entry filterEntry(Entry entry) {
     Entry filtered = filter.filter(entry);
     if (entry != null && filtered == null) {
       source.getSourceMetrics().incrLogEditsFiltered();

http://git-wip-us.apache.org/repos/asf/hbase/blob/2d0d6a3b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
new file mode 100644
index 0000000..5e9a9f6
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.util.concurrent.PriorityBlockingQueue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.replication.WALEntryFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * WAL reader for a serial replication peer.
+ */
+@InterfaceAudience.Private
+public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader {
+
+  // used to store the first cell in an entry before filtering. This is because that if serial
+  // replication is enabled, we may find out that an entry can not be pushed after filtering. And
+  // when we try the next time, the cells maybe null since the entry has already been filtered,
+  // especially for region event wal entries. And this can also used to determine whether we can
+  // skip filtering.
+  private Cell firstCellInEntryBeforeFiltering;
+
+  private final SerialReplicationChecker checker;
+
+  public SerialReplicationSourceWALReader(FileSystem fs, Configuration conf,
+      PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter,
+      ReplicationSource source) {
+    super(fs, conf, logQueue, startPosition, filter, source);
+    checker = new SerialReplicationChecker(conf, source);
+  }
+
+  @Override
+  protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
+      throws IOException, InterruptedException {
+    if (!entryStream.hasNext()) {
+      return null;
+    }
+    long positionBefore = entryStream.getPosition();
+    WALEntryBatch batch = createBatch(entryStream);
+    do {
+      Entry entry = entryStream.peek();
+      boolean doFiltering = true;
+      if (firstCellInEntryBeforeFiltering == null) {
+        assert !entry.getEdit().isEmpty() : "should not write empty edits";
+        // Used to locate the region record in meta table. In WAL we only have the table name and
+        // encoded region name which can not be mapping to region name without scanning all the
+        // records for a table, so we need a start key, just like what we have done at client side
+        // when locating a region. For the markers, we will use the start key of the region as the
+        // row key for the edit. And we need to do this before filtering since all the cells may
+        // be filtered out, especially that for the markers.
+        firstCellInEntryBeforeFiltering = entry.getEdit().getCells().get(0);
+      } else {
+        // if this is not null then we know that the entry has already been filtered.
+        doFiltering = false;
+      }
+
+      if (doFiltering) {
+        entry = filterEntry(entry);
+      }
+      if (entry != null) {
+        if (!checker.canPush(entry, firstCellInEntryBeforeFiltering)) {
+          if (batch.getLastWalPosition() > positionBefore) {
+            // we have something that can push, break
+            break;
+          } else {
+            checker.waitUntilCanPush(entry, firstCellInEntryBeforeFiltering);
+          }
+        }
+        // arrive here means we can push the entry, record the last sequence id
+        batch.setLastSeqId(Bytes.toString(entry.getKey().getEncodedRegionName()),
+          entry.getKey().getSequenceId());
+        // actually remove the entry.
+        removeEntryFromStream(entryStream, batch);
+        if (addEntryToBatch(batch, entry)) {
+          break;
+        }
+      } else {
+        // actually remove the entry.
+        removeEntryFromStream(entryStream, batch);
+      }
+    } while (entryStream.hasNext());
+    return batch;
+  }
+
+  private void removeEntryFromStream(WALEntryStream entryStream, WALEntryBatch batch)
+      throws IOException {
+    entryStream.next();
+    firstCellInEntryBeforeFiltering = null;
+    batch.setLastWalPosition(entryStream.getPosition());
+  }
+}