You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2021/03/02 20:23:34 UTC

[hbase] branch branch-2.4 updated: Revert "HBASE-25596: Fix NPE and avoid permanent unreplicated data due to EOF (#2990)"

This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 65999a9  Revert "HBASE-25596: Fix NPE and avoid permanent unreplicated data due to EOF (#2990)"
65999a9 is described below

commit 65999a902720ca8755cced2c41d21776182078d8
Author: Andrew Purtell <ap...@apache.org>
AuthorDate: Tue Mar 2 12:20:05 2021 -0800

    Revert "HBASE-25596: Fix NPE and avoid permanent unreplicated data due to EOF (#2990)"
    
    This reverts commit 9590080a2132014990efd8da450e734ae99c749e.
---
 .../regionserver/ReplicationSource.java            |   8 +-
 .../regionserver/ReplicationSourceWALReader.java   | 152 ++++-------
 .../SerialReplicationSourceWALReader.java          |   4 +-
 .../replication/regionserver/WALEntryBatch.java    |   4 -
 .../replication/regionserver/WALEntryStream.java   |   6 +-
 .../hbase/replication/TestReplicationBase.java     |  67 ++---
 .../TestReplicationEmptyWALRecovery.java           | 298 ++-------------------
 .../regionserver/TestWALEntryStream.java           |  62 +----
 8 files changed, 98 insertions(+), 503 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 8c7f0a6..e654a5b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -29,12 +29,12 @@ import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Predicate;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 
 /**
@@ -263,11 +264,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
     }
   }
 
-  @InterfaceAudience.Private
-  public Map<String, PriorityBlockingQueue<Path>> getQueues() {
-    return logQueue.getQueues();
-  }
-
   @Override
   public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
       throws ReplicationException {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 57c0a16..f52a83a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -41,6 +41,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
 
@@ -122,64 +123,44 @@ class ReplicationSourceWALReader extends Thread {
   @Override
   public void run() {
     int sleepMultiplier = 1;
-    WALEntryBatch batch = null;
-    WALEntryStream entryStream = null;
-    try {
-      // we only loop back here if something fatal happened to our stream
-      while (isReaderRunning()) {
-        try {
-          entryStream =
-            new WALEntryStream(logQueue, conf, currentPosition, source.getWALFileLengthProvider(),
-              source.getServerWALsBelongTo(), source.getSourceMetrics(), walGroupId);
-          while (isReaderRunning()) { // loop here to keep reusing stream while we can
-            if (!source.isPeerEnabled()) {
-              Threads.sleep(sleepForRetries);
-              continue;
-            }
-            if (!checkQuota()) {
-              continue;
-            }
-
-            batch = createBatch(entryStream);
-            batch = readWALEntries(entryStream, batch);
-            currentPosition = entryStream.getPosition();
-            if (batch == null) {
-              // either the queue have no WAL to read
-              // or got no new entries (didn't advance position in WAL)
-              handleEmptyWALEntryBatch();
-              entryStream.reset(); // reuse stream
-            } else {
-              addBatchToShippingQueue(batch);
-            }
+    while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
+      try (WALEntryStream entryStream =
+          new WALEntryStream(logQueue, conf, currentPosition,
+              source.getWALFileLengthProvider(), source.getServerWALsBelongTo(),
+              source.getSourceMetrics(), walGroupId)) {
+        while (isReaderRunning()) { // loop here to keep reusing stream while we can
+          if (!source.isPeerEnabled()) {
+            Threads.sleep(sleepForRetries);
+            continue;
+          }
+          if (!checkQuota()) {
+            continue;
           }
-        } catch (IOException e) { // stream related
-          if (handleEofException(e, batch)) {
+          WALEntryBatch batch = readWALEntries(entryStream);
+          currentPosition = entryStream.getPosition();
+          if (batch != null) {
+            // need to propagate the batch even it has no entries since it may carry the last
+            // sequence id information for serial replication.
+            LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
+            entryBatchQueue.put(batch);
             sleepMultiplier = 1;
-          } else {
-            LOG.warn("Failed to read stream of replication entries", e);
-            if (sleepMultiplier < maxRetriesMultiplier) {
-              sleepMultiplier++;
-            }
-            Threads.sleep(sleepForRetries * sleepMultiplier);
+          } else { // got no entries and didn't advance position in WAL
+            handleEmptyWALEntryBatch(entryStream.getCurrentPath());
+            entryStream.reset(); // reuse stream
           }
-        } catch (InterruptedException e) {
-          LOG.trace("Interrupted while sleeping between WAL reads");
-          Thread.currentThread().interrupt();
-        } finally {
-          entryStream.close();
         }
+      } catch (IOException e) { // stream related
+        if (!handleEofException(e)) {
+          LOG.warn("Failed to read stream of replication entries", e);
+          if (sleepMultiplier < maxRetriesMultiplier) {
+            sleepMultiplier ++;
+          }
+          Threads.sleep(sleepForRetries * sleepMultiplier);
+        }
+      } catch (InterruptedException e) {
+        LOG.trace("Interrupted while sleeping between WAL reads");
+        Thread.currentThread().interrupt();
       }
-    } catch (IOException e) {
-      if (sleepMultiplier < maxRetriesMultiplier) {
-        LOG.debug("Failed to read stream of replication entries: " + e);
-        sleepMultiplier++;
-      } else {
-        LOG.error("Failed to read stream of replication entries", e);
-      }
-      Threads.sleep(sleepForRetries * sleepMultiplier);
-    } catch (InterruptedException e) {
-      LOG.trace("Interrupted while sleeping between WAL reads");
-      Thread.currentThread().interrupt();
     }
   }
 
@@ -208,19 +189,14 @@ class ReplicationSourceWALReader extends Thread {
     return newPath == null || !path.getName().equals(newPath.getName());
   }
 
-  // We need to get the WALEntryBatch from the caller so we can add entries in there
-  // This is required in case there is any exception in while reading entries
-  // we do want to loss the existing entries in the batch
-  protected WALEntryBatch readWALEntries(WALEntryStream entryStream,
-      WALEntryBatch batch) throws IOException, InterruptedException {
+  protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
+      throws IOException, InterruptedException {
     Path currentPath = entryStream.getCurrentPath();
     if (!entryStream.hasNext()) {
       // check whether we have switched a file
       if (currentPath != null && switched(entryStream, currentPath)) {
         return WALEntryBatch.endOfFile(currentPath);
       } else {
-        // This would mean either no more files in the queue
-        // or there is no new data yet on the current wal
         return null;
       }
     }
@@ -232,7 +208,7 @@ class ReplicationSourceWALReader extends Thread {
       // when reading from the entry stream first time we will enter here
       currentPath = entryStream.getCurrentPath();
     }
-    batch.setLastWalPath(currentPath);
+    WALEntryBatch batch = createBatch(entryStream);
     for (;;) {
       Entry entry = entryStream.next();
       batch.setLastWalPosition(entryStream.getPosition());
@@ -255,12 +231,10 @@ class ReplicationSourceWALReader extends Thread {
     return batch;
   }
 
-  private void handleEmptyWALEntryBatch() throws InterruptedException {
+  private void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
     LOG.trace("Didn't read any new entries from WAL");
-    if (logQueue.getQueue(walGroupId).isEmpty()) {
-      // we're done with current queue, either this is a recovered queue, or it is the special group
-      // for a sync replication peer and the peer has been transited to DA or S state.
-      LOG.debug("Stopping the replication source wal reader");
+    if (source.isRecovered()) {
+      // we're done with queue recovery, shut ourself down
       setReaderRunning(false);
       // shuts down shipper thread immediately
       entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA);
@@ -270,38 +244,22 @@ class ReplicationSourceWALReader extends Thread {
   }
 
   /**
-   * This is to handle the EOFException from the WAL entry stream. EOFException should
-   * be handled carefully because there are chances of data loss because of never replicating
-   * the data. Thus we should always try to ship existing batch of entries here.
-   * If there was only one log in the queue before EOF, we ship the empty batch here
-   * and since reader is still active, in the next iteration of reader we will
-   * stop the reader.
-   * If there was more than one log in the queue before EOF, we ship the existing batch
-   * and reset the wal patch and position to the log with EOF, so shipper can remove
-   * logs from replication queue
+   * if we get an EOF due to a zero-length log, and there are other logs in queue
+   * (highly likely we've closed the current log), and autorecovery is
+   * enabled, then dump the log
    * @return true only the IOE can be handled
    */
