You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2018/12/07 22:29:18 UTC

[35/51] [abbrv] hbase git commit: HBASE-21486 The current replication implementation for peer in STANDBY state breaks serial replication

HBASE-21486 The current replication implementation for peer in STANDBY state breaks serial replication


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/766aa1bf
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/766aa1bf
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/766aa1bf

Branch: refs/heads/HBASE-20952
Commit: 766aa1bfccb48b4d228dd86c100fb48e9c9d61fa
Parents: dfeab9f
Author: Duo Zhang <zh...@apache.org>
Authored: Wed Nov 28 18:00:18 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sat Dec 1 12:15:18 2018 +0800

----------------------------------------------------------------------
 .../src/main/protobuf/MasterProcedure.proto     |  19 ++--
 .../replication/AbstractPeerProcedure.java      |  97 ++++++++++++++++-
 .../master/replication/ModifyPeerProcedure.java |  81 --------------
 ...ransitPeerSyncReplicationStateProcedure.java |  73 +++++++++----
 .../replication/SyncReplicationTestBase.java    |  30 ++++--
 .../replication/TestSerialSyncReplication.java  | 106 +++++++++++++++++++
 .../TestSyncReplicationRemoveRemoteWAL.java     |  21 +---
 7 files changed, 291 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/766aa1bf/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
