You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/05/31 07:02:34 UTC

[36/36] hbase git commit: HBASE-20637 Polish the WAL switching when transiting from A to S

HBASE-20637 Polish the WAL switching when transiting from A to S


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1f3e50fc
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1f3e50fc
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1f3e50fc

Branch: refs/heads/HBASE-19064
Commit: 1f3e50fcefd049290e5b102d04b568e40e6dc91e
Parents: 46866d9
Author: zhangduo <zh...@apache.org>
Authored: Tue May 29 20:38:20 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Thu May 31 14:41:40 2018 +0800

----------------------------------------------------------------------
 .../hbase/regionserver/wal/AsyncFSWAL.java      | 52 +++++++++++++-
 .../hbase/regionserver/wal/DualAsyncFSWAL.java  | 71 ++++++++++++++------
 .../apache/hadoop/hbase/util/FSHDFSUtils.java   | 16 +++--
 .../hbase/wal/SyncReplicationWALProvider.java   |  2 +-
 .../replication/DualAsyncFSWALForTest.java      |  4 +-
 .../replication/SyncReplicationTestBase.java    | 26 +++++--
 .../replication/TestSyncReplicationActive.java  | 42 ++++++++++--
 7 files changed, 176 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/1f3e50fc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index 9b4ce9c..7f3e30b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -52,12 +52,12 @@ import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
 import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.htrace.core.TraceScope;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hbase.thirdparty.io.netty.channel.Channel;
 import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
@@ -470,6 +470,44 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
     // whether to issue a sync in the caller method.
   }
 
+  private void drainNonMarkerEditsAndFailSyncs() {
+    if (toWriteAppends.isEmpty()) {
+      return;
+    }
+    boolean hasNonMarkerEdits = false;
+    Iterator<FSWALEntry> iter = toWriteAppends.descendingIterator();
+    while (iter.hasNext()) {
+      FSWALEntry entry = iter.next();
+      if (!entry.getEdit().isMetaEdit()) {
+        hasNonMarkerEdits = true;
+        break;
+      }
+    }
+    if (hasNonMarkerEdits) {
+      for (;;) {
+        iter.remove();
+        if (!iter.hasNext()) {
+          break;
+        }
+        iter.next();
+      }
+      unackedAppends.clear();
+      // fail the sync futures which are under the txid of the first remaining edit, if none, fail
+      // all the sync futures.
+      long txid = toWriteAppends.isEmpty() ? Long.MAX_VALUE : toWriteAppends.peek().getTxid();
+      IOException error = new IOException("WAL is closing, only marker edit is allowed");
+      for (Iterator<SyncFuture> syncIter = syncFutures.iterator(); syncIter.hasNext();) {
+        SyncFuture future = syncIter.next();
+        if (future.getTxid() < txid) {
+          future.done(future.getTxid(), error);
+          syncIter.remove();
+        } else {
+          break;
+        }
+      }
+    }
+  }
+
   private void consume() {
     consumeLock.lock();
     try {
@@ -512,6 +550,9 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
       }
       waitingConsumePayloadsGatingSequence.set(nextCursor);
     }
+    if (markerEditOnly()) {
+      drainNonMarkerEditsAndFailSyncs();
+    }
     appendAndSync();
     if (hasConsumerTask.get()) {
       return;
@@ -553,9 +594,18 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
     return consumerScheduled.compareAndSet(false, true);
   }
 
