You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/05/02 02:59:31 UTC
[41/43] hbase git commit: HBASE-19782 Reject the replication request
when peer is DA or A state
HBASE-19782 Reject the replication request when peer is DA or A state
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e15cf0b5
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e15cf0b5
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e15cf0b5
Branch: refs/heads/HBASE-19064
Commit: e15cf0b57ccbabb3804068c3bc8114bff50fe29a
Parents: b8d7704
Author: huzheng <op...@gmail.com>
Authored: Fri Mar 2 18:05:29 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Wed May 2 10:51:10 2018 +0800
----------------------------------------------------------------------
.../hbase/protobuf/ReplicationProtbufUtil.java | 2 +-
.../hadoop/hbase/regionserver/HRegion.java | 2 +-
.../hbase/regionserver/HRegionServer.java | 5 +--
.../hbase/regionserver/RSRpcServices.java | 25 +++++++++--
.../RejectReplicationRequestStateChecker.java | 45 ++++++++++++++++++++
.../ReplaySyncReplicationWALCallable.java | 24 ++++++-----
.../replication/regionserver/Replication.java | 2 +-
.../regionserver/ReplicationSink.java | 16 +++----
.../SyncReplicationPeerInfoProvider.java | 11 ++---
.../SyncReplicationPeerInfoProviderImpl.java | 13 +++---
.../SyncReplicationPeerMappingManager.java | 5 +--
.../hbase/wal/SyncReplicationWALProvider.java | 7 +--
.../replication/SyncReplicationTestBase.java | 32 ++++++++++++++
.../replication/TestSyncReplicationActive.java | 13 +++++-
.../regionserver/TestReplicationSink.java | 5 +--
.../regionserver/TestWALEntrySinkFilter.java | 3 +-
.../wal/TestSyncReplicationWALProvider.java | 6 +--
17 files changed, 163 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/e15cf0b5/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
index 81dd59e..e01f881 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.SizedCellScanner;
@@ -45,7 +46,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminServic
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
@InterfaceAudience.Private
public class ReplicationProtbufUtil {
http://git-wip-us.apache.org/repos/asf/hbase/blob/e15cf0b5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 1865144..cb7ba6e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -1981,7 +1981,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private boolean shouldForbidMajorCompaction() {
if (rsServices != null && rsServices.getReplicationSourceService() != null) {
return rsServices.getReplicationSourceService().getSyncReplicationPeerInfoProvider()
- .checkState(getRegionInfo(), ForbidMajorCompactionChecker.get());
+ .checkState(getRegionInfo().getTable(), ForbidMajorCompactionChecker.get());
}
return false;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e15cf0b5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 4dd8f09..5963cd9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -2478,10 +2478,9 @@ public class HRegionServer extends HasThread implements
}
/**
- * @return Return the object that implements the replication
- * sink executorService.
+ * @return Return the object that implements the replication sink executorService.
*/
- ReplicationSinkService getReplicationSinkService() {
+ public ReplicationSinkService getReplicationSinkService() {
return replicationSinkHandler;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e15cf0b5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 5316ac5..bdb86d0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -121,6 +121,7 @@ import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker;
import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
@@ -2204,9 +2205,26 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
+ private void checkShouldRejectReplicationRequest(List<WALEntry> entries) throws IOException {
+ ReplicationSourceService replicationSource = regionServer.getReplicationSourceService();
+ if (replicationSource == null || entries.isEmpty()) {
+ return;
+ }
+ // We can ensure that all entries are for one peer, so only need to check one entry's
+ // table name. if the table hit sync replication at peer side and the peer cluster
+ // is (or is transiting to) state ACTIVE or DOWNGRADE_ACTIVE, we should reject to apply
+ // those entries according to the design doc.
+ TableName table = TableName.valueOf(entries.get(0).getKey().getTableName().toByteArray());
+ if (replicationSource.getSyncReplicationPeerInfoProvider().checkState(table,
+ RejectReplicationRequestStateChecker.get())) {
+ throw new DoNotRetryIOException(
+ "Reject to apply to sink cluster because sync replication state of sink cluster "
+ + "is ACTIVE or DOWNGRADE_ACTIVE, table: " + table);
+ }
+ }
+
/**
* Replicate WAL entries on the region server.
- *
* @param controller the RPC controller
* @param request the request
* @throws ServiceException
@@ -2220,7 +2238,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (regionServer.replicationSinkHandler != null) {
requestCount.increment();
List<WALEntry> entries = request.getEntryList();
- CellScanner cellScanner = ((HBaseRpcController)controller).cellScanner();
+ checkShouldRejectReplicationRequest(entries);
+ CellScanner cellScanner = ((HBaseRpcController) controller).cellScanner();
regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries();
regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner,
request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
@@ -2435,7 +2454,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
private boolean shouldRejectRequestsFromClient(HRegion region) {
return regionServer.getReplicationSourceService().getSyncReplicationPeerInfoProvider()
- .checkState(region.getRegionInfo(), RejectRequestsFromClientStateChecker.get());
+ .checkState(region.getRegionInfo().getTable(), RejectRequestsFromClientStateChecker.get());
}
private void rejectIfInStandByState(HRegion region) throws DoNotRetryIOException {
http://git-wip-us.apache.org/repos/asf/hbase/blob/e15cf0b5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RejectReplicationRequestStateChecker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RejectReplicationRequestStateChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RejectReplicationRequestStateChecker.java
new file mode 100644
index 0000000..9ad0af2
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RejectReplicationRequestStateChecker.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.util.function.BiPredicate;
+
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Check whether we need to reject the replication request from source cluster.
+ */
+@InterfaceAudience.Private
+public class RejectReplicationRequestStateChecker
+ implements BiPredicate<SyncReplicationState, SyncReplicationState> {
+
+ private static final RejectReplicationRequestStateChecker INST =
+ new RejectReplicationRequestStateChecker();
+
+ @Override
+ public boolean test(SyncReplicationState state, SyncReplicationState newState) {
+ return state == SyncReplicationState.ACTIVE || state == SyncReplicationState.DOWNGRADE_ACTIVE
+ || newState == SyncReplicationState.ACTIVE
+ || newState == SyncReplicationState.DOWNGRADE_ACTIVE;
+ }
+
+ public static RejectReplicationRequestStateChecker get() {
+ return INST;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e15cf0b5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
index c9c5ef6..3cf065c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
@@ -27,8 +27,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.executor.EventType;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -46,6 +44,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALParameter;
/**
@@ -81,14 +80,19 @@ public class ReplaySyncReplicationWALCallable implements RSProcedureCallable {
throw initError;
}
LOG.info("Received a replay sync replication wal {} event, peerId={}", wal, peerId);
- try (Reader reader = getReader()) {
- List<Entry> entries = readWALEntries(reader);
- while (!entries.isEmpty()) {
- Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> pair = ReplicationProtbufUtil
- .buildReplicateWALEntryRequest(entries.toArray(new Entry[entries.size()]));
- HBaseRpcController controller = new HBaseRpcControllerImpl(pair.getSecond());
- rs.getRSRpcServices().replicateWALEntry(controller, pair.getFirst());
- entries = readWALEntries(reader);
+ if (rs.getReplicationSinkService() != null) {
+ try (Reader reader = getReader()) {
+ List<Entry> entries = readWALEntries(reader);
+ while (!entries.isEmpty()) {
+ Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> pair = ReplicationProtbufUtil
+ .buildReplicateWALEntryRequest(entries.toArray(new Entry[entries.size()]));
+ ReplicateWALEntryRequest request = pair.getFirst();
+ rs.getReplicationSinkService().replicateLogEntries(request.getEntryList(),
+ pair.getSecond(), request.getReplicationClusterId(),
+ request.getSourceBaseNamespaceDirPath(), request.getSourceHFileArchiveDirPath());
+ // Read next entries.
+ entries = readWALEntries(reader);
+ }
}
}
return null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/e15cf0b5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 2846d2c..2199415 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -275,7 +275,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
List<ReplicationSourceInterface> oldSources = this.replicationManager.getOldSources();
for (ReplicationSourceInterface source : oldSources) {
if (source instanceof ReplicationSource) {
- sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics());
+ sourceMetricsList.add(source.getSourceMetrics());
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e15cf0b5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index eb09a3a..a334b16 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -28,6 +28,7 @@ import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -93,9 +94,8 @@ public class ReplicationSink {
/**
* Create a sink for replication
- *
- * @param conf conf object
- * @param stopper boolean to tell this thread to stop
+ * @param conf conf object
+ * @param stopper boolean to tell this thread to stop
* @throws IOException thrown when HDFS goes bad or bad file name
*/
public ReplicationSink(Configuration conf, Stoppable stopper)
@@ -104,16 +104,15 @@ public class ReplicationSink {
decorateConf();
this.metrics = new MetricsSink();
this.walEntrySinkFilter = setupWALEntrySinkFilter();
- String className =
- conf.get("hbase.replication.source.fs.conf.provider",
- DefaultSourceFSConfigurationProvider.class.getCanonicalName());
+ String className = conf.get("hbase.replication.source.fs.conf.provider",
+ DefaultSourceFSConfigurationProvider.class.getCanonicalName());
try {
Class<? extends SourceFSConfigurationProvider> c =
Class.forName(className).asSubclass(SourceFSConfigurationProvider.class);
this.provider = c.getDeclaredConstructor().newInstance();
} catch (Exception e) {
throw new IllegalArgumentException(
- "Configured source fs configuration provider class " + className + " throws error.", e);
+ "Configured source fs configuration provider class " + className + " throws error.", e);
}
}
@@ -178,8 +177,7 @@ public class ReplicationSink {
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = null;
for (WALEntry entry : entries) {
- TableName table =
- TableName.valueOf(entry.getKey().getTableName().toByteArray());
+ TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray());
if (this.walEntrySinkFilter != null) {
if (this.walEntrySinkFilter.filter(table, entry.getKey().getWriteTime())) {
// Skip Cells in CellScanner associated with this entry.
http://git-wip-us.apache.org/repos/asf/hbase/blob/e15cf0b5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java
index 66fe3be..cfe525a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java
@@ -19,7 +19,8 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.util.Optional;
import java.util.function.BiPredicate;
-import org.apache.hadoop.hbase.client.RegionInfo;
+
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
@@ -31,17 +32,17 @@ import org.apache.yetus.audience.InterfaceAudience;
public interface SyncReplicationPeerInfoProvider {
/**
- * Return the peer id and remote WAL directory if the region is synchronously replicated and the
+ * Return the peer id and remote WAL directory if the table is synchronously replicated and the
* state is {@link SyncReplicationState#ACTIVE}.
*/
- Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info);
+ Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(TableName table);
/**
- * Check whether the give region is contained in a sync replication peer which can pass the state
+ * Check whether the given table is contained in a sync replication peer which can pass the state
* checker.
* <p>
* Will call the checker with current sync replication state and new sync replication state.
*/
- boolean checkState(RegionInfo info,
+ boolean checkState(TableName table,
BiPredicate<SyncReplicationState, SyncReplicationState> checker);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e15cf0b5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
index cb33dab..75274ea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
@@ -19,7 +19,8 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.util.Optional;
import java.util.function.BiPredicate;
-import org.apache.hadoop.hbase.client.RegionInfo;
+
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
@@ -40,11 +41,11 @@ class SyncReplicationPeerInfoProviderImpl implements SyncReplicationPeerInfoProv
}
@Override
- public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info) {
- if (info == null) {
+ public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(TableName table) {
+ if (table == null) {
return Optional.empty();
}
- String peerId = mapping.getPeerId(info);
+ String peerId = mapping.getPeerId(table);
if (peerId == null) {
return Optional.empty();
}
@@ -65,9 +66,9 @@ class SyncReplicationPeerInfoProviderImpl implements SyncReplicationPeerInfoProv
}
@Override
- public boolean checkState(RegionInfo info,
+ public boolean checkState(TableName table,
BiPredicate<SyncReplicationState, SyncReplicationState> checker) {
- String peerId = mapping.getPeerId(info);
+ String peerId = mapping.getPeerId(table);
if (peerId == null) {
return false;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e15cf0b5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerMappingManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerMappingManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerMappingManager.java
index 64216cb..5d19f72 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerMappingManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerMappingManager.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.yetus.audience.InterfaceAudience;
@@ -42,7 +41,7 @@ class SyncReplicationPeerMappingManager {
peerConfig.getTableCFsMap().keySet().forEach(table2PeerId::remove);
}
- String getPeerId(RegionInfo info) {
- return table2PeerId.get(info.getTable());
+ String getPeerId(TableName tableName) {
+ return table2PeerId.get(tableName);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e15cf0b5/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
index 3cd356d42..3b56aa2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
@@ -33,6 +33,7 @@ import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
@@ -160,7 +161,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
}
WAL wal = null;
Optional<Pair<String, String>> peerIdAndRemoteWALDir =
- peerInfoProvider.getPeerIdAndRemoteWALDir(region);
+ peerInfoProvider.getPeerIdAndRemoteWALDir(region.getTable());
if (peerIdAndRemoteWALDir.isPresent()) {
Pair<String, String> pair = peerIdAndRemoteWALDir.get();
wal = getWAL(pair.getFirst(), pair.getSecond());
@@ -273,12 +274,12 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
implements SyncReplicationPeerInfoProvider {
@Override
- public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info) {
+ public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(TableName table) {
return Optional.empty();
}
@Override
- public boolean checkState(RegionInfo info,
+ public boolean checkState(TableName table,
BiPredicate<SyncReplicationState, SyncReplicationState> checker) {
return false;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e15cf0b5/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
index 30dbdb5..0d5fce8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
@@ -25,11 +25,13 @@ import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
@@ -37,9 +39,15 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
@@ -182,4 +190,28 @@ public class SyncReplicationTestBase {
Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME);
return new Path(remoteWALDir, PEER_ID);
}
+
+ protected void verifyReplicationRequestRejection(HBaseTestingUtility utility,
+ boolean expectedRejection) throws Exception {
+ HRegionServer regionServer = utility.getRSForFirstRegionInTable(TABLE_NAME);
+ ClusterConnection connection = regionServer.getClusterConnection();
+ Entry[] entries = new Entry[10];
+ for (int i = 0; i < entries.length; i++) {
+ entries[i] =
+ new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit());
+ }
+ if (!expectedRejection) {
+ ReplicationProtbufUtil.replicateWALEntry(connection.getAdmin(regionServer.getServerName()),
+ entries, null, null, null);
+ } else {
+ try {
+ ReplicationProtbufUtil.replicateWALEntry(connection.getAdmin(regionServer.getServerName()),
+ entries, null, null, null);
+ Assert.fail("Should throw IOException when sync-replication state is in A or DA");
+ } catch (DoNotRetryIOException e) {
+ Assert.assertTrue(e.getMessage().contains("Reject to apply to sink cluster"));
+ Assert.assertTrue(e.getMessage().contains(TABLE_NAME.toString()));
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e15cf0b5/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
index f4fb5fe..bff4572 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
@@ -29,7 +29,8 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestSyncReplicationActive.class);
+ HBaseClassTestRule.forClass(TestSyncReplicationActive.class);
+
@Test
public void testActive() throws Exception {
@@ -37,13 +38,21 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase {
SyncReplicationState.STANDBY);
UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.ACTIVE);
+
+ // confirm that peer with state A will reject replication request.
+ verifyReplicationRequestRejection(UTIL1, true);
+ verifyReplicationRequestRejection(UTIL2, false);
+
UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
write(UTIL1, 0, 100);
Thread.sleep(2000);
// peer is disabled so no data have been replicated
verifyNotReplicatedThroughRegion(UTIL2, 0, 100);
+
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.DOWNGRADE_ACTIVE);
+ // confirm that peer with state DA will reject replication request.
+ verifyReplicationRequestRejection(UTIL2, true);
// confirm that the data is there after we convert the peer to DA
verify(UTIL2, 0, 100);
@@ -59,6 +68,8 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase {
// confirm that we can convert to DA even if the remote slave cluster is down
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.DOWNGRADE_ACTIVE);
+ // confirm that peer with state DA will reject replication request.
+ verifyReplicationRequestRejection(UTIL2, true);
write(UTIL2, 200, 300);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e15cf0b5/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
index aa6c39c..2d6c28f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
@@ -129,8 +129,7 @@ public class TestReplicationSink {
TestSourceFSConfigurationProvider.class.getCanonicalName());
TEST_UTIL.startMiniCluster(3);
- SINK =
- new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE);
+ SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE);
table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
Path rootDir = FSUtils.getRootDir(TEST_UTIL.getConfiguration());
@@ -419,7 +418,7 @@ public class TestReplicationSink {
return builder.build();
}
- private WALEntry.Builder createWALEntryBuilder(TableName table) {
+ public static WALEntry.Builder createWALEntryBuilder(TableName table) {
WALEntry.Builder builder = WALEntry.newBuilder();
builder.setAssociatedCellCount(1);
WALKey.Builder keyBuilder = WALKey.newBuilder();
http://git-wip-us.apache.org/repos/asf/hbase/blob/e15cf0b5/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java
index 6299065..fd9ff29 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilder;
@@ -127,7 +128,7 @@ public class TestWALEntrySinkFilter {
conf.setClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY,
IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class);
conf.setClass("hbase.client.connection.impl", DevNullConnection.class,
- Connection.class);
+ Connection.class);
ReplicationSink sink = new ReplicationSink(conf, STOPPABLE);
// Create some dumb walentries.
List< org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry > entries =
http://git-wip-us.apache.org/repos/asf/hbase/blob/e15cf0b5/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
index 3263fe8..69ed44d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
@@ -75,8 +75,8 @@ public class TestSyncReplicationWALProvider {
public static final class InfoProvider implements SyncReplicationPeerInfoProvider {
@Override
- public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info) {
- if (info.getTable().equals(TABLE)) {
+ public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(TableName table) {
+ if (table != null && table.equals(TABLE)) {
return Optional.of(Pair.newPair(PEER_ID, REMOTE_WAL_DIR));
} else {
return Optional.empty();
@@ -84,7 +84,7 @@ public class TestSyncReplicationWALProvider {
}
@Override
- public boolean checkState(RegionInfo info,
+ public boolean checkState(TableName table,
BiPredicate<SyncReplicationState, SyncReplicationState> checker) {
// TODO Implement SyncReplicationPeerInfoProvider.isInState
return false;