You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/05/24 03:27:56 UTC

[28/37] hbase git commit: HBASE-20426 Give up replicating anything in S state

HBASE-20426 Give up replicating anything in S state


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ca7f0e3a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ca7f0e3a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ca7f0e3a

Branch: refs/heads/HBASE-19064
Commit: ca7f0e3ab2d382ea73f92ff79db3d43b2632e078
Parents: cfc980f
Author: zhangduo <zh...@apache.org>
Authored: Thu May 3 15:51:35 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Thu May 24 11:13:58 2018 +0800

----------------------------------------------------------------------
 .../src/main/protobuf/MasterProcedure.proto     |  13 +-
 .../replication/AbstractPeerProcedure.java      |   4 +
 .../master/replication/ModifyPeerProcedure.java |   6 -
 .../replication/ReplicationPeerManager.java     |  13 +-
 ...ransitPeerSyncReplicationStateProcedure.java |  94 +++++++++++----
 .../hadoop/hbase/regionserver/LogRoller.java    |  11 +-
 .../regionserver/PeerProcedureHandlerImpl.java  |  63 ++++++++--
 .../regionserver/ReplicationSource.java         |   1 +
 .../regionserver/ReplicationSourceManager.java  | 118 ++++++++++++++++---
 .../TestDrainReplicationQueuesForStandBy.java   | 118 +++++++++++++++++++
 10 files changed, 379 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ca7f0e3a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
index 01e4dae..f15cb04 100644
--- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
@@ -394,11 +394,14 @@ enum PeerSyncReplicationStateTransitionState {
   SET_PEER_NEW_SYNC_REPLICATION_STATE = 2;
   REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN = 3;
   REPLAY_REMOTE_WAL_IN_PEER = 4;
-  REOPEN_ALL_REGIONS_IN_PEER = 5;
-  TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE = 6;
-  REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END = 7;
-  CREATE_DIR_FOR_REMOTE_WAL = 8;
-  POST_PEER_SYNC_REPLICATION_STATE_TRANSITION = 9;
+  REMOVE_ALL_REPLICATION_QUEUES_IN_PEER = 5;
+  REOPEN_ALL_REGIONS_IN_PEER = 6;
+  TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE = 7;
+  REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END = 8;
+  SYNC_REPLICATION_SET_PEER_ENABLED = 9;
+  SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS = 10;
+  CREATE_DIR_FOR_REMOTE_WAL = 11;
+  POST_PEER_SYNC_REPLICATION_STATE_TRANSITION = 12;
 }
 
 message PeerModificationStateData {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca7f0e3a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
index 6679d78..458e073 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
@@ -106,4 +106,8 @@ public abstract class AbstractPeerProcedure<TState>
     throw new UnsupportedOperationException();
   }
 
+  protected final void refreshPeer(MasterProcedureEnv env, PeerOperationType type) {
+    addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
+      .map(sn -> new RefreshPeerProcedure(peerId, type, sn)).toArray(RefreshPeerProcedure[]::new));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca7f0e3a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
index 32b8ea1..56462ca 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
@@ -108,12 +108,6 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
     throw new UnsupportedOperationException();
   }
 
