You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/05/31 07:02:18 UTC
[20/36] hbase git commit: HBASE-19957 General framework to transit
sync replication state
http://git-wip-us.apache.org/repos/asf/hbase/blob/31f8fe08/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java
new file mode 100644
index 0000000..92f2c52
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.util.Optional;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Get the information for a sync replication peer.
+ */
+@InterfaceAudience.Private
+public interface SyncReplicationPeerInfoProvider {
+
+ /**
+ * Return the peer id and remote WAL directory if the region is synchronously replicated and the
+ * state is {@link SyncReplicationState#ACTIVE}.
+ */
+ Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info);
+
+ /**
+ * Check whether the give region is contained in a sync replication peer which is in the given
+ * state.
+ */
+ boolean isInState(RegionInfo info, SyncReplicationState state);
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/31f8fe08/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
new file mode 100644
index 0000000..32159e6
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.util.Optional;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.replication.ReplicationPeer;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+class SyncReplicationPeerInfoProviderImpl implements SyncReplicationPeerInfoProvider {
+
+ private final ReplicationPeers replicationPeers;
+
+ private final SyncReplicationPeerMappingManager mapping;
+
+ SyncReplicationPeerInfoProviderImpl(ReplicationPeers replicationPeers,
+ SyncReplicationPeerMappingManager mapping) {
+ this.replicationPeers = replicationPeers;
+ this.mapping = mapping;
+ }
+
+ @Override
+ public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info) {
+ String peerId = mapping.getPeerId(info);
+ if (peerId == null) {
+ return Optional.empty();
+ }
+ ReplicationPeer peer = replicationPeers.getPeer(peerId);
+ if (peer == null) {
+ return Optional.empty();
+ }
+ if (peer.getSyncReplicationState() == SyncReplicationState.ACTIVE) {
+ return Optional.of(Pair.newPair(peerId, peer.getPeerConfig().getRemoteWALDir()));
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public boolean isInState(RegionInfo info, SyncReplicationState state) {
+ String peerId = mapping.getPeerId(info);
+ if (peerId == null) {
+ return false;
+ }
+ ReplicationPeer peer = replicationPeers.getPeer(peerId);
+ if (peer == null) {
+ return false;
+ }
+ return peer.getSyncReplicationState() == state;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/31f8fe08/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerMappingManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerMappingManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerMappingManager.java
new file mode 100644
index 0000000..64216cb
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerMappingManager.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Used to map region to sync replication peer id.
+ * <p>
+ * TODO: now only support include table options.
+ */
+@InterfaceAudience.Private
+class SyncReplicationPeerMappingManager {
+
+ private final ConcurrentMap<TableName, String> table2PeerId = new ConcurrentHashMap<>();
+
+ void add(String peerId, ReplicationPeerConfig peerConfig) {
+ peerConfig.getTableCFsMap().keySet().forEach(tn -> table2PeerId.put(tn, peerId));
+ }
+
+ void remove(String peerId, ReplicationPeerConfig peerConfig) {
+ peerConfig.getTableCFsMap().keySet().forEach(table2PeerId::remove);
+ }
+
+ String getPeerId(RegionInfo info) {
+ return table2PeerId.get(info.getTable());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/31f8fe08/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerProvider.java
deleted file mode 100644
index b97bf7e..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerProvider.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import java.util.Optional;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * Get the peer id and remote root dir if the region is synchronously replicated.
- */
-@InterfaceAudience.Private
-public interface SyncReplicationPeerProvider {
-
- /**
- * Return the peer id and remote WAL directory if the region is synchronously replicated.
- */
- Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info);
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/31f8fe08/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
index bccc842..e3de6b4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
@@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.regionserver.PeerActionListener;
-import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerProvider;
+import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.hadoop.hbase.util.Pair;
@@ -67,7 +67,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
private final WALProvider provider;
- private final SyncReplicationPeerProvider peerProvider;
+ private SyncReplicationPeerInfoProvider peerInfoProvider;
private WALFactory factory;
@@ -85,9 +85,12 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
private final KeyLocker<String> createLock = new KeyLocker<>();
- SyncReplicationWALProvider(WALProvider provider, SyncReplicationPeerProvider peerProvider) {
+ SyncReplicationWALProvider(WALProvider provider) {
this.provider = provider;
- this.peerProvider = peerProvider;
+ }
+
+ public void setPeerInfoProvider(SyncReplicationPeerInfoProvider peerInfoProvider) {
+ this.peerInfoProvider = peerInfoProvider;
}
@Override
@@ -99,7 +102,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
this.conf = conf;
this.factory = factory;
Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass =
- NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf);
+ NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf);
eventLoopGroup = eventLoopGroupAndChannelClass.getFirst();
channelClass = eventLoopGroupAndChannelClass.getSecond();
}
@@ -112,9 +115,9 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
Path remoteWALDirPath = new Path(remoteWALDir);
FileSystem remoteFs = remoteWALDirPath.getFileSystem(conf);
return new DualAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), remoteFs,
- CommonFSUtils.getWALRootDir(conf), new Path(remoteWALDirPath, peerId),
- getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId),
- conf, listeners, true, getLogPrefix(peerId), LOG_SUFFIX, eventLoopGroup, channelClass);
+ CommonFSUtils.getWALRootDir(conf), new Path(remoteWALDirPath, peerId),
+ getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId),
+ conf, listeners, true, getLogPrefix(peerId), LOG_SUFFIX, eventLoopGroup, channelClass);
}
private DualAsyncFSWAL getWAL(String peerId, String remoteWALDir) throws IOException {
@@ -139,7 +142,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
@Override
public WAL getWAL(RegionInfo region) throws IOException {
Optional<Pair<String, String>> peerIdAndRemoteWALDir =
- peerProvider.getPeerIdAndRemoteWALDir(region);
+ peerInfoProvider.getPeerIdAndRemoteWALDir(region);
if (peerIdAndRemoteWALDir.isPresent()) {
Pair<String, String> pair = peerIdAndRemoteWALDir.get();
return getWAL(pair.getFirst(), pair.getSecond());
@@ -221,14 +224,12 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
}
@Override
- public void peerRemoved(String peerId) {
- safeClose(peerId2WAL.remove(peerId));
- }
-
- @Override
public void peerSyncReplicationStateChange(String peerId, SyncReplicationState from,
- SyncReplicationState to) {
- assert to == SyncReplicationState.DOWNGRADE_ACTIVE;
- safeClose(peerId2WAL.remove(peerId));
+ SyncReplicationState to, int stage) {
+ // TODO: stage 0
+ if (from == SyncReplicationState.ACTIVE && to == SyncReplicationState.DOWNGRADE_ACTIVE &&
+ stage == 1) {
+ safeClose(peerId2WAL.remove(peerId));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/31f8fe08/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index 737ccfb..78355a1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -24,10 +24,10 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
-import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerProvider;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
@@ -145,18 +145,6 @@ public class WALFactory {
}
/**
- * instantiate a provider from a config property. requires conf to have already been set (as well
- * as anything the provider might need to read).
- */
- private WALProvider getProvider(String key, String defaultValue, String providerId)
- throws IOException {
- WALProvider provider = createProvider(getProviderClass(key, defaultValue));
- provider.init(this, conf, providerId);
- provider.addWALActionsListener(new MetricsWAL());
- return provider;
- }
-
- /**
* @param conf must not be null, will keep a reference to read params in later reader/writer
* instances.
* @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations
@@ -173,7 +161,13 @@ public class WALFactory {
this.factoryId = factoryId;
// end required early initialization
if (conf.getBoolean("hbase.regionserver.hlog.enabled", true)) {
- provider = getProvider(WAL_PROVIDER, DEFAULT_WAL_PROVIDER, null);
+ WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
+ if (conf.getBoolean(HConstants.SYNC_REPLICATION_ENABLED, false)) {
+ provider = new SyncReplicationWALProvider(provider);
+ }
+ provider.init(this, conf, null);
+ provider.addWALActionsListener(new MetricsWAL());
+ this.provider = provider;
} else {
// special handling of existing configuration behavior.
LOG.warn("Running with WAL disabled.");
@@ -183,26 +177,6 @@ public class WALFactory {
}
/**
- * A temporary constructor for testing synchronous replication.
- * <p>
- * Remove it once we can integrate the synchronous replication logic in RS.
- */
- @VisibleForTesting
- WALFactory(Configuration conf, String factoryId, SyncReplicationPeerProvider peerProvider)
- throws IOException {
- timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
- /* TODO Both of these are probably specific to the fs wal provider */
- logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
- AbstractFSWALProvider.Reader.class);
- this.conf = conf;
- this.factoryId = factoryId;
- WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
- this.provider = new SyncReplicationWALProvider(provider, peerProvider);
- this.provider.init(this, conf, null);
- this.provider.addWALActionsListener(new MetricsWAL());
- }
-
- /**
* Shutdown all WALs and clean up any underlying storage.
* Use only when you will not need to replay and edits that have gone to any wals from this
* factory.
@@ -250,8 +224,9 @@ public class WALFactory {
if (provider != null) {
return provider;
}
- provider = getProvider(META_WAL_PROVIDER, DEFAULT_META_WAL_PROVIDER,
- AbstractFSWALProvider.META_WAL_PROVIDER_ID);
+ provider = createProvider(getProviderClass(META_WAL_PROVIDER, DEFAULT_META_WAL_PROVIDER));
+ provider.init(this, conf, AbstractFSWALProvider.META_WAL_PROVIDER_ID);
+ provider.addWALActionsListener(new MetricsWAL());
if (metaProvider.compareAndSet(null, provider)) {
return provider;
} else {
http://git-wip-us.apache.org/repos/asf/hbase/blob/31f8fe08/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index d462dbd..0ad476f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterR
import org.apache.hadoop.hbase.replication.regionserver.TestReplicator.ReplicationEndpointForTest;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -1008,7 +1009,7 @@ public class TestReplicationAdmin {
@Test
public void testTransitSyncReplicationPeerState() throws Exception {
TableName tableName = TableName.valueOf(name.getMethodName());
-
+ TEST_UTIL.createTable(tableName, Bytes.toBytes("family"));
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
builder.setClusterKey(KEY_ONE);
builder.setReplicateAllUserTables(false);
http://git-wip-us.apache.org/repos/asf/hbase/blob/31f8fe08/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index b058da3..482f49a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -175,7 +175,10 @@ public abstract class TestReplicationSourceManager {
ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/sync-rep-state");
ZKUtil.setData(zkw, "/hbase/replication/peers/1/sync-rep-state",
- SyncReplicationState.toByteArray(SyncReplicationState.NONE));
+ ZKReplicationPeerStorage.NONE_STATE_ZNODE_BYTES);
+ ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/new-sync-rep-state");
+ ZKUtil.setData(zkw, "/hbase/replication/peers/1/new-sync-rep-state",
+ ZKReplicationPeerStorage.NONE_STATE_ZNODE_BYTES);
ZKUtil.createWithParents(zkw, "/hbase/replication/state");
ZKUtil.setData(zkw, "/hbase/replication/state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
http://git-wip-us.apache.org/repos/asf/hbase/blob/31f8fe08/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
index f09e51e..986228c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
@@ -27,6 +27,7 @@ import java.util.Optional;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.RegionInfo;
@@ -35,6 +36,8 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogTestHelper;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -51,7 +54,7 @@ public class TestSyncReplicationWALProvider {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestSyncReplicationWALProvider.class);
+ HBaseClassTestRule.forClass(TestSyncReplicationWALProvider.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
@@ -69,19 +72,30 @@ public class TestSyncReplicationWALProvider {
private static WALFactory FACTORY;
- private static Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info) {
- if (info.getTable().equals(TABLE)) {
- return Optional.of(Pair.newPair(PEER_ID, REMOTE_WAL_DIR));
- } else {
- return Optional.empty();
+ public static final class InfoProvider implements SyncReplicationPeerInfoProvider {
+
+ @Override
+ public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info) {
+ if (info.getTable().equals(TABLE)) {
+ return Optional.of(Pair.newPair(PEER_ID, REMOTE_WAL_DIR));
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public boolean isInState(RegionInfo info, SyncReplicationState state) {
+ // TODO Implement SyncReplicationPeerInfoProvider.isInState
+ return false;
}
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
+ UTIL.getConfiguration().setBoolean(HConstants.SYNC_REPLICATION_ENABLED, true);
UTIL.startMiniDFSCluster(3);
- FACTORY = new WALFactory(UTIL.getConfiguration(), "test",
- TestSyncReplicationWALProvider::getPeerIdAndRemoteWALDir);
+ FACTORY = new WALFactory(UTIL.getConfiguration(), "test");
+ ((SyncReplicationWALProvider) FACTORY.getWALProvider()).setPeerInfoProvider(new InfoProvider());
UTIL.getTestFileSystem().mkdirs(new Path(REMOTE_WAL_DIR, PEER_ID));
}
@@ -151,9 +165,9 @@ public class TestSyncReplicationWALProvider {
DualAsyncFSWAL wal = (DualAsyncFSWAL) FACTORY.getWAL(REGION);
assertEquals(2, FACTORY.getWALs().size());
testReadWrite(wal);
- SyncReplicationWALProvider walProvider =
- (SyncReplicationWALProvider) FACTORY.getWALProvider();
- walProvider.peerRemoved(PEER_ID);
+ SyncReplicationWALProvider walProvider = (SyncReplicationWALProvider) FACTORY.getWALProvider();
+ walProvider.peerSyncReplicationStateChange(PEER_ID, SyncReplicationState.ACTIVE,
+ SyncReplicationState.DOWNGRADE_ACTIVE, 1);
assertEquals(1, FACTORY.getWALs().size());
}
}