index 44ac952..cc0c6ba 100644
--- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
@@ -398,15 +398,16 @@ enum PeerSyncReplicationStateTransitionState {
   PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION = 1;
   SET_PEER_NEW_SYNC_REPLICATION_STATE = 2;
   REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN = 3;
-  REPLAY_REMOTE_WAL_IN_PEER = 4;
-  REMOVE_ALL_REPLICATION_QUEUES_IN_PEER = 5;
-  REOPEN_ALL_REGIONS_IN_PEER = 6;
-  TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE = 7;
-  REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END = 8;
-  SYNC_REPLICATION_SET_PEER_ENABLED = 9;
-  SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS = 10;
-  CREATE_DIR_FOR_REMOTE_WAL = 11;
-  POST_PEER_SYNC_REPLICATION_STATE_TRANSITION = 12;
+  REOPEN_ALL_REGIONS_IN_PEER = 4;
+  SYNC_REPLICATION_UPDATE_LAST_PUSHED_SEQ_ID_FOR_SERIAL_PEER = 5;
+  REPLAY_REMOTE_WAL_IN_PEER = 6;
+  REMOVE_ALL_REPLICATION_QUEUES_IN_PEER = 7;
+  TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE = 8;
+  REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END = 9;
+  SYNC_REPLICATION_SET_PEER_ENABLED = 10;
+  SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS = 11;
+  CREATE_DIR_FOR_REMOTE_WAL = 12;
+  POST_PEER_SYNC_REPLICATION_STATE_TRANSITION = 13;
 }
 
 message PeerModificationStateData {

http://git-wip-us.apache.org/repos/asf/hbase/blob/766aa1bf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
index 882a050..755e0a3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
@@ -17,11 +17,27 @@
  */
 package org.apache.hadoop.hbase.master.replication;
 
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.TableStateManager;
+import org.apache.hadoop.hbase.master.TableStateManager.TableStateNotFoundException;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
 import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
 import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
@@ -29,8 +45,15 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
  * The base class for all replication peer related procedure.
  */
 @InterfaceAudience.Private
-public abstract class AbstractPeerProcedure<TState>
-    extends AbstractPeerNoLockProcedure<TState> implements PeerProcedureInterface {
+public abstract class AbstractPeerProcedure<TState> extends AbstractPeerNoLockProcedure<TState>
+    implements PeerProcedureInterface {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractPeerProcedure.class);
+
+  protected static final int UPDATE_LAST_SEQ_ID_BATCH_SIZE = 1000;
+
+  // The sleep interval when waiting table to be enabled or disabled.
+  protected static final int SLEEP_INTERVAL_MS = 1000;
 
   // used to keep compatible with old client where we can only returns after updateStorage.
   protected ProcedurePrepareLatch latch;
@@ -75,4 +98,74 @@ public abstract class AbstractPeerProcedure<TState>
   protected void enablePeer(MasterProcedureEnv env) throws ReplicationException {
     env.getReplicationPeerManager().enablePeer(peerId);
   }
+
+  private void addToMap(Map<String, Long> lastSeqIds, String encodedRegionName, long barrier,
+      ReplicationQueueStorage queueStorage) throws ReplicationException {
+    if (barrier >= 0) {
+      lastSeqIds.put(encodedRegionName, barrier);
+      if (lastSeqIds.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) {
+        queueStorage.setLastSequenceIds(peerId, lastSeqIds);
+        lastSeqIds.clear();
+      }
+    }
+  }
+
+  protected final void setLastPushedSequenceId(MasterProcedureEnv env,
+      ReplicationPeerConfig peerConfig) throws IOException, ReplicationException {
+    Map<String, Long> lastSeqIds = new HashMap<String, Long>();
+    for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
+      if (!td.hasGlobalReplicationScope()) {
+        continue;
+      }
+      TableName tn = td.getTableName();
+      if (!ReplicationUtils.contains(peerConfig, tn)) {
+        continue;
+      }
+      setLastPushedSequenceIdForTable(env, tn, lastSeqIds);
+    }
+    if (!lastSeqIds.isEmpty()) {
+      env.getReplicationPeerManager().getQueueStorage().setLastSequenceIds(peerId, lastSeqIds);
+    }
+  }
+
+  // If the table is currently disabling, then we need to wait until it is disabled.We will write
+  // replication barrier for a disabled table. And return whether we need to update the last pushed
+  // sequence id, if the table has been deleted already, i.e, we hit TableStateNotFoundException,
+  // then we do not need to update last pushed sequence id for this table.
+  private boolean needSetLastPushedSequenceId(TableStateManager tsm, TableName tn)
+      throws IOException {
+    for (;;) {
+      try {
+        if (!tsm.getTableState(tn).isDisabling()) {
+          return true;
+        }
+        Thread.sleep(SLEEP_INTERVAL_MS);
+      } catch (TableStateNotFoundException e) {
+        return false;
+      } catch (InterruptedException e) {
+        throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e);
+      }
+    }
+  }
+
+  // Will put the encodedRegionName->lastPushedSeqId pair into the map passed in, if the map is
+  // large enough we will call queueStorage.setLastSequenceIds and clear the map. So the caller
+  // should not forget to check whether the map is empty at last, if not you should call
+  // queueStorage.setLastSequenceIds to write out the remaining entries in the map.
+  protected final void setLastPushedSequenceIdForTable(MasterProcedureEnv env, TableName tableName,
+      Map<String, Long> lastSeqIds) throws IOException, ReplicationException {
+    TableStateManager tsm = env.getMasterServices().getTableStateManager();
+    ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage();
+    Connection conn = env.getMasterServices().getConnection();
+    if (!needSetLastPushedSequenceId(tsm, tableName)) {
+      LOG.debug("Skip settting last pushed sequence id for {}", tableName);
+      return;
+    }
+    for (Pair<String, Long> name2Barrier : MetaTableAccessor
+      .getTableEncodedRegionNameAndLastBarrier(conn, tableName)) {
+      LOG.trace("Update last pushed sequence id for {}, {}", tableName, name2Barrier);
+      addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1,
+        queueStorage);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/766aa1bf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
index 9550fb0..d5d2779 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
@@ -19,11 +19,7 @@ package org.apache.hadoop.hbase.master.replication;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.master.TableStateManager;
@@ -35,9 +31,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
 import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,11 +49,6 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
 
   private static final Logger LOG = LoggerFactory.getLogger(ModifyPeerProcedure.class);
 
-  protected static final int UPDATE_LAST_SEQ_ID_BATCH_SIZE = 1000;
-
-  // The sleep interval when waiting table to be enabled or disabled.
-  protected static final int SLEEP_INTERVAL_MS = 1000;
-
   protected ModifyPeerProcedure() {
   }
 
@@ -169,76 +158,6 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
     }
   }
 