-  private void refreshPeer(MasterProcedureEnv env, PeerOperationType type) {
-    addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
-      .map(sn -> new RefreshPeerProcedure(peerId, type, sn))
-      .toArray(RefreshPeerProcedure[]::new));
-  }
-
   protected ReplicationPeerConfig getOldPeerConfig() {
     return null;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca7f0e3a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index 229549e..e1d8b51 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -192,9 +193,9 @@ public class ReplicationPeerManager {
   }
 
   /**
-   * @return the old state.
+   * @return the old state, and whether the peer is enabled.
    */
-  public SyncReplicationState preTransitPeerSyncReplicationState(String peerId,
+  Pair<SyncReplicationState, Boolean> preTransitPeerSyncReplicationState(String peerId,
       SyncReplicationState state) throws DoNotRetryIOException {
     ReplicationPeerDescription desc = checkPeerExists(peerId);
     SyncReplicationState fromState = desc.getSyncReplicationState();
@@ -203,7 +204,7 @@ public class ReplicationPeerManager {
       throw new DoNotRetryIOException("Can not transit current cluster state from " + fromState +
         " to " + state + " for peer id=" + peerId);
     }
-    return fromState;
+    return Pair.newPair(fromState, desc.isEnabled());
   }
 
   public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
@@ -303,7 +304,7 @@ public class ReplicationPeerManager {
     }
   }
 
-  public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
+  public void removeAllQueues(String peerId) throws ReplicationException {
     // Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still
     // on-going when the refresh peer config procedure is done, if a RS which has already been
     // scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in
@@ -317,6 +318,10 @@ public class ReplicationPeerManager {
     // unless it has already been removed by others.
     ReplicationUtils.removeAllQueues(queueStorage, peerId);
     ReplicationUtils.removeAllQueues(queueStorage, peerId);
+  }
+
+  public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
+    removeAllQueues(peerId);
     queueStorage.removePeerFromHFileRefs(peerId);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca7f0e3a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
index 99fd615..0175296 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,6 +55,8 @@ public class TransitPeerSyncReplicationStateProcedure
 
   private SyncReplicationState toState;
 
+  private boolean enabled;
+
   public TransitPeerSyncReplicationStateProcedure() {
   }
 
@@ -110,7 +113,10 @@ public class TransitPeerSyncReplicationStateProcedure
     if (cpHost != null) {
       cpHost.preTransitReplicationPeerSyncReplicationState(peerId, toState);
     }
-    fromState = env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, toState);
+    Pair<SyncReplicationState, Boolean> pair =
+      env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, toState);
+    fromState = pair.getFirst();
+    enabled = pair.getSecond();
   }
 
   private void postTransit(MasterProcedureEnv env) throws IOException {
@@ -131,6 +137,21 @@ public class TransitPeerSyncReplicationStateProcedure
       .collect(Collectors.toList());
   }
 
+  private void createDirForRemoteWAL(MasterProcedureEnv env)
+      throws ProcedureYieldException, IOException {
+    MasterFileSystem mfs = env.getMasterFileSystem();
+    Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME);
+    Path remoteWALDirForPeer = ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId);
+    FileSystem walFs = mfs.getWALFileSystem();
+    if (walFs.exists(remoteWALDirForPeer)) {
+      LOG.warn("Wal dir {} already exists, usually this should not happen, continue anyway",
+        remoteWALDirForPeer);
+    } else if (!walFs.mkdirs(remoteWALDirForPeer)) {
+      LOG.warn("Can not create remote wal dir {}", remoteWALDirForPeer);
+      throw new ProcedureYieldException();
+    }
+  }
+
   @Override
   protected Flow executeFromState(MasterProcedureEnv env,
       PeerSyncReplicationStateTransitionState state)
@@ -151,6 +172,13 @@ public class TransitPeerSyncReplicationStateProcedure
       case SET_PEER_NEW_SYNC_REPLICATION_STATE:
         try {
           env.getReplicationPeerManager().setPeerNewSyncReplicationState(peerId, toState);
+          if (toState.equals(SyncReplicationState.STANDBY) && enabled) {
+            // disable the peer if we are going to transit to STANDBY state, as we need to remove
+            // all the pending replication files. If we do not disable the peer and delete the wal
+            // queues on zk directly, RS will get NoNode exception when updating the wal position
+            // and crash.
+            env.getReplicationPeerManager().disablePeer(peerId);
+          }
         } catch (ReplicationException e) {
           LOG.warn("Failed to update peer storage for peer {} when starting transiting sync " +
             "replication peer state from {} to {}, retry", peerId, fromState, toState, e);
@@ -163,16 +191,35 @@ public class TransitPeerSyncReplicationStateProcedure
         addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
           .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 0))
           .toArray(RefreshPeerProcedure[]::new));
