You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2017/06/03 16:49:07 UTC

[1/2] hbase git commit: HBASE-15995 Separate replication WAL reading from shipping

Repository: hbase
Updated Branches:
  refs/heads/branch-1 b66a478e7 -> 3cf443326


http://git-wip-us.apache.org/repos/asf/hbase/blob/3cf44332/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
new file mode 100644
index 0000000..c4d552c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -0,0 +1,411 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.Closeable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.PriorityBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WAL.Reader;
+import org.apache.hadoop.hbase.wal.WALFactory;
+
+/**
+ * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually
+ * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it
+ * dequeues it and starts reading from the next.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entry> {
+  private static final Log LOG = LogFactory.getLog(WALEntryStream.class);
+
+  private Reader reader;
+  private Path currentPath;
+  // cache of next entry for hasNext()
+  private Entry currentEntry;
+  // position after reading current entry
+  private long currentPosition = 0;
+  private PriorityBlockingQueue<Path> logQueue;
+  private FileSystem fs;
+  private Configuration conf;
+  private MetricsSource metrics;
+
+  /**
+   * Create an entry stream over the given queue
+   * @param logQueue the queue of WAL paths
+   * @param fs {@link FileSystem} to use to create {@link Reader} for this stream
+   * @param conf {@link Configuration} to use to create {@link Reader} for this stream
+   * @param metrics replication metrics
+   * @throws IOException
+   */
+  public WALEntryStream(PriorityBlockingQueue<Path> logQueue, FileSystem fs, Configuration conf,
+      MetricsSource metrics)
+      throws IOException {
+    this(logQueue, fs, conf, 0, metrics);
+  }
+
+  /**
+   * Create an entry stream over the given queue at the given start position
+   * @param logQueue the queue of WAL paths
+   * @param fs {@link FileSystem} to use to create {@link Reader} for this stream
+   * @param conf {@link Configuration} to use to create {@link Reader} for this stream
+   * @param startPosition the position in the first WAL to start reading at
+   * @param metrics replication metrics
+   * @throws IOException
+   */
+  public WALEntryStream(PriorityBlockingQueue<Path> logQueue, FileSystem fs, Configuration conf,
+      long startPosition, MetricsSource metrics) throws IOException {
+    this.logQueue = logQueue;
+    this.fs = fs;
+    this.conf = conf;
+    this.currentPosition = startPosition;
+    this.metrics = metrics;
+  }
+
+  /**
+   * @return true if there is another WAL {@link Entry}
+   * @throws WALEntryStreamRuntimeException if there was an Exception while reading
+   */
+  @Override
+  public boolean hasNext() {
+    if (currentEntry == null) {
+      try {
+        tryAdvanceEntry();
+      } catch (Exception e) {
+        throw new WALEntryStreamRuntimeException(e);
+      }
+    }
+    return currentEntry != null;
+  }
+
+  /**
+   * @return the next WAL entry in this stream
+   * @throws WALEntryStreamRuntimeException if there was an IOException
+   * @throws NoSuchElementException if no more entries in the stream.
+   */
+  @Override
+  public Entry next() {
+    if (!hasNext()) throw new NoSuchElementException();
+    Entry save = currentEntry;
+    currentEntry = null; // gets reloaded by hasNext()
+    return save;
+  }
+
+  /**
+   * Not supported.
+   */
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void close() throws IOException {
+    closeReader();
+  }
+
+  /**
+   * @return the iterator over WAL entries in the queue.
+   */
+  @Override
+  public Iterator<Entry> iterator() {
+    return this;
+  }
+
+  /**
+   * @return the position of the last Entry returned by next()
+   */
+  public long getPosition() {
+    return currentPosition;
+  }
+
+  /**
+   * @return the {@link Path} of the current WAL
+   */
+  public Path getCurrentPath() {
+    return currentPath;
+  }
+
+  private String getCurrentPathStat() {
+    StringBuilder sb = new StringBuilder();
+    if (currentPath != null) {
+      sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
+          .append(currentPosition).append("\n");
+    } else {
+      sb.append("no replication ongoing, waiting for new log");
+    }
+    return sb.toString();
+  }
+
+  /**
+   * Should be called if the stream is to be reused (i.e. used again after hasNext() has returned
+   * false)
+   * @throws IOException
+   */
+  public void reset() throws IOException {
+    if (reader != null && currentPath != null) {
+      resetReader();
+    }
+  }
+
+  private void setPosition(long position) {
+    currentPosition = position;
+  }
+
+  private void setCurrentPath(Path path) {
+    this.currentPath = path;
+  }
+
+  private void tryAdvanceEntry() throws IOException {
+    if (checkReader()) {
+      readNextEntryAndSetPosition();
+      if (currentEntry == null) { // no more entries in this log file - see if log was rolled
+        if (logQueue.size() > 1) { // log was rolled
+          // Before dequeueing, we should always get one more attempt at reading.
+          // This is in case more entries came in after we opened the reader,
+          // and a new log was enqueued while we were reading. See HBASE-6758
+          resetReader();
+          readNextEntryAndSetPosition();
+          if (currentEntry == null) {
+            if (checkAllBytesParsed()) { // now we're certain we're done with this log file
+              dequeueCurrentLog();
+              if (openNextLog()) {
+                readNextEntryAndSetPosition();
+              }
+            }
+          }
+        } // no other logs, we've simply hit the end of the current open log. Do nothing
+      }
+    }
+    // do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue)
+  }
+
+  // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file
+  private boolean checkAllBytesParsed() throws IOException {
+    // -1 means the wal wasn't closed cleanly.
+    final long trailerSize = currentTrailerSize();
+    FileStatus stat = null;
+    try {
+      stat = fs.getFileStatus(this.currentPath);
+    } catch (IOException exception) {
+      LOG.warn("Couldn't get file length information about log " + this.currentPath + ", it "
+          + (trailerSize < 0 ? "was not" : "was") + " closed cleanly " + getCurrentPathStat());
+      metrics.incrUnknownFileLengthForClosedWAL();
+    }
+    if (stat != null) {
+      if (trailerSize < 0) {
+        if (currentPosition < stat.getLen()) {
+          final long skippedBytes = stat.getLen() - currentPosition;
+          LOG.info("Reached the end of WAL file '" + currentPath
+              + "'. It was not closed cleanly, so we did not parse " + skippedBytes
+              + " bytes of data.");
+          metrics.incrUncleanlyClosedWALs();
+          metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
+        }
+      } else if (currentPosition + trailerSize < stat.getLen()) {
+        LOG.warn("Processing end of WAL file '" + currentPath + "'. At position " + currentPosition
+            + ", which is too far away from reported file length " + stat.getLen()
+            + ". Restarting WAL reading (see HBASE-15983 for details). " + getCurrentPathStat());
+        setPosition(0);
+        resetReader();
+        metrics.incrRestartedWALReading();
+        metrics.incrRepeatedFileBytes(currentPosition);
+        return false;
+      }
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is "
+          + (stat == null ? "N/A" : stat.getLen()));
+    }
+    metrics.incrCompletedWAL();
+    return true;
+  }
+
+  private void dequeueCurrentLog() throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Reached the end of log " + currentPath);
+    }
+    closeReader();
+    logQueue.remove();
+    setPosition(0);
+    metrics.decrSizeOfLogQueue();
+  }
+
+  private void readNextEntryAndSetPosition() throws IOException {
+    Entry readEntry = reader.next();
+    long readerPos = reader.getPosition();
+    if (readEntry != null) {
+      metrics.incrLogEditsRead();
+      metrics.incrLogReadInBytes(readerPos - currentPosition);
+    }
+    currentEntry = readEntry; // could be null
+    setPosition(readerPos);
+  }
+
+  private void closeReader() throws IOException {
+    if (reader != null) {
+      reader.close();
+      reader = null;
+    }
+  }
+
+  // if we don't have a reader, open a reader on the next log
+  private boolean checkReader() throws IOException {
+    if (reader == null) {
+      return openNextLog();
+    }
+    return true;
+  }
+
+  // open a reader on the next log in queue
+  private boolean openNextLog() throws IOException {
+    Path nextPath = logQueue.peek();
+    if (nextPath != null) {
+      openReader(nextPath);
+      if (reader != null) return true;
+    }
+    return false;
+  }
+
+  private Path getArchivedLog(Path path) throws IOException {
+    Path rootDir = FSUtils.getRootDir(conf);
+    Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+    Path archivedLogLocation = new Path(oldLogDir, path.getName());
+    if (fs.exists(archivedLogLocation)) {
+      LOG.info("Log " + path + " was moved to " + archivedLogLocation);
+      return archivedLogLocation;
+    } else {
+      LOG.error("Couldn't locate log: " + path);
+      return path;
+    }
+  }
+
+  private void openReader(Path path) throws IOException {
+    try {
+      // Detect if this is a new file, if so get a new reader else
+      // reset the current reader so that we see the new data
+      if (reader == null || !getCurrentPath().equals(path)) {
+        closeReader();
+        reader = WALFactory.createReader(fs, path, conf);
+        seek();
+        setCurrentPath(path);
+      } else {
+        resetReader();
+      }
+    } catch (FileNotFoundException fnfe) {
+      // If the log was archived, continue reading from there
+      Path archivedLog = getArchivedLog(path);
+      if (!path.equals(archivedLog)) {
+        openReader(archivedLog);
+      } else {
+        throw fnfe;
+      }
+    } catch (LeaseNotRecoveredException lnre) {
+      // HBASE-15019 the WAL was not closed due to some hiccup.
+      LOG.warn("Try to recover the WAL lease " + currentPath, lnre);
+      recoverLease(conf, currentPath);
+      reader = null;
+    } catch (NullPointerException npe) {
+      // Workaround for race condition in HDFS-4380
+      // which throws a NPE if we open a file before any data node has the most recent block
+      // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
+      LOG.warn("Got NPE opening reader, will retry.");
+      reader = null;
+    }
+  }
+
+  // For HBASE-15019
+  private void recoverLease(final Configuration conf, final Path path) {
+    try {
+      final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
+      FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
+      fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
+        @Override
+        public boolean progress() {
+          LOG.debug("recover WAL lease: " + path);
+          return true;
+        }
+      });
+    } catch (IOException e) {
+      LOG.warn("unable to recover lease for WAL: " + path, e);
+    }
+  }
+
+  private void resetReader() throws IOException {
+    try {
+      reader.reset();
+      seek();
+    } catch (FileNotFoundException fnfe) {
+      // If the log was archived, continue reading from there
+      Path archivedLog = getArchivedLog(currentPath);
+      if (!currentPath.equals(archivedLog)) {
+        openReader(archivedLog);
+      } else {
+        throw fnfe;
+      }
+    } catch (NullPointerException npe) {
+      throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
+    }
+  }
+
+  private void seek() throws IOException {
+    if (currentPosition != 0) {
+      reader.seek(currentPosition);
+    }
+  }
+
+  private long currentTrailerSize() {
+    long size = -1L;
+    if (reader instanceof ProtobufLogReader) {
+      final ProtobufLogReader pblr = (ProtobufLogReader) reader;
+      size = pblr.trailerSize();
+    }
+    return size;
+  }
+
+  @InterfaceAudience.Private
+  public static class WALEntryStreamRuntimeException extends RuntimeException {
+    private static final long serialVersionUID = -6298201811259982568L;
+
+    public WALEntryStreamRuntimeException(Exception e) {
+      super(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3cf44332/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java
deleted file mode 100644
index 40db3eb..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
-import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-@Category({LargeTests.class})
-@RunWith(Parameterized.class)
-public class TestReplicationWALReaderManager {
-
-  private static HBaseTestingUtility TEST_UTIL;
-  private static Configuration conf;
-  private static FileSystem fs;
-  private static MiniDFSCluster cluster;
-  private static final TableName tableName = TableName.valueOf("tablename");
-  private static final byte [] family = Bytes.toBytes("column");
-  private static final byte [] qualifier = Bytes.toBytes("qualifier");
-  private static final HRegionInfo info = new HRegionInfo(tableName,
-      HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false);
-  private static final HTableDescriptor htd = new HTableDescriptor(tableName);
-
-  private WAL log;
-  private ReplicationWALReaderManager logManager;
-  private PathWatcher pathWatcher;
-  private int nbRows;
-  private int walEditKVs;
-  @Rule public TestName tn = new TestName();
-  private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
-
-  @Parameters
-  public static Collection<Object[]> parameters() {
-    // Try out different combinations of row count and KeyValue count
-    int[] NB_ROWS = { 1500, 60000 };
-    int[] NB_KVS = { 1, 100 };
-    // whether compression is used
-    Boolean[] BOOL_VALS = { false, true };
-    List<Object[]> parameters = new ArrayList<Object[]>();
-    for (int nbRows : NB_ROWS) {
-      for (int walEditKVs : NB_KVS) {
-        for (boolean b : BOOL_VALS) {
-          Object[] arr = new Object[3];
-          arr[0] = nbRows;
-          arr[1] = walEditKVs;
-          arr[2] = b;
-          parameters.add(arr);
-        }
-      }
-    }
-    return parameters;
-  }
-
-  public TestReplicationWALReaderManager(int nbRows, int walEditKVs, boolean enableCompression) {
-    this.nbRows = nbRows;
-    this.walEditKVs = walEditKVs;
-    TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION,
-      enableCompression);
-    mvcc.advanceTo(1);
-  }
-  
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    TEST_UTIL = new HBaseTestingUtility();
-    conf = TEST_UTIL.getConfiguration();
-    TEST_UTIL.startMiniDFSCluster(3);
-
-    cluster = TEST_UTIL.getDFSCluster();
-    fs = cluster.getFileSystem();
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    TEST_UTIL.shutdownMiniCluster();
-  }
-
-  @Before
-  public void setUp() throws Exception {
-    logManager = new ReplicationWALReaderManager(fs, conf);
-    List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
-    pathWatcher = new PathWatcher();
-    listeners.add(pathWatcher);
-    final WALFactory wals = new WALFactory(conf, listeners, tn.getMethodName());
-    log = wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace());
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    log.close();
-  }
-
-  @Test
-  public void test() throws Exception {
-    // Grab the path that was generated when the log rolled as part of its creation
-    Path path = pathWatcher.currentPath;
-
-    assertEquals(0, logManager.getPosition());
-
-    appendToLog();
-
-    // There's one edit in the log, read it. Reading past it needs to return nulls
-    assertNotNull(logManager.openReader(path));
-    logManager.seek();
-    WAL.Entry entry = logManager.readNextAndSetPosition();
-    assertNotNull(entry);
-    entry = logManager.readNextAndSetPosition();
-    assertNull(entry);
-    logManager.closeReader();
-    long oldPos = logManager.getPosition();
-
-    appendToLog();
-
-    // Read the newly added entry, make sure we made progress
-    assertNotNull(logManager.openReader(path));
-    logManager.seek();
-    entry = logManager.readNextAndSetPosition();
-    assertNotEquals(oldPos, logManager.getPosition());
-    assertNotNull(entry);
-    logManager.closeReader();
-    oldPos = logManager.getPosition();
-
-    log.rollWriter();
-
-    // We rolled but we still should see the end of the first log and not get data
-    assertNotNull(logManager.openReader(path));
-    logManager.seek();
-    entry = logManager.readNextAndSetPosition();
-    assertEquals(oldPos, logManager.getPosition());
-    assertNull(entry);
-    logManager.finishCurrentFile();
-
-    path = pathWatcher.currentPath;
-
-    for (int i = 0; i < nbRows; i++) { appendToLogPlus(walEditKVs); }
-    log.rollWriter();
-    logManager.openReader(path);
-    logManager.seek();
-    for (int i = 0; i < nbRows; i++) {
-      WAL.Entry e = logManager.readNextAndSetPosition();
-      if (e == null) {
-        fail("Should have enough entries");
-      }
-    }
-  }
-
-  private void appendToLog() throws IOException {
-    appendToLogPlus(1);
-  }
-
-  private void appendToLogPlus(int count) throws IOException {
-    final long txid = log.append(htd, info,
-        new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc),
-        getWALEdits(count), true);
-    log.sync(txid);
-  }
-
-  private WALEdit getWALEdits(int count) {
-    WALEdit edit = new WALEdit();
-    for (int i = 0; i < count; i++) {
-      edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier,
-        System.currentTimeMillis(), qualifier));
-    }
-    return edit;
-  }
-
-  class PathWatcher extends WALActionsListener.Base {
-
-    Path currentPath;
-
-    @Override
-    public void preLogRoll(Path oldPath, Path newPath) throws IOException {
-      currentPath = newPath;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3cf44332/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
new file mode 100644
index 0000000..005e2a1
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -0,0 +1,440 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.NoSuchElementException;
+import java.util.TreeMap;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
+import org.apache.hadoop.hbase.replication.WALEntryFilter;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestWALEntryStream {
+
+  private static HBaseTestingUtility TEST_UTIL;
+  private static Configuration conf;
+  private static FileSystem fs;
+  private static MiniDFSCluster cluster;
+  private static final TableName tableName = TableName.valueOf("tablename");
+  private static final byte[] family = Bytes.toBytes("column");
+  private static final byte[] qualifier = Bytes.toBytes("qualifier");
+  private static final HRegionInfo info =
+      new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false);
+  private static final HTableDescriptor htd = new HTableDescriptor(tableName);
+  private static NavigableMap<byte[], Integer> scopes;
+
+  private WAL log;
+  PriorityBlockingQueue<Path> walQueue;
+  private PathWatcher pathWatcher;
+
+  @Rule
+  public TestName tn = new TestName();
+  private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL = new HBaseTestingUtility();
+    conf = TEST_UTIL.getConfiguration();
+    TEST_UTIL.startMiniDFSCluster(3);
+
+    cluster = TEST_UTIL.getDFSCluster();
+    fs = cluster.getFileSystem();
+    scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+    for (byte[] fam : htd.getFamiliesKeys()) {
+      scopes.put(fam, 0);
+    }
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    walQueue = new PriorityBlockingQueue<>();
+    List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
+    pathWatcher = new PathWatcher();
+    listeners.add(pathWatcher);
+    final WALFactory wals = new WALFactory(conf, listeners, tn.getMethodName());
+    log = wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    log.close();
+  }
+
+  // Try out different combinations of row count and KeyValue count
+  @Test
+  public void testDifferentCounts() throws Exception {
+    int[] NB_ROWS = { 1500, 60000 };
+    int[] NB_KVS = { 1, 100 };
+    // whether compression is used
+    Boolean[] BOOL_VALS = { false, true };
+    // long lastPosition = 0;
+    for (int nbRows : NB_ROWS) {
+      for (int walEditKVs : NB_KVS) {
+        for (boolean isCompressionEnabled : BOOL_VALS) {
+          TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION,
+            isCompressionEnabled);
+          mvcc.advanceTo(1);
+
+          for (int i = 0; i < nbRows; i++) {
+            appendToLogPlus(walEditKVs);
+          }
+
+          log.rollWriter();
+
+          try (WALEntryStream entryStream =
+              new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
+            int i = 0;
+            for (WAL.Entry e : entryStream) {
+              assertNotNull(e);
+              i++;
+            }
+            assertEquals(nbRows, i);
+
+            // should've read all entries
+            assertFalse(entryStream.hasNext());
+          }
+          // reset everything for next loop
+          log.close();
+          setUp();
+        }
+      }
+    }
+  }
+
+  /**
+   * Tests basic reading of log appends
+   */
+  @Test
+  public void testAppendsWithRolls() throws Exception {
+    appendToLog();
+
+    long oldPos;
+    try (WALEntryStream entryStream =
+        new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
+      // There's one edit in the log, read it. Reading past it needs to throw exception
+      assertTrue(entryStream.hasNext());
+      WAL.Entry entry = entryStream.next();
+      assertNotNull(entry);
+      assertFalse(entryStream.hasNext());
+      try {
+        entry = entryStream.next();
+        fail();
+      } catch (NoSuchElementException e) {
+        // expected
+      }
+      oldPos = entryStream.getPosition();
+    }
+
+    appendToLog();
+
+    try (WALEntryStream entryStream =
+        new WALEntryStream(walQueue, fs, conf, oldPos, new MetricsSource("1"))) {
+      // Read the newly added entry, make sure we made progress
+      WAL.Entry entry = entryStream.next();
+      assertNotEquals(oldPos, entryStream.getPosition());
+      assertNotNull(entry);
+      oldPos = entryStream.getPosition();
+    }
+
+    // We rolled but we still should see the end of the first log and get that item
+    appendToLog();
+    log.rollWriter();
+    appendToLog();
+
+    try (WALEntryStream entryStream =
+        new WALEntryStream(walQueue, fs, conf, oldPos, new MetricsSource("1"))) {
+      WAL.Entry entry = entryStream.next();
+      assertNotEquals(oldPos, entryStream.getPosition());
+      assertNotNull(entry);
+
+      // next item should come from the new log
+      entry = entryStream.next();
+      assertNotEquals(oldPos, entryStream.getPosition());
+      assertNotNull(entry);
+
+      // no more entries to read
+      assertFalse(entryStream.hasNext());
+      oldPos = entryStream.getPosition();
+    }
+  }
+
+  /**
+   * Tests that if after a stream is opened, more entries come in and then the log is rolled, we
+   * don't mistakenly dequeue the current log thinking we're done with it
+   */
+  @Test
+  public void testLogrollWhileStreaming() throws Exception {
+    appendToLog("1");
+    appendToLog("2");// 2
+    try (WALEntryStream entryStream =
+        new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
+      assertEquals("1", getRow(entryStream.next()));
+
+      appendToLog("3"); // 3 - comes in after reader opened
+      log.rollWriter(); // log roll happening while we're reading
+      appendToLog("4"); // 4 - this append is in the rolled log
+
+      assertEquals("2", getRow(entryStream.next()));
+      assertEquals(2, walQueue.size()); // we should not have dequeued yet since there's still an
+                                        // entry in first log
+      assertEquals("3", getRow(entryStream.next())); // if implemented improperly, this would be 4
+                                                     // and 3 would be skipped
+      assertEquals("4", getRow(entryStream.next())); // 4
+      assertEquals(1, walQueue.size()); // now we've dequeued and moved on to next log properly
+      assertFalse(entryStream.hasNext());
+    }
+  }
+
+  /**
+   * Tests that if writes come in while we have a stream open, we shouldn't miss them
+   */
+  @Test
+  public void testNewEntriesWhileStreaming() throws Exception {
+    appendToLog("1");
+    try (WALEntryStream entryStream =
+        new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
+      entryStream.next(); // we've hit the end of the stream at this point
+
+      // some new entries come in while we're streaming
+      appendToLog("2");
+      appendToLog("3");
+
+      // don't see them
+      assertFalse(entryStream.hasNext());
+
+      // But we do if we reset
+      entryStream.reset();
+      assertEquals("2", getRow(entryStream.next()));
+      assertEquals("3", getRow(entryStream.next()));
+      assertFalse(entryStream.hasNext());
+    }
+  }
+
+  @Test
+  public void testResumeStreamingFromPosition() throws Exception {
+    long lastPosition = 0;
+    appendToLog("1");
+    try (WALEntryStream entryStream =
+        new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
+      entryStream.next(); // we've hit the end of the stream at this point
+      appendToLog("2");
+      appendToLog("3");
+      lastPosition = entryStream.getPosition();
+    }
+    // next stream should picks up where we left off
+    try (WALEntryStream entryStream =
+        new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) {
+      assertEquals("2", getRow(entryStream.next()));
+      assertEquals("3", getRow(entryStream.next()));
+      assertFalse(entryStream.hasNext()); // done
+      assertEquals(1, walQueue.size());
+    }
+  }
+
+  /**
+   * Tests that if we stop before hitting the end of a stream, we can continue where we left off
+   * using the last position
+   */
+  @Test
+  public void testPosition() throws Exception {
+    long lastPosition = 0;
+    appendEntriesToLog(3);
+    // read only one element
+    try (WALEntryStream entryStream =
+        new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) {
+      entryStream.next();
+      lastPosition = entryStream.getPosition();
+    }
+    // there should still be two more entries from where we left off
+    try (WALEntryStream entryStream =
+        new WALEntryStream(walQueue, fs, conf, lastPosition, new MetricsSource("1"))) {
+      assertNotNull(entryStream.next());
+      assertNotNull(entryStream.next());
+      assertFalse(entryStream.hasNext());
+    }
+  }
+
+
+  @Test
+  public void testEmptyStream() throws Exception {
+    try (WALEntryStream entryStream =
+        new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
+      assertFalse(entryStream.hasNext());
+    }
+  }
+
+  @Test
+  public void testReplicationSourceWALReaderThread() throws Exception {
+    appendEntriesToLog(3);
+    // get ending position
+    long position;
+    try (WALEntryStream entryStream =
+        new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
+      entryStream.next();
+      entryStream.next();
+      entryStream.next();
+      position = entryStream.getPosition();
+    }
+
+    // start up a batcher
+    ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
+    when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+    ReplicationSourceWALReaderThread batcher = new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(),walQueue, 0,
+        fs, conf, getDummyFilter(), new MetricsSource("1"));
+    Path walPath = walQueue.peek();
+    batcher.start();
+    WALEntryBatch entryBatch = batcher.take();
+
+    // should've batched up our entries
+    assertNotNull(entryBatch);
+    assertEquals(3, entryBatch.getWalEntries().size());
+    assertEquals(position, entryBatch.getLastWalPosition());
+    assertEquals(walPath, entryBatch.getLastWalPath());
+    assertEquals(3, entryBatch.getNbRowKeys());
+
+    appendToLog("foo");
+    entryBatch = batcher.take();
+    assertEquals(1, entryBatch.getNbEntries());
+    assertEquals(getRow(entryBatch.getWalEntries().get(0)), "foo");
+  }
+
+  private String getRow(WAL.Entry entry) {
+    Cell cell = entry.getEdit().getCells().get(0);
+    return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
+  }
+
+  private void appendToLog(String key) throws IOException {
+    final long txid = log.append(htd, info,
+      new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc),
+      getWALEdit(key), true);
+    log.sync(txid);
+  }
+
+  private void appendEntriesToLog(int count) throws IOException {
+    for (int i = 0; i < count; i++) {
+      appendToLog();
+    }
+  }
+
+  private void appendToLog() throws IOException {
+    appendToLogPlus(1);
+  }
+
+  private void appendToLogPlus(int count) throws IOException {
+    final long txid = log.append(htd, info,
+      new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc),
+      getWALEdits(count), true);
+    log.sync(txid);
+  }
+
+  private WALEdit getWALEdits(int count) {
+    WALEdit edit = new WALEdit();
+    for (int i = 0; i < count; i++) {
+      edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier,
+          System.currentTimeMillis(), qualifier));
+    }
+    return edit;
+  }
+
+  private WALEdit getWALEdit(String row) {
+    WALEdit edit = new WALEdit();
+    edit.add(
+      new KeyValue(Bytes.toBytes(row), family, qualifier, System.currentTimeMillis(), qualifier));
+    return edit;
+  }
+
+  private WALEntryFilter getDummyFilter() {
+    return new WALEntryFilter() {
+
+      @Override
+      public Entry filter(Entry entry) {
+        return entry;
+      }
+    };
+  }
+
+  private ReplicationQueueInfo getQueueInfo() {
+    return new ReplicationQueueInfo("1");
+  }
+
+  class PathWatcher extends WALActionsListener.Base {
+
+    Path currentPath;
+
+    @Override
+    public void preLogRoll(Path oldPath, Path newPath) throws IOException {
+      walQueue.add(newPath);
+      currentPath = newPath;
+    }
+  }
+
+}