-  private void addToMap(Map<String, Long> lastSeqIds, String encodedRegionName, long barrier,
-      ReplicationQueueStorage queueStorage) throws ReplicationException {
-    if (barrier >= 0) {
-      lastSeqIds.put(encodedRegionName, barrier);
-      if (lastSeqIds.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) {
-        queueStorage.setLastSequenceIds(peerId, lastSeqIds);
-        lastSeqIds.clear();
-      }
-    }
-  }
-
-  protected final void setLastPushedSequenceId(MasterProcedureEnv env,
-      ReplicationPeerConfig peerConfig) throws IOException, ReplicationException {
-    Map<String, Long> lastSeqIds = new HashMap<String, Long>();
-    for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
-      if (!td.hasGlobalReplicationScope()) {
-        continue;
-      }
-      TableName tn = td.getTableName();
-      if (!ReplicationUtils.contains(peerConfig, tn)) {
-        continue;
-      }
-      setLastPushedSequenceIdForTable(env, tn, lastSeqIds);
-    }
-    if (!lastSeqIds.isEmpty()) {
-      env.getReplicationPeerManager().getQueueStorage().setLastSequenceIds(peerId, lastSeqIds);
-    }
-  }
-
-  // If the table is currently disabling, then we need to wait until it is disabled.We will write
-  // replication barrier for a disabled table. And return whether we need to update the last pushed
-  // sequence id, if the table has been deleted already, i.e, we hit TableStateNotFoundException,
-  // then we do not need to update last pushed sequence id for this table.
-  private boolean needSetLastPushedSequenceId(TableStateManager tsm, TableName tn)
-      throws IOException {
-    for (;;) {
-      try {
-        if (!tsm.getTableState(tn).isDisabling()) {
-          return true;
-        }
-        Thread.sleep(SLEEP_INTERVAL_MS);
-      } catch (TableStateNotFoundException e) {
-        return false;
-      } catch (InterruptedException e) {
-        throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e);
-      }
-    }
-  }
-
-  // Will put the encodedRegionName->lastPushedSeqId pair into the map passed in, if the map is
-  // large enough we will call queueStorage.setLastSequenceIds and clear the map. So the caller
-  // should not forget to check whether the map is empty at last, if not you should call
-  // queueStorage.setLastSequenceIds to write out the remaining entries in the map.
-  protected final void setLastPushedSequenceIdForTable(MasterProcedureEnv env, TableName tableName,
-      Map<String, Long> lastSeqIds) throws IOException, ReplicationException {
-    TableStateManager tsm = env.getMasterServices().getTableStateManager();
-    ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage();
-    Connection conn = env.getMasterServices().getConnection();
-    if (!needSetLastPushedSequenceId(tsm, tableName)) {
-      LOG.debug("Skip settting last pushed sequence id for {}", tableName);
-      return;
-    }
-    for (Pair<String, Long> name2Barrier : MetaTableAccessor
-      .getTableEncodedRegionNameAndLastBarrier(conn, tableName)) {
-      LOG.trace("Update last pushed sequence id for {}, {}", tableName, name2Barrier);
-      addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1,
-        queueStorage);
-    }
-  }
-
   @Override
   protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
       throws ProcedureSuspendedException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/766aa1bf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