-  private boolean handleEofException(IOException e, WALEntryBatch batch)
-      throws InterruptedException {
+  private boolean handleEofException(IOException e) {
     PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
     // Dump the log even if logQueue size is 1 if the source is from recovered Source
     // since we don't add current log to recovered source queue so it is safe to remove.
-    if ((e instanceof EOFException || e.getCause() instanceof EOFException)
-      && (source.isRecovered() || queue.size() > 1)
-      && this.eofAutoRecovery) {
-      Path head = queue.peek();
+    if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
+      (source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) {
       try {
-        if (fs.getFileStatus(head).getLen() == 0) {
-          // head of the queue is an empty log file
-          LOG.warn("Forcing removal of 0 length log in queue: {}", head);
+        if (fs.getFileStatus(queue.peek()).getLen() == 0) {
+          LOG.warn("Forcing removal of 0 length log in queue: {}", queue.peek());
           logQueue.remove(walGroupId);
           currentPosition = 0;
-          // After we removed the WAL from the queue, we should
-          // try shipping the existing batch of entries and set the wal position
-          // and path to the wal just dequeued to correctly remove logs from the zk
-          batch.setLastWalPath(head);
-          batch.setLastWalPosition(currentPosition);
-          addBatchToShippingQueue(batch);
           return true;
         }
       } catch (IOException ioe) {
@@ -311,20 +269,6 @@ class ReplicationSourceWALReader extends Thread {
     return false;
   }
 
-  /**
-   * Update the batch try to ship and return true if shipped
-   * @param batch Batch of entries to ship
-   * @throws InterruptedException throws interrupted exception
-   * @throws IOException throws io exception from stream
-   */
-  private void addBatchToShippingQueue(WALEntryBatch batch)
-    throws InterruptedException, IOException {
-    // need to propagate the batch even it has no entries since it may carry the last
-    // sequence id information for serial replication.
-    LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
-    entryBatchQueue.put(batch);
-  }
-
   public Path getCurrentPath() {
     // if we've read some WAL entries, get the Path we read from
     WALEntryBatch batchQueueHead = entryBatchQueue.peek();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
index 254dc4a..d0e76fb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
@@ -50,7 +50,7 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader
   }
 
   @Override
-  protected WALEntryBatch readWALEntries(WALEntryStream entryStream, WALEntryBatch batch)
+  protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
       throws IOException, InterruptedException {
     Path currentPath = entryStream.getCurrentPath();
     if (!entryStream.hasNext()) {
@@ -70,7 +70,7 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader
       currentPath = entryStream.getCurrentPath();
     }
     long positionBefore = entryStream.getPosition();
-    batch = createBatch(entryStream);
+    WALEntryBatch batch = createBatch(entryStream);
     for (;;) {
       Entry entry = entryStream.peek();
       boolean doFiltering = true;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
index 8301dff..4f96c96 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
@@ -94,10 +94,6 @@ class WALEntryBatch {
     return lastWalPath;
   }
 
-  public void setLastWalPath(Path lastWalPath) {
-    this.lastWalPath = lastWalPath;
-  }
-
   /**
    * @return the position in the last WAL that was read.
    */
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index 721a122..5b8f057 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -80,7 +80,7 @@ class WALEntryStream implements Closeable {
    * @param walFileLengthProvider provides the length of the WAL file
    * @param serverName the server name which all WALs belong to
    * @param metrics the replication metrics
-   * @throws IOException throw IO exception from stream
+   * @throws IOException
    */
   public WALEntryStream(ReplicationSourceLogQueue logQueue, Configuration conf,
       long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
@@ -368,9 +368,7 @@ class WALEntryStream implements Closeable {
       handleFileNotFound(path, fnfe);
     }  catch (RemoteException re) {
       IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
-      if (!(ioe instanceof FileNotFoundException)) {
-        throw ioe;
-      }
+      if (!(ioe instanceof FileNotFoundException)) throw ioe;
       handleFileNotFound(path, (FileNotFoundException)ioe);
     } catch (LeaseNotRecoveredException lnre) {
       // HBASE-15019 the WAL was not closed due to some hiccup.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index 13b8081..b017a89 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -20,10 +20,10 @@ package org.apache.hadoop.hbase.replication;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -44,10 +44,8 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
-import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
-import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -56,9 +54,6 @@ import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
-
 /**
  * This class is only a base for other integration-level replication tests.
  * Do not add tests here.
@@ -67,8 +62,7 @@ import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
  */
 public class TestReplicationBase {
   private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class);
-  private static Connection connection1;
-  private static Connection connection2;
+
   protected static Configuration CONF_WITH_LOCALFS;
 
   protected static ReplicationAdmin admin;
@@ -89,8 +83,6 @@ public class TestReplicationBase {
       NB_ROWS_IN_BATCH * 10;
   protected static final long SLEEP_TIME = 500;
   protected static final int NB_RETRIES = 50;
-  protected static AtomicInteger replicateCount = new AtomicInteger();
-  protected static volatile List<WAL.Entry> replicatedEntries = Lists.newArrayList();
 
   protected static final TableName tableName = TableName.valueOf("test");
   protected static final byte[] famName = Bytes.toBytes("f");
@@ -245,26 +237,26 @@ public class TestReplicationBase {
     // as a component in deciding maximum number of parallel batches to send to the peer cluster.
     UTIL2.startMiniCluster(NUM_SLAVES2);
 
-    connection1 = ConnectionFactory.createConnection(CONF1);
-    connection2 = ConnectionFactory.createConnection(CONF2);
-    hbaseAdmin = connection1.getAdmin();
+    admin = new ReplicationAdmin(CONF1);
+    hbaseAdmin = ConnectionFactory.createConnection(CONF1).getAdmin();
 
     TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
         .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
             .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
         .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
 
-    try (
-      Admin admin1 = connection1.getAdmin();
-      Admin admin2 = connection2.getAdmin()) {
+    Connection connection1 = ConnectionFactory.createConnection(CONF1);
+    Connection connection2 = ConnectionFactory.createConnection(CONF2);
+    try (Admin admin1 = connection1.getAdmin()) {
       admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
+    }
+    try (Admin admin2 = connection2.getAdmin()) {
       admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
-      UTIL1.waitUntilAllRegionsAssigned(tableName);
-      htable1 = connection1.getTable(tableName);
-      UTIL2.waitUntilAllRegionsAssigned(tableName);
-      htable2 = connection2.getTable(tableName);
     }
-
+    UTIL1.waitUntilAllRegionsAssigned(tableName);
+    UTIL2.waitUntilAllRegionsAssigned(tableName);
+    htable1 = connection1.getTable(tableName);
+    htable2 = connection2.getTable(tableName);
   }
 
   @BeforeClass
@@ -280,10 +272,9 @@ public class TestReplicationBase {
   @Before
   public void setUpBase() throws Exception {
     if (!peerExist(PEER_ID2)) {
-      ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
-        .setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer()).setReplicationEndpointImpl(
-          ReplicationEndpointTest.class.getName());
-      hbaseAdmin.addReplicationPeer(PEER_ID2, builder.build());
+      ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
+          .setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer()).build();
+      hbaseAdmin.addReplicationPeer(PEER_ID2, rpc);
     }
   }
 
@@ -359,33 +350,7 @@ public class TestReplicationBase {
     if (admin != null) {
       admin.close();
     }
-    if (hbaseAdmin != null) {
-      hbaseAdmin.close();
-    }
-
-    if (connection2 != null) {
-      connection2.close();
-    }
-    if (connection1 != null) {
-      connection1.close();
-    }
     UTIL2.shutdownMiniCluster();
     UTIL1.shutdownMiniCluster();
   }
-
-  /**
-   * Custom replication endpoint to keep track of replication status for tests.
-   */
-  public static class ReplicationEndpointTest extends HBaseInterClusterReplicationEndpoint {
-    public ReplicationEndpointTest() {
-      replicateCount.set(0);
-    }
-
-    @Override public boolean replicate(ReplicateContext replicateContext) {
-      replicateCount.incrementAndGet();
-      replicatedEntries.addAll(replicateContext.getEntries());
-
-      return super.replicate(replicateContext);
-    }
-  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
index 2d72618..c0f22a9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
@@ -6,7 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,99 +20,56 @@ package org.apache.hadoop.hbase.replication;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.NavigableMap;
-import java.util.TreeMap;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
 import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.hadoop.hbase.wal.WALKeyImpl;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Category
-  ({ ReplicationTests.class, LargeTests.class }) public class TestReplicationEmptyWALRecovery
-  extends TestReplicationBase {
-  MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
-  static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
-  NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
 
-  @ClassRule public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestReplicationEmptyWALRecovery.class);
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestReplicationEmptyWALRecovery.class);
 
   @Before
   public void setUp() throws IOException, InterruptedException {
     cleanUp();
-    scopes.put(famName, HConstants.REPLICATION_SCOPE_GLOBAL);
-    replicateCount.set(0);
-    replicatedEntries.clear();
   }
 
   /**
    * Waits until there is only one log(the current writing one) in the replication queue
-   *
-   * @param numRs number of region servers
+   * @param numRs number of regionservers
    */
-  private void waitForLogAdvance(int numRs) {
-    Waiter.waitFor(CONF1, 100000, new Waiter.Predicate<Exception>() {
+  private void waitForLogAdvance(int numRs) throws Exception {
+    Waiter.waitFor(CONF1, 10000, new Waiter.Predicate<Exception>() {
       @Override
       public boolean evaluate() throws Exception {
         for (int i = 0; i < numRs; i++) {
           HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i);
           RegionInfo regionInfo =
-            UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
+              UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
           WAL wal = hrs.getWAL(regionInfo);
           Path currentFile = ((AbstractFSWAL<?>) wal).getCurrentFileName();
-          Replication replicationService =
-            (Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
-          for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
-            .getSources()) {
-            ReplicationSource source = (ReplicationSource) rsi;
-            // We are making sure that there is only one log queue and that is for the
-            // current WAL of region server
-            String logPrefix = source.getQueues().keySet().stream().findFirst().get();
-            if (!currentFile.equals(source.getCurrentPath())
-              || source.getQueues().keySet().size() != 1
-              || source.getQueues().get(logPrefix).size() != 1) {
-              return false;
-            }
-          }
-        }
-        return true;
-      }
-    });
-  }
-
-  private void verifyNumberOfLogsInQueue(int numQueues, int numRs) {
-    Waiter.waitFor(CONF1, 10000, new Waiter.Predicate<Exception>() {
-      @Override
-      public boolean evaluate() {
-        for (int i = 0; i < numRs; i++) {
-          Replication replicationService =
-            (Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
+          Replication replicationService = (Replication) UTIL1.getHBaseCluster()
+              .getRegionServer(i).getReplicationSourceService();
           for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
-            .getSources()) {
+              .getSources()) {
             ReplicationSource source = (ReplicationSource) rsi;
-            String logPrefix = source.getQueues().keySet().stream().findFirst().get();
-            if (source.getQueues().get(logPrefix).size() != numQueues) {
+            if (!currentFile.equals(source.getCurrentPath())) {
               return false;
             }
           }
@@ -123,211 +82,25 @@ import org.junit.experimental.categories.Category;
   @Test
   public void testEmptyWALRecovery() throws Exception {
     final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
-    // for each RS, create an empty wal with same walGroupId
-    final List<Path> emptyWalPaths = new ArrayList<>();
-    long ts = System.currentTimeMillis();
-    for (int i = 0; i < numRs; i++) {
-      RegionInfo regionInfo =
-        UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
-      WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
-      Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
-      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
-      Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
-      UTIL1.getTestFileSystem().create(emptyWalPath).close();
-      emptyWalPaths.add(emptyWalPath);
-    }
 
-    injectEmptyWAL(numRs, emptyWalPaths);
-
-    // ReplicationSource should advance past the empty wal, or else the test will fail
-    waitForLogAdvance(numRs);
-    verifyNumberOfLogsInQueue(1, numRs);
-    // we're now writing to the new wal
-    // if everything works, the source should've stopped reading from the empty wal, and start
-    // replicating from the new wal
-    runSimplePutDeleteTest();
-    rollWalsAndWaitForDeque(numRs);
-  }
-
-  /**
-   * Test empty WAL along with non empty WALs in the same batch. This test is to make sure
-   * when we see the empty and handle the EOF exception, we are able to existing the previous
-   * batch of entries without loosing it. This test also tests the number of batches shipped
-   *
-   * @throws Exception throws any exception
-   */
-  @Test
-  public void testReplicationOfEmptyWALFollowingNonEmptyWAL() throws Exception {
-    // Disable the replication peer to accumulate the non empty WAL followed by empty WAL
-    hbaseAdmin.disableReplicationPeer(PEER_ID2);
-    int numOfEntriesToReplicate = 20;
-
-    final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
     // for each RS, create an empty wal with same walGroupId
     final List<Path> emptyWalPaths = new ArrayList<>();
     long ts = System.currentTimeMillis();
     for (int i = 0; i < numRs; i++) {
       RegionInfo regionInfo =
-        UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
+          UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
       WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
       Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
-
-      appendEntriesToWal(numOfEntriesToReplicate, wal);
-      wal.rollWriter();
-      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
-      Path emptyWalPath = new Path(UTIL1.getDefaultRootDirPath(), walGroupId + "." + ts);
-      UTIL1.getTestFileSystem().create(emptyWalPath).close();
-      emptyWalPaths.add(emptyWalPath);
-    }
-
-    injectEmptyWAL(numRs, emptyWalPaths);
-    // There should be three WALs in queue
-    // 1. empty WAL
-    // 2. non empty WAL
-    // 3. live WAL
-    //verifyNumberOfLogsInQueue(3, numRs);
-    hbaseAdmin.enableReplicationPeer(PEER_ID2);
-    // ReplicationSource should advance past the empty wal, or else the test will fail
-    waitForLogAdvance(numRs);
-
-    // Now we should expect numOfEntriesToReplicate entries
-    // replicated from each region server. This makes sure we didn't loose data
-    // from any previous batch when we encounter EOF exception for empty file.
-    Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs,
-      replicatedEntries.size());
-
-    // We expect just one batch of replication which will
-    // be from when we handle the EOF exception.
-    Assert.assertEquals("Replicated batches are not correct", 1, replicateCount.intValue());
-    verifyNumberOfLogsInQueue(1, numRs);
-    // we're now writing to the new wal
-    // if everything works, the source should've stopped reading from the empty wal, and start
-    // replicating from the new wal
-    runSimplePutDeleteTest();
-    rollWalsAndWaitForDeque(numRs);
-  }
-
-  /**
-   * Test empty WAL along with non empty WALs in the same batch. This test is to make sure
-   * when we see the empty WAL and handle the EOF exception, we are able to proceed
-   * with next batch and replicate it properly without missing data.
-   *
-   * @throws Exception throws any exception
-   */
-  @Test
-  public void testReplicationOfEmptyWALFollowedByNonEmptyWAL() throws Exception {
-    // Disable the replication peer to accumulate the non empty WAL followed by empty WAL
-    hbaseAdmin.disableReplicationPeer(PEER_ID2);
-    int numOfEntriesToReplicate = 20;
-
-    final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
-    // for each RS, create an empty wal with same walGroupId
-    final List<Path> emptyWalPaths = new ArrayList<>();
-
-    long ts = System.currentTimeMillis();
-    WAL wal = null;
-    for (int i = 0; i < numRs; i++) {
-      RegionInfo regionInfo =
-        UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
-      wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
-      Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
-      appendEntriesToWal(numOfEntriesToReplicate, wal);
       String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
       Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
       UTIL1.getTestFileSystem().create(emptyWalPath).close();
       emptyWalPaths.add(emptyWalPath);
-
     }
-    injectEmptyWAL(numRs, emptyWalPaths);
-    // roll the WAL now
-    for (int i = 0; i < numRs; i++) {
-      wal.rollWriter();
-    }
-    hbaseAdmin.enableReplicationPeer(PEER_ID2);
-    // ReplicationSource should advance past the empty wal, or else the test will fail
-    waitForLogAdvance(numRs);
-
-    // Now we should expect numOfEntriesToReplicate entries
-    // replicated from each region server. This makes sure we didn't loose data
-    // from any previous batch when we encounter EOF exception for empty file.
-    Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs,
-      replicatedEntries.size());
-
-    // We expect just one batch of replication to be shipped which will
-    // for non empty WAL
-    Assert.assertEquals("Replicated batches are not correct", 1, replicateCount.get());
-    verifyNumberOfLogsInQueue(1, numRs);
-    // we're now writing to the new wal
-    // if everything works, the source should've stopped reading from the empty wal, and start
-    // replicating from the new wal
-    runSimplePutDeleteTest();
-    rollWalsAndWaitForDeque(numRs);
-  }
-
-  /**
-   * This test make sure we replicate all the enties from the non empty WALs which
-   * are surrounding the empty WALs
-   *
-   * @throws Exception throws exception
-   */
-  @Test
-  public void testReplicationOfEmptyWALSurroundedNonEmptyWAL() throws Exception {
-    // Disable the replication peer to accumulate the non empty WAL followed by empty WAL
-    hbaseAdmin.disableReplicationPeer(PEER_ID2);
-    int numOfEntriesToReplicate = 20;
-
-    final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
-    // for each RS, create an empty wal with same walGroupId
-    final List<Path> emptyWalPaths = new ArrayList<>();
 
-    long ts = System.currentTimeMillis();
-    WAL wal = null;
-    for (int i = 0; i < numRs; i++) {
-      RegionInfo regionInfo =
-        UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
-      wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
-      Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
-      appendEntriesToWal(numOfEntriesToReplicate, wal);
-      wal.rollWriter();
-      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
-      Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
-      UTIL1.getTestFileSystem().create(emptyWalPath).close();
-      emptyWalPaths.add(emptyWalPath);
-    }
-    injectEmptyWAL(numRs, emptyWalPaths);
-
-    // roll the WAL again with some entries
-    for (int i = 0; i < numRs; i++) {
-      appendEntriesToWal(numOfEntriesToReplicate, wal);
-      wal.rollWriter();
-    }
-
-    hbaseAdmin.enableReplicationPeer(PEER_ID2);
-    // ReplicationSource should advance past the empty wal, or else the test will fail
-    waitForLogAdvance(numRs);
-
-    // Now we should expect numOfEntriesToReplicate entries
-    // replicated from each region server. This makes sure we didn't loose data
-    // from any previous batch when we encounter EOF exception for empty file.
-    Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs * 2,
-      replicatedEntries.size());
-
-    // We expect two batch of replication to be shipped which will
-    // for non empty WAL
-    Assert.assertEquals("Replicated batches are not correct", 2, replicateCount.get());
-    verifyNumberOfLogsInQueue(1, numRs);
-    // we're now writing to the new wal
-    // if everything works, the source should've stopped reading from the empty wal, and start
-    // replicating from the new wal
-    runSimplePutDeleteTest();
-    rollWalsAndWaitForDeque(numRs);
-  }
-
-  // inject our empty wal into the replication queue, and then roll the original wal, which
-  // enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to
-  // determine if the file being replicated currently is still opened for write, so just inject a
-  // new wal to the replication queue does not mean the previous file is closed.
-  private void injectEmptyWAL(int numRs, List<Path> emptyWalPaths) throws IOException {
+    // inject our empty wal into the replication queue, and then roll the original wal, which
+    // enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to
+    // determine if the file being replicated currently is still opened for write, so just inject a
+    // new wal to the replication queue does not mean the previous file is closed.
     for (int i = 0; i < numRs; i++) {
       HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i);
       Replication replicationService = (Replication) hrs.getReplicationSourceService();
@@ -338,32 +111,13 @@ import org.junit.experimental.categories.Category;
       WAL wal = hrs.getWAL(regionInfo);
       wal.rollWriter(true);
     }
-  }
-
-  protected WALKeyImpl getWalKeyImpl() {
-    return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 0, mvcc, scopes);
-  }
 
-  // Roll the WAL and wait for it to get deque from the log queue
-  private void rollWalsAndWaitForDeque(int numRs) throws IOException {
-    RegionInfo regionInfo =
-      UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
-    for (int i = 0; i < numRs; i++) {
-      WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
-      wal.rollWriter();
-    }
+    // ReplicationSource should advance past the empty wal, or else the test will fail
     waitForLogAdvance(numRs);
-  }
 
-  private void appendEntriesToWal(int numEntries, WAL wal) throws IOException {
-    long txId = -1;
-    for (int i = 0; i < numEntries; i++) {
-      byte[] b = Bytes.toBytes(Integer.toString(i));
-      KeyValue kv = new KeyValue(b, famName, b);
-      WALEdit edit = new WALEdit();
-      edit.add(kv);
-      txId = wal.appendData(info, getWalKeyImpl(), edit);
-    }
-    wal.sync(txId);
+    // we're now writing to the new wal
+    // if everything works, the source should've stopped reading from the empty wal, and start
+    // replicating from the new wal
+    runSimplePutDeleteTest();
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index d31b864..9c6fafc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -27,9 +27,9 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -69,7 +69,6 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
-import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -84,6 +83,7 @@ import org.junit.rules.TestName;
 import org.mockito.Mockito;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
 
+
 @Category({ ReplicationTests.class, LargeTests.class })
 public class TestWALEntryStream {
 
@@ -687,7 +687,6 @@ public class TestWALEntryStream {
     // Override the max retries multiplier to fail fast.
     conf.setInt("replication.source.maxretriesmultiplier", 1);
     conf.setBoolean("replication.source.eof.autorecovery", true);
-    conf.setInt("replication.source.nb.batches", 10);
     // Create a reader thread with source as recovered source.
     ReplicationSource source = mockReplicationSource(true, conf);
     when(source.isPeerEnabled()).thenReturn(true);
@@ -706,64 +705,7 @@ public class TestWALEntryStream {
     assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId));
   }
 
-  @Test
-  public void testEOFExceptionForRecoveredQueueWithMultipleLogs() throws Exception {
-    Configuration conf = new Configuration(CONF);
-    MetricsSource metrics = mock(MetricsSource.class);
-    ReplicationSource source = mockReplicationSource(true, conf);
-    ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source);
-    // Create a 0 length log.
-    Path emptyLog = new Path(fs.getHomeDirectory(),"log.2");
-    FSDataOutputStream fsdos = fs.create(emptyLog);
-    fsdos.close();
-    assertEquals(0, fs.getFileStatus(emptyLog).getLen());
-    localLogQueue.enqueueLog(emptyLog, fakeWalGroupId);
-
-    final Path log1 = new Path(fs.getHomeDirectory(), "log.1");
-    WALProvider.Writer writer1 = WALFactory.createWALWriter(fs, log1, TEST_UTIL.getConfiguration());
-    appendEntries(writer1, 3);
-    localLogQueue.enqueueLog(log1, fakeWalGroupId);
-
-    ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
-    // Make it look like the source is from recovered source.
-    when(mockSourceManager.getOldSources())
-      .thenReturn(new ArrayList<>(Arrays.asList((ReplicationSourceInterface)source)));
-    when(source.isPeerEnabled()).thenReturn(true);
-    when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
-    // Override the max retries multiplier to fail fast.
-    conf.setInt("replication.source.maxretriesmultiplier", 1);
-    conf.setBoolean("replication.source.eof.autorecovery", true);
-    conf.setInt("replication.source.nb.batches", 10);
-    // Create a reader thread.
-    ReplicationSourceWALReader reader =
-      new ReplicationSourceWALReader(fs, conf, localLogQueue, 0,
-        getDummyFilter(), source, fakeWalGroupId);
-    assertEquals("Initial log queue size is not correct",
-      2, localLogQueue.getQueueSize(fakeWalGroupId));
-    reader.run();
-
-    // remove empty log from logQueue.
-    assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId));
-    assertEquals("Log queue should be empty", 0, localLogQueue.getQueueSize(fakeWalGroupId));
-  }
-
   private PriorityBlockingQueue<Path> getQueue() {
     return logQueue.getQueue(fakeWalGroupId);
   }
-
-  private void appendEntries(WALProvider.Writer writer, int numEntries) throws IOException {
-    for (int i = 0; i < numEntries; i++) {
-      byte[] b = Bytes.toBytes(Integer.toString(i));
-      KeyValue kv = new KeyValue(b,b,b);
-      WALEdit edit = new WALEdit();
-      edit.add(kv);
-      WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0,
-        HConstants.DEFAULT_CLUSTER_ID);
-      NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
-      scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
-      writer.append(new WAL.Entry(key, edit));
-      writer.sync(false);
-    }
-    writer.close();
-  }
 }