[2/2] hbase git commit: HBASE-15995 Separate replication WAL reading from shipping

Posted by te...@apache.org.
HBASE-15995 Separate replication WAL reading from shipping

Signed-off-by: tedyu <yu...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3cf44332
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3cf44332
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3cf44332

Branch: refs/heads/branch-1
Commit: 3cf4433260b60a0e0455090628cf60a9d5a180f3
Parents: b66a478
Author: Vincent <vi...@gmail.com>
Authored: Thu Jun 1 16:50:41 2017 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Sat Jun 3 09:48:57 2017 -0700

----------------------------------------------------------------------
 .../replication/ClusterMarkingEntryFilter.java  |  70 ++
 .../regionserver/ReplicationSource.java         | 826 +++++--------------
 .../ReplicationSourceWALReaderThread.java       | 471 +++++++++++
 .../ReplicationWALReaderManager.java            | 155 ----
 .../regionserver/WALEntryStream.java            | 411 +++++++++
 .../TestReplicationWALReaderManager.java        | 228 -----
 .../regionserver/TestWALEntryStream.java        | 440 ++++++++++
 7 files changed, 1610 insertions(+), 991 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/3cf44332/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java
new file mode 100644
index 0000000..7cd3fed
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java
@@ -0,0 +1,70 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.util.UUID;
+
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+
+
+/**
+ * Filters out entries with our peerClusterId (i.e. already replicated)
+ * and marks all other entries with our clusterID
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+@InterfaceStability.Evolving
+public class ClusterMarkingEntryFilter implements WALEntryFilter {
+  private UUID clusterId;
+  private UUID peerClusterId;
+  private ReplicationEndpoint replicationEndpoint;
+
+  /**
+   * @param clusterId id of this cluster
+   * @param peerClusterId of the other cluster
+   * @param replicationEndpoint ReplicationEndpoint which will handle the actual replication
+   */
+  public ClusterMarkingEntryFilter(UUID clusterId, UUID peerClusterId, ReplicationEndpoint replicationEndpoint) {
+    this.clusterId = clusterId;
+    this.peerClusterId = peerClusterId;
+    this.replicationEndpoint = replicationEndpoint;
+  }
+  @Override
+  public Entry filter(Entry entry) {
+    // don't replicate if the log entries have already been consumed by the cluster
+    if (replicationEndpoint.canReplicateToSameCluster()
+        || !entry.getKey().getClusterIds().contains(peerClusterId)) {
+      WALEdit edit = entry.getEdit();
+      WALKey logKey = entry.getKey();
+
+      if (edit != null && !edit.isEmpty()) {
+        // Mark that the current cluster has the change
+        logKey.addClusterId(clusterId);
+        // We need to set the CC to null else it will be compressed when sent to the sink
+        entry.setCompressionContext(null);
+        return entry;
+      }
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3cf44332/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index ce070d0..0d52bbe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -25,8 +25,6 @@ import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Service;
 
-import java.io.EOFException;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -55,12 +53,12 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
+import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
@@ -69,16 +67,14 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
-import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
 
 /**
  * Class that handles the source of a replication stream.
@@ -93,8 +89,7 @@ import org.apache.hadoop.hbase.wal.WALKey;
  *
  */
 @InterfaceAudience.Private
-public class ReplicationSource extends Thread
-    implements ReplicationSourceInterface {
+public class ReplicationSource extends Thread implements ReplicationSourceInterface {
 
   private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
   // Queues of logs to process, entry in format of walGroupId->queue,
@@ -118,10 +113,6 @@ public class ReplicationSource extends Thread
   private Stoppable stopper;
   // How long should we sleep for each retry
   private long sleepForRetries;
-  // Max size in bytes of entriesArray
-  private long replicationQueueSizeCapacity;
-  // Max number of entries in entriesArray
-  private int replicationQueueNbCapacity;
   private FileSystem fs;
   // id of this cluster
   private UUID clusterId;
@@ -149,11 +140,10 @@ public class ReplicationSource extends Thread
   private ReplicationThrottler throttler;
   private long defaultBandwidth;
   private long currentBandwidth;
-  private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads =
-      new ConcurrentHashMap<String, ReplicationSourceWorkerThread>();
+  private ConcurrentHashMap<String, ReplicationSourceShipperThread> workerThreads =
+      new ConcurrentHashMap<String, ReplicationSourceShipperThread>();
 
   private AtomicLong totalBufferUsed;
-  private long totalBufferQuota;
 
   /**
    * Instantiation method used by region servers
@@ -178,10 +168,6 @@ public class ReplicationSource extends Thread
     this.stopper = stopper;
     this.conf = HBaseConfiguration.create(conf);
     decorateConf();
-    this.replicationQueueSizeCapacity =
-        this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
-    this.replicationQueueNbCapacity =
-        this.conf.getInt("replication.source.nb.capacity", 25000);
     this.sleepForRetries =
         this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
     this.maxRetriesMultiplier =
@@ -207,12 +193,8 @@ public class ReplicationSource extends Thread
     currentBandwidth = getCurrentBandwidth();
     this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
     this.totalBufferUsed = manager.getTotalBufferUsed();
-    this.totalBufferQuota = conf.getInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
-        HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
     LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId
-        + " inited, replicationQueueSizeCapacity=" + replicationQueueSizeCapacity
-        + ", replicationQueueNbCapacity=" + replicationQueueNbCapacity + ", curerntBandwidth="
-        + this.currentBandwidth);
+        + ", currentBandwidth=" + this.currentBandwidth);
   }
 
   private void decorateConf() {
@@ -233,9 +215,9 @@ public class ReplicationSource extends Thread
         // new wal group observed after source startup, start a new worker thread to track it
         // notice: it's possible that log enqueued when this.running is set but worker thread
         // still not launched, so it's necessary to check workerThreads before start the worker
-        final ReplicationSourceWorkerThread worker =
-            new ReplicationSourceWorkerThread(logPrefix, queue, replicationQueueInfo, this);
-        ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(logPrefix, worker);
+        final ReplicationSourceShipperThread worker =
+            new ReplicationSourceShipperThread(logPrefix, queue, replicationQueueInfo, this);
+        ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(logPrefix, worker);
         if (extant != null) {
           LOG.debug("Someone has beat us to start a worker thread for wal group " + logPrefix);
         } else {
@@ -342,9 +324,9 @@ public class ReplicationSource extends Thread
     for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
       String walGroupId = entry.getKey();
       PriorityBlockingQueue<Path> queue = entry.getValue();
-      final ReplicationSourceWorkerThread worker =
-          new ReplicationSourceWorkerThread(walGroupId, queue, replicationQueueInfo, this);
-      ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(walGroupId, worker);
+      final ReplicationSourceShipperThread worker =
+          new ReplicationSourceShipperThread(walGroupId, queue, replicationQueueInfo, this);
+      ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(walGroupId, worker);
       if (extant != null) {
         LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
       } else {
@@ -415,9 +397,10 @@ public class ReplicationSource extends Thread
           + " because an error occurred: " + reason, cause);
     }
     this.sourceRunning = false;
-    Collection<ReplicationSourceWorkerThread> workers = workerThreads.values();
-    for (ReplicationSourceWorkerThread worker : workers) {
+    Collection<ReplicationSourceShipperThread> workers = workerThreads.values();
+    for (ReplicationSourceShipperThread worker : workers) {
       worker.setWorkerRunning(false);
+      worker.entryReader.interrupt();
       worker.interrupt();
     }
     ListenableFuture<Service.State> future = null;
@@ -425,7 +408,7 @@ public class ReplicationSource extends Thread
       future = this.replicationEndpoint.stop();
     }
     if (join) {
-      for (ReplicationSourceWorkerThread worker : workers) {
+      for (ReplicationSourceShipperThread worker : workers) {
         Threads.shutdown(worker, this.sleepForRetries);
         LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated");
       }
@@ -454,7 +437,7 @@ public class ReplicationSource extends Thread
   @Override
   public Path getCurrentPath() {
     // only for testing
-    for (ReplicationSourceWorkerThread worker : workerThreads.values()) {
+    for (ReplicationSourceShipperThread worker : workerThreads.values()) {
       if (worker.getCurrentPath() != null) return worker.getCurrentPath();
     }
     return null;
@@ -491,9 +474,9 @@ public class ReplicationSource extends Thread
     StringBuilder sb = new StringBuilder();
     sb.append("Total replicated edits: ").append(totalReplicatedEdits)
         .append(", current progress: \n");
-    for (Map.Entry<String, ReplicationSourceWorkerThread> entry : workerThreads.entrySet()) {
+    for (Map.Entry<String, ReplicationSourceShipperThread> entry : workerThreads.entrySet()) {
       String walGroupId = entry.getKey();
-      ReplicationSourceWorkerThread worker = entry.getValue();
+      ReplicationSourceShipperThread worker = entry.getValue();
       long position = worker.getCurrentPosition();
       Path currentPath = worker.getCurrentPath();
       sb.append("walGroup [").append(walGroupId).append("]: ");
@@ -522,28 +505,20 @@ public class ReplicationSource extends Thread
     return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
   }
 
-  public class ReplicationSourceWorkerThread extends Thread {
-    private ReplicationSource source;
-    private String walGroupId;
-    private PriorityBlockingQueue<Path> queue;
-    private ReplicationQueueInfo replicationQueueInfo;
-    // Our reader for the current log. open/close handled by repLogReader
-    private WAL.Reader reader;
+  // This thread reads entries from a queue and ships them.
+  // Entries are placed onto the queue by ReplicationSourceWALReaderThread
+  public class ReplicationSourceShipperThread extends Thread {
+    ReplicationSourceInterface source;
+    String walGroupId;
+    PriorityBlockingQueue<Path> queue;
+    ReplicationQueueInfo replicationQueueInfo;
     // Last position in the log that we sent to ZooKeeper
     private long lastLoggedPosition = -1;
     // Path of the current log
     private volatile Path currentPath;
-    // Handle on the log reader helper
-    private ReplicationWALReaderManager repLogReader;
-    // Current number of operations (Put/Delete) that we need to replicate
-    private int currentNbOperations = 0;
-    // Current size of data we need to replicate
-    private int currentSize = 0;
     // Indicates whether this particular worker is running
     private boolean workerRunning = true;
-    // Current number of hfiles that we need to replicate
-    private long currentNbHFiles = 0;
-    List<WAL.Entry> entries;
+    ReplicationSourceWALReaderThread entryReader;
     // Use guava cache to set ttl for each key
     private LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder()
         .expireAfterAccess(1, TimeUnit.DAYS).build(
@@ -555,32 +530,17 @@ public class ReplicationSource extends Thread
             }
         );
 
-    public ReplicationSourceWorkerThread(String walGroupId, PriorityBlockingQueue<Path> queue,
-        ReplicationQueueInfo replicationQueueInfo, ReplicationSource source) {
+    public ReplicationSourceShipperThread(String walGroupId,
+        PriorityBlockingQueue<Path> queue, ReplicationQueueInfo replicationQueueInfo,
+        ReplicationSourceInterface source) {
       this.walGroupId = walGroupId;
       this.queue = queue;
       this.replicationQueueInfo = replicationQueueInfo;
-      this.repLogReader = new ReplicationWALReaderManager(fs, conf);
       this.source = source;
-      this.entries = new ArrayList<>();
     }
 
     @Override
     public void run() {
-      // If this is recovered, the queue is already full and the first log
-      // normally has a position (unless the RS failed between 2 logs)
-      if (this.replicationQueueInfo.isQueueRecovered()) {
-        try {
-          this.repLogReader.setPosition(replicationQueues.getLogPosition(peerClusterZnode,
-            this.queue.peek().getName()));
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
-                + this.repLogReader.getPosition());
-          }
-        } catch (ReplicationException e) {
-          terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
-        }
-      }
       // Loop until we close down
       while (isWorkerActive()) {
         int sleepMultiplier = 1;
@@ -591,150 +551,43 @@ public class ReplicationSource extends Thread
           }
           continue;
         }
-        Path oldPath = getCurrentPath(); //note that in the current scenario,
-                                         //oldPath will be null when a log roll
-                                         //happens.
-        // Get a new path
-        boolean hasCurrentPath = getNextPath();
-        if (getCurrentPath() != null && oldPath == null) {
-          sleepMultiplier = 1; //reset the sleepMultiplier on a path change
-        }
-        if (!hasCurrentPath) {
-          if (sleepForRetries("No log to process", sleepMultiplier)) {
+        while (entryReader == null) {
+          if (sleepForRetries("Replication WAL entry reader thread not initialized",
+            sleepMultiplier)) {
             sleepMultiplier++;
           }
-          continue;
-        }
-        boolean currentWALisBeingWrittenTo = false;
-        //For WAL files we own (rather than recovered), take a snapshot of whether the
-        //current WAL file (this.currentPath) is in use (for writing) NOW!
-        //Since the new WAL paths are enqueued only after the prev WAL file
-        //is 'closed', presence of an element in the queue means that
-        //the previous WAL file was closed, else the file is in use (currentPath)
-        //We take the snapshot now so that we are protected against races
-        //where a new file gets enqueued while the current file is being processed
-        //(and where we just finished reading the current file).
-        if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) {
-          currentWALisBeingWrittenTo = true;
-        }
-        // Open a reader on it
-        if (!openReader(sleepMultiplier)) {
-          // Reset the sleep multiplier, else it'd be reused for the next file
-          sleepMultiplier = 1;
-          continue;
-        }
-
-        // If we got a null reader but didn't continue, then sleep and continue
-        if (this.reader == null) {
-          if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
-            sleepMultiplier++;
+          if (sleepMultiplier == maxRetriesMultiplier) {
+            LOG.warn("Replication WAL entry reader thread not initialized");
           }
-          continue;
         }
 
-        boolean gotIOE = false;
-        currentNbOperations = 0;
-        currentNbHFiles = 0;
-        entries.clear();
-        Map<String, Long> lastPositionsForSerialScope = new HashMap<>();
-        currentSize = 0;
         try {
-          if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries,
-              lastPositionsForSerialScope)) {
-            for (Map.Entry<String, Long> entry : lastPositionsForSerialScope.entrySet()) {
-              waitingUntilCanPush(entry);
-            }
-            try {
-              MetaTableAccessor
-                  .updateReplicationPositions(manager.getConnection(), actualPeerId,
-                      lastPositionsForSerialScope);
-            } catch (IOException e) {
-              LOG.error("updateReplicationPositions fail", e);
-              stopper.stop("updateReplicationPositions fail");
-            }
-
-            continue;
-          }
-        } catch (IOException ioe) {
-          LOG.warn(peerClusterZnode + " Got: ", ioe);
-          gotIOE = true;
-          if (ioe.getCause() instanceof EOFException) {
-
-            boolean considerDumping = false;
-            if (this.replicationQueueInfo.isQueueRecovered()) {
-              try {
-                FileStatus stat = fs.getFileStatus(this.currentPath);
-                if (stat.getLen() == 0) {
-                  LOG.warn(peerClusterZnode + " Got EOF and the file was empty");
-                }
-                considerDumping = true;
-              } catch (IOException e) {
-                LOG.warn(peerClusterZnode + " Got while getting file size: ", e);
-              }
-            }
-
-            if (considerDumping &&
-                sleepMultiplier == maxRetriesMultiplier &&
-                processEndOfFile()) {
-              continue;
-            }
-          }
-        } finally {
-          try {
-            this.reader = null;
-            this.repLogReader.closeReader();
-          } catch (IOException e) {
-            gotIOE = true;
-            LOG.warn("Unable to finalize the tailing of a file", e);
-          }
-        }
-        for(Map.Entry<String, Long> entry: lastPositionsForSerialScope.entrySet()) {
-          waitingUntilCanPush(entry);
-        }
-        // If we didn't get anything to replicate, or if we hit a IOE,
-        // wait a bit and retry.
-        // But if we need to stop, don't bother sleeping
-        if (isWorkerActive() && (gotIOE || entries.isEmpty())) {
-          if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
-
-            // Save positions to meta table before zk.
-            if (!gotIOE) {
-              try {
-                MetaTableAccessor.updateReplicationPositions(manager.getConnection(), actualPeerId,
-                    lastPositionsForSerialScope);
-              } catch (IOException e) {
-                LOG.error("updateReplicationPositions fail", e);
-                stopper.stop("updateReplicationPositions fail");
-              }
-            }
-
-            manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode,
-                this.repLogReader.getPosition(),
-                this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
-
-            this.lastLoggedPosition = this.repLogReader.getPosition();
-          }
-          // Reset the sleep multiplier if nothing has actually gone wrong
-          if (!gotIOE) {
-            sleepMultiplier = 1;
-            // if there was nothing to ship and it's not an error
-            // set "ageOfLastShippedOp" to <now> to indicate that we're current
-            metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId);
+          WALEntryBatch entryBatch = entryReader.take();
+          for (Map.Entry<String, Long> entry : entryBatch.getLastSeqIds().entrySet()) {
+            waitingUntilCanPush(entry);
           }
-          if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
-            sleepMultiplier++;
+          shipEdits(entryBatch);
+          releaseBufferQuota((int) entryBatch.getHeapSize());
+          if (replicationQueueInfo.isQueueRecovered() && entryBatch.getWalEntries().isEmpty()
+              && entryBatch.getLastSeqIds().isEmpty()) {
+            LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
+                + peerClusterZnode);
+            metrics.incrCompletedRecoveryQueue();
+            setWorkerRunning(false);
+            continue;
           }
-          continue;
+        } catch (InterruptedException e) {
+          LOG.trace("Interrupted while waiting for next replication entry batch", e);
+          Thread.currentThread().interrupt();
         }
-        shipEdits(currentWALisBeingWrittenTo, entries, lastPositionsForSerialScope);
-        releaseBufferQuota();
       }
+
       if (replicationQueueInfo.isQueueRecovered()) {
         // use synchronize to make sure one last thread will clean the queue
         synchronized (workerThreads) {
           Threads.sleep(100);// wait a short while for other worker thread to fully exit
           boolean allOtherTaskDone = true;
-          for (ReplicationSourceWorkerThread worker : workerThreads.values()) {
+          for (ReplicationSourceShipperThread worker : workerThreads.values()) {
             if (!worker.equals(this) && worker.isAlive()) {
               allOtherTaskDone = false;
               break;
@@ -776,127 +629,6 @@ public class ReplicationSource extends Thread
       }
     }
 
-    /**
-     * Read all the entries from the current log files and retain those that need to be replicated.
-     * Else, process the end of the current file.
-     * @param currentWALisBeingWrittenTo is the current WAL being written to
-     * @param entries resulting entries to be replicated
-     * @param lastPosition save the last sequenceid for each region if the table has
-     *                     serial-replication scope
-     * @return true if we got nothing and went to the next file, false if we got entries
-     * @throws IOException
-     */
-    protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
-        List<WAL.Entry> entries, Map<String, Long> lastPosition) throws IOException {
-      long seenEntries = 0;
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Seeking in " + this.currentPath + " at position "
-            + this.repLogReader.getPosition());
-      }
-      this.repLogReader.seek();
-      long positionBeforeRead = this.repLogReader.getPosition();
-      WAL.Entry entry = this.repLogReader.readNextAndSetPosition();
-      while (entry != null) {
-        metrics.incrLogEditsRead();
-        seenEntries++;
-
-        if (entry.hasSerialReplicationScope()) {
-          String key = Bytes.toString(entry.getKey().getEncodedRegionName());
-          lastPosition.put(key, entry.getKey().getLogSeqNum());
-          if (entry.getEdit().getCells().size() > 0) {
-            WALProtos.RegionEventDescriptor maybeEvent =
-                WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
-            if (maybeEvent != null && maybeEvent.getEventType()
-                == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE) {
-              // In serially replication, if we move a region to another RS and move it back, we may
-              // read logs crossing two sections. We should break at REGION_CLOSE and push the first
-              // section first in case of missing the middle section belonging to the other RS.
-              // In a worker thread, if we can push the first log of a region, we can push all logs
-              // in the same region without waiting until we read a close marker because next time
-              // we read logs in this region, it must be a new section and not adjacent with this
-              // region. Mark it negative.
-              lastPosition.put(key, -entry.getKey().getLogSeqNum());
-              break;
-            }
-          }
-        }
-        boolean totalBufferTooLarge = false;
-        // don't replicate if the log entries have already been consumed by the cluster
-        if (replicationEndpoint.canReplicateToSameCluster()
-            || !entry.getKey().getClusterIds().contains(peerClusterId)) {
-          // Remove all KVs that should not be replicated
-          entry = walEntryFilter.filter(entry);
-          WALEdit edit = null;
-          WALKey logKey = null;
-          if (entry != null) {
-            edit = entry.getEdit();
-            logKey = entry.getKey();
-          }
-
-          if (edit != null && edit.size() != 0) {
-            // Mark that the current cluster has the change
-            logKey.addClusterId(clusterId);
-            currentNbOperations += countDistinctRowKeys(edit);
-            entries.add(entry);
-            int delta = (int)entry.getEdit().heapSize() + calculateTotalSizeOfStoreFiles(edit);
-            currentSize += delta;
-            totalBufferTooLarge = acquireBufferQuota(delta);
-          } else {
-            metrics.incrLogEditsFiltered();
-          }
-        }
-        // Stop if too many entries or too big
-        // FIXME check the relationship between single wal group and overall
-        if (totalBufferTooLarge || currentSize >= replicationQueueSizeCapacity
-            || entries.size() >= replicationQueueNbCapacity) {
-          break;
-        }
-
-        try {
-          entry = this.repLogReader.readNextAndSetPosition();
-        } catch (IOException ie) {
-          LOG.debug("Break on IOE: " + ie.getMessage());
-          break;
-        }
-      }
-      metrics.incrLogReadInBytes(this.repLogReader.getPosition() - positionBeforeRead);
-      if (currentWALisBeingWrittenTo) {
-        return false;
-      }
-      // If we didn't get anything and the queue has an object, it means we
-      // hit the end of the file for sure
-      return seenEntries == 0 && processEndOfFile();
-    }
-
-    /**
-     * Calculate the total size of all the store files
-     * @param edit edit to count row keys from
-     * @return the total size of the store files
-     */
-    private int calculateTotalSizeOfStoreFiles(WALEdit edit) {
-      List<Cell> cells = edit.getCells();
-      int totalStoreFilesSize = 0;
-
-      int totalCells = edit.size();
-      for (int i = 0; i < totalCells; i++) {
-        if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
-          try {
-            BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
-            List<StoreDescriptor> stores = bld.getStoresList();
-            int totalStores = stores.size();
-            for (int j = 0; j < totalStores; j++) {
-              totalStoreFilesSize += stores.get(j).getStoreFileSizeBytes();
-            }
-          } catch (IOException e) {
-            LOG.error("Failed to deserialize bulk load entry from wal edit. "
-                + "Size of HFiles part of cell will not be considered in replication "
-                + "request size calculation.", e);
-          }
-        }
-      }
-      return totalStoreFilesSize;
-    }
-
     private void cleanUpHFileRefs(WALEdit edit) throws IOException {
       String peerId = peerClusterZnode;
       if (peerId.contains("-")) {
@@ -921,206 +653,6 @@ public class ReplicationSource extends Thread
       }
     }
 
