You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/05/24 03:28:02 UTC
[34/37] hbase git commit: HBASE-19082 Reject read/write from client
but accept write from replication in state S
HBASE-19082 Reject read/write from client but accept write from replication in state S
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1c82bc58
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1c82bc58
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1c82bc58
Branch: refs/heads/HBASE-19064
Commit: 1c82bc58818a4bbfa07c1820ebba35b6d23953df
Parents: 0196bec
Author: zhangduo <zh...@apache.org>
Authored: Mon Feb 12 18:20:18 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Thu May 24 11:13:58 2018 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/HConstants.java | 3 -
.../src/main/protobuf/MasterProcedure.proto | 3 +-
.../hbase/replication/ReplicationUtils.java | 4 +
...ransitPeerSyncReplicationStateProcedure.java | 10 +
.../hadoop/hbase/regionserver/HRegion.java | 5 +-
.../hbase/regionserver/HRegionServer.java | 2 +-
.../hbase/regionserver/RSRpcServices.java | 88 ++++++--
.../RejectRequestsFromClientStateChecker.java | 44 ++++
.../regionserver/ReplicationSink.java | 72 ++++---
.../SyncReplicationPeerInfoProvider.java | 10 +-
.../SyncReplicationPeerInfoProviderImpl.java | 19 +-
.../hbase/wal/SyncReplicationWALProvider.java | 3 +
.../org/apache/hadoop/hbase/wal/WALFactory.java | 4 +-
.../hbase/replication/TestSyncReplication.java | 200 +++++++++++++++++++
.../wal/TestSyncReplicationWALProvider.java | 8 +-
15 files changed, 401 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/1c82bc58/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 522c2cf..9241682 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1355,9 +1355,6 @@ public final class HConstants {
public static final String NOT_IMPLEMENTED = "Not implemented";
- // TODO: need to find a better place to hold it.
- public static final String SYNC_REPLICATION_ENABLED = "hbase.replication.sync.enabled";
-
private HConstants() {
// Can't be instantiated with this ctor.
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1c82bc58/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
index 67c1b43..e8b940e 100644
--- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
@@ -397,7 +397,8 @@ enum PeerSyncReplicationStateTransitionState {
REOPEN_ALL_REGIONS_IN_PEER = 5;
TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE = 6;
REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END = 7;
- POST_PEER_SYNC_REPLICATION_STATE_TRANSITION = 8;
+ CREATE_DIR_FOR_REMOTE_WAL = 8;
+ POST_PEER_SYNC_REPLICATION_STATE_TRANSITION = 9;
}
message PeerModificationStateData {
http://git-wip-us.apache.org/repos/asf/hbase/blob/1c82bc58/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index e4dea83..d94cb00 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -37,6 +37,10 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public final class ReplicationUtils {
+ public static final String SYNC_REPLICATION_ENABLED = "hbase.replication.sync.enabled";
+
+ public static final String REPLICATION_ATTR_NAME = "__rep__";
+
private ReplicationUtils() {
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1c82bc58/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
index 8fc932f..69404a0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
@@ -197,8 +197,18 @@ public class TransitPeerSyncReplicationStateProcedure
addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
.map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 1))
.toArray(RefreshPeerProcedure[]::new));
+ if (toState == SyncReplicationState.STANDBY) {
+ setNextState(PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL);
+ } else {
+ setNextState(
+ PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
+ }
+ return Flow.HAS_MORE_STATE;
+ case CREATE_DIR_FOR_REMOTE_WAL:
+ // TODO: create wal for write remote wal
setNextState(
PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
+ return Flow.HAS_MORE_STATE;
case POST_PEER_SYNC_REPLICATION_STATE_TRANSITION:
try {
postTransit(env);
http://git-wip-us.apache.org/repos/asf/hbase/blob/1c82bc58/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index c0203a4..0e585c5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -4325,12 +4325,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/**
* Add updates first to the wal and then add values to memstore.
+ * <p>
* Warning: Assumption is caller has lock on passed in row.
* @param edits Cell updates by column
- * @throws IOException
*/
- void put(final byte [] row, byte [] family, List<Cell> edits)
- throws IOException {
+ void put(final byte[] row, byte[] family, List<Cell> edits) throws IOException {
NavigableMap<byte[], List<Cell>> familyMap;
familyMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
http://git-wip-us.apache.org/repos/asf/hbase/blob/1c82bc58/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 7fa9eb6..682f743 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1805,7 +1805,7 @@ public class HRegionServer extends HasThread implements
boolean isMasterNoTableOrSystemTableOnly = this instanceof HMaster &&
(!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf));
if (isMasterNoTableOrSystemTableOnly) {
- conf.setBoolean(HConstants.SYNC_REPLICATION_ENABLED, false);
+ conf.setBoolean(ReplicationUtils.SYNC_REPLICATION_ENABLED, false);
}
WALFactory factory = new WALFactory(conf, serverName.toString());
if (!isMasterNoTableOrSystemTableOnly) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/1c82bc58/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 8828a22..5316ac5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -120,6 +120,8 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTrack
import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.AccessChecker;
@@ -2431,6 +2433,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return region.execService(execController, serviceCall);
}
+ private boolean shouldRejectRequestsFromClient(HRegion region) {
+ return regionServer.getReplicationSourceService().getSyncReplicationPeerInfoProvider()
+ .checkState(region.getRegionInfo(), RejectRequestsFromClientStateChecker.get());
+ }
+
+ private void rejectIfInStandByState(HRegion region) throws DoNotRetryIOException {
+ if (shouldRejectRequestsFromClient(region)) {
+ throw new DoNotRetryIOException(
+ region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state.");
+ }
+ }
+
/**
* Get data from a table.
*
@@ -2439,8 +2453,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* @throws ServiceException
*/
@Override
- public GetResponse get(final RpcController controller,
- final GetRequest request) throws ServiceException {
+ public GetResponse get(final RpcController controller, final GetRequest request)
+ throws ServiceException {
long before = EnvironmentEdgeManager.currentTime();
OperationQuota quota = null;
HRegion region = null;
@@ -2449,6 +2463,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
requestCount.increment();
rpcGetRequestCount.increment();
region = getRegion(request.getRegion());
+ rejectIfInStandByState(region);
GetResponse.Builder builder = GetResponse.newBuilder();
ClientProtos.Get get = request.getGet();
@@ -2587,16 +2602,45 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
+ private void failRegionAction(MultiResponse.Builder responseBuilder,
+ RegionActionResult.Builder regionActionResultBuilder, RegionAction regionAction,
+ CellScanner cellScanner, Throwable error) {
+ rpcServer.getMetrics().exception(error);
+ regionActionResultBuilder.setException(ResponseConverter.buildException(error));
+ responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
+ // All Mutations in this RegionAction not executed as we can not see the Region online here
+ // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
+ // corresponding to these Mutations.
+ if (cellScanner != null) {
+ skipCellsForMutations(regionAction.getActionList(), cellScanner);
+ }
+ }
+
+ private boolean isReplicationRequest(Action action) {
+ // replication request can only be put or delete.
+ if (!action.hasMutation()) {
+ return false;
+ }
+ MutationProto mutation = action.getMutation();
+ MutationType type = mutation.getMutateType();
+ if (type != MutationType.PUT && type != MutationType.DELETE) {
+ return false;
+ }
+ // replication will set a special attribute so we can make use of it to decide whether a request
+ // is for replication.
+ return mutation.getAttributeList().stream().map(p -> p.getName())
+ .filter(n -> n.equals(ReplicationUtils.REPLICATION_ATTR_NAME)).findAny().isPresent();
+ }
+
/**
* Execute multiple actions on a table: get, mutate, and/or execCoprocessor
- *
* @param rpcc the RPC controller
* @param request the multi request
* @throws ServiceException
*/
@Override
public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
- throws ServiceException {
+ throws ServiceException {
try {
checkOpen();
} catch (IOException ie) {
@@ -2636,17 +2680,19 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
region = getRegion(regionSpecifier);
quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
} catch (IOException e) {
- rpcServer.getMetrics().exception(e);
- regionActionResultBuilder.setException(ResponseConverter.buildException(e));
- responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
- // All Mutations in this RegionAction not executed as we can not see the Region online here
- // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
- // corresponding to these Mutations.
- skipCellsForMutations(regionAction.getActionList(), cellScanner);
+ failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
continue; // For this region it's a failure.
}
-
+ boolean rejectIfFromClient = shouldRejectRequestsFromClient(region);
if (regionAction.hasAtomic() && regionAction.getAtomic()) {
+ // We only allow replication in standby state and it will not set the atomic flag.
+ if (rejectIfFromClient) {
+ failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
+ new DoNotRetryIOException(
+ region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
+ quota.close();
+ continue;
+ }
// How does this call happen? It may need some work to play well w/ the surroundings.
// Need to return an item per Action along w/ Action index. TODO.
try {
@@ -2677,6 +2723,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
regionActionResultBuilder.setException(ResponseConverter.buildException(e));
}
} else {
+ if (rejectIfFromClient && regionAction.getActionCount() > 0 &&
+ !isReplicationRequest(regionAction.getAction(0))) {
+ // fail if it is not a replication request
+ failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
+ new DoNotRetryIOException(
+ region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
+ quota.close();
+ continue;
+ }
// doNonAtomicRegionMutation manages the exception internally
if (context != null && closeCallBack == null) {
// An RpcCallBack that creates a list of scanners that needs to perform callBack
@@ -2692,7 +2747,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
quota.close();
ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics();
- if(regionLoadStats != null) {
+ if (regionLoadStats != null) {
regionStats.put(regionSpecifier, regionLoadStats);
}
}
@@ -2751,8 +2806,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* @param request the mutate request
*/
@Override
- public MutateResponse mutate(final RpcController rpcc,
- final MutateRequest request) throws ServiceException {
+ public MutateResponse mutate(final RpcController rpcc, final MutateRequest request)
+ throws ServiceException {
// rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
// It is also the conduit via which we pass back data.
HBaseRpcController controller = (HBaseRpcController)rpcc;
@@ -2772,6 +2827,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
requestCount.increment();
rpcMutateRequestCount.increment();
region = getRegion(request.getRegion());
+ rejectIfInStandByState(region);
MutateResponse.Builder builder = MutateResponse.newBuilder();
MutationProto mutation = request.getMutation();
if (!region.getRegionInfo().isMetaRegion()) {
@@ -2941,6 +2997,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
"'hbase.client.scanner.timeout.period' configuration.");
}
}
+ rejectIfInStandByState(rsh.r);
RegionInfo hri = rsh.s.getRegionInfo();
// Yes, should be the same instance
if (regionServer.getOnlineRegion(hri.getRegionName()) != rsh.r) {
@@ -2967,6 +3024,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
private RegionScannerHolder newRegionScanner(ScanRequest request, ScanResponse.Builder builder)
throws IOException {
HRegion region = getRegion(request.getRegion());
+ rejectIfInStandByState(region);
ClientProtos.Scan protoScan = request.getScan();
boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
Scan scan = ProtobufUtil.toScan(protoScan);
http://git-wip-us.apache.org/repos/asf/hbase/blob/1c82bc58/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RejectRequestsFromClientStateChecker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RejectRequestsFromClientStateChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RejectRequestsFromClientStateChecker.java
new file mode 100644
index 0000000..8e68f0f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RejectRequestsFromClientStateChecker.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.util.function.BiPredicate;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Check whether we need to reject the request from client.
+ */
+@InterfaceAudience.Private
+public class RejectRequestsFromClientStateChecker
+ implements BiPredicate<SyncReplicationState, SyncReplicationState> {
+
+ private static final RejectRequestsFromClientStateChecker INST =
+ new RejectRequestsFromClientStateChecker();
+
+ @Override
+ public boolean test(SyncReplicationState state, SyncReplicationState newState) {
+ // reject requests from client if we are in standby state, or we are going to transit to standby
+ // state.
+ return state == SyncReplicationState.STANDBY || newState == SyncReplicationState.STANDBY;
+ }
+
+ public static RejectRequestsFromClientStateChecker get() {
+ return INST;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1c82bc58/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index fb4e0f9..eb09a3a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -29,7 +28,6 @@ import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -41,9 +39,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
@@ -52,13 +47,18 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
-import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
/**
* <p>
@@ -82,10 +82,10 @@ public class ReplicationSink {
private final Configuration conf;
// Volatile because of note in here -- look for double-checked locking:
// http://www.oracle.com/technetwork/articles/javase/bloch-effective-08-qa-140880.html
- private volatile Connection sharedHtableCon;
+ private volatile Connection sharedConn;
private final MetricsSink metrics;
private final AtomicLong totalReplicatedEdits = new AtomicLong();
- private final Object sharedHtableConLock = new Object();
+ private final Object sharedConnLock = new Object();
// Number of hfiles that we successfully replicated
private long hfilesReplicated = 0;
private SourceFSConfigurationProvider provider;
@@ -108,12 +108,12 @@ public class ReplicationSink {
conf.get("hbase.replication.source.fs.conf.provider",
DefaultSourceFSConfigurationProvider.class.getCanonicalName());
try {
- @SuppressWarnings("rawtypes")
- Class c = Class.forName(className);
- this.provider = (SourceFSConfigurationProvider) c.getDeclaredConstructor().newInstance();
+ Class<? extends SourceFSConfigurationProvider> c =
+ Class.forName(className).asSubclass(SourceFSConfigurationProvider.class);
+ this.provider = c.getDeclaredConstructor().newInstance();
} catch (Exception e) {
- throw new IllegalArgumentException("Configured source fs configuration provider class "
- + className + " throws error.", e);
+ throw new IllegalArgumentException(
+ "Configured source fs configuration provider class " + className + " throws error.", e);
}
}
@@ -221,6 +221,8 @@ public class ReplicationSink {
clusterIds.add(toUUID(clusterId));
}
mutation.setClusterIds(clusterIds);
+ mutation.setAttribute(ReplicationUtils.REPLICATION_ATTR_NAME,
+ HConstants.EMPTY_BYTE_ARRAY);
addToHashMultiMap(rowMap, table, clusterIds, mutation);
}
if (CellUtil.isDelete(cell)) {
@@ -374,11 +376,11 @@ public class ReplicationSink {
*/
public void stopReplicationSinkServices() {
try {
- if (this.sharedHtableCon != null) {
- synchronized (sharedHtableConLock) {
- if (this.sharedHtableCon != null) {
- this.sharedHtableCon.close();
- this.sharedHtableCon = null;
+ if (this.sharedConn != null) {
+ synchronized (sharedConnLock) {
+ if (this.sharedConn != null) {
+ this.sharedConn.close();
+ this.sharedConn = null;
}
}
}
@@ -394,14 +396,12 @@ public class ReplicationSink {
* @param allRows list of actions
* @throws IOException
*/
- protected void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException {
+ private void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException {
if (allRows.isEmpty()) {
return;
}
- Table table = null;
- try {
- Connection connection = getConnection();
- table = connection.getTable(tableName);
+ Connection connection = getConnection();
+ try (Table table = connection.getTable(tableName)) {
for (List<Row> rows : allRows) {
table.batch(rows, null);
}
@@ -414,21 +414,18 @@ public class ReplicationSink {
throw rewde;
} catch (InterruptedException ix) {
throw (InterruptedIOException) new InterruptedIOException().initCause(ix);
- } finally {
- if (table != null) {
- table.close();
- }
}
}
private Connection getConnection() throws IOException {
// See https://en.wikipedia.org/wiki/Double-checked_locking
- Connection connection = sharedHtableCon;
+ Connection connection = sharedConn;
if (connection == null) {
- synchronized (sharedHtableConLock) {
- connection = sharedHtableCon;
+ synchronized (sharedConnLock) {
+ connection = sharedConn;
if (connection == null) {
- connection = sharedHtableCon = ConnectionFactory.createConnection(conf);
+ connection = ConnectionFactory.createConnection(conf);
+ sharedConn = connection;
}
}
}
@@ -441,9 +438,10 @@ public class ReplicationSink {
* of the last edit that was applied
*/
public String getStats() {
- return this.totalReplicatedEdits.get() == 0 ? "" : "Sink: " +
- "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() +
- ", total replicated edits: " + this.totalReplicatedEdits;
+ long total = this.totalReplicatedEdits.get();
+ return total == 0 ? ""
+ : "Sink: " + "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() +
+ ", total replicated edits: " + total;
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/1c82bc58/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java
index 92f2c52..66fe3be 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.replication.regionserver;
import java.util.Optional;
+import java.util.function.BiPredicate;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Pair;
@@ -36,8 +37,11 @@ public interface SyncReplicationPeerInfoProvider {
Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info);
/**
- * Check whether the give region is contained in a sync replication peer which is in the given
- * state.
+ * Check whether the give region is contained in a sync replication peer which can pass the state
+ * checker.
+ * <p>
+ * Will call the checker with current sync replication state and new sync replication state.
*/
- boolean isInState(RegionInfo info, SyncReplicationState state);
+ boolean checkState(RegionInfo info,
+ BiPredicate<SyncReplicationState, SyncReplicationState> checker);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1c82bc58/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
index 32159e6..973e049 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
@@ -18,8 +18,9 @@
package org.apache.hadoop.hbase.replication.regionserver;
import java.util.Optional;
+import java.util.function.BiPredicate;
import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.replication.ReplicationPeer;
+import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Pair;
@@ -44,11 +45,14 @@ class SyncReplicationPeerInfoProviderImpl implements SyncReplicationPeerInfoProv
if (peerId == null) {
return Optional.empty();
}
- ReplicationPeer peer = replicationPeers.getPeer(peerId);
+ ReplicationPeerImpl peer = replicationPeers.getPeer(peerId);
if (peer == null) {
return Optional.empty();
}
- if (peer.getSyncReplicationState() == SyncReplicationState.ACTIVE) {
+ Pair<SyncReplicationState, SyncReplicationState> states =
+ peer.getSyncReplicationStateAndNewState();
+ if (states.getFirst() == SyncReplicationState.ACTIVE &&
+ states.getSecond() == SyncReplicationState.NONE) {
return Optional.of(Pair.newPair(peerId, peer.getPeerConfig().getRemoteWALDir()));
} else {
return Optional.empty();
@@ -56,16 +60,19 @@ class SyncReplicationPeerInfoProviderImpl implements SyncReplicationPeerInfoProv
}
@Override
- public boolean isInState(RegionInfo info, SyncReplicationState state) {
+ public boolean checkState(RegionInfo info,
+ BiPredicate<SyncReplicationState, SyncReplicationState> checker) {
String peerId = mapping.getPeerId(info);
if (peerId == null) {
return false;
}
- ReplicationPeer peer = replicationPeers.getPeer(peerId);
+ ReplicationPeerImpl peer = replicationPeers.getPeer(peerId);
if (peer == null) {
return false;
}
- return peer.getSyncReplicationState() == state;
+ Pair<SyncReplicationState, SyncReplicationState> states =
+ peer.getSyncReplicationStateAndNewState();
+ return checker.test(states.getFirst(), states.getSecond());
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1c82bc58/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
index e3de6b4..ac4b4cd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
@@ -141,6 +141,9 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
@Override
public WAL getWAL(RegionInfo region) throws IOException {
+ if (region == null) {
+ return provider.getWAL(region);
+ }
Optional<Pair<String, String>> peerIdAndRemoteWALDir =
peerInfoProvider.getPeerIdAndRemoteWALDir(region);
if (peerIdAndRemoteWALDir.isPresent()) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/1c82bc58/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index 78355a1..2e43eb4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -24,10 +24,10 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
@@ -162,7 +162,7 @@ public class WALFactory {
// end required early initialization
if (conf.getBoolean("hbase.regionserver.hlog.enabled", true)) {
WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
- if (conf.getBoolean(HConstants.SYNC_REPLICATION_ENABLED, false)) {
+ if (conf.getBoolean(ReplicationUtils.SYNC_REPLICATION_ENABLED, false)) {
provider = new SyncReplicationWALProvider(provider);
}
provider.init(this, conf, null);
http://git-wip-us.apache.org/repos/asf/hbase/blob/1c82bc58/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplication.java
new file mode 100644
index 0000000..acddc4a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplication.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HBaseZKTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestSyncReplication {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestSyncReplication.class);
+
+ private static final HBaseZKTestingUtility ZK_UTIL = new HBaseZKTestingUtility();
+
+ private static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility();
+
+ private static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility();
+
+ private static TableName TABLE_NAME = TableName.valueOf("SyncRep");
+
+ private static byte[] CF = Bytes.toBytes("cf");
+
+ private static byte[] CQ = Bytes.toBytes("cq");
+
+ private static String PEER_ID = "1";
+
+ private static void initTestingUtility(HBaseTestingUtility util, String zkParent) {
+ util.setZkCluster(ZK_UTIL.getZkCluster());
+ Configuration conf = util.getConfiguration();
+ conf.setBoolean(ReplicationUtils.SYNC_REPLICATION_ENABLED, true);
+ conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkParent);
+ conf.setInt("replication.source.size.capacity", 102400);
+ conf.setLong("replication.source.sleepforretries", 100);
+ conf.setInt("hbase.regionserver.maxlogs", 10);
+ conf.setLong("hbase.master.logcleaner.ttl", 10);
+ conf.setInt("zookeeper.recovery.retry", 1);
+ conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
+ conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
+ conf.setInt("replication.stats.thread.period.seconds", 5);
+ conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
+ conf.setLong("replication.sleep.before.failover", 2000);
+ conf.setInt("replication.source.maxretriesmultiplier", 10);
+ conf.setFloat("replication.source.ratio", 1.0f);
+ conf.setBoolean("replication.source.eof.autorecovery", true);
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ ZK_UTIL.startMiniZKCluster();
+ initTestingUtility(UTIL1, "/cluster1");
+ initTestingUtility(UTIL2, "/cluster2");
+ UTIL1.startMiniCluster(3);
+ UTIL2.startMiniCluster(3);
+ TableDescriptor td =
+ TableDescriptorBuilder.newBuilder(TABLE_NAME).addColumnFamily(ColumnFamilyDescriptorBuilder
+ .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build();
+ UTIL1.getAdmin().createTable(td);
+ UTIL2.getAdmin().createTable(td);
+ FileSystem fs1 = UTIL1.getTestFileSystem();
+ FileSystem fs2 = UTIL2.getTestFileSystem();
+ Path remoteWALDir1 =
+ new Path(UTIL1.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(),
+ "remoteWALs").makeQualified(fs1.getUri(), fs1.getWorkingDirectory());
+ Path remoteWALDir2 =
+ new Path(UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(),
+ "remoteWALs").makeQualified(fs2.getUri(), fs2.getWorkingDirectory());
+ UTIL1.getAdmin().addReplicationPeer(PEER_ID,
+ ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey())
+ .setReplicateAllUserTables(false)
+ .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>()))
+ .setRemoteWALDir(remoteWALDir2.toUri().toString()).build());
+ UTIL2.getAdmin().addReplicationPeer(PEER_ID,
+ ReplicationPeerConfig.newBuilder().setClusterKey(UTIL1.getClusterKey())
+ .setReplicateAllUserTables(false)
+ .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>()))
+ .setRemoteWALDir(remoteWALDir1.toUri().toString()).build());
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ UTIL1.shutdownMiniCluster();
+ UTIL2.shutdownMiniCluster();
+ ZK_UTIL.shutdownMiniZKCluster();
+ }
+
+ @FunctionalInterface
+ private interface TableAction {
+
+ void call(Table table) throws IOException;
+ }
+
+ private void assertDisallow(Table table, TableAction action) throws IOException {
+ try {
+ action.call(table);
+ } catch (DoNotRetryIOException | RetriesExhaustedException e) {
+ // expected
+ assertThat(e.getMessage(), containsString("STANDBY"));
+ }
+ }
+
+ @Test
+ public void testStandby() throws Exception {
+ UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.STANDBY);
+ try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) {
+ assertDisallow(table, t -> t.get(new Get(Bytes.toBytes("row"))));
+ assertDisallow(table,
+ t -> t.put(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row"))));
+ assertDisallow(table, t -> t.delete(new Delete(Bytes.toBytes("row"))));
+ assertDisallow(table, t -> t.incrementColumnValue(Bytes.toBytes("row"), CF, CQ, 1));
+ assertDisallow(table,
+ t -> t.append(new Append(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row"))));
+ assertDisallow(table,
+ t -> t.get(Arrays.asList(new Get(Bytes.toBytes("row")), new Get(Bytes.toBytes("row1")))));
+ assertDisallow(table,
+ t -> t
+ .put(Arrays.asList(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")),
+ new Put(Bytes.toBytes("row1")).addColumn(CF, CQ, Bytes.toBytes("row1")))));
+ assertDisallow(table, t -> t.mutateRow(new RowMutations(Bytes.toBytes("row"))
+ .add((Mutation) new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")))));
+ }
+ // But we should still allow replication writes
+ try (Table table = UTIL1.getConnection().getTable(TABLE_NAME)) {
+ for (int i = 0; i < 100; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+ }
+ }
+ // The reject check is in RSRpcService so we can still read through HRegion
+ HRegion region = UTIL2.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
+ UTIL2.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+ @Override
+ public boolean evaluate() throws Exception {
+ return !region.get(new Get(Bytes.toBytes(99))).isEmpty();
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return "Replication has not been catched up yet";
+ }
+ });
+ for (int i = 0; i < 100; i++) {
+ assertEquals(i, Bytes.toInt(region.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ)));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1c82bc58/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
index 986228c..488d9fb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
@@ -24,10 +24,10 @@ import static org.junit.Assert.assertThat;
import java.io.IOException;
import java.util.Optional;
+import java.util.function.BiPredicate;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.RegionInfo;
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogTestHelper;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -84,7 +85,8 @@ public class TestSyncReplicationWALProvider {
}
@Override
- public boolean isInState(RegionInfo info, SyncReplicationState state) {
+ public boolean checkState(RegionInfo info,
+ BiPredicate<SyncReplicationState, SyncReplicationState> checker) {
// TODO Implement SyncReplicationPeerInfoProvider.isInState
return false;
}
@@ -92,7 +94,7 @@ public class TestSyncReplicationWALProvider {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
- UTIL.getConfiguration().setBoolean(HConstants.SYNC_REPLICATION_ENABLED, true);
+ UTIL.getConfiguration().setBoolean(ReplicationUtils.SYNC_REPLICATION_ENABLED, true);
UTIL.startMiniDFSCluster(3);
FACTORY = new WALFactory(UTIL.getConfiguration(), "test");
((SyncReplicationWALProvider) FACTORY.getWALProvider()).setPeerInfoProvider(new InfoProvider());