-        if (fromState == SyncReplicationState.STANDBY &&
-          toState == SyncReplicationState.DOWNGRADE_ACTIVE) {
-          setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
+        if (fromState.equals(SyncReplicationState.ACTIVE)) {
+          setNextState(toState.equals(SyncReplicationState.STANDBY)
+            ? PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER
+            : PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
+        } else if (fromState.equals(SyncReplicationState.DOWNGRADE_ACTIVE)) {
+          setNextState(toState.equals(SyncReplicationState.STANDBY)
+            ? PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER
+            : PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
         } else {
-          setNextState(PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
+          assert toState.equals(SyncReplicationState.DOWNGRADE_ACTIVE);
+          setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
         }
         return Flow.HAS_MORE_STATE;
       case REPLAY_REMOTE_WAL_IN_PEER:
         addChildProcedure(new RecoverStandbyProcedure(peerId));
-        setNextState(PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
+        setNextState(
+          PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
+        return Flow.HAS_MORE_STATE;
+      case REMOVE_ALL_REPLICATION_QUEUES_IN_PEER:
+        try {
+          env.getReplicationPeerManager().removeAllQueues(peerId);
+        } catch (ReplicationException e) {
+          LOG.warn("Failed to remove all replication queues peer {} when starting transiting" +
+            " sync replication peer state from {} to {}, retry", peerId, fromState, toState, e);
+          throw new ProcedureYieldException();
+        }
+        setNextState(fromState.equals(SyncReplicationState.ACTIVE)
+          ? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER
+          : PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
         return Flow.HAS_MORE_STATE;
       case REOPEN_ALL_REGIONS_IN_PEER:
         try {
@@ -202,27 +249,35 @@ public class TransitPeerSyncReplicationStateProcedure
           .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 1))
           .toArray(RefreshPeerProcedure[]::new));
         if (toState == SyncReplicationState.STANDBY) {
-          setNextState(PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL);
+          setNextState(
+            enabled ? PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_SET_PEER_ENABLED
+              : PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL);
         } else {
           setNextState(
             PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
         }
         return Flow.HAS_MORE_STATE;
+      case SYNC_REPLICATION_SET_PEER_ENABLED:
+        try {
+          env.getReplicationPeerManager().enablePeer(peerId);
+        } catch (ReplicationException e) {
+          LOG.warn("Failed to set peer enabled for peer {} when transiting sync replication peer " +
+            "state from {} to {}, retry", peerId, fromState, toState, e);
+          throw new ProcedureYieldException();
+        }
+        setNextState(
+          PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS);
+        return Flow.HAS_MORE_STATE;
+      case SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS:
+        refreshPeer(env, PeerOperationType.ENABLE);
+        setNextState(PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL);
+        return Flow.HAS_MORE_STATE;
       case CREATE_DIR_FOR_REMOTE_WAL:
-        MasterFileSystem mfs = env.getMasterFileSystem();
-        Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME);
-        Path remoteWALDirForPeer = ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId);
-        FileSystem walFs = mfs.getWALFileSystem();
         try {
-          if (walFs.exists(remoteWALDirForPeer)) {
-            LOG.warn("Wal dir {} already exists, usually this should not happen, continue anyway",
-              remoteWALDirForPeer);
-          } else if (!walFs.mkdirs(remoteWALDirForPeer)) {
-            LOG.warn("Can not create remote wal dir {}", remoteWALDirForPeer);
-            throw new ProcedureYieldException();
-          }
+          createDirForRemoteWAL(env);
         } catch (IOException e) {
-          LOG.warn("Failed to create remote wal dir {}", remoteWALDirForPeer, e);
+          LOG.warn("Failed to create remote wal dir for peer {} when transiting sync replication " +
+            "peer state from {} to {}, retry", peerId, fromState, toState, e);
           throw new ProcedureYieldException();
         }
         setNextState(
@@ -242,5 +297,4 @@ public class TransitPeerSyncReplicationStateProcedure
         throw new UnsupportedOperationException("unhandled state=" + state);
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca7f0e3a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
index ab0083f..05a8fdf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
@@ -244,10 +244,8 @@ public class LogRoller extends HasThread implements Closeable {
   }
 
   /**
-   * For testing only
    * @return true if all WAL roll finished
    */
-  @VisibleForTesting
   public boolean walRollFinished() {
     for (boolean needRoll : walNeedsRoll.values()) {
       if (needRoll) {
@@ -257,6 +255,15 @@ public class LogRoller extends HasThread implements Closeable {
     return true;
   }
 
+  /**
+   * Wait until all wals have been rolled after calling {@link #requestRollAll()}.
+   */
+  public void waitUntilWalRollFinished() throws InterruptedException {
+    while (!walRollFinished()) {
+      Thread.sleep(100);
+    }
+  }
+
   @Override
   public void close() {
     running = false;

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca7f0e3a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
index 7fc9f53..d01b130 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
@@ -18,8 +18,10 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.util.concurrent.locks.Lock;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.LogRoller;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@@ -154,24 +156,65 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
       if (!peer.getPeerConfig().isSyncReplication()) {
         throw new ReplicationException("Peer with id=" + peerId + " is not synchronous.");
       }
-      SyncReplicationState newState = peer.getNewSyncReplicationState();
+      SyncReplicationState newSyncReplicationState = peer.getNewSyncReplicationState();
       if (stage == 0) {
-        if (newState != SyncReplicationState.NONE) {
+        if (newSyncReplicationState != SyncReplicationState.NONE) {
           LOG.warn("The new sync replication state for peer {} has already been set to {}, " +
-            "this should be a retry, give up", peerId, newState);
+            "this should be a retry, give up", peerId, newSyncReplicationState);
           return;
         }
-        newState = replicationPeers.refreshPeerNewSyncReplicationState(peerId);
-        SyncReplicationState oldState = peer.getSyncReplicationState();
-        peerActionListener.peerSyncReplicationStateChange(peerId, oldState, newState, stage);
+        // refresh the peer state first, as when we transit to STANDBY, we may need to disable the
+        // peer before processing the sync replication state.
+        PeerState oldState = peer.getPeerState();
+        boolean success = false;
+        try {
+          PeerState newState = replicationPeers.refreshPeerState(peerId);
+          if (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) {
+            replicationSourceManager.refreshSources(peerId);
+          }
+          success = true;
+        } finally {
+          if (!success) {
+            peer.setPeerState(oldState.equals(PeerState.ENABLED));
+          }
+        }
+        newSyncReplicationState = replicationPeers.refreshPeerNewSyncReplicationState(peerId);
+        SyncReplicationState oldSyncReplicationState = peer.getSyncReplicationState();
+        peerActionListener.peerSyncReplicationStateChange(peerId, oldSyncReplicationState,
+          newSyncReplicationState, stage);
       } else {
-        if (newState == SyncReplicationState.NONE) {
-          LOG.warn("The new sync replication state for peer {} has already been clear, and the " +
-            "current state is {}, this should be a retry, give up", peerId, newState);
+        if (newSyncReplicationState == SyncReplicationState.NONE) {
+          LOG.warn(
+            "The new sync replication state for peer {} has already been clear, and the " +
+              "current state is {}, this should be a retry, give up",
+            peerId, newSyncReplicationState);
           return;
         }
+        if (newSyncReplicationState == SyncReplicationState.STANDBY) {
+          replicationSourceManager.drainSources(peerId);
+          // Need to roll the wals and make the ReplicationSource for this peer track the new file.
+          // If we do not do this, there will be two problems that can not be addressed at the same
+          // time. First, if we just throw away the current wal file, and later when we transit the
+          // peer to DA, and the wal has not been rolled yet, then the new data written to the wal
+          // file will not be replicated and cause data inconsistency. But if we just track the
+          // current wal file without rolling, it may contains some data before we transit the peer
+          // to S, later if we transit the peer to DA, the data will also be replicated and cause
+          // data inconsistency. So here we need to roll the wal, and let the ReplicationSource
+          // track the new wal file, and throw the old wal files away.
+          LogRoller roller = rs.getWalRoller();
+          roller.requestRollAll();
+          try {
+            roller.waitUntilWalRollFinished();
+          } catch (InterruptedException e) {
+            // reset the interrupted flag
+            Thread.currentThread().interrupt();
+            throw (IOException) new InterruptedIOException(
+              "Interrupted while waiting for wal roll finish").initCause(e);
+          }
+        }
         SyncReplicationState oldState = peer.getSyncReplicationState();
-        peerActionListener.peerSyncReplicationStateChange(peerId, oldState, newState, stage);
+        peerActionListener.peerSyncReplicationStateChange(peerId, oldState, newSyncReplicationState,
+          stage);
         peer.transitSyncReplicationState();
       }
     } finally {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca7f0e3a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index c669622..1cac0c5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -500,6 +500,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
     if (this.replicationEndpoint != null) {
       this.replicationEndpoint.stop();
     }
+    metrics.clear();
     if (join) {
       for (ReplicationSourceShipper worker : workers) {
         Threads.shutdown(worker, this.sleepForRetries);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca7f0e3a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 5015129..f25b073 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
@@ -391,11 +392,83 @@ public class ReplicationSourceManager implements ReplicationListener {
   }
 
   /**
+   * <p>
+   * This is used when we transit a sync replication peer to {@link SyncReplicationState#STANDBY}.
+   * </p>
+   * <p>
+   * When transiting to {@link SyncReplicationState#STANDBY}, we can remove all the pending wal
+   * files for a replication peer as we do not need to replicate them any more. And this is
+   * necessary, otherwise when we transit back to {@link SyncReplicationState#DOWNGRADE_ACTIVE}
+   * later, the stale data will be replicated again and cause inconsistency.
+   * </p>
+   * <p>
+   * See HBASE-20426 for more details.
+   * </p>
+   * @param peerId the id of the sync replication peer
+   */
+  public void drainSources(String peerId) throws IOException, ReplicationException {
+    String terminateMessage = "Sync replication peer " + peerId +
+      " is transiting to STANDBY. Will close the previous replication source and open a new one";
+    ReplicationPeer peer = replicationPeers.getPeer(peerId);
+    assert peer.getPeerConfig().isSyncReplication();
+    ReplicationSourceInterface src = createSource(peerId, peer);
+    // synchronized here to avoid race with preLogRoll where we add new log to source and also
+    // walsById.
+    ReplicationSourceInterface toRemove;
+    Map<String, NavigableSet<String>> wals = new HashMap<>();
+    synchronized (latestPaths) {
+      toRemove = sources.put(peerId, src);
+      if (toRemove != null) {
+        LOG.info("Terminate replication source for " + toRemove.getPeerId());
+        toRemove.terminate(terminateMessage);
+        toRemove.getSourceMetrics().clear();
+      }
+      // Here we make a copy of all the remaining wal files and then delete them from the
+      // replication queue storage after releasing the lock. It is not safe to just remove the old
+      // map from walsById since later we may fail to delete them from the replication queue
+      // storage, and when we retry next time, we can not know the wal files that need to be deleted
+      // from the replication queue storage.
+      walsById.get(peerId).forEach((k, v) -> wals.put(k, new TreeSet<>(v)));
+    }
+    LOG.info("Startup replication source for " + src.getPeerId());
+    src.startup();
+    for (NavigableSet<String> walsByGroup : wals.values()) {
+      for (String wal : walsByGroup) {
+        queueStorage.removeWAL(server.getServerName(), peerId, wal);
+      }
+    }
+    synchronized (walsById) {
+      Map<String, NavigableSet<String>> oldWals = walsById.get(peerId);
+      wals.forEach((k, v) -> {
+        NavigableSet<String> walsByGroup = oldWals.get(k);
+        if (walsByGroup != null) {
+          walsByGroup.removeAll(v);
+        }
+      });
+    }
+    // synchronized on oldsources to avoid race with NodeFailoverWorker. Since NodeFailoverWorker is
+    // a background task, we will delete the file from replication queue storage under the lock to
+    // simplify the logic.
+    synchronized (this.oldsources) {
+      for (Iterator<ReplicationSourceInterface> iter = oldsources.iterator(); iter.hasNext();) {
+        ReplicationSourceInterface oldSource = iter.next();
+        if (oldSource.getPeerId().equals(peerId)) {
+          String queueId = oldSource.getQueueId();
+          oldSource.terminate(terminateMessage);
+          oldSource.getSourceMetrics().clear();
+          queueStorage.removeQueue(server.getServerName(), queueId);
+          walsByIdRecoveredQueues.remove(queueId);
+          iter.remove();
+        }
+      }
+    }
+  }
+
+  /**
    * Close the previous replication sources of this peer id and open new sources to trigger the new
    * replication state changes or new replication config changes. Here we don't need to change
    * replication queue storage and only to enqueue all logs to the new replication source
    * @param peerId the id of the replication peer
-   * @throws IOException
    */
   public void refreshSources(String peerId) throws IOException {
     String terminateMessage = "Peer " + peerId +
@@ -409,7 +482,7 @@ public class ReplicationSourceManager implements ReplicationListener {
         LOG.info("Terminate replication source for " + toRemove.getPeerId());
         toRemove.terminate(terminateMessage);
       }
-      for (SortedSet<String> walsByGroup : walsById.get(peerId).values()) {
+      for (NavigableSet<String> walsByGroup : walsById.get(peerId).values()) {
         walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal)));
       }
     }
@@ -830,18 +903,6 @@ public class ReplicationSourceManager implements ReplicationListener {
               actualPeerId);
             continue;
           }
-          // track sources in walsByIdRecoveredQueues
-          Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
-          walsByIdRecoveredQueues.put(queueId, walsByGroup);
-          for (String wal : walsSet) {
-            String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
-            NavigableSet<String> wals = walsByGroup.get(walPrefix);
-            if (wals == null) {
-              wals = new TreeSet<>();
-              walsByGroup.put(walPrefix, wals);
-            }
-            wals.add(wal);
-          }
 
           ReplicationSourceInterface src = createSource(queueId, peer);
           // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
@@ -849,9 +910,36 @@ public class ReplicationSourceManager implements ReplicationListener {
             peer = replicationPeers.getPeer(src.getPeerId());
             if (peer == null || !isOldPeer(src.getPeerId(), peer)) {
               src.terminate("Recovered queue doesn't belong to any current peer");
-              removeRecoveredSource(src);
+              deleteQueue(queueId);
               continue;
             }
+            // Do not setup recovered queue if a sync replication peer is in STANDBY state, or is
+            // transiting to STANDBY state. The only exception is we are in STANDBY state and
+            // transiting to DA, under this state we will replay the remote WAL and they need to be
+            // replicated back.
+            if (peer.getPeerConfig().isSyncReplication()) {
+              Pair<SyncReplicationState, SyncReplicationState> stateAndNewState =
+                peer.getSyncReplicationStateAndNewState();
+              if ((stateAndNewState.getFirst().equals(SyncReplicationState.STANDBY) &&
+                stateAndNewState.getSecond().equals(SyncReplicationState.NONE)) ||
+                stateAndNewState.getSecond().equals(SyncReplicationState.STANDBY)) {
+                src.terminate("Sync replication peer is in STANDBY state");
+                deleteQueue(queueId);
+                continue;
+              }
+            }
+            // track sources in walsByIdRecoveredQueues
+            Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
+            walsByIdRecoveredQueues.put(queueId, walsByGroup);
+            for (String wal : walsSet) {
+              String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
+              NavigableSet<String> wals = walsByGroup.get(walPrefix);
+              if (wals == null) {
+                wals = new TreeSet<>();
+                walsByGroup.put(walPrefix, wals);
+              }
+              wals.add(wal);
+            }
             oldsources.add(src);
             for (String wal : walsSet) {
               src.enqueueLog(new Path(oldLogDir, wal));

http://git-wip-us.apache.org/repos/asf/hbase/blob/ca7f0e3a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java
new file mode 100644
index 0000000..5da7870
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.replication.SyncReplicationTestBase;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestDrainReplicationQueuesForStandBy extends SyncReplicationTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestDrainReplicationQueuesForStandBy.class);
+
+  @Test
+  public void test() throws Exception {
+    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.STANDBY);
+    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.ACTIVE);
+    UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
+    write(UTIL1, 0, 100);
+
+    HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME);
+    String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(
+      ((AbstractFSWAL<?>) rs.getWAL(RegionInfoBuilder.newBuilder(TABLE_NAME).build()))
+        .getCurrentFileName().getName());
+    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.DOWNGRADE_ACTIVE);
+    // transit cluster2 to DA and cluster 1 to S
+    verify(UTIL2, 0, 100);
+
+    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.STANDBY);
+    // delete the original value, and then major compact
+    try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) {
+      for (int i = 0; i < 100; i++) {
+        table.delete(new Delete(Bytes.toBytes(i)));
+      }
+    }
+    UTIL2.flush(TABLE_NAME);
+    UTIL2.compact(TABLE_NAME, true);
+    // wait until the new values are replicated back to cluster1
+    HRegion region = rs.getRegions(TABLE_NAME).get(0);
+    UTIL1.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+      @Override
+      public boolean evaluate() throws Exception {
+        return region.get(new Get(Bytes.toBytes(99))).isEmpty();
+      }
+
+      @Override
+      public String explainFailure() throws Exception {
+        return "Replication has not been catched up yet";
+      }
+    });
+    // transit cluster1 to DA and cluster2 to S, then we will start replicating from cluster1 to
+    // cluster2
+    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.DOWNGRADE_ACTIVE);
+    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.STANDBY);
+    UTIL1.getAdmin().enableReplicationPeer(PEER_ID);
+
+    // confirm that we will not replicate the old data which causes inconsistency
+    ReplicationSource source = (ReplicationSource) ((Replication) rs.getReplicationSourceService())
+      .getReplicationManager().getSource(PEER_ID);
+    UTIL1.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+      @Override
+      public boolean evaluate() throws Exception {
+        return !source.workerThreads.containsKey(walGroupId);
+      }
+
+      @Override
+      public String explainFailure() throws Exception {
+        return "Replication has not been catched up yet";
+      }
+    });
+    HRegion region2 = UTIL2.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
+    for (int i = 0; i < 100; i++) {
+      assertTrue(region2.get(new Get(Bytes.toBytes(i))).isEmpty());
+    }
+  }
+}
\ No newline at end of file