-    /**
-     * Poll for the next path
-     * @return true if a path was obtained, false if not
-     */
-    protected boolean getNextPath() {
-      try {
-        if (this.currentPath == null) {
-          this.currentPath = queue.poll(sleepForRetries, TimeUnit.MILLISECONDS);
-          metrics.decrSizeOfLogQueue();
-          if (this.currentPath != null) {
-            // For recovered queue: must use peerClusterZnode since peerId is a parsed value
-            manager.cleanOldLogs(this.currentPath.getName(), peerClusterZnode,
-              this.replicationQueueInfo.isQueueRecovered());
-            if (LOG.isTraceEnabled()) {
-              LOG.trace("New log: " + this.currentPath);
-            }
-          }
-        }
-      } catch (InterruptedException e) {
-        LOG.warn("Interrupted while reading edits", e);
-      }
-      return this.currentPath != null;
-    }
-
-    /**
-     * Open a reader on the current path
-     *
-     * @param sleepMultiplier by how many times the default sleeping time is augmented
-     * @return true if we should continue with that file, false if we are over with it
-     */
-    protected boolean openReader(int sleepMultiplier) {
-      try {
-        try {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Opening log " + this.currentPath);
-          }
-          this.reader = repLogReader.openReader(this.currentPath);
-        } catch (FileNotFoundException fnfe) {
-          if (this.replicationQueueInfo.isQueueRecovered()) {
-            // We didn't find the log in the archive directory, look if it still
-            // exists in the dead RS folder (there could be a chain of failures
-            // to look at)
-            List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
-            LOG.info("NB dead servers : " + deadRegionServers.size());
-            final Path walDir = FSUtils.getWALRootDir(conf);
-            for (String curDeadServerName : deadRegionServers) {
-              final Path deadRsDirectory = new Path(walDir,
-                  DefaultWALProvider.getWALDirectoryName(curDeadServerName));
-              Path[] locs = new Path[] {
-                  new Path(deadRsDirectory, currentPath.getName()),
-                  new Path(deadRsDirectory.suffix(DefaultWALProvider.SPLITTING_EXT),
-                                            currentPath.getName()),
-              };
-              for (Path possibleLogLocation : locs) {
-                LOG.info("Possible location " + possibleLogLocation.toUri().toString());
-                if (manager.getFs().exists(possibleLogLocation)) {
-                  // We found the right new location
-                  LOG.info("Log " + this.currentPath + " still exists at " +
-                      possibleLogLocation);
-                  // Breaking here will make us sleep since reader is null
-                  // TODO why don't we need to set currentPath and call openReader here?
-                  return true;
-                }
-              }
-            }
-            // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
-            // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
-            if (stopper instanceof ReplicationSyncUp.DummyServer) {
-              // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal
-              //      area rather than to the wal area for a particular region server.
-              FileStatus[] rss = fs.listStatus(manager.getLogDir());
-              for (FileStatus rs : rss) {
-                Path p = rs.getPath();
-                FileStatus[] logs = fs.listStatus(p);
-                for (FileStatus log : logs) {
-                  p = new Path(p, log.getPath().getName());
-                  if (p.getName().equals(currentPath.getName())) {
-                    currentPath = p;
-                    LOG.info("Log " + currentPath.getName() + " found at " + currentPath);
-                    // Open the log at the new location
-                    this.openReader(sleepMultiplier);
-                    return true;
-                  }
-                }
-              }
-            }
-
-            // TODO What happens if the log was missing from every single location?
-            // Although we need to check a couple of times as the log could have
-            // been moved by the master between the checks
-            // It can also happen if a recovered queue wasn't properly cleaned,
-            // such that the znode pointing to a log exists but the log was
-            // deleted a long time ago.
-            // For the moment, we'll throw the IO and processEndOfFile
-            throw new IOException("File from recovered queue is " +
-                "nowhere to be found", fnfe);
-          } else {
-            // If the log was archived, continue reading from there
-            Path archivedLogLocation =
-                new Path(manager.getOldLogDir(), currentPath.getName());
-            if (manager.getFs().exists(archivedLogLocation)) {
-              currentPath = archivedLogLocation;
-              LOG.info("Log " + this.currentPath + " was moved to " +
-                  archivedLogLocation);
-              // Open the log at the new location
-              this.openReader(sleepMultiplier);
-
-            }
-            // TODO What happens the log is missing in both places?
-          }
-        }
-      } catch (LeaseNotRecoveredException lnre) {
-        // HBASE-15019 the WAL was not closed due to some hiccup.
-        LOG.warn(peerClusterZnode + " Try to recover the WAL lease " + currentPath, lnre);
-        recoverLease(conf, currentPath);
-        this.reader = null;
-      } catch (IOException ioe) {
-        if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
-        LOG.warn(peerClusterZnode + " Got: ", ioe);
-        this.reader = null;
-        if (ioe.getCause() instanceof NullPointerException) {
-          // Workaround for race condition in HDFS-4380
-          // which throws a NPE if we open a file before any data node has the most recent block
-          // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
-          LOG.warn("Got NPE opening reader, will retry.");
-        } else if (sleepMultiplier >= maxRetriesMultiplier) {
-          // TODO Need a better way to determine if a file is really gone but
-          // TODO without scanning all logs dir
-          LOG.warn("Waited too long for this file, considering dumping");
-          return !processEndOfFile();
-        }
-      }
-      return true;
-    }
-
-    private void recoverLease(final Configuration conf, final Path path) {
-      try {
-        final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
-        FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
-        fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
-          @Override
-          public boolean progress() {
-            LOG.debug("recover WAL lease: " + path);
-            return isWorkerActive();
-          }
-        });
-      } catch (IOException e) {
-        LOG.warn("unable to recover lease for WAL: " + path, e);
-      }
-    }
-
-    /*
-     * Checks whether the current log file is empty, and it is not a recovered queue. This is to
-     * handle scenario when in an idle cluster, there is no entry in the current log and we keep on
-     * trying to read the log file and get EOFException. In case of a recovered queue the last log
-     * file may be empty, and we don't want to retry that.
-     */
-    private boolean isCurrentLogEmpty() {
-      return (this.repLogReader.getPosition() == 0 &&
-          !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
-    }
-
-    /**
-     * Count the number of different row keys in the given edit because of mini-batching. We assume
-     * that there's at least one Cell in the WALEdit.
-     * @param edit edit to count row keys from
-     * @return number of different row keys
-     */
-    private int countDistinctRowKeys(WALEdit edit) {
-      List<Cell> cells = edit.getCells();
-      int distinctRowKeys = 1;
-      int totalHFileEntries = 0;
-      Cell lastCell = cells.get(0);
-      
-      int totalCells = edit.size();
-      for (int i = 0; i < totalCells; i++) {
-        // Count HFiles to be replicated
-        if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
-          try {
-            BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
-            List<StoreDescriptor> stores = bld.getStoresList();
-            int totalStores = stores.size();
-            for (int j = 0; j < totalStores; j++) {
-              totalHFileEntries += stores.get(j).getStoreFileList().size();
-            }
-          } catch (IOException e) {
-            LOG.error("Failed to deserialize bulk load entry from wal edit. "
-                + "Then its hfiles count will not be added into metric.");
-          }
-        }
-
-        if (!CellUtil.matchingRow(cells.get(i), lastCell)) {
-          distinctRowKeys++;
-        }
-        lastCell = cells.get(i);
-      }
-      currentNbHFiles += totalHFileEntries;
-      return distinctRowKeys + totalHFileEntries;
-    }
-
     private void checkBandwidthChangeAndResetThrottler() {
       long peerBandwidth = getCurrentBandwidth();
       if (peerBandwidth != currentBandwidth) {
@@ -1133,16 +665,24 @@ public class ReplicationSource extends Thread
 
     /**
      * Do the shipping logic
-     * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
-     * written to when this method was called
      */
-    protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries,
-        Map<String, Long> lastPositionsForSerialScope) {
+    protected void shipEdits(WALEntryBatch entryBatch) {
+      List<Entry> entries = entryBatch.getWalEntries();
+      long lastReadPosition = entryBatch.getLastWalPosition();
+      currentPath = entryBatch.getLastWalPath();
       int sleepMultiplier = 0;
       if (entries.isEmpty()) {
-        LOG.warn("Was given 0 edits to ship");
+        if (lastLoggedPosition != lastReadPosition) {
+          // Save positions to meta table before zk.
+          updateSerialRepPositions(entryBatch.getLastSeqIds());
+          updateLogPosition(lastReadPosition);
+          // if there was nothing to ship and it's not an error
+          // set "ageOfLastShippedOp" to <now> to indicate that we're current
+          metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId);
+        }
         return;
       }
+      int currentSize = (int) entryBatch.getHeapSize();
       while (isWorkerActive()) {
         try {
           checkBandwidthChangeAndResetThrottler();
@@ -1183,7 +723,7 @@ public class ReplicationSource extends Thread
             sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
           }
 
-          if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
+          if (this.lastLoggedPosition != lastReadPosition) {
             //Clean up hfile references
             int size = entries.size();
             for (int i = 0; i < size; i++) {
@@ -1191,27 +731,18 @@ public class ReplicationSource extends Thread
             }
 
             // Save positions to meta table before zk.
-            try {
-              MetaTableAccessor.updateReplicationPositions(manager.getConnection(), actualPeerId,
-                  lastPositionsForSerialScope);
-            } catch (IOException e) {
-              LOG.error("updateReplicationPositions fail", e);
-              stopper.stop("updateReplicationPositions fail");
-            }
+            updateSerialRepPositions(entryBatch.getLastSeqIds());
 
             //Log and clean up WAL logs
-            manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode,
-              this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(),
-              currentWALisBeingWrittenTo);
-            this.lastLoggedPosition = this.repLogReader.getPosition();
+            updateLogPosition(lastReadPosition);
           }
           if (throttler.isEnabled()) {
             throttler.addPushSize(currentSize);
           }
           totalReplicatedEdits.addAndGet(entries.size());
-          totalReplicatedOperations.addAndGet(currentNbOperations);
+          totalReplicatedOperations.addAndGet(entryBatch.getNbOperations());
           // FIXME check relationship between wal group and overall
-          metrics.shipBatch(currentNbOperations, currentSize, currentNbHFiles);
+          metrics.shipBatch(entryBatch.getNbOperations(), currentSize, entryBatch.getNbHFiles());
           metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
             walGroupId);
           if (LOG.isTraceEnabled()) {
@@ -1230,62 +761,20 @@ public class ReplicationSource extends Thread
       }
     }
 