index 8c6232f..fcf41be 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
@@ -50,7 +50,7 @@ public class TransitPeerSyncReplicationStateProcedure
     extends AbstractPeerProcedure<PeerSyncReplicationStateTransitionState> {
 
   private static final Logger LOG =
-      LoggerFactory.getLogger(TransitPeerSyncReplicationStateProcedure.class);
+    LoggerFactory.getLogger(TransitPeerSyncReplicationStateProcedure.class);
 
   protected SyncReplicationState fromState;
 
@@ -58,6 +58,8 @@ public class TransitPeerSyncReplicationStateProcedure
 
   private boolean enabled;
 
+  private boolean serial;
+
   public TransitPeerSyncReplicationStateProcedure() {
   }
 
@@ -75,8 +77,8 @@ public class TransitPeerSyncReplicationStateProcedure
   protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
     super.serializeStateData(serializer);
     TransitPeerSyncReplicationStateStateData.Builder builder =
-        TransitPeerSyncReplicationStateStateData.newBuilder()
-          .setToState(ReplicationPeerConfigUtil.toSyncReplicationState(toState));
+      TransitPeerSyncReplicationStateStateData.newBuilder()
+        .setToState(ReplicationPeerConfigUtil.toSyncReplicationState(toState));
     if (fromState != null) {
       builder.setFromState(ReplicationPeerConfigUtil.toSyncReplicationState(fromState));
     }
@@ -87,7 +89,7 @@ public class TransitPeerSyncReplicationStateProcedure
   protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
     super.deserializeStateData(serializer);
     TransitPeerSyncReplicationStateStateData data =
-        serializer.deserialize(TransitPeerSyncReplicationStateStateData.class);
+      serializer.deserialize(TransitPeerSyncReplicationStateStateData.class);
     toState = ReplicationPeerConfigUtil.toSyncReplicationState(data.getToState());
     if (data.hasFromState()) {
       fromState = ReplicationPeerConfigUtil.toSyncReplicationState(data.getFromState());
@@ -129,6 +131,7 @@ public class TransitPeerSyncReplicationStateProcedure
     }
     fromState = desc.getSyncReplicationState();
     enabled = desc.isEnabled();
+    serial = desc.getPeerConfig().isSerial();
   }
 
   private void postTransit(MasterProcedureEnv env) throws IOException {
@@ -174,7 +177,11 @@ public class TransitPeerSyncReplicationStateProcedure
         : PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
     } else {
       assert toState.equals(SyncReplicationState.DOWNGRADE_ACTIVE);
-      setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
+      // for serial peer, we need to reopen all the regions and then update the last pushed sequence
+      // id, before replaying any remote wals, so that the serial replication will not be stuck, and
+      // also guarantee the order when replicating the remote wal back.
+      setNextState(serial ? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER
+        : PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
     }
   }
 
@@ -183,6 +190,11 @@ public class TransitPeerSyncReplicationStateProcedure
       setNextState(
         enabled ? PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_SET_PEER_ENABLED
           : PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL);
