You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/05/15 10:42:04 UTC

[22/27] hbase git commit: HBASE-19782 Reject the replication request when peer is DA or A state

HBASE-19782 Reject the replication request when peer is DA or A state


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7307d16f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7307d16f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7307d16f

Branch: refs/heads/master
Commit: 7307d16f01c05085fe56350e75a747ade6a109d3
Parents: 4233532
Author: huzheng <op...@gmail.com>
Authored: Fri Mar 2 18:05:29 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Tue May 15 08:39:04 2018 +0800

----------------------------------------------------------------------
 .../hbase/protobuf/ReplicationProtbufUtil.java  |  2 +-
 .../hadoop/hbase/regionserver/HRegion.java      |  2 +-
 .../hbase/regionserver/HRegionServer.java       |  5 +--
 .../hbase/regionserver/RSRpcServices.java       | 25 +++++++++--
 .../RejectReplicationRequestStateChecker.java   | 45 ++++++++++++++++++++
 .../ReplaySyncReplicationWALCallable.java       | 24 ++++++-----
 .../replication/regionserver/Replication.java   |  2 +-
 .../regionserver/ReplicationSink.java           | 16 +++----
 .../SyncReplicationPeerInfoProvider.java        | 11 ++---
 .../SyncReplicationPeerInfoProviderImpl.java    | 13 +++---
 .../SyncReplicationPeerMappingManager.java      |  5 +--
 .../hbase/wal/SyncReplicationWALProvider.java   |  7 +--
 .../replication/SyncReplicationTestBase.java    | 32 ++++++++++++++
 .../replication/TestSyncReplicationActive.java  | 13 +++++-
 .../regionserver/TestReplicationSink.java       |  5 +--
 .../regionserver/TestWALEntrySinkFilter.java    |  3 +-
 .../wal/TestSyncReplicationWALProvider.java     |  6 +--
 17 files changed, 163 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7307d16f/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
index 81dd59e..e01f881 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.io.SizedCellScanner;
@@ -45,7 +46,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminServic
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
 
 @InterfaceAudience.Private
 public class ReplicationProtbufUtil {

http://git-wip-us.apache.org/repos/asf/hbase/blob/7307d16f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 6aa4b27..ba487c6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -1984,7 +1984,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   private boolean shouldForbidMajorCompaction() {
     if (rsServices != null && rsServices.getReplicationSourceService() != null) {
       return rsServices.getReplicationSourceService().getSyncReplicationPeerInfoProvider()
-          .checkState(getRegionInfo(), ForbidMajorCompactionChecker.get());
+          .checkState(getRegionInfo().getTable(), ForbidMajorCompactionChecker.get());
     }
     return false;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7307d16f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 440a838..ab571c6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -2478,10 +2478,9 @@ public class HRegionServer extends HasThread implements
   }
 
   /**
-   * @return Return the object that implements the replication
-   * sink executorService.
+   * @return Return the object that implements the replication sink executorService.
    */
-  ReplicationSinkService getReplicationSinkService() {
+  public ReplicationSinkService getReplicationSinkService() {
     return replicationSinkHandler;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/7307d16f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 5316ac5..bdb86d0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -121,6 +121,7 @@ import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
 import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
 import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker;
 import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker;
 import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.security.User;
@@ -2204,9 +2205,26 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     }
   }
 