-    /**
-     * If the queue isn't empty, switch to the next one Else if this is a recovered queue, it means
-     * we're done! Else we'll just continue to try reading the log file
-     * @return true if we're done with the current file, false if we should continue trying to read
-     *         from it
-     */
-    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE",
-        justification = "Yeah, this is how it works")
-    protected boolean processEndOfFile() {
-      // We presume this means the file we're reading is closed.
-      if (this.queue.size() != 0) {
-        // -1 means the wal wasn't closed cleanly.
-        final long trailerSize = this.repLogReader.currentTrailerSize();
-        final long currentPosition = this.repLogReader.getPosition();
-        FileStatus stat = null;
-        try {
-          stat = fs.getFileStatus(this.currentPath);
-        } catch (IOException exception) {
-          LOG.warn("Couldn't get file length information about log " + this.currentPath + ", it " + (trailerSize < 0 ? "was not" : "was") + " closed cleanly"
-              + ", stats: " + getStats());
-          metrics.incrUnknownFileLengthForClosedWAL();
-        }
-        if (stat != null) {
-          if (trailerSize < 0) {
-            if (currentPosition < stat.getLen()) {
-              final long skippedBytes = stat.getLen() - currentPosition;
-              LOG.info("Reached the end of WAL file '" + currentPath + "'. It was not closed cleanly, so we did not parse " + skippedBytes + " bytes of data.");
-              metrics.incrUncleanlyClosedWALs();
-              metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
-            }
-          } else if (currentPosition + trailerSize < stat.getLen()){
-            LOG.warn("Processing end of WAL file '" + currentPath + "'. At position " + currentPosition + ", which is too far away from reported file length " + stat.getLen() +
-                ". Restarting WAL reading (see HBASE-15983 for details). stats: " + getStats());
-            repLogReader.setPosition(0);
-            metrics.incrRestartedWALReading();
-            metrics.incrRepeatedFileBytes(currentPosition);
-            return false;
-          }
-        }
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Reached the end of log " + this.currentPath + ", stats: " + getStats()
-              + ", and the length of the file is " + (stat == null ? "N/A" : stat.getLen()));
-        }
-        this.currentPath = null;
-        this.repLogReader.finishCurrentFile();
-        this.reader = null;
-        metrics.incrCompletedWAL();
-        return true;
-      } else if (this.replicationQueueInfo.isQueueRecovered()) {
-        LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
-            + peerClusterZnode);
-        metrics.incrCompletedRecoveryQueue();
-        workerRunning = false;
-        return true;
+    private void updateSerialRepPositions(Map<String, Long> lastPositionsForSerialScope) {
+      try {
+        MetaTableAccessor.updateReplicationPositions(manager.getConnection(), actualPeerId,
+          lastPositionsForSerialScope);
+      } catch (IOException e) {
+        LOG.error("updateReplicationPositions fail", e);
+        stopper.stop("updateReplicationPositions fail");
       }
-      return false;
+    }
+
+    private void updateLogPosition(long lastReadPosition) {
+      manager.logPositionAndCleanOldLogs(currentPath, peerClusterZnode, lastReadPosition,
+        this.replicationQueueInfo.isQueueRecovered(), false);
+      lastLoggedPosition = lastReadPosition;
     }
 
     public void startup() {
@@ -1302,6 +791,134 @@ public class ReplicationSource extends Thread
       Threads.setDaemonThreadRunning(this, n + ".replicationSource." + walGroupId + ","
           + peerClusterZnode, handler);
       workerThreads.put(walGroupId, this);
+
+      long startPosition = 0;
+
+      if (this.replicationQueueInfo.isQueueRecovered()) {
+        startPosition = getRecoveredQueueStartPos(startPosition);
+        int numRetries = 0;
+        while (numRetries <= maxRetriesMultiplier) {
+          try {
+            locateRecoveredPaths();
+            break;
+          } catch (IOException e) {
+            LOG.error("Error while locating recovered queue paths, attempt #" + numRetries);
+            numRetries++;
+          }
+        }
+      }
+
+      startWALReaderThread(n, handler, startPosition);
+    }
+
+    // If this is a recovered queue, the queue is already full and the first log
+    // normally has a position (unless the RS failed between 2 logs)
+    private long getRecoveredQueueStartPos(long startPosition) {
+      try {
+        startPosition =
+            (replicationQueues.getLogPosition(peerClusterZnode, this.queue.peek().getName()));
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
+              + startPosition);
+        }
+      } catch (ReplicationException e) {
+        terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
+      }
+      return startPosition;
+    }
+
+    // start a background thread to read and batch entries
+    private void startWALReaderThread(String threadName, Thread.UncaughtExceptionHandler handler,
+        long startPosition) {
+      ArrayList<WALEntryFilter> filters = Lists.newArrayList(walEntryFilter,
+        new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
+      ChainWALEntryFilter readerFilter = new ChainWALEntryFilter(filters);
+      entryReader = new ReplicationSourceWALReaderThread(manager, replicationQueueInfo, queue,
+          startPosition, fs, conf, readerFilter, metrics);
+      Threads.setDaemonThreadRunning(entryReader, threadName
+          + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode,
+        handler);
+    }
+
+    // Loops through the recovered queue and tries to find the location of each log
+    // this is necessary because the logs may have moved before recovery was initiated
+    private void locateRecoveredPaths() throws IOException {
+      boolean hasPathChanged = false;
+      PriorityBlockingQueue<Path> newPaths =
+          new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
+      pathsLoop: for (Path path : queue) {
+        if (fs.exists(path)) { // still in same location, don't need to do anything
+          newPaths.add(path);
+          continue;
+        }
+        // Path changed - try to find the right path.
+        hasPathChanged = true;
+        if (stopper instanceof ReplicationSyncUp.DummyServer) {
+          // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
+          // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
+          Path newPath = getReplSyncUpPath(path);
+          newPaths.add(newPath);
+          continue;
+        } else {
+          // See if Path exists in the dead RS folder (there could be a chain of failures
+          // to look at)
+          List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
+          LOG.info("NB dead servers : " + deadRegionServers.size());
+          final Path walDir = FSUtils.getWALRootDir(conf);
+          for (String curDeadServerName : deadRegionServers) {
+            final Path deadRsDirectory =
+                new Path(walDir, DefaultWALProvider.getWALDirectoryName(curDeadServerName));
+            Path[] locs = new Path[] { new Path(deadRsDirectory, path.getName()), new Path(
+                deadRsDirectory.suffix(DefaultWALProvider.SPLITTING_EXT), path.getName()) };
+            for (Path possibleLogLocation : locs) {
+              LOG.info("Possible location " + possibleLogLocation.toUri().toString());
+              if (manager.getFs().exists(possibleLogLocation)) {
+                // We found the right new location
+                LOG.info("Log " + path + " still exists at " + possibleLogLocation);
+                newPaths.add(possibleLogLocation);
+                continue pathsLoop;
+              }
+            }
+          }
+          // didn't find a new location
+          LOG.error(
+            String.format("WAL Path %s doesn't exist and couldn't find its new location", path));
+          newPaths.add(path);
+        }
+      }
+
+      if (hasPathChanged) {
+        if (newPaths.size() != queue.size()) { // this shouldn't happen
+          LOG.error("Recovery queue size is incorrect");
+          throw new IOException("Recovery queue size error");
+        }
+        // put the correct locations in the queue
+        // since this is a recovered queue with no new incoming logs,
+        // there shouldn't be any concurrency issues
+        queue.clear();
+        for (Path path : newPaths) {
+          queue.add(path);
+        }
+      }
+    }
+
+    // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal
+    // area rather than to the wal area for a particular region server.
+    private Path getReplSyncUpPath(Path path) throws IOException {
+      FileStatus[] rss = fs.listStatus(manager.getLogDir());
+      for (FileStatus rs : rss) {
+        Path p = rs.getPath();
+        FileStatus[] logs = fs.listStatus(p);
+        for (FileStatus log : logs) {
+          p = new Path(p, log.getPath().getName());
+          if (p.getName().equals(path.getName())) {
+            LOG.info("Log " + p.getName() + " found at " + p);
+            return p;
+          }
+        }
+      }
+      LOG.error("Didn't find path for: " + path.getName());
+      return path;
     }
 
     public Path getCurrentPath() {
@@ -1309,7 +926,7 @@ public class ReplicationSource extends Thread
     }
 
     public long getCurrentPosition() {
-      return this.repLogReader.getPosition();
+      return this.lastLoggedPosition;
     }
 
     private boolean isWorkerActive() {
@@ -1324,27 +941,20 @@ public class ReplicationSource extends Thread
         LOG.error("Closing worker for wal group " + this.walGroupId
             + " because an error occurred: " + reason, cause);
       }
+      entryReader.interrupt();
+      Threads.shutdown(entryReader, sleepForRetries);
       this.interrupt();
       Threads.shutdown(this, sleepForRetries);
       LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
     }
 
     public void setWorkerRunning(boolean workerRunning) {
+      entryReader.setReaderRunning(workerRunning);
       this.workerRunning = workerRunning;
     }
 
