You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/05/30 13:11:38 UTC

[32/43] hbase git commit: HBASE-20424 Allow writing WAL to local and remote cluster concurrently

HBASE-20424 Allow writing WAL to local and remote cluster concurrently


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e4300002
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e4300002
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e4300002

Branch: refs/heads/HBASE-19064
Commit: e4300002164366a8ce16863f487bb19c561b7398
Parents: 8b012e9
Author: zhangduo <zh...@apache.org>
Authored: Thu May 24 16:20:28 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Wed May 30 21:01:01 2018 +0800

----------------------------------------------------------------------
 .../src/main/protobuf/MasterProcedure.proto     |   2 +-
 .../hbase/replication/ReplicationUtils.java     |  26 ++-
 .../asyncfs/FanOutOneBlockAsyncDFSOutput.java   |   3 +-
 .../replication/RecoverStandbyProcedure.java    |  10 +-
 .../master/replication/RemovePeerProcedure.java |   5 +-
 .../ReplaySyncReplicationWALManager.java        | 110 ++++++-----
 ...ransitPeerSyncReplicationStateProcedure.java |   4 +-
 .../hbase/regionserver/HRegionServer.java       |   3 +-
 .../regionserver/ReplicationSourceService.java  |   6 +
 .../hbase/regionserver/SplitLogWorker.java      | 188 +++++++++++++------
 .../regionserver/wal/CombinedAsyncWriter.java   |  80 ++------
 .../hbase/regionserver/wal/DualAsyncFSWAL.java  |  11 +-
 .../replication/regionserver/Replication.java   |   5 +
 .../regionserver/ReplicationSourceManager.java  |   2 +-
 .../SyncReplicationPeerInfoProviderImpl.java    |   3 +-
 .../org/apache/hadoop/hbase/util/FSUtils.java   |   9 +
 .../hbase/wal/SyncReplicationWALProvider.java   |  43 ++++-
 .../replication/TestReplicationAdmin.java       |   2 +-
 .../wal/TestCombinedAsyncWriter.java            |  20 +-
 .../replication/DualAsyncFSWALForTest.java      | 149 +++++++++++++++
 .../replication/SyncReplicationTestBase.java    |  12 +-
 .../replication/TestSyncReplicationActive.java  |   5 +-
 ...cReplicationMoreLogsInLocalCopyToRemote.java | 108 +++++++++++
 ...plicationMoreLogsInLocalGiveUpSplitting.java | 128 +++++++++++++
 .../TestSyncReplicationRemoveRemoteWAL.java     |   7 +-
 .../replication/TestSyncReplicationStandBy.java |  20 +-
 .../master/TestRecoverStandbyProcedure.java     |   4 +-
 .../TestReplicationSourceManager.java           |   5 +-
 .../wal/TestSyncReplicationWALProvider.java     |   1 -
 29 files changed, 733 insertions(+), 238 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e4300002/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