+    } else if (fromState == SyncReplicationState.STANDBY) {
+      assert toState.equals(SyncReplicationState.DOWNGRADE_ACTIVE);
+      setNextState(serial && enabled
+        ? PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_SET_PEER_ENABLED
+        : PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
     } else {
       setNextState(
         PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
@@ -196,14 +208,20 @@ public class TransitPeerSyncReplicationStateProcedure
   @VisibleForTesting
   protected void setPeerNewSyncReplicationState(MasterProcedureEnv env)
       throws ReplicationException {
-    env.getReplicationPeerManager().setPeerNewSyncReplicationState(peerId, toState);
-    if (toState.equals(SyncReplicationState.STANDBY) && enabled) {
-      // disable the peer if we are going to transit to STANDBY state, as we need to remove
+    if (toState.equals(SyncReplicationState.STANDBY) ||
+      (fromState.equals(SyncReplicationState.STANDBY) && serial) && enabled) {
+      // Disable the peer if we are going to transit to STANDBY state, as we need to remove
       // all the pending replication files. If we do not disable the peer and delete the wal
       // queues on zk directly, RS will get NoNode exception when updating the wal position
       // and crash.
+      // Disable the peer if we are going to transit from STANDBY to DOWNGRADE_ACTIVE, and the
+      // replication is serial, as we need to update the lastPushedSequence id after we reopen all
+      // the regions, and for performance reason here we will update in batch, without using CAS, if
+      // we are still replicating at RS side, we may accidentally update the last pushed sequence id
+      // to a less value and cause the replication to be stuck.
       env.getReplicationPeerManager().disablePeer(peerId);
     }
+    env.getReplicationPeerManager().setPeerNewSyncReplicationState(peerId, toState);
   }
 
   @VisibleForTesting
@@ -240,7 +258,7 @@ public class TransitPeerSyncReplicationStateProcedure
           long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
           LOG.warn(
             "Failed to update peer storage for peer {} when starting transiting sync " +
-                "replication peer state from {} to {}, sleep {} secs and retry",
+              "replication peer state from {} to {}, sleep {} secs and retry",
             peerId, fromState, toState, backoff / 1000, e);
           throw suspend(backoff);
         }
@@ -254,6 +272,30 @@ public class TransitPeerSyncReplicationStateProcedure
           .toArray(RefreshPeerProcedure[]::new));
         setNextStateAfterRefreshBegin();
         return Flow.HAS_MORE_STATE;
+      case REOPEN_ALL_REGIONS_IN_PEER:
+        reopenRegions(env);
+        if (fromState.equals(SyncReplicationState.STANDBY)) {
+          assert serial;
+          setNextState(
+            PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_UPDATE_LAST_PUSHED_SEQ_ID_FOR_SERIAL_PEER);
+        } else {
+          setNextState(
+            PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
+        }
+        return Flow.HAS_MORE_STATE;
+      case SYNC_REPLICATION_UPDATE_LAST_PUSHED_SEQ_ID_FOR_SERIAL_PEER:
+        try {
+          setLastPushedSequenceId(env, env.getReplicationPeerManager().getPeerConfig(peerId).get());
+        } catch (Exception e) {
+          long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
+          LOG.warn(
+            "Failed to update last pushed sequence id for peer {} when transiting sync " +
+              "replication peer state from {} to {}, sleep {} secs and retry",
+            peerId, fromState, toState, backoff / 1000, e);
+          throw suspend(backoff);
+        }
+        setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
+        return Flow.HAS_MORE_STATE;
       case REPLAY_REMOTE_WAL_IN_PEER:
         replayRemoteWAL(env.getReplicationPeerManager().getPeerConfig(peerId).get().isSerial());
         setNextState(
@@ -266,7 +308,7 @@ public class TransitPeerSyncReplicationStateProcedure
           long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
           LOG.warn(
             "Failed to remove all replication queues peer {} when starting transiting" +
-                " sync replication peer state from {} to {}, sleep {} secs and retry",
+              " sync replication peer state from {} to {}, sleep {} secs and retry",
             peerId, fromState, toState, backoff / 1000, e);
           throw suspend(backoff);
         }
@@ -275,11 +317,6 @@ public class TransitPeerSyncReplicationStateProcedure
           ? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER
           : PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
         return Flow.HAS_MORE_STATE;
-      case REOPEN_ALL_REGIONS_IN_PEER:
-        reopenRegions(env);
-        setNextState(
-          PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
-        return Flow.HAS_MORE_STATE;
       case TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE:
         try {
           transitPeerSyncReplicationState(env);
@@ -287,7 +324,7 @@ public class TransitPeerSyncReplicationStateProcedure
           long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
           LOG.warn(
             "Failed to update peer storage for peer {} when ending transiting sync " +
-                "replication peer state from {} to {}, sleep {} secs and retry",
+              "replication peer state from {} to {}, sleep {} secs and retry",
             peerId, fromState, toState, backoff / 1000, e);
           throw suspend(backoff);
         }
@@ -308,7 +345,7 @@ public class TransitPeerSyncReplicationStateProcedure
           long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
           LOG.warn(
             "Failed to set peer enabled for peer {} when transiting sync replication peer " +
-                "state from {} to {}, sleep {} secs and retry",
+              "state from {} to {}, sleep {} secs and retry",
             peerId, fromState, toState, backoff / 1000, e);
           throw suspend(backoff);
         }
@@ -327,7 +364,7 @@ public class TransitPeerSyncReplicationStateProcedure
           long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
           LOG.warn(
             "Failed to create remote wal dir for peer {} when transiting sync replication " +
-                "peer state from {} to {}, sleep {} secs and retry",
+              "peer state from {} to {}, sleep {} secs and retry",
             peerId, fromState, toState, backoff / 1000, e);
           throw suspend(backoff);
         }

http://git-wip-us.apache.org/repos/asf/hbase/blob/766aa1bf/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
index 1b52354..f373590 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
@@ -103,8 +103,8 @@ public class SyncReplicationTestBase {
     ZK_UTIL.startMiniZKCluster();
     initTestingUtility(UTIL1, "/cluster1");
     initTestingUtility(UTIL2, "/cluster2");
-    StartMiniClusterOption option = StartMiniClusterOption.builder()
-        .numMasters(2).numRegionServers(3).numDataNodes(3).build();
+    StartMiniClusterOption option =
+      StartMiniClusterOption.builder().numMasters(2).numRegionServers(3).numDataNodes(3).build();
     UTIL1.startMiniCluster(option);
     UTIL2.startMiniCluster(option);
     TableDescriptor td =
@@ -217,16 +217,16 @@ public class SyncReplicationTestBase {
     return getRemoteWALDir(remoteWALDir, peerId);
   }
 
-  protected Path getRemoteWALDir(Path remoteWALDir, String peerId) {
+  protected final Path getRemoteWALDir(Path remoteWALDir, String peerId) {
     return new Path(remoteWALDir, peerId);
   }
 
-  protected Path getReplayRemoteWALs(Path remoteWALDir, String peerId) {
+  protected final Path getReplayRemoteWALs(Path remoteWALDir, String peerId) {
     return new Path(remoteWALDir, peerId + "-replay");
   }
 
-  protected void verifyRemovedPeer(String peerId, Path remoteWALDir, HBaseTestingUtility utility)
-      throws Exception {
+  protected final void verifyRemovedPeer(String peerId, Path remoteWALDir,
+      HBaseTestingUtility utility) throws Exception {
     ReplicationPeerStorage rps = ReplicationStorageFactory
       .getReplicationPeerStorage(utility.getZooKeeperWatcher(), utility.getConfiguration());
     try {
@@ -247,7 +247,7 @@ public class SyncReplicationTestBase {
     }
   }
 
-  protected void verifyReplicationRequestRejection(HBaseTestingUtility utility,
+  protected final void verifyReplicationRequestRejection(HBaseTestingUtility utility,
       boolean expectedRejection) throws Exception {
     HRegionServer regionServer = utility.getRSForFirstRegionInTable(TABLE_NAME);
     ClusterConnection connection = regionServer.getClusterConnection();
@@ -270,4 +270,20 @@ public class SyncReplicationTestBase {
       }
     }
   }
+
+  protected final void waitUntilDeleted(HBaseTestingUtility util, Path remoteWAL) throws Exception {
+    MasterFileSystem mfs = util.getMiniHBaseCluster().getMaster().getMasterFileSystem();
+    util.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+      @Override
+      public boolean evaluate() throws Exception {
+        return !mfs.getWALFileSystem().exists(remoteWAL);
+      }
+
+      @Override
+      public String explainFailure() throws Exception {
+        return remoteWAL + " has not been deleted yet";
+      }
+    });
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/766aa1bf/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialSyncReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialSyncReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialSyncReplication.java
new file mode 100644
index 0000000..6725649
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialSyncReplication.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.hamcrest.CoreMatchers.endsWith;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.LogRoller;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
+
+/**
+ * Testcase to confirm that serial replication will not be stuck when using along with synchronous
+ * replication. See HBASE-21486 for more details.
+ */
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestSerialSyncReplication extends SyncReplicationTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestSerialSyncReplication.class);
+
+  @Test
+  public void test() throws Exception {
+    // change to serial
+    UTIL1.getAdmin().updateReplicationPeerConfig(PEER_ID, ReplicationPeerConfig
+      .newBuilder(UTIL1.getAdmin().getReplicationPeerConfig(PEER_ID)).setSerial(true).build());
+    UTIL2.getAdmin().updateReplicationPeerConfig(PEER_ID, ReplicationPeerConfig
+      .newBuilder(UTIL2.getAdmin().getReplicationPeerConfig(PEER_ID)).setSerial(true).build());
+
+    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.STANDBY);
+    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.ACTIVE);
+
+    UTIL2.getAdmin().disableReplicationPeer(PEER_ID);
+
+    writeAndVerifyReplication(UTIL1, UTIL2, 0, 100);
+
+    MasterFileSystem mfs = UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem();
+    Path remoteWALDir = ReplicationUtils.getPeerRemoteWALDir(
+      new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME), PEER_ID);
+    FileStatus[] remoteWALStatus = mfs.getWALFileSystem().listStatus(remoteWALDir);
+    assertEquals(1, remoteWALStatus.length);
+    Path remoteWAL = remoteWALStatus[0].getPath();
+    assertThat(remoteWAL.getName(), endsWith(ReplicationUtils.SYNC_WAL_SUFFIX));
+    // roll the wal writer, so that we will delete the remore wal. This is used to make sure that we
+    // will not replay this wal when transiting to DA.
+    for (RegionServerThread t : UTIL1.getMiniHBaseCluster().getRegionServerThreads()) {
+      LogRoller roller = t.getRegionServer().getWalRoller();
+      roller.requestRollAll();
+      roller.waitUntilWalRollFinished();
+    }
+    waitUntilDeleted(UTIL2, remoteWAL);
+
+    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.DOWNGRADE_ACTIVE);
+    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.STANDBY);
+    // let's reopen the region
+    RegionInfo region = Iterables.getOnlyElement(UTIL2.getAdmin().getRegions(TABLE_NAME));
+    HRegionServer target = UTIL2.getOtherRegionServer(UTIL2.getRSForFirstRegionInTable(TABLE_NAME));
+    UTIL2.getAdmin().move(region.getEncodedNameAsBytes(),
+      Bytes.toBytes(target.getServerName().getServerName()));
+    // here we will remove all the pending wals. This is not a normal operation sequence but anyway,
+    // user could do this.
+    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.STANDBY);
+    // transit back to DA
+    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.DOWNGRADE_ACTIVE);
+
+    UTIL2.getAdmin().enableReplicationPeer(PEER_ID);
+    // make sure that the async replication still works
+    writeAndVerifyReplication(UTIL2, UTIL1, 100, 200);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/766aa1bf/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java
index 0cd1846..9f89826 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.assertTrue;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
 import org.apache.hadoop.hbase.master.MasterFileSystem;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -41,22 +40,6 @@ public class TestSyncReplicationRemoveRemoteWAL extends SyncReplicationTestBase
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestSyncReplicationRemoveRemoteWAL.class);
 