-    /**
-     * @param size delta size for grown buffer
-     * @return true if we should clear buffer and push all
-     */
-    private boolean acquireBufferQuota(int size) {
-      return totalBufferUsed.addAndGet(size) >= totalBufferQuota;
-    }
-
-    private void releaseBufferQuota() {
-      totalBufferUsed.addAndGet(-currentSize);
-      currentSize = 0;
-      entries.clear();
+    private void releaseBufferQuota(int size) {
+      totalBufferUsed.addAndGet(-size);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3cf44332/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
new file mode 100644
index 0000000..6f1c641
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
@@ -0,0 +1,471 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
+import org.apache.hadoop.hbase.replication.WALEntryFilter;
+import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.WALEntryStreamRuntimeException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+
+/**
+ * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches onto a queue
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ReplicationSourceWALReaderThread extends Thread {
+  private static final Log LOG = LogFactory.getLog(ReplicationSourceWALReaderThread.class);
+
+  private PriorityBlockingQueue<Path> logQueue;
+  private FileSystem fs;
+  private Configuration conf;
+  private BlockingQueue<WALEntryBatch> entryBatchQueue;
+  // max (heap) size of each batch - multiply by number of batches in queue to get total
+  private long replicationBatchSizeCapacity;
+  // max count of each batch - multiply by number of batches in queue to get total
+  private int replicationBatchCountCapacity;
+  // position in the WAL to start reading at
+  private long currentPosition;
+  private WALEntryFilter filter;
+  private long sleepForRetries;
+  //Indicates whether this particular worker is running
+  private boolean isReaderRunning = true;
+  private ReplicationQueueInfo replicationQueueInfo;
+  private int maxRetriesMultiplier;
+  private MetricsSource metrics;
+
+  private AtomicLong totalBufferUsed;
+  private long totalBufferQuota;
+
+  /**
+   * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
+   * entries, and puts them on a batch queue.
+   * @param manager replication manager
+   * @param replicationQueueInfo
+   * @param logQueue The WAL queue to read off of
+   * @param startPosition position in the first WAL to start reading from
+   * @param fs the files system to use
+   * @param conf configuration to use
+   * @param filter The filter to use while reading
+   * @param metrics replication metrics
+   */
+  public ReplicationSourceWALReaderThread(ReplicationSourceManager manager,
+      ReplicationQueueInfo replicationQueueInfo, PriorityBlockingQueue<Path> logQueue,
+      long startPosition,
+      FileSystem fs, Configuration conf, WALEntryFilter filter, MetricsSource metrics) {
+    this.replicationQueueInfo = replicationQueueInfo;
+    this.logQueue = logQueue;
+    this.currentPosition = startPosition;
+    this.fs = fs;
+    this.conf = conf;
+    this.filter = filter;
+    this.replicationBatchSizeCapacity =
+        this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64);
+    this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000);
+    // memory used will be batchSizeCapacity * (nb.batches + 1)
+    // the +1 is for the current thread reading before placing onto the queue
+    int batchCount = conf.getInt("replication.source.nb.batches", 1);
+    this.totalBufferUsed = manager.getTotalBufferUsed();
+    this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
+      HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
+    this.sleepForRetries =
+        this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
+    this.maxRetriesMultiplier =
+        this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
+    this.metrics = metrics;
+    this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
+    LOG.info("peerClusterZnode=" + replicationQueueInfo.getPeerClusterZnode()
+        + ", ReplicationSourceWALReaderThread : " + replicationQueueInfo.getPeerId()
+        + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
+        + ", replicationBatchCountCapacity=" + replicationBatchCountCapacity
+        + ", replicationBatchQueueCapacity=" + batchCount);
+  }
+
+  @Override
+  public void run() {
+    int sleepMultiplier = 1;
+    while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
+      try (WALEntryStream entryStream =
+          new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) {
+        while (isReaderRunning()) { // loop here to keep reusing stream while we can
+          if (!checkQuota()) {
+            continue;
+          }
+          WALEntryBatch batch = null;
+          while (entryStream.hasNext()) {
+            if (batch == null) {
+              batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
+            }
+            Entry entry = entryStream.next();
+            if (updateSerialReplPos(batch, entry)) {
+              batch.lastWalPosition = entryStream.getPosition();
+              break;
+            }
+            entry = filterEntry(entry);
+            if (entry != null) {
+              WALEdit edit = entry.getEdit();
+              if (edit != null && !edit.isEmpty()) {
+                long entrySize = getEntrySize(entry);
+                batch.addEntry(entry);
+                updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
+                boolean totalBufferTooLarge = acquireBufferQuota(entrySize);
+                // Stop if too many entries or too big
+                if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
+                    || batch.getNbEntries() >= replicationBatchCountCapacity) {
+                  break;
+                }
+              }
+            }
+          }
+          if (batch != null && (!batch.getLastSeqIds().isEmpty() || batch.getNbEntries() > 0)) {
+            if (LOG.isTraceEnabled()) {
+              LOG.trace(String.format("Read %s WAL entries eligible for replication",
+                batch.getNbEntries()));
+            }
+            entryBatchQueue.put(batch);
+            sleepMultiplier = 1;
+          } else { // got no entries and didn't advance position in WAL
+            LOG.trace("Didn't read any new entries from WAL");
+            if (replicationQueueInfo.isQueueRecovered()) {
+              // we're done with queue recovery, shut ourself down
+              setReaderRunning(false);
+              // shuts down shipper thread immediately
+              entryBatchQueue.put(batch != null ? batch
+                  : new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()));
+            } else {
+              Thread.sleep(sleepForRetries);
+            }
+          }
+          currentPosition = entryStream.getPosition();
+          entryStream.reset(); // reuse stream
+        }
+      } catch (IOException | WALEntryStreamRuntimeException e) { // stream related
+        if (sleepMultiplier < maxRetriesMultiplier) {
+          LOG.debug("Failed to read stream of replication entries: " + e);
+          sleepMultiplier++;
+        } else {
+          LOG.error("Failed to read stream of replication entries", e);
+        }
+        Threads.sleep(sleepForRetries * sleepMultiplier);
+      } catch (InterruptedException e) {
+        LOG.trace("Interrupted while sleeping between WAL reads");
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  //returns false if we've already exceeded the global quota
+  private boolean checkQuota() {
+    // try not to go over total quota
+    if (totalBufferUsed.get() > totalBufferQuota) {
+      Threads.sleep(sleepForRetries);
+      return false;
+    }
+    return true;
+  }
+
+  private Entry filterEntry(Entry entry) {
+    Entry filtered = filter.filter(entry);
+    if (entry != null && filtered == null) {
+      metrics.incrLogEditsFiltered();
+    }
+    return filtered;
+  }
+
+  /**
+   * @return true if we should stop reading because we're at REGION_CLOSE
+   */
+  private boolean updateSerialReplPos(WALEntryBatch batch, Entry entry) throws IOException {
+    if (entry.hasSerialReplicationScope()) {
+      String key = Bytes.toString(entry.getKey().getEncodedRegionName());
+      batch.setLastPosition(key, entry.getKey().getSequenceId());
+      if (!entry.getEdit().getCells().isEmpty()) {
+        WALProtos.RegionEventDescriptor maybeEvent =
+            WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
+        if (maybeEvent != null && maybeEvent
+            .getEventType() == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE) {
+          // In serially replication, if we move a region to another RS and move it back, we may
+          // read logs crossing two sections. We should break at REGION_CLOSE and push the first
+          // section first in case of missing the middle section belonging to the other RS.
+          // In a worker thread, if we can push the first log of a region, we can push all logs
+          // in the same region without waiting until we read a close marker because next time
+          // we read logs in this region, it must be a new section and not adjacent with this
+          // region. Mark it negative.
+          batch.setLastPosition(key, -entry.getKey().getSequenceId());
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Retrieves the next batch of WAL entries from the queue, waiting up to the specified time for a
+   * batch to become available
+   * @return A batch of entries, along with the position in the log after reading the batch
+   * @throws InterruptedException if interrupted while waiting
+   */
+  public WALEntryBatch take() throws InterruptedException {
+    return entryBatchQueue.take();
+  }
+
+  private long getEntrySize(Entry entry) {
+    WALEdit edit = entry.getEdit();
+    return edit.heapSize() + calculateTotalSizeOfStoreFiles(edit);
+  }
+
+  private void updateBatchStats(WALEntryBatch batch, Entry entry, long entryPosition, long entrySize) {
+    WALEdit edit = entry.getEdit();
+    if (edit != null && !edit.isEmpty()) {
+      batch.incrementHeapSize(entrySize);
+      Pair<Integer, Integer> nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit);
+      batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst());
+      batch.incrementNbHFiles(nbRowsAndHFiles.getSecond());
+    }
+    batch.lastWalPosition = entryPosition;
+  }
+
+  /**
+   * Count the number of different row keys in the given edit because of mini-batching. We assume
+   * that there's at least one Cell in the WALEdit.
+   * @param edit edit to count row keys from
+   * @return number of different row keys and HFiles
+   */
+  private Pair<Integer, Integer> countDistinctRowKeysAndHFiles(WALEdit edit) {
+    List<Cell> cells = edit.getCells();
+    int distinctRowKeys = 1;
+    int totalHFileEntries = 0;
+    Cell lastCell = cells.get(0);
+
+    int totalCells = edit.size();
+    for (int i = 0; i < totalCells; i++) {
+      // Count HFiles to be replicated
+      if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
+        try {
+          BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
+          List<StoreDescriptor> stores = bld.getStoresList();
+          int totalStores = stores.size();
+          for (int j = 0; j < totalStores; j++) {
+            totalHFileEntries += stores.get(j).getStoreFileList().size();
+          }
+        } catch (IOException e) {
+          LOG.error("Failed to deserialize bulk load entry from wal edit. "
+              + "Then its hfiles count will not be added into metric.");
+        }
+      }
+
+      if (!CellUtil.matchingRow(cells.get(i), lastCell)) {
+        distinctRowKeys++;
+      }
+      lastCell = cells.get(i);
+    }
+
+    Pair<Integer, Integer> result = new Pair<>(distinctRowKeys, totalHFileEntries);
+    return result;
+  }
+
+  /**
+   * Calculate the total size of all the store files
+   * @param edit edit to count row keys from
+   * @return the total size of the store files
+   */
+  private int calculateTotalSizeOfStoreFiles(WALEdit edit) {
+    List<Cell> cells = edit.getCells();
+    int totalStoreFilesSize = 0;
+
+    int totalCells = edit.size();
+    for (int i = 0; i < totalCells; i++) {
+      if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
+        try {
+          BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
+          List<StoreDescriptor> stores = bld.getStoresList();
+          int totalStores = stores.size();
+          for (int j = 0; j < totalStores; j++) {
+            totalStoreFilesSize += stores.get(j).getStoreFileSizeBytes();
+          }
+        } catch (IOException e) {
+          LOG.error("Failed to deserialize bulk load entry from wal edit. "
+              + "Size of HFiles part of cell will not be considered in replication "
+              + "request size calculation.",
+            e);
+        }
+      }
+    }
+    return totalStoreFilesSize;
+  }
+
+  /**
+   * @param size delta size for grown buffer
+   * @return true if we should clear buffer and push all
+   */
+  private boolean acquireBufferQuota(long size) {
+    return totalBufferUsed.addAndGet(size) >= totalBufferQuota;
+  }
+
+  /**
+   * @return whether the reader thread is running
+   */
+  public boolean isReaderRunning() {
+    return isReaderRunning && !isInterrupted();
+  }
+
+  /**
+   * @param readerRunning the readerRunning to set
+   */
+  public void setReaderRunning(boolean readerRunning) {
+    this.isReaderRunning = readerRunning;
+  }
+
+  /**
+   * Holds a batch of WAL entries to replicate, along with some statistics
+   *
+   */
+  static class WALEntryBatch {
+    private List<Entry> walEntries;
+    // last WAL that was read
+    private Path lastWalPath;
+    // position in WAL of last entry in this batch
+    private long lastWalPosition = 0;
+    // number of distinct row keys in this batch
+    private int nbRowKeys = 0;
+    // number of HFiles
+    private int nbHFiles = 0;
+    // heap size of data we need to replicate
+    private long heapSize = 0;
+    // save the last sequenceid for each region if the table has serial-replication scope
+    private Map<String, Long> lastSeqIds = new HashMap<>();
+
+    /**
+     * @param walEntries
+     * @param lastWalPath Path of the WAL the last entry in this batch was read from
+     * @param lastWalPosition Position in the WAL the last entry in this batch was read from
+     */
+    private WALEntryBatch(int maxNbEntries, Path lastWalPath) {
+      this.walEntries = new ArrayList<>(maxNbEntries);
+      this.lastWalPath = lastWalPath;
+    }
+
+    public void addEntry(Entry entry) {
+      walEntries.add(entry);
+    }
+
+    /**
+     * @return the WAL Entries.
+     */
+    public List<Entry> getWalEntries() {
+      return walEntries;
+    }
+
+    /**
+     * @return the path of the last WAL that was read.
+     */
+    public Path getLastWalPath() {
+      return lastWalPath;
+    }
+
+    /**
+     * @return the position in the last WAL that was read.
+     */
+    public long getLastWalPosition() {
+      return lastWalPosition;
+    }
+
+    public int getNbEntries() {
+      return walEntries.size();
+    }
+
+    /**
+     * @return the number of distinct row keys in this batch
+     */
+    public int getNbRowKeys() {
+      return nbRowKeys;
+    }
+
+    /**
+     * @return the number of HFiles in this batch
+     */
+    public int getNbHFiles() {
+      return nbHFiles;
+    }
+
+    /**
+     * @return total number of operations in this batch
+     */
+    public int getNbOperations() {
+      return getNbRowKeys() + getNbHFiles();
+    }
+
+    /**
+     * @return the heap size of this batch
+     */
+    public long getHeapSize() {
+      return heapSize;
+    }
+
+    /**
+     * @return the last sequenceid for each region if the table has serial-replication scope
+     */
+    public Map<String, Long> getLastSeqIds() {
+      return lastSeqIds;
+    }
+
+    private void incrementNbRowKeys(int increment) {
+      nbRowKeys += increment;
+    }
+
+    private void incrementNbHFiles(int increment) {
+      nbHFiles += increment;
+    }
+
+    private void incrementHeapSize(long increment) {
+      heapSize += increment;
+    }
+
+    private void setLastPosition(String region, Long sequenceId) {
+      getLastSeqIds().put(region, sequenceId);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3cf44332/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationWALReaderManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationWALReaderManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationWALReaderManager.java
deleted file mode 100644
index 3558d08..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationWALReaderManager.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
-import org.apache.hadoop.hbase.wal.WAL.Reader;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALFactory;
-
-import java.io.IOException;
-
-/**
- * Wrapper class around WAL to help manage the implementation details
- * such as compression.
- */
-@InterfaceAudience.Private
-public class ReplicationWALReaderManager {
-
-  private static final Log LOG = LogFactory.getLog(ReplicationWALReaderManager.class);
-  private final FileSystem fs;
-  private final Configuration conf;
-  private long position = 0;
-  private Reader reader;
-  private Path lastPath;
-
-  /**
-   * Creates the helper but doesn't open any file
-   * Use setInitialPosition after using the constructor if some content needs to be skipped
-   * @param fs
-   * @param conf
-   */
-  public ReplicationWALReaderManager(FileSystem fs, Configuration conf) {
-    this.fs = fs;
-    this.conf = conf;
-  }
-
-  /**
-   * Opens the file at the current position
-   * @param path
-   * @return an WAL reader.
-   * @throws IOException
-   */
-  public Reader openReader(Path path) throws IOException {
-    // Detect if this is a new file, if so get a new reader else
-    // reset the current reader so that we see the new data
-    if (this.reader == null || !this.lastPath.equals(path)) {
-      this.closeReader();
-      this.reader = WALFactory.createReader(this.fs, path, this.conf);
-      this.lastPath = path;
-    } else {
-      try {
-        this.reader.reset();
-      } catch (NullPointerException npe) {
-        throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
-      }
-    }
-    return this.reader;
-  }
-
-  /**
-   * Get the next entry, returned and also added in the array
-   * @return a new entry or null
-   * @throws IOException
-   */
-  public Entry readNextAndSetPosition() throws IOException {
-    Entry entry = this.reader.next();
-    // Store the position so that in the future the reader can start
-    // reading from here. If the above call to next() throws an
-    // exception, the position won't be changed and retry will happen
-    // from the last known good position
-    this.position = this.reader.getPosition();
-    // We need to set the CC to null else it will be compressed when sent to the sink
-    if (entry != null) {
-      entry.setCompressionContext(null);
-    }
-    return entry;
-  }
-
-  /**
-   * Advance the reader to the current position
-   * @throws IOException
-   */
-  public void seek() throws IOException {
-    if (this.position != 0) {
-      this.reader.seek(this.position);
-    }
-  }
-
-  /**
-   * Get the position that we stopped reading at
-   * @return current position, cannot be negative
-   */
-  public long getPosition() {
-    return this.position;
-  }
-
-  public void setPosition(long pos) {
-    this.position = pos;
-  }
-
-  public long currentTrailerSize() {
-    long size = -1L;
-    if (reader instanceof ProtobufLogReader) {
-      final ProtobufLogReader pblr = (ProtobufLogReader)reader;
-      size = pblr.trailerSize();
-    }
-    return size;
-  }
-
-  /**
-   * Close the current reader
-   * @throws IOException
-   */
-  public void closeReader() throws IOException {
-    if (this.reader != null) {
-      this.reader.close();
-      this.reader = null;
-    }
-  }
-
-  /**
-   * Tell the helper to reset internal state
-   */
-  void finishCurrentFile() {
-    this.position = 0;
-    try {
-      this.closeReader();
-    } catch (IOException e) {
-      LOG.warn("Unable to close reader", e);
-    }
-  }
-
-}