index f58ad2e..5764a21 100644
--- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
@@ -476,7 +476,7 @@ enum RecoverStandbyState {
   RENAME_SYNC_REPLICATION_WALS_DIR = 1;
   INIT_WORKERS = 2;
   DISPATCH_TASKS = 3;
-  REMOVE_SYNC_REPLICATION_WALS_DIR = 4;
+  SNAPSHOT_SYNC_REPLICATION_WALS_DIR = 4;
 }
 
 message RecoverStandbyStateData {

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4300002/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index 069db7a..dc4217c 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -46,6 +46,16 @@ public final class ReplicationUtils {
 
   public static final String REMOTE_WAL_DIR_NAME = "remoteWALs";
 
+  public static final String SYNC_WAL_SUFFIX = ".syncrep";
+
+  public static final String REMOTE_WAL_REPLAY_SUFFIX = "-replay";
+
+  public static final String REMOTE_WAL_SNAPSHOT_SUFFIX = "-snapshot";
+
+  // This is used for copying sync replication log from local to remote and overwrite the old one
+  // since some FileSystem implementation may not support atomic rename.
+  public static final String RENAME_WAL_SUFFIX = ".ren";
+
   private ReplicationUtils() {
   }
 
@@ -187,14 +197,26 @@ public final class ReplicationUtils {
     return new Path(remoteWALDir).getFileSystem(conf);
   }
 
-  public static Path getRemoteWALDirForPeer(String remoteWALDir, String peerId) {
+  public static Path getPeerRemoteWALDir(String remoteWALDir, String peerId) {
     return new Path(remoteWALDir, peerId);
   }
 
-  public static Path getRemoteWALDirForPeer(Path remoteWALDir, String peerId) {
+  public static Path getPeerRemoteWALDir(Path remoteWALDir, String peerId) {
     return new Path(remoteWALDir, peerId);
   }
 
+  public static Path getPeerReplayWALDir(Path remoteWALDir, String peerId) {
+    return getPeerRemoteWALDir(remoteWALDir, peerId).suffix(REMOTE_WAL_REPLAY_SUFFIX);
+  }
+
+  public static Path getPeerSnapshotWALDir(String remoteWALDir, String peerId) {
+    return getPeerRemoteWALDir(remoteWALDir, peerId).suffix(REMOTE_WAL_SNAPSHOT_SUFFIX);
+  }
+
+  public static Path getPeerSnapshotWALDir(Path remoteWALDir, String peerId) {
+    return getPeerRemoteWALDir(remoteWALDir, peerId).suffix(REMOTE_WAL_SNAPSHOT_SUFFIX);
+  }
+
   /**
    * Do the sleeping logic
    * @param msg Why we sleep

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4300002/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
index 1645d68..7ffd3da 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
@@ -22,9 +22,9 @@ import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHel
 import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile;
 import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease;
 import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
 import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.WRITER_IDLE;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -40,7 +40,6 @@ import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.Encryptor;
 import org.apache.hadoop.fs.Path;

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4300002/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java
index e9e3a97..9860774 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java
@@ -50,7 +50,7 @@ public class RecoverStandbyProcedure extends AbstractPeerProcedure<RecoverStandb
     switch (state) {
       case RENAME_SYNC_REPLICATION_WALS_DIR:
         try {
-          replaySyncReplicationWALManager.renamePeerRemoteWALDir(peerId);
+          replaySyncReplicationWALManager.renameToPeerReplayWALDir(peerId);
         } catch (IOException e) {
           LOG.warn("Failed to rename remote wal dir for peer id={}", peerId, e);
           setFailure("master-recover-standby", e);
@@ -67,11 +67,11 @@ public class RecoverStandbyProcedure extends AbstractPeerProcedure<RecoverStandb
             .map(wal -> new ReplaySyncReplicationWALProcedure(peerId,
                 replaySyncReplicationWALManager.removeWALRootPath(wal)))
             .toArray(ReplaySyncReplicationWALProcedure[]::new));
-        setNextState(RecoverStandbyState.REMOVE_SYNC_REPLICATION_WALS_DIR);
+        setNextState(RecoverStandbyState.SNAPSHOT_SYNC_REPLICATION_WALS_DIR);
         return Flow.HAS_MORE_STATE;
-      case REMOVE_SYNC_REPLICATION_WALS_DIR:
+      case SNAPSHOT_SYNC_REPLICATION_WALS_DIR:
         try {
-          replaySyncReplicationWALManager.removePeerReplayWALDir(peerId);
+          replaySyncReplicationWALManager.renameToPeerSnapshotWALDir(peerId);
         } catch (IOException e) {
           LOG.warn("Failed to cleanup replay wals dir for peer id={}, , retry", peerId, e);
           throw new ProcedureYieldException();
@@ -85,7 +85,7 @@ public class RecoverStandbyProcedure extends AbstractPeerProcedure<RecoverStandb
   private List<Path> getReplayWALs(ReplaySyncReplicationWALManager replaySyncReplicationWALManager)
       throws ProcedureYieldException {
     try {
-      return replaySyncReplicationWALManager.getReplayWALs(peerId);
+      return replaySyncReplicationWALManager.getReplayWALsAndCleanUpUnusedFiles(peerId);
     } catch (IOException e) {
       LOG.warn("Failed to get replay wals for peer id={}, , retry", peerId, e);
       throw new ProcedureYieldException();

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4300002/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
index 7335fe0..254448a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
@@ -67,10 +67,7 @@ public class RemovePeerProcedure extends ModifyPeerProcedure {
   }
 
   private void removeRemoteWALs(MasterProcedureEnv env) throws IOException {
-    ReplaySyncReplicationWALManager remoteWALManager =
-        env.getMasterServices().getReplaySyncReplicationWALManager();
-    remoteWALManager.removePeerRemoteWALs(peerId);
-    remoteWALManager.removePeerReplayWALDir(peerId);
+    env.getMasterServices().getReplaySyncReplicationWALManager().removePeerRemoteWALs(peerId);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4300002/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java
index eac5aa4..348c134 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java
@@ -17,6 +17,10 @@
  */
 package org.apache.hadoop.hbase.master.replication;
 
+import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerRemoteWALDir;
+import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerReplayWALDir;
+import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerSnapshotWALDir;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -25,31 +29,27 @@ import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
 @InterfaceAudience.Private
 public class ReplaySyncReplicationWALManager {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(ReplaySyncReplicationWALManager.class);
 
-  private static final String REPLAY_SUFFIX = "-replay";
-
   private final MasterServices services;
 
-  private final Configuration conf;
-
   private final FileSystem fs;
 
   private final Path walRootDir;
@@ -60,69 +60,86 @@ public class ReplaySyncReplicationWALManager {
 
   public ReplaySyncReplicationWALManager(MasterServices services) {
     this.services = services;
-    this.conf = services.getConfiguration();
     this.fs = services.getMasterFileSystem().getWALFileSystem();
     this.walRootDir = services.getMasterFileSystem().getWALRootDir();
     this.remoteWALDir = new Path(this.walRootDir, ReplicationUtils.REMOTE_WAL_DIR_NAME);
   }
 
-  public Path getPeerRemoteWALDir(String peerId) {
-    return new Path(this.remoteWALDir, peerId);
-  }
-
-  private Path getPeerReplayWALDir(String peerId) {
-    return getPeerRemoteWALDir(peerId).suffix(REPLAY_SUFFIX);
-  }
-
   public void createPeerRemoteWALDir(String peerId) throws IOException {
-    Path peerRemoteWALDir = getPeerRemoteWALDir(peerId);
+    Path peerRemoteWALDir = getPeerRemoteWALDir(remoteWALDir, peerId);
     if (!fs.exists(peerRemoteWALDir) && !fs.mkdirs(peerRemoteWALDir)) {
       throw new IOException("Unable to mkdir " + peerRemoteWALDir);
     }
   }
 
-  public void renamePeerRemoteWALDir(String peerId) throws IOException {
-    Path peerRemoteWALDir = getPeerRemoteWALDir(peerId);
-    Path peerReplayWALDir = peerRemoteWALDir.suffix(REPLAY_SUFFIX);
-    if (fs.exists(peerRemoteWALDir)) {
-      if (!fs.rename(peerRemoteWALDir, peerReplayWALDir)) {
-        throw new IOException("Failed rename remote wal dir from " + peerRemoteWALDir + " to "
-            + peerReplayWALDir + " for peer id=" + peerId);
+  private void rename(Path src, Path dst, String peerId) throws IOException {
+    if (fs.exists(src)) {
+      deleteDir(dst, peerId);
+      if (!fs.rename(src, dst)) {
+        throw new IOException(
+          "Failed to rename dir from " + src + " to " + dst + " for peer id=" + peerId);
       }
-      LOG.info("Rename remote wal dir from {} to {} for peer id={}", remoteWALDir, peerReplayWALDir,
-        peerId);
-    } else if (!fs.exists(peerReplayWALDir)) {
-      throw new IOException("Remote wal dir " + peerRemoteWALDir + " and replay wal dir "
-          + peerReplayWALDir + " not exist for peer id=" + peerId);
+      LOG.info("Renamed dir from {} to {} for peer id={}", src, dst, peerId);
+    } else if (!fs.exists(dst)) {
+      throw new IOException(
+        "Want to rename from " + src + " to " + dst + ", but they both do not exist");
     }
   }
 
-  public List<Path> getReplayWALs(String peerId) throws IOException {
-    Path peerReplayWALDir = getPeerReplayWALDir(peerId);
-    List<Path> replayWals = new ArrayList<>();
-    RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(peerReplayWALDir, false);
-    while (iterator.hasNext()) {
-      replayWals.add(iterator.next().getPath());
+  public void renameToPeerReplayWALDir(String peerId) throws IOException {
+    rename(getPeerRemoteWALDir(remoteWALDir, peerId), getPeerReplayWALDir(remoteWALDir, peerId),
+      peerId);
+  }
+
+  public void renameToPeerSnapshotWALDir(String peerId) throws IOException {
+    rename(getPeerReplayWALDir(remoteWALDir, peerId), getPeerSnapshotWALDir(remoteWALDir, peerId),
+      peerId);
+  }
+
+  public List<Path> getReplayWALsAndCleanUpUnusedFiles(String peerId) throws IOException {
+    Path peerReplayWALDir = getPeerReplayWALDir(remoteWALDir, peerId);
+    for (FileStatus status : fs.listStatus(peerReplayWALDir,
+      p -> p.getName().endsWith(ReplicationUtils.RENAME_WAL_SUFFIX))) {
+      Path src = status.getPath();
+      String srcName = src.getName();
+      String dstName =
+        srcName.substring(0, srcName.length() - ReplicationUtils.RENAME_WAL_SUFFIX.length());
+      FSUtils.renameFile(fs, src, new Path(src.getParent(), dstName));
+    }
+    List<Path> wals = new ArrayList<>();
+    for (FileStatus status : fs.listStatus(peerReplayWALDir)) {
+      Path path = status.getPath();
+      if (path.getName().endsWith(ReplicationUtils.SYNC_WAL_SUFFIX)) {
+        wals.add(path);
+      } else {
+        if (!fs.delete(path, true)) {
+          LOG.warn("Can not delete unused file: " + path);
+        }
+      }
     }
-    return replayWals;
+    return wals;
   }
 
-  public void removePeerReplayWALDir(String peerId) throws IOException {
-    Path peerReplayWALDir = getPeerReplayWALDir(peerId);
+  public void snapshotPeerReplayWALDir(String peerId) throws IOException {
+    Path peerReplayWALDir = getPeerReplayWALDir(remoteWALDir, peerId);
     if (fs.exists(peerReplayWALDir) && !fs.delete(peerReplayWALDir, true)) {
       throw new IOException(
           "Failed to remove replay wals dir " + peerReplayWALDir + " for peer id=" + peerId);
     }
   }
 
-  public void removePeerRemoteWALs(String peerId) throws IOException {
-    Path remoteWALDir = getPeerRemoteWALDir(peerId);
-    if (fs.exists(remoteWALDir) && !fs.delete(remoteWALDir, true)) {
-      throw new IOException(
-          "Failed to remove remote WALs dir " + remoteWALDir + " for peer id=" + peerId);
+  private void deleteDir(Path dir, String peerId) throws IOException {
+    if (!fs.delete(dir, true) && fs.exists(dir)) {
+      throw new IOException("Failed to remove dir " + dir + " for peer id=" + peerId);
     }
   }
 
+  public void removePeerRemoteWALs(String peerId) throws IOException {
+    deleteDir(getPeerRemoteWALDir(remoteWALDir, peerId), peerId);
+    deleteDir(getPeerReplayWALDir(remoteWALDir, peerId), peerId);
+    deleteDir(getPeerSnapshotWALDir(remoteWALDir, peerId), peerId);
+  }
+
   public void initPeerWorkers(String peerId) {
     BlockingQueue<ServerName> servers = new LinkedBlockingQueue<>();
     services.getServerManager().getOnlineServers().keySet()
@@ -144,4 +161,9 @@ public class ReplaySyncReplicationWALManager {
     // remove the "/" too.
     return pathStr.substring(walRootDir.toString().length() + 1);
   }
+
+  @VisibleForTesting
+  public Path getRemoteWALDir() {
+    return remoteWALDir;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4300002/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
index ebe7a93..81ee6b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
@@ -118,7 +118,7 @@ public class TransitPeerSyncReplicationStateProcedure
       env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, toState);
     if (toState == SyncReplicationState.ACTIVE) {
       Path remoteWALDirForPeer =
-        ReplicationUtils.getRemoteWALDirForPeer(desc.getPeerConfig().getRemoteWALDir(), peerId);
+        ReplicationUtils.getPeerRemoteWALDir(desc.getPeerConfig().getRemoteWALDir(), peerId);
       // check whether the remote wal directory is present
       if (!remoteWALDirForPeer.getFileSystem(env.getMasterConfiguration())
         .exists(remoteWALDirForPeer)) {
@@ -152,7 +152,7 @@ public class TransitPeerSyncReplicationStateProcedure
       throws ProcedureYieldException, IOException {
     MasterFileSystem mfs = env.getMasterFileSystem();
     Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME);
-    Path remoteWALDirForPeer = ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId);
+    Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
     FileSystem walFs = mfs.getWALFileSystem();
     if (walFs.exists(remoteWALDirForPeer)) {
       LOG.warn("Wal dir {} already exists, usually this should not happen, continue anyway",

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4300002/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 5052a0b..90f3099 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1957,8 +1957,7 @@ public class HRegionServer extends HasThread implements
     sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
     if (this.csm != null) {
       // SplitLogWorker needs csm. If none, don't start this.
-      this.splitLogWorker = new SplitLogWorker(this, sinkConf, this,
-          this, walFactory);
+      this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory);
       splitLogWorker.start();
     } else {
       LOG.warn("SplitLogWorker Service NOT started; CoordinatedStateManager is null");

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4300002/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
index 4529943..09ec477 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.regionserver.PeerProcedureHandler;
 import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -37,4 +38,9 @@ public interface ReplicationSourceService extends ReplicationService {
    * Returns a Handler to handle peer procedures.
    */
   PeerProcedureHandler getPeerProcedureHandler();
+
+  /**
+   * Return the replication peers.
+   */
+  ReplicationPeers getReplicationPeers();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4300002/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
index a1c2030..4a9712c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
@@ -23,22 +23,31 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.net.ConnectException;
 import java.net.SocketTimeoutException;
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.Optional;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
-import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status;
+import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.ExceptionUtil;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
@@ -67,67 +76,133 @@ public class SplitLogWorker implements Runnable {
   Thread worker;
   // thread pool which executes recovery work
   private SplitLogWorkerCoordination coordination;
-  private Configuration conf;
   private RegionServerServices server;
 
   public SplitLogWorker(Server hserver, Configuration conf, RegionServerServices server,
       TaskExecutor splitTaskExecutor) {
     this.server = server;
-    this.conf = conf;
     this.coordination = hserver.getCoordinatedStateManager().getSplitLogWorkerCoordination();
     coordination.init(server, conf, splitTaskExecutor, this);
   }
 
-  public SplitLogWorker(final Server hserver, final Configuration conf,
-      final RegionServerServices server, final LastSequenceId sequenceIdChecker,
-      final WALFactory factory) {
-    this(hserver, conf, server, new TaskExecutor() {
-      @Override
-      public Status exec(String filename, CancelableProgressable p) {
-        Path walDir;
-        FileSystem fs;
-        try {
-          walDir = FSUtils.getWALRootDir(conf);
-          fs = walDir.getFileSystem(conf);
-        } catch (IOException e) {
-          LOG.warn("could not find root dir or fs", e);
-          return Status.RESIGNED;
-        }
-        // TODO have to correctly figure out when log splitting has been
-        // interrupted or has encountered a transient error and when it has
-        // encountered a bad non-retry-able persistent error.
-        try {
-          if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, filename)),
-            fs, conf, p, sequenceIdChecker,
-              server.getCoordinatedStateManager().getSplitLogWorkerCoordination(), factory)) {
-            return Status.PREEMPTED;
-          }
-        } catch (InterruptedIOException iioe) {
-          LOG.warn("log splitting of " + filename + " interrupted, resigning", iioe);
-          return Status.RESIGNED;
-        } catch (IOException e) {
-          if (e instanceof FileNotFoundException) {
-            // A wal file may not exist anymore. Nothing can be recovered so move on
-            LOG.warn("WAL {} does not exist anymore", filename, e);
-            return Status.DONE;
-          }
-          Throwable cause = e.getCause();
-          if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException
-                  || cause instanceof ConnectException
-                  || cause instanceof SocketTimeoutException)) {
-            LOG.warn("log replaying of " + filename + " can't connect to the target regionserver, "
-                + "resigning", e);
-            return Status.RESIGNED;
-          } else if (cause instanceof InterruptedException) {
-            LOG.warn("log splitting of " + filename + " interrupted, resigning", e);
-            return Status.RESIGNED;
-          }
-          LOG.warn("log splitting of " + filename + " failed, returning error", e);
-          return Status.ERR;
-        }
+  public SplitLogWorker(Configuration conf, RegionServerServices server,
+      LastSequenceId sequenceIdChecker, WALFactory factory) {
+    this(server, conf, server, (f, p) -> splitLog(f, p, conf, server, sequenceIdChecker, factory));
+  }
+
+  // returns whether we need to continue the split work
+  private static boolean processSyncReplicationWAL(String name, Configuration conf,
+      RegionServerServices server, FileSystem fs, Path walDir) throws IOException {
+    Path walFile = new Path(walDir, name);
+    String filename = walFile.getName();
+    Optional<String> optSyncPeerId =
+      SyncReplicationWALProvider.getSyncReplicationPeerIdFromWALName(filename);
+    if (!optSyncPeerId.isPresent()) {
+      return true;
+    }
+    String peerId = optSyncPeerId.get();
+    ReplicationPeerImpl peer =
+      server.getReplicationSourceService().getReplicationPeers().getPeer(peerId);
+    if (peer == null || !peer.getPeerConfig().isSyncReplication()) {
+      return true;
+    }
+    Pair<SyncReplicationState, SyncReplicationState> stateAndNewState =
+      peer.getSyncReplicationStateAndNewState();
+    if (stateAndNewState.getFirst().equals(SyncReplicationState.ACTIVE) &&
+      stateAndNewState.getSecond().equals(SyncReplicationState.NONE)) {
+      // copy the file to remote and overwrite the previous one
+      String remoteWALDir = peer.getPeerConfig().getRemoteWALDir();
+      Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
+      Path tmpRemoteWAL = new Path(remoteWALDirForPeer, filename + ".tmp");
+      FileSystem remoteFs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
+      try (FSDataInputStream in = fs.open(walFile); @SuppressWarnings("deprecation")
+      FSDataOutputStream out = remoteFs.createNonRecursive(tmpRemoteWAL, true,
+        FSUtils.getDefaultBufferSize(remoteFs), remoteFs.getDefaultReplication(tmpRemoteWAL),
+        remoteFs.getDefaultBlockSize(tmpRemoteWAL), null)) {
+        IOUtils.copy(in, out);
+      }
+      Path toCommitRemoteWAL =
+        new Path(remoteWALDirForPeer, filename + ReplicationUtils.RENAME_WAL_SUFFIX);
+      // Some FileSystem implementations may not support atomic rename so we need to do it in two
+      // phases
+      FSUtils.renameFile(remoteFs, tmpRemoteWAL, toCommitRemoteWAL);
+      FSUtils.renameFile(remoteFs, toCommitRemoteWAL, new Path(remoteWALDirForPeer, filename));
+    } else if ((stateAndNewState.getFirst().equals(SyncReplicationState.ACTIVE) &&
+      stateAndNewState.getSecond().equals(SyncReplicationState.STANDBY)) ||
+      stateAndNewState.getFirst().equals(SyncReplicationState.STANDBY)) {
+      // check whether we still need to process this file
+      // actually we only write wal file which name is ended with .syncrep in A state, and after
+      // transiting to a state other than A, we will reopen all the regions so the data in the wal
+      // will be flushed so the wal file will be archived soon. But it is still possible that there
+      // is a server crash when we are transiting from A to S, to simplify the logic of the transit
+      // procedure, here we will also check the remote snapshot directory in state S, so that we do
+      // not need wait until all the wal files with .syncrep suffix to be archived before finishing
+      // the procedure.
+      String remoteWALDir = peer.getPeerConfig().getRemoteWALDir();
+      Path remoteSnapshotDirForPeer = ReplicationUtils.getPeerSnapshotWALDir(remoteWALDir, peerId);
+      FileSystem remoteFs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
+      if (remoteFs.exists(new Path(remoteSnapshotDirForPeer, filename))) {
+        // the file has been replayed when the remote cluster was transited from S to DA, the
+        // content will be replicated back to us so give up split it.
+        LOG.warn("Giveup splitting {} since it has been replayed in the remote cluster and " +
+          "the content will be replicated back", filename);
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private static Status splitLog(String name, CancelableProgressable p, Configuration conf,
+      RegionServerServices server, LastSequenceId sequenceIdChecker, WALFactory factory) {
+    Path walDir;
+    FileSystem fs;
+    try {
+      walDir = FSUtils.getWALRootDir(conf);
+      fs = walDir.getFileSystem(conf);
+    } catch (IOException e) {
+      LOG.warn("could not find root dir or fs", e);
+      return Status.RESIGNED;
+    }
+    try {
+      if (!processSyncReplicationWAL(name, conf, server, fs, walDir)) {
         return Status.DONE;
       }
-    });
+    } catch (IOException e) {
+      LOG.warn("failed to process sync replication wal {}", name, e);
+      return Status.RESIGNED;
+    }
+    // TODO have to correctly figure out when log splitting has been
+    // interrupted or has encountered a transient error and when it has
+    // encountered a bad non-retry-able persistent error.
+    try {
+      if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, name)), fs, conf,
+        p, sequenceIdChecker, server.getCoordinatedStateManager().getSplitLogWorkerCoordination(),
+        factory)) {
+        return Status.PREEMPTED;
+      }
+    } catch (InterruptedIOException iioe) {
+      LOG.warn("log splitting of " + name + " interrupted, resigning", iioe);
+      return Status.RESIGNED;
+    } catch (IOException e) {
+      if (e instanceof FileNotFoundException) {
+        // A wal file may not exist anymore. Nothing can be recovered so move on
+        LOG.warn("WAL {} does not exist anymore", name, e);
+        return Status.DONE;
+      }
+      Throwable cause = e.getCause();
+      if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException ||
+        cause instanceof ConnectException || cause instanceof SocketTimeoutException)) {
+        LOG.warn("log replaying of " + name + " can't connect to the target regionserver, " +
+          "resigning", e);
+        return Status.RESIGNED;
+      } else if (cause instanceof InterruptedException) {
+        LOG.warn("log splitting of " + name + " interrupted, resigning", e);
+        return Status.RESIGNED;
+      }
+      LOG.warn("log splitting of " + name + " failed, returning error", e);
+      return Status.ERR;
+    }
+    return Status.DONE;
   }
 
   @Override
@@ -191,6 +266,7 @@ public class SplitLogWorker implements Runnable {
    * {@link org.apache.hadoop.hbase.master.SplitLogManager} commit the work in
    * SplitLogManager.TaskFinisher
    */
+  @FunctionalInterface
   public interface TaskExecutor {
     enum Status {
       DONE(),

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4300002/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java
index 8ecfede..4301ae7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java
@@ -32,13 +32,13 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
  * An {@link AsyncWriter} wrapper which writes data to a set of {@link AsyncWriter} instances.
  */
 @InterfaceAudience.Private
-public abstract class CombinedAsyncWriter implements AsyncWriter {
+public final class CombinedAsyncWriter implements AsyncWriter {
 
   private static final Logger LOG = LoggerFactory.getLogger(CombinedAsyncWriter.class);
 
-  protected final ImmutableList<AsyncWriter> writers;
+  private final ImmutableList<AsyncWriter> writers;
 
-  protected CombinedAsyncWriter(ImmutableList<AsyncWriter> writers) {
+  private CombinedAsyncWriter(ImmutableList<AsyncWriter> writers) {
     this.writers = writers;
   }
 
@@ -66,69 +66,29 @@ public abstract class CombinedAsyncWriter implements AsyncWriter {
     }
   }
 
-  protected abstract void doSync(CompletableFuture<Long> future);
-
-  @Override
-  public CompletableFuture<Long> sync() {
-    CompletableFuture<Long> future = new CompletableFuture<>();
-    doSync(future);
-    return future;
-  }
-
   @Override
   public void append(Entry entry) {
     writers.forEach(w -> w.append(entry));
   }
 
-  public enum Mode {
-    SEQUENTIAL, PARALLEL
+  @Override
+  public CompletableFuture<Long> sync() {
+    CompletableFuture<Long> future = new CompletableFuture<>();
+    AtomicInteger remaining = new AtomicInteger(writers.size());
+    writers.forEach(w -> w.sync().whenComplete((length, error) -> {
+      if (error != null) {
+        future.completeExceptionally(error);
+        return;
+      }
+      if (remaining.decrementAndGet() == 0) {
+        future.complete(length);
+      }
+    }));
+    return future;
   }
 
-  public static CombinedAsyncWriter create(Mode mode, AsyncWriter writer, AsyncWriter... writers) {
-    ImmutableList<AsyncWriter> ws =
-        ImmutableList.<AsyncWriter> builder().add(writer).add(writers).build();
-    switch (mode) {
-      case SEQUENTIAL:
-        return new CombinedAsyncWriter(ws) {
-
-          private void doSync(CompletableFuture<Long> future, Long length, int index) {
-            if (index == writers.size()) {
-              future.complete(length);
-              return;
-            }
-            writers.get(index).sync().whenComplete((len, error) -> {
-              if (error != null) {
-                future.completeExceptionally(error);
-                return;
-              }
-              doSync(future, len, index + 1);
-            });
-          }
-
-          @Override
-          protected void doSync(CompletableFuture<Long> future) {
-            doSync(future, null, 0);
-          }
-        };
-      case PARALLEL:
-        return new CombinedAsyncWriter(ws) {
-
-          @Override
-          protected void doSync(CompletableFuture<Long> future) {
-            AtomicInteger remaining = new AtomicInteger(writers.size());
-            writers.forEach(w -> w.sync().whenComplete((length, error) -> {
-              if (error != null) {
-                future.completeExceptionally(error);
-                return;
-              }
-              if (remaining.decrementAndGet() == 0) {
-                future.complete(length);
-              }
-            }));
-          }
-        };
-      default:
-        throw new IllegalArgumentException("Unknown mode: " + mode);
-    }
+  public static CombinedAsyncWriter create(AsyncWriter writer, AsyncWriter... writers) {
+    return new CombinedAsyncWriter(
+      ImmutableList.<AsyncWriter> builder().add(writer).add(writers).build());
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4300002/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
index a98567a..3967e78 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.io.netty.channel.Channel;
 import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
 
@@ -50,6 +51,13 @@ public class DualAsyncFSWAL extends AsyncFSWAL {
     this.remoteWalDir = remoteWalDir;
   }
 
+  // will be overridden in testcase
+  @VisibleForTesting
+  protected AsyncWriter createCombinedAsyncWriter(AsyncWriter localWriter,
+      AsyncWriter remoteWriter) {
+    return CombinedAsyncWriter.create(remoteWriter, localWriter);
+  }
+
   @Override
   protected AsyncWriter createWriterInstance(Path path) throws IOException {
     AsyncWriter localWriter = super.createWriterInstance(path);
@@ -66,8 +74,7 @@ public class DualAsyncFSWAL extends AsyncFSWAL {
         closeWriter(localWriter);
       }
     }
-    return CombinedAsyncWriter.create(CombinedAsyncWriter.Mode.SEQUENTIAL, remoteWriter,
-      localWriter);
+    return createCombinedAsyncWriter(localWriter, remoteWriter);
   }
 
   // Allow temporarily skipping the creation of remote writer. When failing to write to the remote

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4300002/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 2199415..b04f0cb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -288,4 +288,9 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
   public SyncReplicationPeerInfoProvider getSyncReplicationPeerInfoProvider() {
     return syncReplicationPeerInfoProvider;
   }
+
+  @Override
+  public ReplicationPeers getReplicationPeers() {
+    return replicationPeers;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4300002/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index f25b073..827cfa9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -652,7 +652,7 @@ public class ReplicationSourceManager implements ReplicationListener {
 
   private void removeRemoteWALs(String peerId, String remoteWALDir, Collection<String> wals)
       throws IOException {
-    Path remoteWALDirForPeer = ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId);
+    Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
     FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
     for (String wal : wals) {
       Path walFile = new Path(remoteWALDirForPeer, wal);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4300002/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
index 75274ea..170441b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
@@ -77,8 +77,7 @@ class SyncReplicationPeerInfoProviderImpl implements SyncReplicationPeerInfoProv
       return false;
     }
     Pair<SyncReplicationState, SyncReplicationState> states =
-        peer.getSyncReplicationStateAndNewState();
+      peer.getSyncReplicationStateAndNewState();
     return checker.test(states.getFirst(), states.getSecond());
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4300002/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
index 8a1f948..5b968db 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -843,6 +843,15 @@ public abstract class FSUtils extends CommonFSUtils {
     return frags;
   }
 
+  public static void renameFile(FileSystem fs, Path src, Path dst) throws IOException {
+    if (fs.exists(dst) && !fs.delete(dst, false)) {
+      throw new IOException("Can not delete " + dst);
+    }
+    if (!fs.rename(src, dst)) {
+      throw new IOException("Can not rename from " + src + " to " + dst);
+    }
+  }
+
   /**
    * A {@link PathFilter} that returns only regular files.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4300002/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
index 8e82d8b..82f8a89 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
@@ -21,6 +21,8 @@ import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALArchiveDir
 import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALDirectoryName;
 
 import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
@@ -51,6 +53,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
 import org.apache.hbase.thirdparty.com.google.common.collect.Streams;
 import org.apache.hbase.thirdparty.io.netty.channel.Channel;
 import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
@@ -67,8 +70,9 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
 
   private static final Logger LOG = LoggerFactory.getLogger(SyncReplicationWALProvider.class);
 
+  // only for injecting errors for testcase, do not use it for other purpose.
   @VisibleForTesting
-  public static final String LOG_SUFFIX = ".syncrep";
+  public static final String DUAL_WAL_IMPL = "hbase.wal.sync.impl";
 
   private final WALProvider provider;
 
@@ -126,12 +130,35 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
   }
 
   private DualAsyncFSWAL createWAL(String peerId, String remoteWALDir) throws IOException {
-    return new DualAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf),
-      ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir),
-      CommonFSUtils.getWALRootDir(conf),
-      ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId),
-      getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId),
-      conf, listeners, true, getLogPrefix(peerId), LOG_SUFFIX, eventLoopGroup, channelClass);
+    Class<? extends DualAsyncFSWAL> clazz =
+      conf.getClass(DUAL_WAL_IMPL, DualAsyncFSWAL.class, DualAsyncFSWAL.class);
+    try {
+      Constructor<?> constructor = null;
+      for (Constructor<?> c : clazz.getDeclaredConstructors()) {
+        if (c.getParameterCount() > 0) {
+          constructor = c;
+          break;
+        }
+      }
+      if (constructor == null) {
+        throw new IllegalArgumentException("No valid constructor provided for class " + clazz);
+      }
+      constructor.setAccessible(true);
+      return (DualAsyncFSWAL) constructor.newInstance(
+        CommonFSUtils.getWALFileSystem(conf),
+        ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir),
+        CommonFSUtils.getWALRootDir(conf),
+        ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId),
+        getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId),
+        conf, listeners, true, getLogPrefix(peerId), ReplicationUtils.SYNC_WAL_SUFFIX,
+        eventLoopGroup, channelClass);
+    } catch (InstantiationException | IllegalAccessException e) {
+      throw new RuntimeException(e);
+    } catch (InvocationTargetException e) {
+      Throwable cause = e.getTargetException();
+      Throwables.propagateIfPossible(cause, IOException.class);
+      throw new RuntimeException(cause);
+    }
   }
 
   private DualAsyncFSWAL getWAL(String peerId, String remoteWALDir) throws IOException {
@@ -304,7 +331,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
    * </p>
    */
   public static Optional<String> getSyncReplicationPeerIdFromWALName(String name) {
-    if (!name.endsWith(LOG_SUFFIX)) {
+    if (!name.endsWith(ReplicationUtils.SYNC_WAL_SUFFIX)) {
       // fast path to return earlier if the name is not for a sync replication peer.
       return Optional.empty();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4300002/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index c6ffeea..6462234 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -1141,7 +1141,7 @@ public class TestReplicationAdmin {
       LOG.info("Expected error:", e);
     }
     TEST_UTIL.getTestFileSystem()
-      .mkdirs(ReplicationUtils.getRemoteWALDirForPeer(rootDir, ID_SECOND));
+      .mkdirs(ReplicationUtils.getPeerRemoteWALDir(rootDir, ID_SECOND));
     hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE);
     assertEquals(SyncReplicationState.ACTIVE,
       hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4300002/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java
index 07aa6a8..f73b4f1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java
@@ -18,8 +18,6 @@
 package org.apache.hadoop.hbase.regionserver.wal;
 
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -39,23 +37,18 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
 
 import org.apache.hbase.thirdparty.io.netty.channel.Channel;
 import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
 import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
 import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
 
-@RunWith(Parameterized.class)
 @Category({ RegionServerTests.class, MediumTests.class })
 public class TestCombinedAsyncWriter {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestCombinedAsyncWriter.class);
+    HBaseClassTestRule.forClass(TestCombinedAsyncWriter.class);
 
   private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
 
@@ -68,15 +61,6 @@ public class TestCombinedAsyncWriter {
   @Rule
   public final TestName name = new TestName();
 
-  @Parameter
-  public CombinedAsyncWriter.Mode mode;
-
-  @Parameters(name = "{index}: mode={0}")
-  public static List<Object[]> params() {
-    return Arrays.asList(new Object[] { CombinedAsyncWriter.Mode.SEQUENTIAL },
-      new Object[] { CombinedAsyncWriter.Mode.PARALLEL });
-  }
-
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     EVENT_LOOP_GROUP = new NioEventLoopGroup();
@@ -125,7 +109,7 @@ public class TestCombinedAsyncWriter {
         EVENT_LOOP_GROUP.next(), CHANNEL_CLASS);
       AsyncWriter writer2 = AsyncFSWALProvider.createAsyncWriter(conf, fs, path2, false,
         EVENT_LOOP_GROUP.next(), CHANNEL_CLASS);
-      CombinedAsyncWriter writer = CombinedAsyncWriter.create(mode, writer1, writer2)) {
+      CombinedAsyncWriter writer = CombinedAsyncWriter.create(writer1, writer2)) {
       ProtobufLogTestHelper.doWrite(new WriterOverAsyncWriter(writer), withTrailer, tableName,
         columnCount, recordCount, row, timestamp);
       try (ProtobufLogReader reader = (ProtobufLogReader) WALS.createReader(fs, path1)) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4300002/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java
new file mode 100644
index 0000000..fb3daf2
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
+import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
+
+class DualAsyncFSWALForTest extends DualAsyncFSWAL {
+
+  private boolean localBroken;
+
+  private boolean remoteBroken;
+
+  private CountDownLatch arrive;
+
+  private CountDownLatch resume;
+
+  private final class MyCombinedAsyncWriter implements AsyncWriter {
+
+    private final AsyncWriter localWriter;
+
+    private final AsyncWriter remoteWriter;
+
+    public MyCombinedAsyncWriter(AsyncWriter localWriter, AsyncWriter remoteWriter) {
+      this.localWriter = localWriter;
+      this.remoteWriter = remoteWriter;
+    }
+
+    @Override
+    public long getLength() {
+      return localWriter.getLength();
+    }
+
+    @Override
+    public void close() throws IOException {
+      Closeables.close(localWriter, true);
+      Closeables.close(remoteWriter, true);
+    }
+
+    @Override
+    public CompletableFuture<Long> sync() {
+      CompletableFuture<Long> localFuture;
+      CompletableFuture<Long> remoteFuture;
+      if (!localBroken) {
+        localFuture = localWriter.sync();
+      } else {
+        localFuture = new CompletableFuture<>();
+        localFuture.completeExceptionally(new IOException("Inject error"));
+      }
+      if (!remoteBroken) {
+        remoteFuture = remoteWriter.sync();
+      } else {
+        remoteFuture = new CompletableFuture<>();
+        remoteFuture.completeExceptionally(new IOException("Inject error"));
+      }
+      return CompletableFuture.allOf(localFuture, remoteFuture).thenApply(v -> {
+        return localFuture.getNow(0L);
+      });
+    }
+
+    @Override
+    public void append(Entry entry) {
+      if (!localBroken) {
+        localWriter.append(entry);
+      }
+      if (!remoteBroken) {
+        remoteWriter.append(entry);
+      }
+    }
+  }
+
+  public DualAsyncFSWALForTest(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWalDir,
+      String logDir, String archiveDir, Configuration conf, List<WALActionsListener> listeners,
+      boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup,
+      Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
+    super(fs, remoteFs, rootDir, remoteWalDir, logDir, archiveDir, conf, listeners, failIfWALExists,
+      prefix, suffix, eventLoopGroup, channelClass);
+  }
+
+  @Override
+  protected AsyncWriter createCombinedAsyncWriter(AsyncWriter localWriter,
+      AsyncWriter remoteWriter) {
+    return new MyCombinedAsyncWriter(localWriter, remoteWriter);
+  }
+
+  @Override
+  protected AsyncWriter createWriterInstance(Path path) throws IOException {
+    if (arrive != null) {
+      arrive.countDown();
+      try {
+        resume.await();
+      } catch (InterruptedException e) {
+      }
+    }
+    if (localBroken || remoteBroken) {
+      throw new IOException("WAL broken");
+    }
+    return super.createWriterInstance(path);
+  }
+
+  public void setLocalBroken() {
+    this.localBroken = true;
+  }
+
+  public void setRemoteBroken() {
+    this.remoteBroken = true;
+  }
+
+  public void suspendLogRoll() {
+    arrive = new CountDownLatch(1);
+    resume = new CountDownLatch(1);
+  }
+
+  public void waitUntilArrive() throws InterruptedException {
+    arrive.await();
+  }
+
+  public void resumeLogRoll() {
+    resume.countDown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4300002/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
index de679be..095be90 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
@@ -72,9 +72,9 @@ public class SyncReplicationTestBase {
 
   protected static String PEER_ID = "1";
 
-  protected static Path remoteWALDir1;
+  protected static Path REMOTE_WAL_DIR1;
 
-  protected static Path remoteWALDir2;
+  protected static Path REMOTE_WAL_DIR2;
 
   private static void initTestingUtility(HBaseTestingUtility util, String zkParent) {
     util.setZkCluster(ZK_UTIL.getZkCluster());
@@ -109,22 +109,22 @@ public class SyncReplicationTestBase {
     UTIL2.getAdmin().createTable(td);
     FileSystem fs1 = UTIL1.getTestFileSystem();
     FileSystem fs2 = UTIL2.getTestFileSystem();
-    remoteWALDir1 =
+    REMOTE_WAL_DIR1 =
       new Path(UTIL1.getMiniHBaseCluster().getMaster().getMasterFileSystem().getWALRootDir(),
         "remoteWALs").makeQualified(fs1.getUri(), fs1.getWorkingDirectory());
-    remoteWALDir2 =
+    REMOTE_WAL_DIR2 =
       new Path(UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem().getWALRootDir(),
         "remoteWALs").makeQualified(fs2.getUri(), fs2.getWorkingDirectory());
     UTIL1.getAdmin().addReplicationPeer(PEER_ID,
       ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey())
         .setReplicateAllUserTables(false)
         .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>()))
-        .setRemoteWALDir(remoteWALDir2.toUri().toString()).build());
+        .setRemoteWALDir(REMOTE_WAL_DIR2.toUri().toString()).build());
     UTIL2.getAdmin().addReplicationPeer(PEER_ID,
       ReplicationPeerConfig.newBuilder().setClusterKey(UTIL1.getClusterKey())
         .setReplicateAllUserTables(false)
         .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>()))
-        .setRemoteWALDir(remoteWALDir1.toUri().toString()).build());
+        .setRemoteWALDir(REMOTE_WAL_DIR1.toUri().toString()).build());
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4300002/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
index b663c44..fce0cdf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
@@ -37,8 +37,7 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestSyncReplicationActive.class);
-
+    HBaseClassTestRule.forClass(TestSyncReplicationActive.class);
 
   @Test
   public void testActive() throws Exception {
@@ -58,7 +57,7 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase {
     verifyNotReplicatedThroughRegion(UTIL2, 0, 100);
 
     // Ensure that there's no cluster id in remote log entries.
-    verifyNoClusterIdInRemoteLog(UTIL2, remoteWALDir2, PEER_ID);
+    verifyNoClusterIdInRemoteLog(UTIL2, REMOTE_WAL_DIR2, PEER_ID);
 
     UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
       SyncReplicationState.DOWNGRADE_ACTIVE);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4300002/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationMoreLogsInLocalCopyToRemote.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationMoreLogsInLocalCopyToRemote.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationMoreLogsInLocalCopyToRemote.java
new file mode 100644
index 0000000..cf8993b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationMoreLogsInLocalCopyToRemote.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.ExecutionException;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestSyncReplicationMoreLogsInLocalCopyToRemote extends SyncReplicationTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestSyncReplicationMoreLogsInLocalCopyToRemote.class);
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(TestSyncReplicationMoreLogsInLocalCopyToRemote.class);
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    UTIL1.getConfiguration().setClass(SyncReplicationWALProvider.DUAL_WAL_IMPL,
+      DualAsyncFSWALForTest.class, DualAsyncFSWAL.class);
+    UTIL2.getConfiguration().setClass(SyncReplicationWALProvider.DUAL_WAL_IMPL,
+      DualAsyncFSWALForTest.class, DualAsyncFSWAL.class);
+    SyncReplicationTestBase.setUp();
+  }
+
+  @Test
+  public void testSplitLog() throws Exception {
+    UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
+    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.STANDBY);
+    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.ACTIVE);
+    HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME);
+    DualAsyncFSWALForTest wal =
+      (DualAsyncFSWALForTest) rs.getWAL(RegionInfoBuilder.newBuilder(TABLE_NAME).build());
+    wal.setRemoteBroken();
+    try (AsyncConnection conn =
+      ConnectionFactory.createAsyncConnection(UTIL1.getConfiguration()).get()) {
+      AsyncTable<?> table = conn.getTableBuilder(TABLE_NAME).setMaxAttempts(1).build();
+      try {
+        table.put(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, Bytes.toBytes(0))).get();
+        fail("Should fail since the rs will crash and we will not retry");
+      } catch (ExecutionException e) {
+        // expected
+        LOG.info("Expected error:", e);
+      }
+    }
+    UTIL1.waitFor(60000, new ExplainingPredicate<Exception>() {
+
+      @Override
+      public boolean evaluate() throws Exception {
+        try (Table table = UTIL1.getConnection().getTable(TABLE_NAME)) {
+          return table.exists(new Get(Bytes.toBytes(0)));
+        }
+      }
+
+      @Override
+      public String explainFailure() throws Exception {
+        return "The row is still not available";
+      }
+    });
+    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.DOWNGRADE_ACTIVE);
+    // We should have copied the local log to remote, so we should be able to get the value
+    try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) {
+      assertEquals(0, Bytes.toInt(table.get(new Get(Bytes.toBytes(0))).getValue(CF, CQ)));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4300002/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationMoreLogsInLocalGiveUpSplitting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationMoreLogsInLocalGiveUpSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationMoreLogsInLocalGiveUpSplitting.java
new file mode 100644
index 0000000..9a6d242
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationMoreLogsInLocalGiveUpSplitting.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestSyncReplicationMoreLogsInLocalGiveUpSplitting extends SyncReplicationTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestSyncReplicationMoreLogsInLocalGiveUpSplitting.class);
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(TestSyncReplicationMoreLogsInLocalGiveUpSplitting.class);
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    UTIL1.getConfiguration().setClass(SyncReplicationWALProvider.DUAL_WAL_IMPL,
+      DualAsyncFSWALForTest.class, DualAsyncFSWAL.class);
+    UTIL2.getConfiguration().setClass(SyncReplicationWALProvider.DUAL_WAL_IMPL,
+      DualAsyncFSWALForTest.class, DualAsyncFSWAL.class);
+    SyncReplicationTestBase.setUp();
+  }
+
+  @Test
+  public void testSplitLog() throws Exception {
+    UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
+    UTIL2.getAdmin().disableReplicationPeer(PEER_ID);
+    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.STANDBY);
+    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.ACTIVE);
+    try (Table table = UTIL1.getConnection().getTable(TABLE_NAME)) {
+      table.put(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, Bytes.toBytes(0)));
+    }
+    HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME);
+    DualAsyncFSWALForTest wal =
+      (DualAsyncFSWALForTest) rs.getWAL(RegionInfoBuilder.newBuilder(TABLE_NAME).build());
+    wal.setRemoteBroken();
+    wal.suspendLogRoll();
+    try (AsyncConnection conn =
+      ConnectionFactory.createAsyncConnection(UTIL1.getConfiguration()).get()) {
+      AsyncTable<?> table = conn.getTableBuilder(TABLE_NAME).setMaxAttempts(1)
+        .setWriteRpcTimeout(5, TimeUnit.SECONDS).build();
+      try {
+        table.put(new Put(Bytes.toBytes(1)).addColumn(CF, CQ, Bytes.toBytes(1))).get();
+        fail("Should fail since the rs will hang and we will get a rpc timeout");
+      } catch (ExecutionException e) {
+        // expected
+        LOG.info("Expected error:", e);
+      }
+    }
+    wal.waitUntilArrive();
+    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.DOWNGRADE_ACTIVE);
+    wal.resumeLogRoll();
+    try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) {
+      assertEquals(0, Bytes.toInt(table.get(new Get(Bytes.toBytes(0))).getValue(CF, CQ)));
+      // we failed to write this entry to remote so it should not exist
+      assertFalse(table.exists(new Get(Bytes.toBytes(1))));
+    }
+    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.STANDBY);
+    // make sure that the region is online. We can not use waitTableAvailable since the table in
+    // stand by state can not be read from client.
+    try (Table table = UTIL1.getConnection().getTable(TABLE_NAME)) {
+      try {
+        table.exists(new Get(Bytes.toBytes(0)));
+      } catch (DoNotRetryIOException | RetriesExhaustedException e) {
+        // expected
+        assertThat(e.getMessage(), containsString("STANDBY"));
+      }
+    }
+    HRegion region = UTIL1.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
+    // we give up splitting the whole wal file so this record will also be gone.
+    assertTrue(region.get(new Get(Bytes.toBytes(0))).isEmpty());
+    UTIL2.getAdmin().enableReplicationPeer(PEER_ID);
+    // finally it should be replicated back
+    waitUntilReplicationDone(UTIL1, 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4300002/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java
index 7d380c1..0cd1846 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.master.MasterFileSystem;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -66,12 +65,12 @@ public class TestSyncReplicationRemoveRemoteWAL extends SyncReplicationTestBase
       SyncReplicationState.ACTIVE);
 
     MasterFileSystem mfs = UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem();
-    Path remoteWALDir = ReplicationUtils.getRemoteWALDirForPeer(
+    Path remoteWALDir = ReplicationUtils.getPeerRemoteWALDir(
       new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME), PEER_ID);
     FileStatus[] remoteWALStatus = mfs.getWALFileSystem().listStatus(remoteWALDir);
     assertEquals(1, remoteWALStatus.length);
     Path remoteWAL = remoteWALStatus[0].getPath();
-    assertThat(remoteWAL.getName(), endsWith(SyncReplicationWALProvider.LOG_SUFFIX));
+    assertThat(remoteWAL.getName(), endsWith(ReplicationUtils.SYNC_WAL_SUFFIX));
     writeAndVerifyReplication(UTIL1, UTIL2, 0, 100);
 
     HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME);
@@ -81,7 +80,7 @@ public class TestSyncReplicationRemoveRemoteWAL extends SyncReplicationTestBase
     remoteWALStatus = mfs.getWALFileSystem().listStatus(remoteWALDir);
     assertEquals(1, remoteWALStatus.length);
     remoteWAL = remoteWALStatus[0].getPath();
-    assertThat(remoteWAL.getName(), endsWith(SyncReplicationWALProvider.LOG_SUFFIX));
+    assertThat(remoteWAL.getName(), endsWith(ReplicationUtils.SYNC_WAL_SUFFIX));
 
     UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
     write(UTIL1, 100, 200);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4300002/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java
index 8526af8..de409fc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java
@@ -97,25 +97,25 @@ public class TestSyncReplicationStandBy extends SyncReplicationTestBase {
     writeAndVerifyReplication(UTIL1, UTIL2, 0, 100);
 
     // Remove the peers in ACTIVE & STANDBY cluster.
-    FileSystem fs2 = remoteWALDir2.getFileSystem(UTIL2.getConfiguration());
-    Assert.assertTrue(fs2.exists(getRemoteWALDir(remoteWALDir2, PEER_ID)));
+    FileSystem fs2 = REMOTE_WAL_DIR2.getFileSystem(UTIL2.getConfiguration());
+    Assert.assertTrue(fs2.exists(getRemoteWALDir(REMOTE_WAL_DIR2, PEER_ID)));
 
     UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
       SyncReplicationState.DOWNGRADE_ACTIVE);
-    Assert.assertFalse(fs2.exists(getRemoteWALDir(remoteWALDir2, PEER_ID)));
-    Assert.assertFalse(fs2.exists(getReplayRemoteWALs(remoteWALDir2, PEER_ID)));
+    Assert.assertFalse(fs2.exists(getRemoteWALDir(REMOTE_WAL_DIR2, PEER_ID)));
+    Assert.assertFalse(fs2.exists(getReplayRemoteWALs(REMOTE_WAL_DIR2, PEER_ID)));
 
     UTIL1.getAdmin().removeReplicationPeer(PEER_ID);
-    verifyRemovedPeer(PEER_ID, remoteWALDir1, UTIL1);
+    verifyRemovedPeer(PEER_ID, REMOTE_WAL_DIR1, UTIL1);
 
     // Peer remoteWAL dir will be renamed to replay WAL dir when transit from S to DA, and the
     // replay WAL dir will be removed after replaying all WALs, so create a emtpy dir here to test
     // whether the removeReplicationPeer would remove the remoteWAL dir.
-    fs2.create(getRemoteWALDir(remoteWALDir2, PEER_ID));
-    fs2.create(getReplayRemoteWALs(remoteWALDir2, PEER_ID));
-    Assert.assertTrue(fs2.exists(getRemoteWALDir(remoteWALDir2, PEER_ID)));
-    Assert.assertTrue(fs2.exists(getReplayRemoteWALs(remoteWALDir2, PEER_ID)));
+    fs2.create(getRemoteWALDir(REMOTE_WAL_DIR2, PEER_ID));
+    fs2.create(getReplayRemoteWALs(REMOTE_WAL_DIR2, PEER_ID));
+    Assert.assertTrue(fs2.exists(getRemoteWALDir(REMOTE_WAL_DIR2, PEER_ID)));
+    Assert.assertTrue(fs2.exists(getReplayRemoteWALs(REMOTE_WAL_DIR2, PEER_ID)));
     UTIL2.getAdmin().removeReplicationPeer(PEER_ID);
-    verifyRemovedPeer(PEER_ID, remoteWALDir2, UTIL2);
+    verifyRemovedPeer(PEER_ID, REMOTE_WAL_DIR2, UTIL2);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4300002/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java
index ebb21a4..2563669 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -151,7 +152,8 @@ public class TestRecoverStandbyProcedure {
   }
 
   private void setupSyncReplicationWALs() throws IOException, StreamLacksCapabilityException {
-    Path peerRemoteWALDir = replaySyncReplicationWALManager.getPeerRemoteWALDir(PEER_ID);
+    Path peerRemoteWALDir = ReplicationUtils
+      .getPeerRemoteWALDir(replaySyncReplicationWALManager.getRemoteWALDir(), PEER_ID);
     if (!fs.exists(peerRemoteWALDir)) {
       fs.mkdirs(peerRemoteWALDir);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4300002/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index d98b7f85..febe764 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -84,7 +84,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALFactory;
@@ -618,7 +617,7 @@ public abstract class TestReplicationSourceManager {
     try {
       // make sure that we can deal with files which does not exist
       String walNameNotExists =
-        "remoteWAL-12345-" + slaveId + ".12345" + SyncReplicationWALProvider.LOG_SUFFIX;
+        "remoteWAL-12345-" + slaveId + ".12345" + ReplicationUtils.SYNC_WAL_SUFFIX;
       Path wal = new Path(logDir, walNameNotExists);
       manager.preLogRoll(wal);
       manager.postLogRoll(wal);
@@ -626,7 +625,7 @@ public abstract class TestReplicationSourceManager {
       Path remoteLogDirForPeer = new Path(remoteLogDir, slaveId);
       fs.mkdirs(remoteLogDirForPeer);
       String walName =
-        "remoteWAL-12345-" + slaveId + ".23456" + SyncReplicationWALProvider.LOG_SUFFIX;
+        "remoteWAL-12345-" + slaveId + ".23456" + ReplicationUtils.SYNC_WAL_SUFFIX;
       Path remoteWAL =
         new Path(remoteLogDirForPeer, walName).makeQualified(fs.getUri(), fs.getWorkingDirectory());
       fs.create(remoteWAL).close();

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4300002/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
index 69ed44d..8189cef 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
@@ -86,7 +86,6 @@ public class TestSyncReplicationWALProvider {
     @Override
     public boolean checkState(TableName table,
         BiPredicate<SyncReplicationState, SyncReplicationState> checker) {
-      // TODO Implement SyncReplicationPeerInfoProvider.isInState
       return false;
     }
   }