-  private void waitUntilDeleted(Path remoteWAL) throws Exception {
-    MasterFileSystem mfs = UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem();
-    UTIL1.waitFor(30000, new ExplainingPredicate<Exception>() {
-
-      @Override
-      public boolean evaluate() throws Exception {
-        return !mfs.getWALFileSystem().exists(remoteWAL);
-      }
-
-      @Override
-      public String explainFailure() throws Exception {
-        return remoteWAL + " has not been deleted yet";
-      }
-    });
-  }
-
   @Test
   public void testRemoveRemoteWAL() throws Exception {
     UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
@@ -76,7 +59,7 @@ public class TestSyncReplicationRemoveRemoteWAL extends SyncReplicationTestBase
     HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME);
     rs.getWalRoller().requestRollAll();
     // The replicated wal file should be deleted finally
-    waitUntilDeleted(remoteWAL);
+    waitUntilDeleted(UTIL2, remoteWAL);
     remoteWALStatus = mfs.getWALFileSystem().listStatus(remoteWALDir);
     assertEquals(1, remoteWALStatus.length);
     remoteWAL = remoteWALStatus[0].getPath();
@@ -95,6 +78,6 @@ public class TestSyncReplicationRemoveRemoteWAL extends SyncReplicationTestBase
     verifyThroughRegion(UTIL2, 100, 200);
 
     // Confirm that we will also remove the remote wal files in DA state
-    waitUntilDeleted(remoteWAL);
+    waitUntilDeleted(UTIL2, remoteWAL);
   }
 }