+  private void checkShouldRejectReplicationRequest(List<WALEntry> entries) throws IOException {
+    ReplicationSourceService replicationSource = regionServer.getReplicationSourceService();
+    if (replicationSource == null || entries.isEmpty()) {
+      return;
+    }
+    // We can ensure that all entries are for one peer, so only need to check one entry's
+    // table name. if the table hit sync replication at peer side and the peer cluster
+    // is (or is transiting to) state ACTIVE or DOWNGRADE_ACTIVE, we should reject to apply
+    // those entries according to the design doc.
+    TableName table = TableName.valueOf(entries.get(0).getKey().getTableName().toByteArray());
+    if (replicationSource.getSyncReplicationPeerInfoProvider().checkState(table,
+      RejectReplicationRequestStateChecker.get())) {
+      throw new DoNotRetryIOException(
+          "Reject to apply to sink cluster because sync replication state of sink cluster "
+              + "is ACTIVE or DOWNGRADE_ACTIVE, table: " + table);
+    }
+  }
+
   /**
    * Replicate WAL entries on the region server.
-   *
    * @param controller the RPC controller
    * @param request the request
    * @throws ServiceException
@@ -2220,7 +2238,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       if (regionServer.replicationSinkHandler != null) {
         requestCount.increment();
         List<WALEntry> entries = request.getEntryList();
-        CellScanner cellScanner = ((HBaseRpcController)controller).cellScanner();
+        checkShouldRejectReplicationRequest(entries);
+        CellScanner cellScanner = ((HBaseRpcController) controller).cellScanner();
         regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries();
         regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner,
           request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
@@ -2435,7 +2454,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
 
   private boolean shouldRejectRequestsFromClient(HRegion region) {
     return regionServer.getReplicationSourceService().getSyncReplicationPeerInfoProvider()
-      .checkState(region.getRegionInfo(), RejectRequestsFromClientStateChecker.get());
+      .checkState(region.getRegionInfo().getTable(), RejectRequestsFromClientStateChecker.get());
   }
 
   private void rejectIfInStandByState(HRegion region) throws DoNotRetryIOException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/7307d16f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RejectReplicationRequestStateChecker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RejectReplicationRequestStateChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RejectReplicationRequestStateChecker.java
new file mode 100644
index 0000000..9ad0af2
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RejectReplicationRequestStateChecker.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.util.function.BiPredicate;
+
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Check whether we need to reject the replication request from source cluster.
+ */
+@InterfaceAudience.Private
+public class RejectReplicationRequestStateChecker
+    implements BiPredicate<SyncReplicationState, SyncReplicationState> {
+
+  private static final RejectReplicationRequestStateChecker INST =
+      new RejectReplicationRequestStateChecker();
+
+  @Override
+  public boolean test(SyncReplicationState state, SyncReplicationState newState) {
+    return state == SyncReplicationState.ACTIVE || state == SyncReplicationState.DOWNGRADE_ACTIVE
+        || newState == SyncReplicationState.ACTIVE
+        || newState == SyncReplicationState.DOWNGRADE_ACTIVE;
+  }
+
+  public static RejectReplicationRequestStateChecker get() {
+    return INST;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/7307d16f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
index c9c5ef6..3cf065c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
@@ -27,8 +27,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.executor.EventType;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl;
 import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -46,6 +44,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALParameter;
 
 /**
@@ -81,14 +80,19 @@ public class ReplaySyncReplicationWALCallable implements RSProcedureCallable {
       throw initError;
     }
     LOG.info("Received a replay sync replication wal {} event, peerId={}", wal, peerId);
-    try (Reader reader = getReader()) {
-      List<Entry> entries = readWALEntries(reader);
-      while (!entries.isEmpty()) {
-        Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> pair = ReplicationProtbufUtil
-            .buildReplicateWALEntryRequest(entries.toArray(new Entry[entries.size()]));
-        HBaseRpcController controller = new HBaseRpcControllerImpl(pair.getSecond());
-        rs.getRSRpcServices().replicateWALEntry(controller, pair.getFirst());
-        entries = readWALEntries(reader);
+    if (rs.getReplicationSinkService() != null) {
+      try (Reader reader = getReader()) {
+        List<Entry> entries = readWALEntries(reader);
+        while (!entries.isEmpty()) {
+          Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> pair = ReplicationProtbufUtil
+              .buildReplicateWALEntryRequest(entries.toArray(new Entry[entries.size()]));
+          ReplicateWALEntryRequest request = pair.getFirst();
+          rs.getReplicationSinkService().replicateLogEntries(request.getEntryList(),
+            pair.getSecond(), request.getReplicationClusterId(),
+            request.getSourceBaseNamespaceDirPath(), request.getSourceHFileArchiveDirPath());
+          // Read next entries.
+          entries = readWALEntries(reader);
+        }
       }
     }
     return null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/7307d16f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 2846d2c..2199415 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -275,7 +275,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
     List<ReplicationSourceInterface> oldSources = this.replicationManager.getOldSources();
     for (ReplicationSourceInterface source : oldSources) {
       if (source instanceof ReplicationSource) {
-        sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics());
+        sourceMetricsList.add(source.getSourceMetrics());
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/7307d16f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index eb09a3a..a334b16 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -28,6 +28,7 @@ import java.util.Map.Entry;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -93,9 +94,8 @@ public class ReplicationSink {
 
   /**
    * Create a sink for replication
-   *
-   * @param conf                conf object
-   * @param stopper             boolean to tell this thread to stop
+   * @param conf conf object
+   * @param stopper boolean to tell this thread to stop
    * @throws IOException thrown when HDFS goes bad or bad file name
    */
   public ReplicationSink(Configuration conf, Stoppable stopper)
@@ -104,16 +104,15 @@ public class ReplicationSink {
     decorateConf();
     this.metrics = new MetricsSink();
     this.walEntrySinkFilter = setupWALEntrySinkFilter();
-    String className =
-        conf.get("hbase.replication.source.fs.conf.provider",
-          DefaultSourceFSConfigurationProvider.class.getCanonicalName());
+    String className = conf.get("hbase.replication.source.fs.conf.provider",
+      DefaultSourceFSConfigurationProvider.class.getCanonicalName());
     try {
       Class<? extends SourceFSConfigurationProvider> c =
           Class.forName(className).asSubclass(SourceFSConfigurationProvider.class);
       this.provider = c.getDeclaredConstructor().newInstance();
     } catch (Exception e) {
       throw new IllegalArgumentException(
-        "Configured source fs configuration provider class " + className + " throws error.", e);
+          "Configured source fs configuration provider class " + className + " throws error.", e);
     }
   }
 
@@ -178,8 +177,7 @@ public class ReplicationSink {
       Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = null;
 
       for (WALEntry entry : entries) {
-        TableName table =
-            TableName.valueOf(entry.getKey().getTableName().toByteArray());
+        TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray());
         if (this.walEntrySinkFilter != null) {
           if (this.walEntrySinkFilter.filter(table, entry.getKey().getWriteTime())) {
             // Skip Cells in CellScanner associated with this entry.

http://git-wip-us.apache.org/repos/asf/hbase/blob/7307d16f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java
index 66fe3be..cfe525a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java
@@ -19,7 +19,8 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.util.Optional;
 import java.util.function.BiPredicate;
-import org.apache.hadoop.hbase.client.RegionInfo;
+
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -31,17 +32,17 @@ import org.apache.yetus.audience.InterfaceAudience;
 public interface SyncReplicationPeerInfoProvider {
 
   /**
-   * Return the peer id and remote WAL directory if the region is synchronously replicated and the
+   * Return the peer id and remote WAL directory if the table is synchronously replicated and the
    * state is {@link SyncReplicationState#ACTIVE}.
    */
-  Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info);
+  Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(TableName table);
 
   /**
-   * Check whether the give region is contained in a sync replication peer which can pass the state
+   * Check whether the given table is contained in a sync replication peer which can pass the state
    * checker.
    * <p>
    * Will call the checker with current sync replication state and new sync replication state.
    */
-  boolean checkState(RegionInfo info,
+  boolean checkState(TableName table,
       BiPredicate<SyncReplicationState, SyncReplicationState> checker);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7307d16f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
index cb33dab..75274ea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
@@ -19,7 +19,8 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.util.Optional;
 import java.util.function.BiPredicate;
-import org.apache.hadoop.hbase.client.RegionInfo;
+
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
@@ -40,11 +41,11 @@ class SyncReplicationPeerInfoProviderImpl implements SyncReplicationPeerInfoProv
   }
 
   @Override
-  public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info) {
-    if (info == null) {
+  public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(TableName table) {
+    if (table == null) {
       return Optional.empty();
     }
-    String peerId = mapping.getPeerId(info);
+    String peerId = mapping.getPeerId(table);
     if (peerId == null) {
       return Optional.empty();
     }
@@ -65,9 +66,9 @@ class SyncReplicationPeerInfoProviderImpl implements SyncReplicationPeerInfoProv
   }
 
   @Override
-  public boolean checkState(RegionInfo info,
+  public boolean checkState(TableName table,
       BiPredicate<SyncReplicationState, SyncReplicationState> checker) {
-    String peerId = mapping.getPeerId(info);
+    String peerId = mapping.getPeerId(table);
     if (peerId == null) {
       return false;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7307d16f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerMappingManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerMappingManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerMappingManager.java
index 64216cb..5d19f72 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerMappingManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerMappingManager.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -42,7 +41,7 @@ class SyncReplicationPeerMappingManager {
     peerConfig.getTableCFsMap().keySet().forEach(table2PeerId::remove);
   }
 
-  String getPeerId(RegionInfo info) {
-    return table2PeerId.get(info.getTable());
+  String getPeerId(TableName tableName) {
+    return table2PeerId.get(tableName);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7307d16f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
index 3cd356d42..3b56aa2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
@@ -33,6 +33,7 @@ import java.util.function.BiPredicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
@@ -160,7 +161,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
     }
     WAL wal = null;
     Optional<Pair<String, String>> peerIdAndRemoteWALDir =
-      peerInfoProvider.getPeerIdAndRemoteWALDir(region);
+        peerInfoProvider.getPeerIdAndRemoteWALDir(region.getTable());
     if (peerIdAndRemoteWALDir.isPresent()) {
       Pair<String, String> pair = peerIdAndRemoteWALDir.get();
       wal = getWAL(pair.getFirst(), pair.getSecond());
@@ -273,12 +274,12 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
       implements SyncReplicationPeerInfoProvider {
 
     @Override
-    public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info) {
+    public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(TableName table) {
       return Optional.empty();
     }
 
     @Override
-    public boolean checkState(RegionInfo info,
+    public boolean checkState(TableName table,
         BiPredicate<SyncReplicationState, SyncReplicationState> checker) {
       return false;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7307d16f/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
index 30dbdb5..0d5fce8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
@@ -25,11 +25,13 @@ import java.util.ArrayList;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HBaseZKTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
@@ -37,9 +39,15 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 
 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
@@ -182,4 +190,28 @@ public class SyncReplicationTestBase {
     Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME);
     return new Path(remoteWALDir, PEER_ID);
   }
+
+  protected void verifyReplicationRequestRejection(HBaseTestingUtility utility,
+      boolean expectedRejection) throws Exception {
+    HRegionServer regionServer = utility.getRSForFirstRegionInTable(TABLE_NAME);
+    ClusterConnection connection = regionServer.getClusterConnection();
+    Entry[] entries = new Entry[10];
+    for (int i = 0; i < entries.length; i++) {
+      entries[i] =
+          new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit());
+    }
+    if (!expectedRejection) {
+      ReplicationProtbufUtil.replicateWALEntry(connection.getAdmin(regionServer.getServerName()),
+        entries, null, null, null);
+    } else {
+      try {
+        ReplicationProtbufUtil.replicateWALEntry(connection.getAdmin(regionServer.getServerName()),
+          entries, null, null, null);
+        Assert.fail("Should throw IOException when sync-replication state is in A or DA");
+      } catch (DoNotRetryIOException e) {
+        Assert.assertTrue(e.getMessage().contains("Reject to apply to sink cluster"));
+        Assert.assertTrue(e.getMessage().contains(TABLE_NAME.toString()));
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7307d16f/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
index f4fb5fe..bff4572 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
@@ -29,7 +29,8 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestSyncReplicationActive.class);
+      HBaseClassTestRule.forClass(TestSyncReplicationActive.class);
+
 
   @Test
   public void testActive() throws Exception {
@@ -37,13 +38,21 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase {
       SyncReplicationState.STANDBY);
     UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
       SyncReplicationState.ACTIVE);
+
+    // confirm that peer with state A will reject replication request.
+    verifyReplicationRequestRejection(UTIL1, true);
+    verifyReplicationRequestRejection(UTIL2, false);
+
     UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
     write(UTIL1, 0, 100);
     Thread.sleep(2000);
     // peer is disabled so no data have been replicated
     verifyNotReplicatedThroughRegion(UTIL2, 0, 100);
+
     UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
       SyncReplicationState.DOWNGRADE_ACTIVE);
+    // confirm that peer with state DA will reject replication request.
+    verifyReplicationRequestRejection(UTIL2, true);
     // confirm that the data is there after we convert the peer to DA
     verify(UTIL2, 0, 100);
 
@@ -59,6 +68,8 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase {
     // confirm that we can convert to DA even if the remote slave cluster is down
     UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
       SyncReplicationState.DOWNGRADE_ACTIVE);
+    // confirm that peer with state DA will reject replication request.
+    verifyReplicationRequestRejection(UTIL2, true);
     write(UTIL2, 200, 300);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7307d16f/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
index aa6c39c..2d6c28f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
@@ -129,8 +129,7 @@ public class TestReplicationSink {
       TestSourceFSConfigurationProvider.class.getCanonicalName());
 
     TEST_UTIL.startMiniCluster(3);
-    SINK =
-      new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE);
+    SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE);
     table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
     table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
     Path rootDir = FSUtils.getRootDir(TEST_UTIL.getConfiguration());
@@ -419,7 +418,7 @@ public class TestReplicationSink {
     return builder.build();
   }
 
-  private WALEntry.Builder createWALEntryBuilder(TableName table) {
+  public static WALEntry.Builder createWALEntryBuilder(TableName table) {
     WALEntry.Builder builder = WALEntry.newBuilder();
     builder.setAssociatedCellCount(1);
     WALKey.Builder keyBuilder = WALKey.newBuilder();

http://git-wip-us.apache.org/repos/asf/hbase/blob/7307d16f/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java
index 6299065..fd9ff29 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellBuilder;
@@ -127,7 +128,7 @@ public class TestWALEntrySinkFilter {
     conf.setClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY,
         IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class);
     conf.setClass("hbase.client.connection.impl", DevNullConnection.class,
-        Connection.class);
+      Connection.class);
     ReplicationSink sink = new ReplicationSink(conf, STOPPABLE);
     // Create some dumb walentries.
     List< org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry > entries =

http://git-wip-us.apache.org/repos/asf/hbase/blob/7307d16f/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
index 3263fe8..69ed44d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
@@ -75,8 +75,8 @@ public class TestSyncReplicationWALProvider {
   public static final class InfoProvider implements SyncReplicationPeerInfoProvider {
 
     @Override
-    public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info) {
-      if (info.getTable().equals(TABLE)) {
+    public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(TableName table) {
+      if (table != null && table.equals(TABLE)) {
         return Optional.of(Pair.newPair(PEER_ID, REMOTE_WAL_DIR));
       } else {
         return Optional.empty();
@@ -84,7 +84,7 @@ public class TestSyncReplicationWALProvider {
     }
 
     @Override
-    public boolean checkState(RegionInfo info,
+    public boolean checkState(TableName table,
         BiPredicate<SyncReplicationState, SyncReplicationState> checker) {
       // TODO Implement SyncReplicationPeerInfoProvider.isInState
       return false;