You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2021/02/25 21:40:42 UTC

[hbase] branch branch-2.4 updated: HBASE-25596: Fix NPE and avoid permanent unreplicated data due to EOF (#2990)

This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 9590080  HBASE-25596: Fix NPE and avoid permanent unreplicated data due to EOF (#2990)
9590080 is described below

commit 9590080a2132014990efd8da450e734ae99c749e
Author: Sandeep Pal <50...@users.noreply.github.com>
AuthorDate: Thu Feb 25 13:36:31 2021 -0800

    HBASE-25596: Fix NPE and avoid permanent unreplicated data due to EOF (#2990)
    
    Signed-off-by: Xu Cang <xu...@apache.org>
    Signed-off-by: Andrew Purtell <ap...@apache.org>
    
    Conflicts:
    	hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
---
 .../regionserver/ReplicationSource.java            |   8 +-
 .../regionserver/ReplicationSourceWALReader.java   | 152 +++++++----
 .../SerialReplicationSourceWALReader.java          |   4 +-
 .../replication/regionserver/WALEntryBatch.java    |   4 +
 .../replication/regionserver/WALEntryStream.java   |   6 +-
 .../hbase/replication/TestReplicationBase.java     |  67 +++--
 .../TestReplicationEmptyWALRecovery.java           | 298 +++++++++++++++++++--
 .../regionserver/TestWALEntryStream.java           |  62 ++++-
 8 files changed, 503 insertions(+), 98 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 063b3d4..42757b4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -30,12 +30,12 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Predicate;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -66,7 +66,6 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 
 /**
@@ -265,6 +264,11 @@ public class ReplicationSource implements ReplicationSourceInterface {
     }
   }
 
+  @InterfaceAudience.Private
+  public Map<String, PriorityBlockingQueue<Path>> getQueues() {
+    return logQueue.getQueues();
+  }
+
   @Override
   public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
       throws ReplicationException {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index f52a83a..57c0a16 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -41,7 +41,6 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
 
@@ -123,44 +122,64 @@ class ReplicationSourceWALReader extends Thread {
   @Override
   public void run() {
     int sleepMultiplier = 1;
-    while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
-      try (WALEntryStream entryStream =
-          new WALEntryStream(logQueue, conf, currentPosition,
-              source.getWALFileLengthProvider(), source.getServerWALsBelongTo(),
-              source.getSourceMetrics(), walGroupId)) {
-        while (isReaderRunning()) { // loop here to keep reusing stream while we can
-          if (!source.isPeerEnabled()) {
-            Threads.sleep(sleepForRetries);
-            continue;
-          }
-          if (!checkQuota()) {
-            continue;
+    WALEntryBatch batch = null;
+    WALEntryStream entryStream = null;
+    try {
+      // we only loop back here if something fatal happened to our stream
+      while (isReaderRunning()) {
+        try {
+          entryStream =
+            new WALEntryStream(logQueue, conf, currentPosition, source.getWALFileLengthProvider(),
+              source.getServerWALsBelongTo(), source.getSourceMetrics(), walGroupId);
+          while (isReaderRunning()) { // loop here to keep reusing stream while we can
+            if (!source.isPeerEnabled()) {
+              Threads.sleep(sleepForRetries);
+              continue;
+            }
+            if (!checkQuota()) {
+              continue;
+            }
+
+            batch = createBatch(entryStream);
+            batch = readWALEntries(entryStream, batch);
+            currentPosition = entryStream.getPosition();
+            if (batch == null) {
+              // either the queue have no WAL to read
+              // or got no new entries (didn't advance position in WAL)
+              handleEmptyWALEntryBatch();
+              entryStream.reset(); // reuse stream
+            } else {
+              addBatchToShippingQueue(batch);
+            }
           }
-          WALEntryBatch batch = readWALEntries(entryStream);
-          currentPosition = entryStream.getPosition();
-          if (batch != null) {
-            // need to propagate the batch even it has no entries since it may carry the last
-            // sequence id information for serial replication.
-            LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
-            entryBatchQueue.put(batch);
+        } catch (IOException e) { // stream related
+          if (handleEofException(e, batch)) {
             sleepMultiplier = 1;
-          } else { // got no entries and didn't advance position in WAL
-            handleEmptyWALEntryBatch(entryStream.getCurrentPath());
-            entryStream.reset(); // reuse stream
+          } else {
+            LOG.warn("Failed to read stream of replication entries", e);
+            if (sleepMultiplier < maxRetriesMultiplier) {
+              sleepMultiplier++;
+            }
+            Threads.sleep(sleepForRetries * sleepMultiplier);
           }
+        } catch (InterruptedException e) {
+          LOG.trace("Interrupted while sleeping between WAL reads");
+          Thread.currentThread().interrupt();
+        } finally {
+          entryStream.close();
         }
-      } catch (IOException e) { // stream related
-        if (!handleEofException(e)) {
-          LOG.warn("Failed to read stream of replication entries", e);
-          if (sleepMultiplier < maxRetriesMultiplier) {
-            sleepMultiplier ++;
-          }
-          Threads.sleep(sleepForRetries * sleepMultiplier);
-        }
-      } catch (InterruptedException e) {
-        LOG.trace("Interrupted while sleeping between WAL reads");
-        Thread.currentThread().interrupt();
       }
+    } catch (IOException e) {
+      if (sleepMultiplier < maxRetriesMultiplier) {
+        LOG.debug("Failed to read stream of replication entries: " + e);
+        sleepMultiplier++;
+      } else {
+        LOG.error("Failed to read stream of replication entries", e);
+      }
+      Threads.sleep(sleepForRetries * sleepMultiplier);
+    } catch (InterruptedException e) {
+      LOG.trace("Interrupted while sleeping between WAL reads");
+      Thread.currentThread().interrupt();
     }
   }
 
@@ -189,14 +208,19 @@ class ReplicationSourceWALReader extends Thread {
     return newPath == null || !path.getName().equals(newPath.getName());
   }
 
-  protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
-      throws IOException, InterruptedException {
+  // We need to get the WALEntryBatch from the caller so we can add entries in there
+  // This is required in case there is any exception in while reading entries
+  // we do want to loss the existing entries in the batch
+  protected WALEntryBatch readWALEntries(WALEntryStream entryStream,
+      WALEntryBatch batch) throws IOException, InterruptedException {
     Path currentPath = entryStream.getCurrentPath();
     if (!entryStream.hasNext()) {
       // check whether we have switched a file
       if (currentPath != null && switched(entryStream, currentPath)) {
         return WALEntryBatch.endOfFile(currentPath);
       } else {
+        // This would mean either no more files in the queue
+        // or there is no new data yet on the current wal
         return null;
       }
     }
@@ -208,7 +232,7 @@ class ReplicationSourceWALReader extends Thread {
       // when reading from the entry stream first time we will enter here
       currentPath = entryStream.getCurrentPath();
     }
-    WALEntryBatch batch = createBatch(entryStream);
+    batch.setLastWalPath(currentPath);
     for (;;) {
       Entry entry = entryStream.next();
       batch.setLastWalPosition(entryStream.getPosition());
@@ -231,10 +255,12 @@ class ReplicationSourceWALReader extends Thread {
     return batch;
   }
 
-  private void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
+  private void handleEmptyWALEntryBatch() throws InterruptedException {
     LOG.trace("Didn't read any new entries from WAL");
-    if (source.isRecovered()) {
-      // we're done with queue recovery, shut ourself down
+    if (logQueue.getQueue(walGroupId).isEmpty()) {
+      // we're done with current queue, either this is a recovered queue, or it is the special group
+      // for a sync replication peer and the peer has been transited to DA or S state.
+      LOG.debug("Stopping the replication source wal reader");
       setReaderRunning(false);
       // shuts down shipper thread immediately
       entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA);
@@ -244,22 +270,38 @@ class ReplicationSourceWALReader extends Thread {
   }
 
   /**
-   * if we get an EOF due to a zero-length log, and there are other logs in queue
-   * (highly likely we've closed the current log), and autorecovery is
-   * enabled, then dump the log
+   * This is to handle the EOFException from the WAL entry stream. EOFException should
+   * be handled carefully because there are chances of data loss because of never replicating
+   * the data. Thus we should always try to ship existing batch of entries here.
+   * If there was only one log in the queue before EOF, we ship the empty batch here
+   * and since reader is still active, in the next iteration of reader we will
+   * stop the reader.
+   * If there was more than one log in the queue before EOF, we ship the existing batch
+   * and reset the wal patch and position to the log with EOF, so shipper can remove
+   * logs from replication queue
    * @return true only the IOE can be handled
    */
-  private boolean handleEofException(IOException e) {
+  private boolean handleEofException(IOException e, WALEntryBatch batch)
+      throws InterruptedException {
     PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
     // Dump the log even if logQueue size is 1 if the source is from recovered Source
     // since we don't add current log to recovered source queue so it is safe to remove.
-    if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
-      (source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) {
+    if ((e instanceof EOFException || e.getCause() instanceof EOFException)
+      && (source.isRecovered() || queue.size() > 1)
+      && this.eofAutoRecovery) {
+      Path head = queue.peek();
       try {
-        if (fs.getFileStatus(queue.peek()).getLen() == 0) {
-          LOG.warn("Forcing removal of 0 length log in queue: {}", queue.peek());
+        if (fs.getFileStatus(head).getLen() == 0) {
+          // head of the queue is an empty log file
+          LOG.warn("Forcing removal of 0 length log in queue: {}", head);
           logQueue.remove(walGroupId);
           currentPosition = 0;
+          // After we removed the WAL from the queue, we should
+          // try shipping the existing batch of entries and set the wal position
+          // and path to the wal just dequeued to correctly remove logs from the zk
+          batch.setLastWalPath(head);
+          batch.setLastWalPosition(currentPosition);
+          addBatchToShippingQueue(batch);
           return true;
         }
       } catch (IOException ioe) {
@@ -269,6 +311,20 @@ class ReplicationSourceWALReader extends Thread {
     return false;
   }
 
+  /**
+   * Update the batch try to ship and return true if shipped
+   * @param batch Batch of entries to ship
+   * @throws InterruptedException throws interrupted exception
+   * @throws IOException throws io exception from stream
+   */
+  private void addBatchToShippingQueue(WALEntryBatch batch)
+    throws InterruptedException, IOException {
+    // need to propagate the batch even it has no entries since it may carry the last
+    // sequence id information for serial replication.
+    LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
+    entryBatchQueue.put(batch);
+  }
+
   public Path getCurrentPath() {
     // if we've read some WAL entries, get the Path we read from
     WALEntryBatch batchQueueHead = entryBatchQueue.peek();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
index d0e76fb..254dc4a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
@@ -50,7 +50,7 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader
   }
 
   @Override
-  protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
+  protected WALEntryBatch readWALEntries(WALEntryStream entryStream, WALEntryBatch batch)
       throws IOException, InterruptedException {
     Path currentPath = entryStream.getCurrentPath();
     if (!entryStream.hasNext()) {
@@ -70,7 +70,7 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader
       currentPath = entryStream.getCurrentPath();
     }
     long positionBefore = entryStream.getPosition();
-    WALEntryBatch batch = createBatch(entryStream);
+    batch = createBatch(entryStream);
     for (;;) {
       Entry entry = entryStream.peek();
       boolean doFiltering = true;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
index 4f96c96..8301dff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
@@ -94,6 +94,10 @@ class WALEntryBatch {
     return lastWalPath;
   }
 
+  public void setLastWalPath(Path lastWalPath) {
+    this.lastWalPath = lastWalPath;
+  }
+
   /**
    * @return the position in the last WAL that was read.
    */
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index 5b8f057..721a122 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -80,7 +80,7 @@ class WALEntryStream implements Closeable {
    * @param walFileLengthProvider provides the length of the WAL file
    * @param serverName the server name which all WALs belong to
    * @param metrics the replication metrics
-   * @throws IOException
+   * @throws IOException throw IO exception from stream
    */
   public WALEntryStream(ReplicationSourceLogQueue logQueue, Configuration conf,
       long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
@@ -368,7 +368,9 @@ class WALEntryStream implements Closeable {
       handleFileNotFound(path, fnfe);
     }  catch (RemoteException re) {
       IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
-      if (!(ioe instanceof FileNotFoundException)) throw ioe;
+      if (!(ioe instanceof FileNotFoundException)) {
+        throw ioe;
+      }
       handleFileNotFound(path, (FileNotFoundException)ioe);
     } catch (LeaseNotRecoveredException lnre) {
       // HBASE-15019 the WAL was not closed due to some hiccup.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index b017a89..13b8081 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -20,10 +20,10 @@ package org.apache.hadoop.hbase.replication;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -44,8 +44,10 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -54,6 +56,9 @@ import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
 /**
  * This class is only a base for other integration-level replication tests.
  * Do not add tests here.
@@ -62,7 +67,8 @@ import org.slf4j.LoggerFactory;
  */
 public class TestReplicationBase {
   private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class);
-
+  private static Connection connection1;
+  private static Connection connection2;
   protected static Configuration CONF_WITH_LOCALFS;
 
   protected static ReplicationAdmin admin;
@@ -83,6 +89,8 @@ public class TestReplicationBase {
       NB_ROWS_IN_BATCH * 10;
   protected static final long SLEEP_TIME = 500;
   protected static final int NB_RETRIES = 50;
+  protected static AtomicInteger replicateCount = new AtomicInteger();
+  protected static volatile List<WAL.Entry> replicatedEntries = Lists.newArrayList();
 
   protected static final TableName tableName = TableName.valueOf("test");
   protected static final byte[] famName = Bytes.toBytes("f");
@@ -237,26 +245,26 @@ public class TestReplicationBase {
     // as a component in deciding maximum number of parallel batches to send to the peer cluster.
     UTIL2.startMiniCluster(NUM_SLAVES2);
 
-    admin = new ReplicationAdmin(CONF1);
-    hbaseAdmin = ConnectionFactory.createConnection(CONF1).getAdmin();
+    connection1 = ConnectionFactory.createConnection(CONF1);
+    connection2 = ConnectionFactory.createConnection(CONF2);
+    hbaseAdmin = connection1.getAdmin();
 
     TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
         .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
             .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
         .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
 
-    Connection connection1 = ConnectionFactory.createConnection(CONF1);
-    Connection connection2 = ConnectionFactory.createConnection(CONF2);
-    try (Admin admin1 = connection1.getAdmin()) {
+    try (
+      Admin admin1 = connection1.getAdmin();
+      Admin admin2 = connection2.getAdmin()) {
       admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
-    }
-    try (Admin admin2 = connection2.getAdmin()) {
       admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
+      UTIL1.waitUntilAllRegionsAssigned(tableName);
+      htable1 = connection1.getTable(tableName);
+      UTIL2.waitUntilAllRegionsAssigned(tableName);
+      htable2 = connection2.getTable(tableName);
     }
-    UTIL1.waitUntilAllRegionsAssigned(tableName);
-    UTIL2.waitUntilAllRegionsAssigned(tableName);
-    htable1 = connection1.getTable(tableName);
-    htable2 = connection2.getTable(tableName);
+
   }
 
   @BeforeClass
@@ -272,9 +280,10 @@ public class TestReplicationBase {
   @Before
   public void setUpBase() throws Exception {
     if (!peerExist(PEER_ID2)) {
-      ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
-          .setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer()).build();
-      hbaseAdmin.addReplicationPeer(PEER_ID2, rpc);
+      ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
+        .setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer()).setReplicationEndpointImpl(
+          ReplicationEndpointTest.class.getName());
+      hbaseAdmin.addReplicationPeer(PEER_ID2, builder.build());
     }
   }
 
@@ -350,7 +359,33 @@ public class TestReplicationBase {
     if (admin != null) {
       admin.close();
     }
+    if (hbaseAdmin != null) {
+      hbaseAdmin.close();
+    }
+
+    if (connection2 != null) {
+      connection2.close();
+    }
+    if (connection1 != null) {
+      connection1.close();
+    }
     UTIL2.shutdownMiniCluster();
     UTIL1.shutdownMiniCluster();
   }
+
+  /**
+   * Custom replication endpoint to keep track of replication status for tests.
+   */
+  public static class ReplicationEndpointTest extends HBaseInterClusterReplicationEndpoint {
+    public ReplicationEndpointTest() {
+      replicateCount.set(0);
+    }
+
+    @Override public boolean replicate(ReplicateContext replicateContext) {
+      replicateCount.incrementAndGet();
+      replicatedEntries.addAll(replicateContext.getEntries());
+
+      return super.replicate(replicateContext);
+    }
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
index c0f22a9..2d72618 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
@@ -6,9 +6,7 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * http://www.apache.org/licenses/LICENSE-2.0
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,56 +18,99 @@ package org.apache.hadoop.hbase.replication;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
 import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Category({ ReplicationTests.class, LargeTests.class })
-public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
+@Category
+  ({ ReplicationTests.class, LargeTests.class }) public class TestReplicationEmptyWALRecovery
+  extends TestReplicationBase {
+  MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+  static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
+  NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
 
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestReplicationEmptyWALRecovery.class);
+  @ClassRule public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestReplicationEmptyWALRecovery.class);
 
   @Before
   public void setUp() throws IOException, InterruptedException {
     cleanUp();
+    scopes.put(famName, HConstants.REPLICATION_SCOPE_GLOBAL);
+    replicateCount.set(0);
+    replicatedEntries.clear();
   }
 
   /**
    * Waits until there is only one log(the current writing one) in the replication queue
-   * @param numRs number of regionservers
+   *
+   * @param numRs number of region servers
    */
-  private void waitForLogAdvance(int numRs) throws Exception {
-    Waiter.waitFor(CONF1, 10000, new Waiter.Predicate<Exception>() {
+  private void waitForLogAdvance(int numRs) {
+    Waiter.waitFor(CONF1, 100000, new Waiter.Predicate<Exception>() {
       @Override
       public boolean evaluate() throws Exception {
         for (int i = 0; i < numRs; i++) {
           HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i);
           RegionInfo regionInfo =
-              UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
+            UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
           WAL wal = hrs.getWAL(regionInfo);
           Path currentFile = ((AbstractFSWAL<?>) wal).getCurrentFileName();
-          Replication replicationService = (Replication) UTIL1.getHBaseCluster()
-              .getRegionServer(i).getReplicationSourceService();
+          Replication replicationService =
+            (Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
+          for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
+            .getSources()) {
+            ReplicationSource source = (ReplicationSource) rsi;
+            // We are making sure that there is only one log queue and that is for the
+            // current WAL of region server
+            String logPrefix = source.getQueues().keySet().stream().findFirst().get();
+            if (!currentFile.equals(source.getCurrentPath())
+              || source.getQueues().keySet().size() != 1
+              || source.getQueues().get(logPrefix).size() != 1) {
+              return false;
+            }
+          }
+        }
+        return true;
+      }
+    });
+  }
+
+  private void verifyNumberOfLogsInQueue(int numQueues, int numRs) {
+    Waiter.waitFor(CONF1, 10000, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() {
+        for (int i = 0; i < numRs; i++) {
+          Replication replicationService =
+            (Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
           for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
-              .getSources()) {
+            .getSources()) {
             ReplicationSource source = (ReplicationSource) rsi;
-            if (!currentFile.equals(source.getCurrentPath())) {
+            String logPrefix = source.getQueues().keySet().stream().findFirst().get();
+            if (source.getQueues().get(logPrefix).size() != numQueues) {
               return false;
             }
           }
@@ -82,25 +123,211 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
   @Test
   public void testEmptyWALRecovery() throws Exception {
     final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
+    // for each RS, create an empty wal with same walGroupId
+    final List<Path> emptyWalPaths = new ArrayList<>();
+    long ts = System.currentTimeMillis();
+    for (int i = 0; i < numRs; i++) {
+      RegionInfo regionInfo =
+        UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
+      WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
+      Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
+      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
+      Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
+      UTIL1.getTestFileSystem().create(emptyWalPath).close();
+      emptyWalPaths.add(emptyWalPath);
+    }
 
+    injectEmptyWAL(numRs, emptyWalPaths);
+
+    // ReplicationSource should advance past the empty wal, or else the test will fail
+    waitForLogAdvance(numRs);
+    verifyNumberOfLogsInQueue(1, numRs);
+    // we're now writing to the new wal
+    // if everything works, the source should've stopped reading from the empty wal, and start
+    // replicating from the new wal
+    runSimplePutDeleteTest();
+    rollWalsAndWaitForDeque(numRs);
+  }
+
+  /**
+   * Test empty WAL along with non empty WALs in the same batch. This test is to make sure
+   * when we see the empty and handle the EOF exception, we are able to existing the previous
+   * batch of entries without loosing it. This test also tests the number of batches shipped
+   *
+   * @throws Exception throws any exception
+   */
+  @Test
+  public void testReplicationOfEmptyWALFollowingNonEmptyWAL() throws Exception {
+    // Disable the replication peer to accumulate the non empty WAL followed by empty WAL
+    hbaseAdmin.disableReplicationPeer(PEER_ID2);
+    int numOfEntriesToReplicate = 20;
+
+    final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
     // for each RS, create an empty wal with same walGroupId
     final List<Path> emptyWalPaths = new ArrayList<>();
     long ts = System.currentTimeMillis();
     for (int i = 0; i < numRs; i++) {
       RegionInfo regionInfo =
-          UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
+        UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
       WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
       Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
+
+      appendEntriesToWal(numOfEntriesToReplicate, wal);
+      wal.rollWriter();
+      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
+      Path emptyWalPath = new Path(UTIL1.getDefaultRootDirPath(), walGroupId + "." + ts);
+      UTIL1.getTestFileSystem().create(emptyWalPath).close();
+      emptyWalPaths.add(emptyWalPath);
+    }
+
+    injectEmptyWAL(numRs, emptyWalPaths);
+    // There should be three WALs in queue
+    // 1. empty WAL
+    // 2. non empty WAL
+    // 3. live WAL
+    //verifyNumberOfLogsInQueue(3, numRs);
+    hbaseAdmin.enableReplicationPeer(PEER_ID2);
+    // ReplicationSource should advance past the empty wal, or else the test will fail
+    waitForLogAdvance(numRs);
+
+    // Now we should expect numOfEntriesToReplicate entries
+    // replicated from each region server. This makes sure we didn't loose data
+    // from any previous batch when we encounter EOF exception for empty file.
+    Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs,
+      replicatedEntries.size());
+
+    // We expect just one batch of replication which will
+    // be from when we handle the EOF exception.
+    Assert.assertEquals("Replicated batches are not correct", 1, replicateCount.intValue());
+    verifyNumberOfLogsInQueue(1, numRs);
+    // we're now writing to the new wal
+    // if everything works, the source should've stopped reading from the empty wal, and start
+    // replicating from the new wal
+    runSimplePutDeleteTest();
+    rollWalsAndWaitForDeque(numRs);
+  }
+
+  /**
+   * Test empty WAL along with non empty WALs in the same batch. This test is to make sure
+   * when we see the empty WAL and handle the EOF exception, we are able to proceed
+   * with next batch and replicate it properly without missing data.
+   *
+   * @throws Exception throws any exception
+   */
+  @Test
+  public void testReplicationOfEmptyWALFollowedByNonEmptyWAL() throws Exception {
+    // Disable the replication peer to accumulate the non empty WAL followed by empty WAL
+    hbaseAdmin.disableReplicationPeer(PEER_ID2);
+    int numOfEntriesToReplicate = 20;
+
+    final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
+    // for each RS, create an empty wal with same walGroupId
+    final List<Path> emptyWalPaths = new ArrayList<>();
+
+    long ts = System.currentTimeMillis();
+    WAL wal = null;
+    for (int i = 0; i < numRs; i++) {
+      RegionInfo regionInfo =
+        UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
+      wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
+      Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
+      appendEntriesToWal(numOfEntriesToReplicate, wal);
       String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
       Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
       UTIL1.getTestFileSystem().create(emptyWalPath).close();
       emptyWalPaths.add(emptyWalPath);
+
     }
+    injectEmptyWAL(numRs, emptyWalPaths);
+    // roll the WAL now
+    for (int i = 0; i < numRs; i++) {
+      wal.rollWriter();
+    }
+    hbaseAdmin.enableReplicationPeer(PEER_ID2);
+    // ReplicationSource should advance past the empty wal, or else the test will fail
+    waitForLogAdvance(numRs);
+
+    // Now we should expect numOfEntriesToReplicate entries
+    // replicated from each region server. This makes sure we didn't loose data
+    // from any previous batch when we encounter EOF exception for empty file.
+    Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs,
+      replicatedEntries.size());
+
+    // We expect just one batch of replication to be shipped which will
+    // for non empty WAL
+    Assert.assertEquals("Replicated batches are not correct", 1, replicateCount.get());
+    verifyNumberOfLogsInQueue(1, numRs);
+    // we're now writing to the new wal
+    // if everything works, the source should've stopped reading from the empty wal, and start
+    // replicating from the new wal
+    runSimplePutDeleteTest();
+    rollWalsAndWaitForDeque(numRs);
+  }
+
+  /**
+   * This test make sure we replicate all the enties from the non empty WALs which
+   * are surrounding the empty WALs
+   *
+   * @throws Exception throws exception
+   */
+  @Test
+  public void testReplicationOfEmptyWALSurroundedNonEmptyWAL() throws Exception {
+    // Disable the replication peer to accumulate the non empty WAL followed by empty WAL
+    hbaseAdmin.disableReplicationPeer(PEER_ID2);
+    int numOfEntriesToReplicate = 20;
+
+    final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
+    // for each RS, create an empty wal with same walGroupId
+    final List<Path> emptyWalPaths = new ArrayList<>();
 
-    // inject our empty wal into the replication queue, and then roll the original wal, which
-    // enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to
-    // determine if the file being replicated currently is still opened for write, so just inject a
-    // new wal to the replication queue does not mean the previous file is closed.
+    long ts = System.currentTimeMillis();
+    WAL wal = null;
+    for (int i = 0; i < numRs; i++) {
+      RegionInfo regionInfo =
+        UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
+      wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
+      Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
+      appendEntriesToWal(numOfEntriesToReplicate, wal);
+      wal.rollWriter();
+      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
+      Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
+      UTIL1.getTestFileSystem().create(emptyWalPath).close();
+      emptyWalPaths.add(emptyWalPath);
+    }
+    injectEmptyWAL(numRs, emptyWalPaths);
+
+    // roll the WAL again with some entries
+    for (int i = 0; i < numRs; i++) {
+      appendEntriesToWal(numOfEntriesToReplicate, wal);
+      wal.rollWriter();
+    }
+
+    hbaseAdmin.enableReplicationPeer(PEER_ID2);
+    // ReplicationSource should advance past the empty wal, or else the test will fail
+    waitForLogAdvance(numRs);
+
+    // Now we should expect numOfEntriesToReplicate entries
+    // replicated from each region server. This makes sure we didn't loose data
+    // from any previous batch when we encounter EOF exception for empty file.
+    Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs * 2,
+      replicatedEntries.size());
+
+    // We expect two batch of replication to be shipped which will
+    // for non empty WAL
+    Assert.assertEquals("Replicated batches are not correct", 2, replicateCount.get());
+    verifyNumberOfLogsInQueue(1, numRs);
+    // we're now writing to the new wal
+    // if everything works, the source should've stopped reading from the empty wal, and start
+    // replicating from the new wal
+    runSimplePutDeleteTest();
+    rollWalsAndWaitForDeque(numRs);
+  }
+
+  // inject our empty wal into the replication queue, and then roll the original wal, which
+  // enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to
+  // determine if the file being replicated currently is still opened for write, so just inject a
+  // new wal to the replication queue does not mean the previous file is closed.
+  private void injectEmptyWAL(int numRs, List<Path> emptyWalPaths) throws IOException {
     for (int i = 0; i < numRs; i++) {
       HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i);
       Replication replicationService = (Replication) hrs.getReplicationSourceService();
@@ -111,13 +338,32 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
       WAL wal = hrs.getWAL(regionInfo);
       wal.rollWriter(true);
     }
+  }
 
-    // ReplicationSource should advance past the empty wal, or else the test will fail
+  protected WALKeyImpl getWalKeyImpl() {
+    return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 0, mvcc, scopes);
+  }
+
+  // Roll the WAL and wait for it to get deque from the log queue
+  private void rollWalsAndWaitForDeque(int numRs) throws IOException {
+    RegionInfo regionInfo =
+      UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
+    for (int i = 0; i < numRs; i++) {
+      WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
+      wal.rollWriter();
+    }
     waitForLogAdvance(numRs);
+  }
 
-    // we're now writing to the new wal
-    // if everything works, the source should've stopped reading from the empty wal, and start
-    // replicating from the new wal
-    runSimplePutDeleteTest();
+  private void appendEntriesToWal(int numEntries, WAL wal) throws IOException {
+    long txId = -1;
+    for (int i = 0; i < numEntries; i++) {
+      byte[] b = Bytes.toBytes(Integer.toString(i));
+      KeyValue kv = new KeyValue(b, famName, b);
+      WALEdit edit = new WALEdit();
+      edit.add(kv);
+      txId = wal.appendData(info, getWalKeyImpl(), edit);
+    }
+    wal.sync(txId);
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 9c6fafc..d31b864 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -27,9 +27,9 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
-
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -83,7 +84,6 @@ import org.junit.rules.TestName;
 import org.mockito.Mockito;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
 
-
 @Category({ ReplicationTests.class, LargeTests.class })
 public class TestWALEntryStream {
 
@@ -687,6 +687,7 @@ public class TestWALEntryStream {
     // Override the max retries multiplier to fail fast.
     conf.setInt("replication.source.maxretriesmultiplier", 1);
     conf.setBoolean("replication.source.eof.autorecovery", true);
+    conf.setInt("replication.source.nb.batches", 10);
     // Create a reader thread with source as recovered source.
     ReplicationSource source = mockReplicationSource(true, conf);
     when(source.isPeerEnabled()).thenReturn(true);
@@ -705,7 +706,64 @@ public class TestWALEntryStream {
     assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId));
   }
 
+  @Test
+  public void testEOFExceptionForRecoveredQueueWithMultipleLogs() throws Exception {
+    Configuration conf = new Configuration(CONF);
+    MetricsSource metrics = mock(MetricsSource.class);
+    ReplicationSource source = mockReplicationSource(true, conf);
+    ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source);
+    // Create a 0 length log.
+    Path emptyLog = new Path(fs.getHomeDirectory(),"log.2");
+    FSDataOutputStream fsdos = fs.create(emptyLog);
+    fsdos.close();
+    assertEquals(0, fs.getFileStatus(emptyLog).getLen());
+    localLogQueue.enqueueLog(emptyLog, fakeWalGroupId);
+
+    final Path log1 = new Path(fs.getHomeDirectory(), "log.1");
+    WALProvider.Writer writer1 = WALFactory.createWALWriter(fs, log1, TEST_UTIL.getConfiguration());
+    appendEntries(writer1, 3);
+    localLogQueue.enqueueLog(log1, fakeWalGroupId);
+
+    ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
+    // Make it look like the source is from recovered source.
+    when(mockSourceManager.getOldSources())
+      .thenReturn(new ArrayList<>(Arrays.asList((ReplicationSourceInterface)source)));
+    when(source.isPeerEnabled()).thenReturn(true);
+    when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+    // Override the max retries multiplier to fail fast.
+    conf.setInt("replication.source.maxretriesmultiplier", 1);
+    conf.setBoolean("replication.source.eof.autorecovery", true);
+    conf.setInt("replication.source.nb.batches", 10);
+    // Create a reader thread.
+    ReplicationSourceWALReader reader =
+      new ReplicationSourceWALReader(fs, conf, localLogQueue, 0,
+        getDummyFilter(), source, fakeWalGroupId);
+    assertEquals("Initial log queue size is not correct",
+      2, localLogQueue.getQueueSize(fakeWalGroupId));
+    reader.run();
+
+    // remove empty log from logQueue.
+    assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId));
+    assertEquals("Log queue should be empty", 0, localLogQueue.getQueueSize(fakeWalGroupId));
+  }
+
   private PriorityBlockingQueue<Path> getQueue() {
     return logQueue.getQueue(fakeWalGroupId);
   }
+
+  private void appendEntries(WALProvider.Writer writer, int numEntries) throws IOException {
+    for (int i = 0; i < numEntries; i++) {
+      byte[] b = Bytes.toBytes(Integer.toString(i));
+      KeyValue kv = new KeyValue(b,b,b);
+      WALEdit edit = new WALEdit();
+      edit.add(kv);
+      WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0,
+        HConstants.DEFAULT_CLUSTER_ID);
+      NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+      scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
+      writer.append(new WAL.Entry(key, edit));
+      writer.sync(false);
+    }
+    writer.close();
+  }
 }