+  // This is used by sync replication, where we are going to close the wal soon after we reopen all
+  // the regions. Will be overridden by sub classes.
+  protected boolean markerEditOnly() {
+    return false;
+  }
+
   @Override
   public long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore)
       throws IOException {
+    if (markerEditOnly() && !edits.isMetaEdit()) {
+      throw new IOException("WAL is closing, only marker edit is allowed");
+    }
     long txid =
       stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads);
     if (shouldScheduleConsumer()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/1f3e50fc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
index 3967e78..bf5b96d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
@@ -18,14 +18,19 @@
 package org.apache.hadoop.hbase.regionserver.wal;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.ConnectionUtils;
 import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
 import org.apache.hbase.thirdparty.io.netty.channel.Channel;
 import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
 
@@ -35,20 +40,24 @@ import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
 @InterfaceAudience.Private
 public class DualAsyncFSWAL extends AsyncFSWAL {
 
+  private static final Logger LOG = LoggerFactory.getLogger(DualAsyncFSWAL.class);
+
   private final FileSystem remoteFs;
 
-  private final Path remoteWalDir;
+  private final Path remoteWALDir;
+
+  private volatile boolean skipRemoteWAL = false;
 
-  private volatile boolean skipRemoteWal = false;
+  private volatile boolean markerEditOnly = false;
 
-  public DualAsyncFSWAL(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWalDir,
+  public DualAsyncFSWAL(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWALDir,
       String logDir, String archiveDir, Configuration conf, List<WALActionsListener> listeners,
       boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup,
       Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
     super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix,
-        eventLoopGroup, channelClass);
+      eventLoopGroup, channelClass);
     this.remoteFs = remoteFs;
-    this.remoteWalDir = remoteWalDir;
+    this.remoteWALDir = remoteWALDir;
   }
 
   // will be overridden in testcase
@@ -61,20 +70,37 @@ public class DualAsyncFSWAL extends AsyncFSWAL {
   @Override
   protected AsyncWriter createWriterInstance(Path path) throws IOException {
     AsyncWriter localWriter = super.createWriterInstance(path);
-    if (skipRemoteWal) {
-      return localWriter;
-    }
-    AsyncWriter remoteWriter;
-    boolean succ = false;
-    try {
-      remoteWriter = createAsyncWriter(remoteFs, new Path(remoteWalDir, path.getName()));
-      succ = true;
-    } finally {
-      if (!succ) {
-        closeWriter(localWriter);
+    // retry forever if we can not create the remote writer to prevent aborting the RS due to log
+    // rolling error, unless the skipRemoteWal is set to true.
+    // TODO: since for now we only have one thread doing log rolling, this may block the rolling for
+    // other wals
+    Path remoteWAL = new Path(remoteWALDir, path.getName());
+    for (int retry = 0;; retry++) {
+      if (skipRemoteWAL) {
+        return localWriter;
+      }
+      AsyncWriter remoteWriter;
+      try {
+        remoteWriter = createAsyncWriter(remoteFs, remoteWAL);
+      } catch (IOException e) {
+        LOG.warn("create remote writer {} failed, retry = {}", remoteWAL, retry, e);
+        try {
+          Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
+        } catch (InterruptedException ie) {
+          // restore the interrupt state
+          Thread.currentThread().interrupt();
+          Closeables.close(localWriter, true);
+          throw (IOException) new InterruptedIOException().initCause(ie);
+        }
+        continue;
       }
+      return createCombinedAsyncWriter(localWriter, remoteWriter);
     }
-    return createCombinedAsyncWriter(localWriter, remoteWriter);
+  }
+
+  @Override
+  protected boolean markerEditOnly() {
+    return markerEditOnly;
   }
 
   // Allow temporarily skipping the creation of remote writer. When failing to write to the remote
@@ -82,7 +108,14 @@ public class DualAsyncFSWAL extends AsyncFSWAL {
   // need to write a close marker when closing a region, and if it fails, the whole rs will abort.
   // So here we need to skip the creation of remote writer and make it possible to write the region
   // close marker.
-  public void skipRemoteWal() {
-    this.skipRemoteWal = true;
+  // Setting markerEdit only to true is for transiting from A to S, where we need to give up writing
+  // any pending wal entries as they will be discarded. The remote cluster will replicated the
+  // correct data back later. We still need to allow writing marker edits such as close region event
+  // to allow closing a region.
+  public void skipRemoteWAL(boolean markerEditOnly) {
+    if (markerEditOnly) {
+      this.markerEditOnly = true;
+    }
+    this.skipRemoteWAL = true;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1f3e50fc/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java
index 301d158..a49ee02 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java
@@ -28,9 +28,9 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
@@ -155,12 +155,16 @@ public class FSHDFSUtils extends FSUtils {
    * Recover the lease from HDFS, retrying multiple times.
    */
   @Override
-  public void recoverFileLease(final FileSystem fs, final Path p,
-      Configuration conf, CancelableProgressable reporter)
-  throws IOException {
+  public void recoverFileLease(FileSystem fs, Path p, Configuration conf,
+      CancelableProgressable reporter) throws IOException {
+    if (fs instanceof FilterFileSystem) {
+      fs = ((FilterFileSystem) fs).getRawFileSystem();
+    }
     // lease recovery not needed for local file system case.
-    if (!(fs instanceof DistributedFileSystem)) return;
-    recoverDFSFileLease((DistributedFileSystem)fs, p, conf, reporter);
+    if (!(fs instanceof DistributedFileSystem)) {
+      return;
+    }
+    recoverDFSFileLease((DistributedFileSystem) fs, p, conf, reporter);
   }
 
   /*

http://git-wip-us.apache.org/repos/asf/hbase/blob/1f3e50fc/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
index 82f8a89..b9fffcf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
@@ -291,7 +291,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
         try {
           Optional<DualAsyncFSWAL> opt = peerId2WAL.get(peerId);
           if (opt != null) {
-            opt.ifPresent(DualAsyncFSWAL::skipRemoteWal);
+            opt.ifPresent(w -> w.skipRemoteWAL(to == SyncReplicationState.STANDBY));
           } else {
             // add a place holder to tell the getWAL caller do not use DualAsyncFSWAL any more.
             peerId2WAL.put(peerId, Optional.empty());

http://git-wip-us.apache.org/repos/asf/hbase/blob/1f3e50fc/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java
index fb3daf2..62000b4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java
@@ -97,11 +97,11 @@ class DualAsyncFSWALForTest extends DualAsyncFSWAL {
     }
   }
 
-  public DualAsyncFSWALForTest(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWalDir,
+  public DualAsyncFSWALForTest(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWALDir,
       String logDir, String archiveDir, Configuration conf, List<WALActionsListener> listeners,
       boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup,
       Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
-    super(fs, remoteFs, rootDir, remoteWalDir, logDir, archiveDir, conf, listeners, failIfWALExists,
+    super(fs, remoteFs, rootDir, remoteWALDir, logDir, archiveDir, conf, listeners, failIfWALExists,
       prefix, suffix, eventLoopGroup, channelClass);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/1f3e50fc/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
index 095be90..a20edd3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.regex.Pattern;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.HBaseZKTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Get;
@@ -127,10 +129,26 @@ public class SyncReplicationTestBase {
         .setRemoteWALDir(REMOTE_WAL_DIR1.toUri().toString()).build());
   }
 
+  private static void shutdown(HBaseTestingUtility util) throws Exception {
+    if (util.getHBaseCluster() == null) {
+      return;
+    }
+    Admin admin = util.getAdmin();
+    if (!admin.listReplicationPeers(Pattern.compile(PEER_ID)).isEmpty()) {
+      if (admin
+        .getReplicationPeerSyncReplicationState(PEER_ID) != SyncReplicationState.DOWNGRADE_ACTIVE) {
+        admin.transitReplicationPeerSyncReplicationState(PEER_ID,
+          SyncReplicationState.DOWNGRADE_ACTIVE);
+      }
+      admin.removeReplicationPeer(PEER_ID);
+    }
+    util.shutdownMiniCluster();
+  }
+
   @AfterClass
   public static void tearDown() throws Exception {
-    UTIL1.shutdownMiniCluster();
-    UTIL2.shutdownMiniCluster();
+    shutdown(UTIL1);
+    shutdown(UTIL2);
     ZK_UTIL.shutdownMiniZKCluster();
   }
 
@@ -207,7 +225,7 @@ public class SyncReplicationTestBase {
   protected void verifyRemovedPeer(String peerId, Path remoteWALDir, HBaseTestingUtility utility)
       throws Exception {
     ReplicationPeerStorage rps = ReplicationStorageFactory
-        .getReplicationPeerStorage(utility.getZooKeeperWatcher(), utility.getConfiguration());
+      .getReplicationPeerStorage(utility.getZooKeeperWatcher(), utility.getConfiguration());
     try {
       rps.getPeerSyncReplicationState(peerId);
       fail("Should throw exception when get the sync replication state of a removed peer.");
@@ -233,7 +251,7 @@ public class SyncReplicationTestBase {
     Entry[] entries = new Entry[10];
     for (int i = 0; i < entries.length; i++) {
       entries[i] =
-          new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit());
+        new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit());
     }
     if (!expectedRejection) {
       ReplicationProtbufUtil.replicateWALEntry(connection.getAdmin(regionServer.getServerName()),

http://git-wip-us.apache.org/repos/asf/hbase/blob/1f3e50fc/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
index fce0cdf..42adab6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
@@ -17,13 +17,28 @@
  */
 package org.apache.hadoop.hbase.replication;
 
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WAL.Reader;
 import org.apache.hadoop.hbase.wal.WALFactory;
@@ -66,8 +81,27 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase {
     // confirm that the data is there after we convert the peer to DA
     verify(UTIL2, 0, 100);
 
-    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
-      SyncReplicationState.STANDBY);
+    try (AsyncConnection conn =
+      ConnectionFactory.createAsyncConnection(UTIL1.getConfiguration()).get()) {
+      AsyncTable<?> table = conn.getTableBuilder(TABLE_NAME).setMaxAttempts(1).build();
+      CompletableFuture<Void> future =
+        table.put(new Put(Bytes.toBytes(1000)).addColumn(CF, CQ, Bytes.toBytes(1000)));
+      Thread.sleep(2000);
+      // should hang on rolling
+      assertFalse(future.isDone());
+      UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+        SyncReplicationState.STANDBY);
+      try {
+        future.get();
+        fail("should fail because of the wal is closing");
+      } catch (ExecutionException e) {
+        // expected
+        assertThat(e.getCause().getMessage(), containsString("only marker edit is allowed"));
+      }
+    }
+    // confirm that the data has not been persisted
+    HRegion region = UTIL1.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
+    assertTrue(region.get(new Get(Bytes.toBytes(1000))).isEmpty());
     UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
       SyncReplicationState.ACTIVE);
 
@@ -89,8 +123,8 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase {
     FileStatus[] files = fs2.listStatus(new Path(remoteDir, peerId));
     Assert.assertTrue(files.length > 0);
     for (FileStatus file : files) {
-      try (Reader reader =
-          WALFactory.createReader(fs2, file.getPath(), utility.getConfiguration())) {
+      try (
+        Reader reader = WALFactory.createReader(fs2, file.getPath(), utility.getConfiguration())) {
         Entry entry = reader.next();
         Assert.assertTrue(entry != null);
         while (entry != null) {