You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2023/03/06 15:56:15 UTC
[hbase] 07/10: HBASE-27218 Support rolling upgrading (#4808)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch HBASE-27109/table_based_rqs
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 6321c964eefba473087e9551ad93dcb80a4e247e
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Sun Nov 6 16:57:11 2022 +0800
HBASE-27218 Support rolling upgrading (#4808)
Signed-off-by: Yu Li <li...@apache.org>
---
.../apache/hadoop/hbase/zookeeper/ZNodePaths.java | 8 +-
.../apache/hadoop/hbase/procedure2/Procedure.java | 15 +
.../protobuf/server/master/MasterProcedure.proto | 12 +
hbase-replication/pom.xml | 10 +
.../hbase/replication/ReplicationQueueStorage.java | 19 ++
.../replication/TableReplicationQueueStorage.java | 65 +++-
.../ZKReplicationQueueStorageForMigration.java | 351 +++++++++++++++++++++
.../replication/TestZKReplicationQueueStorage.java | 317 +++++++++++++++++++
hbase-server/pom.xml | 6 +
.../org/apache/hadoop/hbase/master/HMaster.java | 13 +
.../master/procedure/ServerCrashProcedure.java | 19 ++
.../replication/AbstractPeerNoLockProcedure.java | 5 +-
...rateReplicationQueueFromZkToTableProcedure.java | 244 ++++++++++++++
.../master/replication/ModifyPeerProcedure.java | 26 ++
.../master/replication/ReplicationPeerManager.java | 104 +++++-
.../TransitPeerSyncReplicationStateProcedure.java | 14 +
.../replication/TestMigrateReplicationQueue.java | 126 ++++++++
...rateReplicationQueueFromZkToTableProcedure.java | 226 +++++++++++++
...icationQueueFromZkToTableProcedureRecovery.java | 128 ++++++++
...tReplicationPeerManagerMigrateQueuesFromZk.java | 216 +++++++++++++
.../hbase/replication/TestReplicationBase.java | 2 +-
pom.xml | 7 +-
22 files changed, 1917 insertions(+), 16 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java
index d19d2100466..3f66c7cdc0c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java
@@ -220,7 +220,11 @@ public class ZNodePaths {
* @param suffix ending of znode name
* @return result of properly joining prefix with suffix
*/
- public static String joinZNode(String prefix, String suffix) {
- return prefix + ZNodePaths.ZNODE_PATH_SEPARATOR + suffix;
+ public static String joinZNode(String prefix, String... suffix) {
+ StringBuilder sb = new StringBuilder(prefix);
+ for (String s : suffix) {
+ sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(s);
+ }
+ return sb.toString();
}
}
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
index 34c74d92c16..43adba2bc21 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.metrics.Counter;
import org.apache.hadoop.hbase.metrics.Histogram;
@@ -33,6 +34,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
/**
@@ -1011,6 +1013,19 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
releaseLock(env);
}
+ protected final ProcedureSuspendedException suspend(int timeoutMillis, boolean jitter)
+ throws ProcedureSuspendedException {
+ if (jitter) {
+ // 10% possible jitter
+ double add = (double) timeoutMillis * ThreadLocalRandom.current().nextDouble(0.1);
+ timeoutMillis += add;
+ }
+ setTimeout(timeoutMillis);
+ setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
+ skipPersistence();
+ throw new ProcedureSuspendedException();
+ }
+
@Override
public int compareTo(final Procedure<TEnvironment> other) {
return Long.compare(getProcId(), other.getProcId());
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto
index 76a1d676487..b6f5d7e50bb 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto
@@ -722,3 +722,15 @@ enum AssignReplicationQueuesState {
message AssignReplicationQueuesStateData {
required ServerName crashed_server = 1;
}
+
+enum MigrateReplicationQueueFromZkToTableState {
+ MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE = 1;
+ MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER = 2;
+ MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE = 3;
+ MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING = 4;
+ MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER = 5;
+}
+
+message MigrateReplicationQueueFromZkToTableStateData {
+ repeated string disabled_peer_id = 1;
+}
diff --git a/hbase-replication/pom.xml b/hbase-replication/pom.xml
index dad93578609..d294cfdbe01 100644
--- a/hbase-replication/pom.xml
+++ b/hbase-replication/pom.xml
@@ -98,6 +98,16 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-library</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
index 6f6aee38cc8..1e36bbeb78f 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
@@ -22,6 +22,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
@@ -184,4 +185,22 @@ public interface ReplicationQueueStorage {
* @return Whether the replication queue table exists
*/
boolean hasData() throws ReplicationException;
+
+ // the below 3 methods are used for migrating
+ /**
+ * Update the replication queue datas for a given region server.
+ */
+ void batchUpdateQueues(ServerName serverName, List<ReplicationQueueData> datas)
+ throws ReplicationException;
+
+ /**
+ * Update last pushed sequence id for the given regions and peers.
+ */
+ void batchUpdateLastSequenceIds(List<ZkLastPushedSeqId> lastPushedSeqIds)
+ throws ReplicationException;
+
+ /**
+ * Add the given hfile refs to the given peer.
+ */
+ void batchUpdateHFileRefs(String peerId, List<String> hfileRefs) throws ReplicationException;
}
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java
index 392a3692d66..f3870f4d09d 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java
@@ -21,12 +21,14 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
@@ -46,6 +48,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Scan.ReadType;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
+import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair;
@@ -74,12 +77,6 @@ public class TableReplicationQueueStorage implements ReplicationQueueStorage {
private final TableName tableName;
- @FunctionalInterface
- private interface TableCreator {
-
- void create() throws IOException;
- }
-
public TableReplicationQueueStorage(Connection conn, TableName tableName) {
this.conn = conn;
this.tableName = tableName;
@@ -541,4 +538,60 @@ public class TableReplicationQueueStorage implements ReplicationQueueStorage {
throw new ReplicationException("failed to get replication queue table", e);
}
}
+
+ @Override
+ public void batchUpdateQueues(ServerName serverName, List<ReplicationQueueData> datas)
+ throws ReplicationException {
+ List<Put> puts = new ArrayList<>();
+ for (ReplicationQueueData data : datas) {
+ if (data.getOffsets().isEmpty()) {
+ continue;
+ }
+ Put put = new Put(Bytes.toBytes(data.getId().toString()));
+ data.getOffsets().forEach((walGroup, offset) -> {
+ put.addColumn(QUEUE_FAMILY, Bytes.toBytes(walGroup), Bytes.toBytes(offset.toString()));
+ });
+ puts.add(put);
+ }
+ try (Table table = conn.getTable(tableName)) {
+ table.put(puts);
+ } catch (IOException e) {
+ throw new ReplicationException("failed to batch update queues", e);
+ }
+ }
+
+ @Override
+ public void batchUpdateLastSequenceIds(List<ZkLastPushedSeqId> lastPushedSeqIds)
+ throws ReplicationException {
+ Map<String, Put> peerId2Put = new HashMap<>();
+ for (ZkLastPushedSeqId lastPushedSeqId : lastPushedSeqIds) {
+ peerId2Put
+ .computeIfAbsent(lastPushedSeqId.getPeerId(), peerId -> new Put(Bytes.toBytes(peerId)))
+ .addColumn(LAST_SEQUENCE_ID_FAMILY, Bytes.toBytes(lastPushedSeqId.getEncodedRegionName()),
+ Bytes.toBytes(lastPushedSeqId.getLastPushedSeqId()));
+ }
+ try (Table table = conn.getTable(tableName)) {
+ table
+ .put(peerId2Put.values().stream().filter(p -> !p.isEmpty()).collect(Collectors.toList()));
+ } catch (IOException e) {
+ throw new ReplicationException("failed to batch update last pushed sequence ids", e);
+ }
+ }
+
+ @Override
+ public void batchUpdateHFileRefs(String peerId, List<String> hfileRefs)
+ throws ReplicationException {
+ if (hfileRefs.isEmpty()) {
+ return;
+ }
+ Put put = new Put(Bytes.toBytes(peerId));
+ for (String ref : hfileRefs) {
+ put.addColumn(HFILE_REF_FAMILY, Bytes.toBytes(ref), HConstants.EMPTY_BYTE_ARRAY);
+ }
+ try (Table table = conn.getTable(tableName)) {
+ table.put(put);
+ } catch (IOException e) {
+ throw new ReplicationException("failed to batch update hfile references", e);
+ }
+ }
}
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorageForMigration.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorageForMigration.java
new file mode 100644
index 00000000000..22cc1314522
--- /dev/null
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorageForMigration.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import com.google.errorprone.annotations.RestrictedApi;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
+
+/**
+ * Just retain a small set of the methods for the old zookeeper based replication queue storage, for
+ * migrating.
+ */
+@InterfaceAudience.Private
+public class ZKReplicationQueueStorageForMigration extends ZKReplicationStorageBase {
+
+ public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY =
+ "zookeeper.znode.replication.hfile.refs";
+ public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs";
+
+ public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY =
+ "zookeeper.znode.replication.regions";
+ public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT = "regions";
+
+ /**
+ * The name of the znode that contains all replication queues
+ */
+ private final String queuesZNode;
+
+ /**
+ * The name of the znode that contains queues of hfile references to be replicated
+ */
+ private final String hfileRefsZNode;
+
+ private final String regionsZNode;
+
+ public ZKReplicationQueueStorageForMigration(ZKWatcher zookeeper, Configuration conf) {
+ super(zookeeper, conf);
+ String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
+ String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
+ ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
+ this.queuesZNode = ZNodePaths.joinZNode(replicationZNode, queuesZNodeName);
+ this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, hfileRefsZNodeName);
+ this.regionsZNode = ZNodePaths.joinZNode(replicationZNode, conf
+ .get(ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY, ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT));
+ }
+
+ public interface MigrationIterator<T> {
+
+ T next() throws Exception;
+ }
+
+ @SuppressWarnings("rawtypes")
+ private static final MigrationIterator EMPTY_ITER = new MigrationIterator() {
+
+ @Override
+ public Object next() {
+ return null;
+ }
+ };
+
+ public static final class ZkReplicationQueueData {
+
+ private final ReplicationQueueId queueId;
+
+ private final Map<String, Long> walOffsets;
+
+ public ZkReplicationQueueData(ReplicationQueueId queueId, Map<String, Long> walOffsets) {
+ this.queueId = queueId;
+ this.walOffsets = walOffsets;
+ }
+
+ public ReplicationQueueId getQueueId() {
+ return queueId;
+ }
+
+ public Map<String, Long> getWalOffsets() {
+ return walOffsets;
+ }
+ }
+
+ private String getRsNode(ServerName serverName) {
+ return ZNodePaths.joinZNode(queuesZNode, serverName.getServerName());
+ }
+
+ private String getQueueNode(ServerName serverName, String queueId) {
+ return ZNodePaths.joinZNode(getRsNode(serverName), queueId);
+ }
+
+ private String getFileNode(String queueNode, String fileName) {
+ return ZNodePaths.joinZNode(queueNode, fileName);
+ }
+
+ private String getFileNode(ServerName serverName, String queueId, String fileName) {
+ return getFileNode(getQueueNode(serverName, queueId), fileName);
+ }
+
+ @SuppressWarnings("unchecked")
+ public MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>> listAllQueues()
+ throws KeeperException {
+ List<String> replicators = ZKUtil.listChildrenNoWatch(zookeeper, queuesZNode);
+ if (replicators == null || replicators.isEmpty()) {
+ ZKUtil.deleteNodeRecursively(zookeeper, queuesZNode);
+ return EMPTY_ITER;
+ }
+ Iterator<String> iter = replicators.iterator();
+ return new MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>>() {
+
+ private ServerName previousServerName;
+
+ @Override
+ public Pair<ServerName, List<ZkReplicationQueueData>> next() throws Exception {
+ if (previousServerName != null) {
+ ZKUtil.deleteNodeRecursively(zookeeper, getRsNode(previousServerName));
+ }
+ if (!iter.hasNext()) {
+ ZKUtil.deleteNodeRecursively(zookeeper, queuesZNode);
+ return null;
+ }
+ String replicator = iter.next();
+ ServerName serverName = ServerName.parseServerName(replicator);
+ previousServerName = serverName;
+ List<String> queueIdList = ZKUtil.listChildrenNoWatch(zookeeper, getRsNode(serverName));
+ if (queueIdList == null || queueIdList.isEmpty()) {
+ return Pair.newPair(serverName, Collections.emptyList());
+ }
+ List<ZkReplicationQueueData> queueDataList = new ArrayList<>(queueIdList.size());
+ for (String queueIdStr : queueIdList) {
+ ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueIdStr);
+ ReplicationQueueId queueId;
+ if (queueInfo.getDeadRegionServers().isEmpty()) {
+ queueId = new ReplicationQueueId(serverName, queueInfo.getPeerId());
+ } else {
+ queueId = new ReplicationQueueId(serverName, queueInfo.getPeerId(),
+ queueInfo.getDeadRegionServers().get(0));
+ }
+ List<String> wals =
+ ZKUtil.listChildrenNoWatch(zookeeper, getQueueNode(serverName, queueIdStr));
+ ZkReplicationQueueData queueData;
+ if (wals == null || wals.isEmpty()) {
+ queueData = new ZkReplicationQueueData(queueId, Collections.emptyMap());
+ } else {
+ Map<String, Long> walOffsets = new HashMap<>();
+ for (String wal : wals) {
+ byte[] data = ZKUtil.getData(zookeeper, getFileNode(serverName, queueIdStr, wal));
+ if (data == null || data.length == 0) {
+ walOffsets.put(wal, 0L);
+ } else {
+ walOffsets.put(wal, ZKUtil.parseWALPositionFrom(data));
+ }
+ }
+ queueData = new ZkReplicationQueueData(queueId, walOffsets);
+ }
+ queueDataList.add(queueData);
+ }
+ return Pair.newPair(serverName, queueDataList);
+ }
+ };
+ }
+
+ public static final class ZkLastPushedSeqId {
+
+ private final String encodedRegionName;
+
+ private final String peerId;
+
+ private final long lastPushedSeqId;
+
+ ZkLastPushedSeqId(String encodedRegionName, String peerId, long lastPushedSeqId) {
+ this.encodedRegionName = encodedRegionName;
+ this.peerId = peerId;
+ this.lastPushedSeqId = lastPushedSeqId;
+ }
+
+ public String getEncodedRegionName() {
+ return encodedRegionName;
+ }
+
+ public String getPeerId() {
+ return peerId;
+ }
+
+ public long getLastPushedSeqId() {
+ return lastPushedSeqId;
+ }
+
+ }
+
+ @SuppressWarnings("unchecked")
+ public MigrationIterator<List<ZkLastPushedSeqId>> listAllLastPushedSeqIds()
+ throws KeeperException {
+ List<String> level1Prefixs = ZKUtil.listChildrenNoWatch(zookeeper, regionsZNode);
+ if (level1Prefixs == null || level1Prefixs.isEmpty()) {
+ ZKUtil.deleteNodeRecursively(zookeeper, regionsZNode);
+ return EMPTY_ITER;
+ }
+ Iterator<String> level1Iter = level1Prefixs.iterator();
+ return new MigrationIterator<List<ZkLastPushedSeqId>>() {
+
+ private String level1Prefix;
+
+ private Iterator<String> level2Iter;
+
+ private String level2Prefix;
+
+ @Override
+ public List<ZkLastPushedSeqId> next() throws Exception {
+ for (;;) {
+ if (level2Iter == null || !level2Iter.hasNext()) {
+ if (!level1Iter.hasNext()) {
+ ZKUtil.deleteNodeRecursively(zookeeper, regionsZNode);
+ return null;
+ }
+ if (level1Prefix != null) {
+ // this will also delete the previous level2Prefix which is under this level1Prefix
+ ZKUtil.deleteNodeRecursively(zookeeper,
+ ZNodePaths.joinZNode(regionsZNode, level1Prefix));
+ }
+ level1Prefix = level1Iter.next();
+ List<String> level2Prefixes = ZKUtil.listChildrenNoWatch(zookeeper,
+ ZNodePaths.joinZNode(regionsZNode, level1Prefix));
+ if (level2Prefixes != null) {
+ level2Iter = level2Prefixes.iterator();
+ // reset level2Prefix as we have switched level1Prefix, otherwise the below delete
+ // level2Prefix section will delete the znode with this level2Prefix under the new
+ // level1Prefix
+ level2Prefix = null;
+ }
+ } else {
+ if (level2Prefix != null) {
+ ZKUtil.deleteNodeRecursively(zookeeper,
+ ZNodePaths.joinZNode(regionsZNode, level1Prefix, level2Prefix));
+ }
+ level2Prefix = level2Iter.next();
+ List<String> encodedRegionNameAndPeerIds = ZKUtil.listChildrenNoWatch(zookeeper,
+ ZNodePaths.joinZNode(regionsZNode, level1Prefix, level2Prefix));
+ if (encodedRegionNameAndPeerIds == null || encodedRegionNameAndPeerIds.isEmpty()) {
+ return Collections.emptyList();
+ }
+ List<ZkLastPushedSeqId> lastPushedSeqIds = new ArrayList<>();
+ for (String encodedRegionNameAndPeerId : encodedRegionNameAndPeerIds) {
+ byte[] data = ZKUtil.getData(zookeeper, ZNodePaths.joinZNode(regionsZNode,
+ level1Prefix, level2Prefix, encodedRegionNameAndPeerId));
+ long lastPushedSeqId = ZKUtil.parseWALPositionFrom(data);
+ Iterator<String> iter = Splitter.on('-').split(encodedRegionNameAndPeerId).iterator();
+ String encodedRegionName = level1Prefix + level2Prefix + iter.next();
+ String peerId = iter.next();
+ lastPushedSeqIds
+ .add(new ZkLastPushedSeqId(encodedRegionName, peerId, lastPushedSeqId));
+ }
+ return Collections.unmodifiableList(lastPushedSeqIds);
+ }
+ }
+ }
+ };
+ }
+
+ private String getHFileRefsPeerNode(String peerId) {
+ return ZNodePaths.joinZNode(hfileRefsZNode, peerId);
+ }
+
+ /**
+ * Pair<PeerId, List<HFileRefs>>
+ */
+ @SuppressWarnings("unchecked")
+ public MigrationIterator<Pair<String, List<String>>> listAllHFileRefs() throws KeeperException {
+ List<String> peerIds = ZKUtil.listChildrenNoWatch(zookeeper, hfileRefsZNode);
+ if (peerIds == null || peerIds.isEmpty()) {
+ ZKUtil.deleteNodeRecursively(zookeeper, hfileRefsZNode);
+ return EMPTY_ITER;
+ }
+ Iterator<String> iter = peerIds.iterator();
+ return new MigrationIterator<Pair<String, List<String>>>() {
+
+ private String previousPeerId;
+
+ @Override
+ public Pair<String, List<String>> next() throws KeeperException {
+ if (previousPeerId != null) {
+ ZKUtil.deleteNodeRecursively(zookeeper, getHFileRefsPeerNode(previousPeerId));
+ }
+ if (!iter.hasNext()) {
+ ZKUtil.deleteNodeRecursively(zookeeper, hfileRefsZNode);
+ return null;
+ }
+ String peerId = iter.next();
+ List<String> refs = ZKUtil.listChildrenNoWatch(zookeeper, getHFileRefsPeerNode(peerId));
+ previousPeerId = peerId;
+ return Pair.newPair(peerId, refs != null ? refs : Collections.emptyList());
+ }
+ };
+ }
+
+ public boolean hasData() throws KeeperException {
+ return ZKUtil.checkExists(zookeeper, queuesZNode) != -1
+ || ZKUtil.checkExists(zookeeper, regionsZNode) != -1
+ || ZKUtil.checkExists(zookeeper, hfileRefsZNode) != -1;
+ }
+
+ public void deleteAllData() throws KeeperException {
+ ZKUtil.deleteNodeRecursively(zookeeper, queuesZNode);
+ ZKUtil.deleteNodeRecursively(zookeeper, regionsZNode);
+ ZKUtil.deleteNodeRecursively(zookeeper, hfileRefsZNode);
+ }
+
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*")
+ String getQueuesZNode() {
+ return queuesZNode;
+ }
+
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*")
+ String getHfileRefsZNode() {
+ return hfileRefsZNode;
+ }
+
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*")
+ String getRegionsZNode() {
+ return regionsZNode;
+ }
+}
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
new file mode 100644
index 00000000000..e38b7b134e9
--- /dev/null
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseZKTestingUtil;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.MigrationIterator;
+import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
+import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkReplicationQueueData;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.MD5Hash;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
+import org.apache.zookeeper.KeeperException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestZKReplicationQueueStorage {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestZKReplicationQueueStorage.class);
+
+ private static final HBaseZKTestingUtil UTIL = new HBaseZKTestingUtil();
+
+ private ZKWatcher zk;
+
+ private ZKReplicationQueueStorageForMigration storage;
+
+ @Rule
+ public final TestName name = new TestName();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ UTIL.startMiniZKCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws IOException {
+ UTIL.shutdownMiniZKCluster();
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ Configuration conf = UTIL.getConfiguration();
+ conf.set(ZKReplicationStorageBase.REPLICATION_ZNODE, name.getMethodName());
+ zk = new ZKWatcher(conf, name.getMethodName(), null);
+ storage = new ZKReplicationQueueStorageForMigration(zk, conf);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ ZKUtil.deleteNodeRecursively(zk, storage.replicationZNode);
+ Closeables.close(zk, true);
+ }
+
+ public static void mockQueuesData(ZKReplicationQueueStorageForMigration storage, int nServers,
+ String peerId, ServerName deadServer) throws KeeperException {
+ ZKWatcher zk = storage.zookeeper;
+ for (int i = 0; i < nServers; i++) {
+ ServerName sn =
+ ServerName.valueOf("test-hbase-" + i, 12345, EnvironmentEdgeManager.currentTime());
+ String rsZNode = ZNodePaths.joinZNode(storage.getQueuesZNode(), sn.toString());
+ String peerZNode = ZNodePaths.joinZNode(rsZNode, peerId);
+ ZKUtil.createWithParents(zk, peerZNode);
+ for (int j = 0; j < i; j++) {
+ String wal = ZNodePaths.joinZNode(peerZNode, sn.toString() + "." + j);
+ ZKUtil.createSetData(zk, wal, ZKUtil.positionToByteArray(j));
+ }
+ String deadServerPeerZNode = ZNodePaths.joinZNode(rsZNode, peerId + "-" + deadServer);
+ ZKUtil.createWithParents(zk, deadServerPeerZNode);
+ for (int j = 0; j < i; j++) {
+ String wal = ZNodePaths.joinZNode(deadServerPeerZNode, deadServer.toString() + "." + j);
+ if (j > 0) {
+ ZKUtil.createSetData(zk, wal, ZKUtil.positionToByteArray(j));
+ } else {
+ ZKUtil.createWithParents(zk, wal);
+ }
+ }
+ }
+ ZKUtil.createWithParents(zk,
+ ZNodePaths.joinZNode(storage.getQueuesZNode(), deadServer.toString()));
+ }
+
+ private static String getLastPushedSeqIdZNode(String regionsZNode, String encodedName,
+ String peerId) {
+ return ZNodePaths.joinZNode(regionsZNode, encodedName.substring(0, 2),
+ encodedName.substring(2, 4), encodedName.substring(4) + "-" + peerId);
+ }
+
+ public static Map<String, Set<String>> mockLastPushedSeqIds(
+ ZKReplicationQueueStorageForMigration storage, String peerId1, String peerId2, int nRegions,
+ int emptyLevel1Count, int emptyLevel2Count) throws KeeperException {
+ ZKWatcher zk = storage.zookeeper;
+ Map<String, Set<String>> name2PeerIds = new HashMap<>();
+ byte[] bytes = new byte[32];
+ for (int i = 0; i < nRegions; i++) {
+ ThreadLocalRandom.current().nextBytes(bytes);
+ String encodeName = MD5Hash.getMD5AsHex(bytes);
+ String znode1 = getLastPushedSeqIdZNode(storage.getRegionsZNode(), encodeName, peerId1);
+ ZKUtil.createSetData(zk, znode1, ZKUtil.positionToByteArray(1));
+ String znode2 = getLastPushedSeqIdZNode(storage.getRegionsZNode(), encodeName, peerId2);
+ ZKUtil.createSetData(zk, znode2, ZKUtil.positionToByteArray(2));
+ name2PeerIds.put(encodeName, Sets.newHashSet(peerId1, peerId2));
+ }
+ int addedEmptyZNodes = 0;
+ for (int i = 0; i < 256; i++) {
+ String level1ZNode =
+ ZNodePaths.joinZNode(storage.getRegionsZNode(), String.format("%02x", i));
+ if (ZKUtil.checkExists(zk, level1ZNode) == -1) {
+ ZKUtil.createWithParents(zk, level1ZNode);
+ addedEmptyZNodes++;
+ if (addedEmptyZNodes <= emptyLevel2Count) {
+ ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(level1ZNode, "ab"));
+ }
+ if (addedEmptyZNodes >= emptyLevel1Count + emptyLevel2Count) {
+ break;
+ }
+ }
+ }
+ return name2PeerIds;
+ }
+
+ public static void mockHFileRefs(ZKReplicationQueueStorageForMigration storage, int nPeers)
+ throws KeeperException {
+ ZKWatcher zk = storage.zookeeper;
+ for (int i = 0; i < nPeers; i++) {
+ String peerId = "peer_" + i;
+ ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(storage.getHfileRefsZNode(), peerId));
+ for (int j = 0; j < i; j++) {
+ ZKUtil.createWithParents(zk,
+ ZNodePaths.joinZNode(storage.getHfileRefsZNode(), peerId, "hfile-" + j));
+ }
+ }
+ }
+
+ @Test
+ public void testDeleteAllData() throws Exception {
+ assertFalse(storage.hasData());
+ ZKUtil.createWithParents(zk, storage.getQueuesZNode());
+ assertTrue(storage.hasData());
+ storage.deleteAllData();
+ assertFalse(storage.hasData());
+ }
+
+ @Test
+ public void testEmptyIter() throws Exception {
+ ZKUtil.createWithParents(zk, storage.getQueuesZNode());
+ ZKUtil.createWithParents(zk, storage.getRegionsZNode());
+ ZKUtil.createWithParents(zk, storage.getHfileRefsZNode());
+ assertNull(storage.listAllQueues().next());
+ assertEquals(-1, ZKUtil.checkExists(zk, storage.getQueuesZNode()));
+ assertNull(storage.listAllLastPushedSeqIds().next());
+ assertEquals(-1, ZKUtil.checkExists(zk, storage.getRegionsZNode()));
+ assertNull(storage.listAllHFileRefs().next());
+ assertEquals(-1, ZKUtil.checkExists(zk, storage.getHfileRefsZNode()));
+ }
+
+ @Test
+ public void testListAllQueues() throws Exception {
+ String peerId = "1";
+ ServerName deadServer =
+ ServerName.valueOf("test-hbase-dead", 12345, EnvironmentEdgeManager.currentTime());
+ int nServers = 10;
+ mockQueuesData(storage, nServers, peerId, deadServer);
+ MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>> iter =
+ storage.listAllQueues();
+ ServerName previousServerName = null;
+ for (int i = 0; i < nServers + 1; i++) {
+ Pair<ServerName, List<ZkReplicationQueueData>> pair = iter.next();
+ assertNotNull(pair);
+ if (previousServerName != null) {
+ assertEquals(-1, ZKUtil.checkExists(zk,
+ ZNodePaths.joinZNode(storage.getQueuesZNode(), previousServerName.toString())));
+ }
+ ServerName sn = pair.getFirst();
+ previousServerName = sn;
+ if (sn.equals(deadServer)) {
+ assertThat(pair.getSecond(), empty());
+ } else {
+ assertEquals(2, pair.getSecond().size());
+ int n = Integer.parseInt(Iterables.getLast(Splitter.on('-').split(sn.getHostname())));
+ ZkReplicationQueueData data0 = pair.getSecond().get(0);
+ assertEquals(peerId, data0.getQueueId().getPeerId());
+ assertEquals(sn, data0.getQueueId().getServerName());
+ assertEquals(n, data0.getWalOffsets().size());
+ for (int j = 0; j < n; j++) {
+ assertEquals(j,
+ data0.getWalOffsets().get(
+ (data0.getQueueId().isRecovered() ? deadServer.toString() : sn.toString()) + "." + j)
+ .intValue());
+ }
+ ZkReplicationQueueData data1 = pair.getSecond().get(1);
+ assertEquals(peerId, data1.getQueueId().getPeerId());
+ assertEquals(sn, data1.getQueueId().getServerName());
+ assertEquals(n, data1.getWalOffsets().size());
+ for (int j = 0; j < n; j++) {
+ assertEquals(j,
+ data1.getWalOffsets().get(
+ (data1.getQueueId().isRecovered() ? deadServer.toString() : sn.toString()) + "." + j)
+ .intValue());
+ }
+ // the order of the returned result is undetermined
+ if (data0.getQueueId().getSourceServerName().isPresent()) {
+ assertEquals(deadServer, data0.getQueueId().getSourceServerName().get());
+ assertFalse(data1.getQueueId().getSourceServerName().isPresent());
+ } else {
+ assertEquals(deadServer, data1.getQueueId().getSourceServerName().get());
+ }
+ }
+ }
+ assertNull(iter.next());
+ assertEquals(-1, ZKUtil.checkExists(zk, storage.getQueuesZNode()));
+ }
+
+ @Test
+ public void testListAllLastPushedSeqIds() throws Exception {
+ String peerId1 = "1";
+ String peerId2 = "2";
+ Map<String, Set<String>> name2PeerIds =
+ mockLastPushedSeqIds(storage, peerId1, peerId2, 100, 10, 10);
+ MigrationIterator<List<ZkLastPushedSeqId>> iter = storage.listAllLastPushedSeqIds();
+ int emptyListCount = 0;
+ for (;;) {
+ List<ZkLastPushedSeqId> list = iter.next();
+ if (list == null) {
+ break;
+ }
+ if (list.isEmpty()) {
+ emptyListCount++;
+ continue;
+ }
+ for (ZkLastPushedSeqId seqId : list) {
+ name2PeerIds.get(seqId.getEncodedRegionName()).remove(seqId.getPeerId());
+ if (seqId.getPeerId().equals(peerId1)) {
+ assertEquals(1, seqId.getLastPushedSeqId());
+ } else {
+ assertEquals(2, seqId.getLastPushedSeqId());
+ }
+ }
+ }
+ assertEquals(10, emptyListCount);
+ name2PeerIds.forEach((encodedRegionName, peerIds) -> {
+ assertThat(encodedRegionName + " still has unmigrated peers", peerIds, empty());
+ });
+ assertEquals(-1, ZKUtil.checkExists(zk, storage.getRegionsZNode()));
+ }
+
+ @Test
+ public void testListAllHFileRefs() throws Exception {
+ int nPeers = 10;
+ mockHFileRefs(storage, nPeers);
+ MigrationIterator<Pair<String, List<String>>> iter = storage.listAllHFileRefs();
+ String previousPeerId = null;
+ for (int i = 0; i < nPeers; i++) {
+ Pair<String, List<String>> pair = iter.next();
+ if (previousPeerId != null) {
+ assertEquals(-1, ZKUtil.checkExists(zk,
+ ZNodePaths.joinZNode(storage.getHfileRefsZNode(), previousPeerId)));
+ }
+ String peerId = pair.getFirst();
+ previousPeerId = peerId;
+ int index = Integer.parseInt(Iterables.getLast(Splitter.on('_').split(peerId)));
+ assertEquals(index, pair.getSecond().size());
+ }
+ assertNull(iter.next());
+ assertEquals(-1, ZKUtil.checkExists(zk, storage.getHfileRefsZNode()));
+ }
+}
diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml
index 0dba4aa9833..b61b0252a05 100644
--- a/hbase-server/pom.xml
+++ b/hbase-server/pom.xml
@@ -102,6 +102,12 @@
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-replication</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-replication</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-balancer</artifactId>
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 118457648de..67d0f889d64 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -171,6 +171,7 @@ import org.apache.hadoop.hbase.master.replication.AbstractPeerProcedure;
import org.apache.hadoop.hbase.master.replication.AddPeerProcedure;
import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure;
import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure;
+import org.apache.hadoop.hbase.master.replication.MigrateReplicationQueueFromZkToTableProcedure;
import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager;
@@ -221,6 +222,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator;
@@ -1050,6 +1052,17 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
this.balancer.initialize();
this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor());
+ // try migrate replication data
+ ZKReplicationQueueStorageForMigration oldReplicationQueueStorage =
+ new ZKReplicationQueueStorageForMigration(zooKeeper, conf);
+ // check whether there are something to migrate and we haven't scheduled a migration procedure
+ // yet
+ if (
+ oldReplicationQueueStorage.hasData() && procedureExecutor.getProcedures().stream()
+ .allMatch(p -> !(p instanceof MigrateReplicationQueueFromZkToTableProcedure))
+ ) {
+ procedureExecutor.submitProcedure(new MigrateReplicationQueueFromZkToTableProcedure());
+ }
// start up all service threads.
startupTaskGroup.addTask("Initializing master service threads");
startServiceThreads();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
index 487c45e5c5c..97976756d82 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
import org.apache.hadoop.hbase.master.replication.AssignReplicationQueuesProcedure;
+import org.apache.hadoop.hbase.master.replication.MigrateReplicationQueueFromZkToTableProcedure;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.procedure2.Procedure;
@@ -52,6 +53,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ServerCrashState;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
/**
* Handle crashed server. This is a port to ProcedureV2 of what used to be euphemistically called
@@ -266,6 +268,16 @@ public class ServerCrashProcedure extends
}
break;
case SERVER_CRASH_CLAIM_REPLICATION_QUEUES:
+ if (
+ env.getMasterServices().getProcedures().stream()
+ .filter(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure)
+ .anyMatch(p -> !p.isFinished())
+ ) {
+ LOG.info("There is a pending {}, will retry claim replication queue later",
+ MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName());
+ suspend(10_000, true);
+ return Flow.NO_MORE_STATE;
+ }
addChildProcedure(new AssignReplicationQueuesProcedure(serverName));
setNextState(ServerCrashState.SERVER_CRASH_FINISH);
break;
@@ -431,6 +443,13 @@ public class ServerCrashProcedure extends
env.getProcedureScheduler().wakeServerExclusiveLock(this, getServerName());
}
+ @Override
+ protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
+ setState(ProcedureProtos.ProcedureState.RUNNABLE);
+ env.getProcedureScheduler().addFront(this);
+ return false;
+ }
+
@Override
public void toStringClassDetails(StringBuilder sb) {
sb.append(getProcName());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerNoLockProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerNoLockProcedure.java
index 660f9968573..1f0a89f2076 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerNoLockProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerNoLockProcedure.java
@@ -98,10 +98,7 @@ public abstract class AbstractPeerNoLockProcedure<TState>
}
long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
backoffConsumer.accept(backoff);
- setTimeout(Math.toIntExact(backoff));
- setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
- skipPersistence();
- throw new ProcedureSuspendedException();
+ throw suspend(Math.toIntExact(backoff), false);
}
protected final void resetRetry() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
new file mode 100644
index 00000000000..536f232338e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER;
+import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER;
+import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE;
+import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE;
+import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableStateData;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
+
+/**
+ * A procedure for migrating replication queue data from zookeeper to hbase:replication table.
+ */
+@InterfaceAudience.Private
+public class MigrateReplicationQueueFromZkToTableProcedure
+ extends StateMachineProcedure<MasterProcedureEnv, MigrateReplicationQueueFromZkToTableState>
+ implements GlobalProcedureInterface {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(MigrateReplicationQueueFromZkToTableProcedure.class);
+
+ private static final int MIN_MAJOR_VERSION = 3;
+
+ private List<String> disabledPeerIds;
+
+ private List<Future<?>> futures;
+
+ private ExecutorService executor;
+
+ @Override
+ public String getGlobalId() {
+ return getClass().getSimpleName();
+ }
+
+ private ExecutorService getExecutorService() {
+ if (executor == null) {
+ executor = Executors.newFixedThreadPool(3, new ThreadFactoryBuilder()
+ .setNameFormat(getClass().getSimpleName() + "-%d").setDaemon(true).build());
+ }
+ return executor;
+ }
+
+ private void shutdownExecutorService() {
+ if (executor != null) {
+ executor.shutdown();
+ executor = null;
+ }
+ }
+
+ private void waitUntilNoPeerProcedure(MasterProcedureEnv env) throws ProcedureSuspendedException {
+ long peerProcCount;
+ try {
+ peerProcCount = env.getMasterServices().getProcedures().stream()
+ .filter(p -> p instanceof PeerProcedureInterface).filter(p -> !p.isFinished()).count();
+ } catch (IOException e) {
+ LOG.warn("failed to check peer procedure status", e);
+ throw suspend(5000, true);
+ }
+ if (peerProcCount > 0) {
+ LOG.info("There are still {} pending peer procedures, will sleep and check later",
+ peerProcCount);
+ throw suspend(10_000, true);
+ }
+ LOG.info("No pending peer procedures found, continue...");
+ }
+
+ @Override
+ protected Flow executeFromState(MasterProcedureEnv env,
+ MigrateReplicationQueueFromZkToTableState state)
+ throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
+ switch (state) {
+ case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE:
+ waitUntilNoPeerProcedure(env);
+ List<ReplicationPeerDescription> peers = env.getReplicationPeerManager().listPeers(null);
+ if (peers.isEmpty()) {
+ LOG.info("No active replication peer found, delete old replication queue data and quit");
+ ZKReplicationQueueStorageForMigration oldStorage =
+ new ZKReplicationQueueStorageForMigration(env.getMasterServices().getZooKeeper(),
+ env.getMasterConfiguration());
+ try {
+ oldStorage.deleteAllData();
+ } catch (KeeperException e) {
+ LOG.warn("failed to delete old replication queue data, sleep and retry later", e);
+ suspend(10_000, true);
+ }
+ return Flow.NO_MORE_STATE;
+ }
+ // here we do not care the peers which have already been disabled, as later we do not need
+ // to enable them
+ disabledPeerIds = peers.stream().filter(ReplicationPeerDescription::isEnabled)
+ .map(ReplicationPeerDescription::getPeerId).collect(Collectors.toList());
+ setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER);
+ return Flow.HAS_MORE_STATE;
+ case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER:
+ for (String peerId : disabledPeerIds) {
+ addChildProcedure(new DisablePeerProcedure(peerId));
+ }
+ setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE);
+ return Flow.HAS_MORE_STATE;
+ case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE:
+ if (futures != null) {
+ // wait until all futures done
+ long notDone = futures.stream().filter(f -> !f.isDone()).count();
+ if (notDone == 0) {
+ boolean succ = true;
+ for (Future<?> future : futures) {
+ try {
+ future.get();
+ } catch (Exception e) {
+ succ = false;
+ LOG.warn("Failed to migrate", e);
+ }
+ }
+ if (succ) {
+ shutdownExecutorService();
+ setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING);
+ return Flow.HAS_MORE_STATE;
+ }
+ // reschedule to retry migration again
+ futures = null;
+ } else {
+ LOG.info("There still {} pending migration tasks, will sleep and check later", notDone);
+ throw suspend(10_000, true);
+ }
+ }
+ try {
+ futures = env.getReplicationPeerManager()
+ .migrateQueuesFromZk(env.getMasterServices().getZooKeeper(), getExecutorService());
+ } catch (IOException e) {
+ LOG.warn("failed to submit migration tasks", e);
+ throw suspend(10_000, true);
+ }
+ throw suspend(10_000, true);
+ case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING:
+ long rsWithLowerVersion =
+ env.getMasterServices().getServerManager().getOnlineServers().values().stream()
+ .filter(sm -> VersionInfo.getMajorVersion(sm.getVersion()) < MIN_MAJOR_VERSION).count();
+ if (rsWithLowerVersion == 0) {
+ setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER);
+ return Flow.HAS_MORE_STATE;
+ } else {
+ LOG.info("There are still {} region servers which have a major version less than {}, "
+ + "will sleep and check later", rsWithLowerVersion, MIN_MAJOR_VERSION);
+ throw suspend(10_000, true);
+ }
+ case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER:
+ for (String peerId : disabledPeerIds) {
+ addChildProcedure(new EnablePeerProcedure(peerId));
+ }
+ return Flow.NO_MORE_STATE;
+ default:
+ throw new UnsupportedOperationException("unhandled state=" + state);
+ }
+ }
+
+ @Override
+ protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
+ setState(ProcedureProtos.ProcedureState.RUNNABLE);
+ env.getProcedureScheduler().addFront(this);
+ return false;
+ }
+
+ @Override
+ protected void rollbackState(MasterProcedureEnv env,
+ MigrateReplicationQueueFromZkToTableState state) throws IOException, InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected MigrateReplicationQueueFromZkToTableState getState(int stateId) {
+ return MigrateReplicationQueueFromZkToTableState.forNumber(stateId);
+ }
+
+ @Override
+ protected int getStateId(MigrateReplicationQueueFromZkToTableState state) {
+ return state.getNumber();
+ }
+
+ @Override
+ protected MigrateReplicationQueueFromZkToTableState getInitialState() {
+ return MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE;
+ }
+
+ @Override
+ protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ super.serializeStateData(serializer);
+ MigrateReplicationQueueFromZkToTableStateData.Builder builder =
+ MigrateReplicationQueueFromZkToTableStateData.newBuilder();
+ if (disabledPeerIds != null) {
+ builder.addAllDisabledPeerId(disabledPeerIds);
+ }
+ serializer.serialize(builder.build());
+ }
+
+ @Override
+ protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ super.deserializeStateData(serializer);
+ MigrateReplicationQueueFromZkToTableStateData data =
+ serializer.deserialize(MigrateReplicationQueueFromZkToTableStateData.class);
+ disabledPeerIds = data.getDisabledPeerIdList().stream().collect(Collectors.toList());
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
index 78b97620c01..c358ec164e7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import java.io.InterruptedIOException;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.TableDescriptor;
@@ -27,6 +28,7 @@ import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
+import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@@ -152,12 +154,36 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
}
}
+ private boolean shouldFailForMigrating(MasterProcedureEnv env) throws IOException {
+ long parentProcId = getParentProcId();
+ if (
+ parentProcId != Procedure.NO_PROC_ID && env.getMasterServices().getMasterProcedureExecutor()
+ .getProcedure(parentProcId) instanceof MigrateReplicationQueueFromZkToTableProcedure
+ ) {
+ // this is scheduled by MigrateReplicationQueueFromZkToTableProcedure, should not fail it
+ return false;
+ }
+ return env.getMasterServices().getProcedures().stream()
+ .filter(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure)
+ .anyMatch(p -> !p.isFinished());
+ }
+
@Override
protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
throws ProcedureSuspendedException, InterruptedException {
switch (state) {
case PRE_PEER_MODIFICATION:
try {
+ if (shouldFailForMigrating(env)) {
+ LOG.info("There is a pending {}, give up execution of {}",
+ MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName(),
+ getClass().getName());
+ setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer",
+ new DoNotRetryIOException("There is a pending "
+ + MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName()));
+ releaseLatch(env);
+ return Flow.NO_MORE_STATE;
+ }
prePeerModification(env);
} catch (IOException e) {
LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, "
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index 0a1dbf848bd..81f569c3f9e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -21,14 +21,18 @@ import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
@@ -39,6 +43,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.master.MasterServices;
@@ -49,17 +54,24 @@ import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
+import org.apache.hadoop.hbase.replication.ReplicationQueueData;
import org.apache.hadoop.hbase.replication.ReplicationQueueId;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
+import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.MigrationIterator;
+import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
+import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkReplicationQueueData;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@@ -106,7 +118,7 @@ public class ReplicationPeerManager {
private final Configuration conf;
@FunctionalInterface
- private interface ReplicationQueueStorageInitializer {
+ interface ReplicationQueueStorageInitializer {
void initialize() throws IOException;
}
@@ -138,6 +150,10 @@ public class ReplicationPeerManager {
}
}
+ private void initializeQueueStorage() throws IOException {
+ queueStorageInitializer.initialize();
+ }
+
void preAddPeer(String peerId, ReplicationPeerConfig peerConfig)
throws ReplicationException, IOException {
if (peerId.contains("-")) {
@@ -152,7 +168,7 @@ public class ReplicationPeerManager {
}
// lazy create table
- queueStorageInitializer.initialize();
+ initializeQueueStorage();
// make sure that there is no queues with the same peer id. This may happen when we create a
// peer with the same id with a old deleted peer. If the replication queues for the old peer
// have not been cleaned up yet then we should not create the new peer, otherwise the old wal
@@ -699,4 +715,88 @@ public class ReplicationPeerManager {
public ReplicationLogCleanerBarrier getReplicationLogCleanerBarrier() {
return replicationLogCleanerBarrier;
}
+
+ private ReplicationQueueData convert(ZkReplicationQueueData zkData) {
+ Map<String, ReplicationGroupOffset> groupOffsets = new HashMap<>();
+ zkData.getWalOffsets().forEach((wal, offset) -> {
+ String walGroup = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
+ groupOffsets.compute(walGroup, (k, oldOffset) -> {
+ if (oldOffset == null) {
+ return new ReplicationGroupOffset(wal, offset);
+ }
+ // we should record the first wal's offset
+ long oldWalTs = AbstractFSWALProvider.getTimestamp(oldOffset.getWal());
+ long walTs = AbstractFSWALProvider.getTimestamp(wal);
+ if (walTs < oldWalTs) {
+ return new ReplicationGroupOffset(wal, offset);
+ }
+ return oldOffset;
+ });
+ });
+ return new ReplicationQueueData(zkData.getQueueId(), ImmutableMap.copyOf(groupOffsets));
+ }
+
+ private void migrateQueues(ZKReplicationQueueStorageForMigration oldQueueStorage)
+ throws Exception {
+ MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>> iter =
+ oldQueueStorage.listAllQueues();
+ for (;;) {
+ Pair<ServerName, List<ZkReplicationQueueData>> pair = iter.next();
+ if (pair == null) {
+ return;
+ }
+ queueStorage.batchUpdateQueues(pair.getFirst(),
+ pair.getSecond().stream().filter(data -> peers.containsKey(data.getQueueId().getPeerId()))
+ .map(this::convert).collect(Collectors.toList()));
+ }
+ }
+
+ private void migrateLastPushedSeqIds(ZKReplicationQueueStorageForMigration oldQueueStorage)
+ throws Exception {
+ MigrationIterator<List<ZkLastPushedSeqId>> iter = oldQueueStorage.listAllLastPushedSeqIds();
+ for (;;) {
+ List<ZkLastPushedSeqId> list = iter.next();
+ if (list == null) {
+ return;
+ }
+ queueStorage.batchUpdateLastSequenceIds(list.stream()
+ .filter(data -> peers.containsKey(data.getPeerId())).collect(Collectors.toList()));
+ }
+ }
+
+ private void migrateHFileRefs(ZKReplicationQueueStorageForMigration oldQueueStorage)
+ throws Exception {
+ MigrationIterator<Pair<String, List<String>>> iter = oldQueueStorage.listAllHFileRefs();
+ for (;;) {
+ Pair<String, List<String>> pair = iter.next();
+ if (pair == null) {
+ return;
+ }
+ if (peers.containsKey(pair.getFirst())) {
+ queueStorage.batchUpdateHFileRefs(pair.getFirst(), pair.getSecond());
+ }
+ }
+ }
+
+ /**
+ * Submit the migration tasks to the given {@code executor} and return the futures.
+ */
+ List<Future<?>> migrateQueuesFromZk(ZKWatcher zookeeper, ExecutorService executor)
+ throws IOException {
+ // the replication queue table creation is asynchronous and will be triggered by addPeer, so
+ // here we need to manually initialize it since we will not call addPeer.
+ initializeQueueStorage();
+ ZKReplicationQueueStorageForMigration oldStorage =
+ new ZKReplicationQueueStorageForMigration(zookeeper, conf);
+ return Arrays.asList(executor.submit(() -> {
+ migrateQueues(oldStorage);
+ return null;
+ }), executor.submit(() -> {
+ migrateLastPushedSeqIds(oldStorage);
+ return null;
+ }), executor.submit(() -> {
+ migrateHFileRefs(oldStorage);
+ return null;
+ }));
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
index 2de10cb2778..89658903538 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.StateMachineProcedure.Flow;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
@@ -236,6 +237,19 @@ public class TransitPeerSyncReplicationStateProcedure
switch (state) {
case PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION:
try {
+ if (
+ env.getMasterServices().getProcedures().stream()
+ .filter(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure)
+ .anyMatch(p -> !p.isFinished())
+ ) {
+ LOG.info("There is a pending {}, give up execution of {}",
+ MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName(),
+ getClass().getSimpleName());
+ setFailure("master-transit-peer-sync-replication-state",
+ new DoNotRetryIOException("There is a pending "
+ + MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName()));
+ return Flow.NO_MORE_STATE;
+ }
preTransit(env);
} catch (IOException e) {
LOG.warn("Failed to call pre CP hook or the pre check is failed for peer {} "
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueue.java
new file mode 100644
index 00000000000..1b0f727a072
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueue.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
+import org.apache.hadoop.hbase.replication.ReplicationQueueData;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.replication.TestReplicationBase;
+import org.apache.hadoop.hbase.replication.ZKReplicationStorageBase;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
+
+@Category({ MasterTests.class, LargeTests.class })
+public class TestMigrateReplicationQueue extends TestReplicationBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestMigrateReplicationQueue.class);
+
+ private int disableAndInsert() throws Exception {
+ UTIL1.getAdmin().disableReplicationPeer(PEER_ID2);
+ return UTIL1.loadTable(htable1, famName);
+ }
+
+ private String getQueuesZNode() throws IOException {
+ Configuration conf = UTIL1.getConfiguration();
+ ZKWatcher zk = UTIL1.getZooKeeperWatcher();
+ String replicationZNode = ZNodePaths.joinZNode(zk.getZNodePaths().baseZNode,
+ conf.get(ZKReplicationStorageBase.REPLICATION_ZNODE,
+ ZKReplicationStorageBase.REPLICATION_ZNODE_DEFAULT));
+ return ZNodePaths.joinZNode(replicationZNode, conf.get("zookeeper.znode.replication.rs", "rs"));
+ }
+
+ private void mockData() throws Exception {
+ // delete the replication queue table to simulate upgrading from an older version of hbase
+ TableName replicationQueueTableName = TableName
+ .valueOf(UTIL1.getConfiguration().get(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME,
+ ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString()));
+ List<ReplicationQueueData> queueDatas = UTIL1.getMiniHBaseCluster().getMaster()
+ .getReplicationPeerManager().getQueueStorage().listAllQueues();
+ assertEquals(UTIL1.getMiniHBaseCluster().getRegionServerThreads().size(), queueDatas.size());
+ UTIL1.getAdmin().disableTable(replicationQueueTableName);
+ UTIL1.getAdmin().deleteTable(replicationQueueTableName);
+ // shutdown the hbase cluster
+ UTIL1.shutdownMiniHBaseCluster();
+ ZKWatcher zk = UTIL1.getZooKeeperWatcher();
+ String queuesZNode = getQueuesZNode();
+ for (ReplicationQueueData queueData : queueDatas) {
+ String replicatorZNode =
+ ZNodePaths.joinZNode(queuesZNode, queueData.getId().getServerName().toString());
+ String queueZNode = ZNodePaths.joinZNode(replicatorZNode, queueData.getId().getPeerId());
+ assertEquals(1, queueData.getOffsets().size());
+ ReplicationGroupOffset offset = Iterables.getOnlyElement(queueData.getOffsets().values());
+ String walZNode = ZNodePaths.joinZNode(queueZNode, offset.getWal());
+ ZKUtil.createSetData(zk, walZNode, ZKUtil.positionToByteArray(offset.getOffset()));
+ }
+ }
+
+ @Test
+ public void testMigrate() throws Exception {
+ int count = disableAndInsert();
+ mockData();
+ restartSourceCluster(1);
+ UTIL1.waitFor(60000,
+ () -> UTIL1.getMiniHBaseCluster().getMaster().getProcedures().stream()
+ .filter(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure).findAny()
+ .map(Procedure::isSuccess).orElse(false));
+ TableName replicationQueueTableName = TableName
+ .valueOf(UTIL1.getConfiguration().get(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME,
+ ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString()));
+ assertTrue(UTIL1.getAdmin().tableExists(replicationQueueTableName));
+ ZKWatcher zk = UTIL1.getZooKeeperWatcher();
+ assertEquals(-1, ZKUtil.checkExists(zk, getQueuesZNode()));
+ // wait until SCP finishes, which means we can finish the claim queue operation
+ UTIL1.waitFor(60000, () -> UTIL1.getMiniHBaseCluster().getMaster().getProcedures().stream()
+ .filter(p -> p instanceof ServerCrashProcedure).allMatch(Procedure::isSuccess));
+ List<ReplicationQueueData> queueDatas = UTIL1.getMiniHBaseCluster().getMaster()
+ .getReplicationPeerManager().getQueueStorage().listAllQueues();
+ assertEquals(1, queueDatas.size());
+ // should have 1 recovered queue, as we haven't replicated anything out so there is no queue
+ // data for the new alive region server
+ assertTrue(queueDatas.get(0).getId().isRecovered());
+ assertEquals(1, queueDatas.get(0).getOffsets().size());
+ // the peer is still disabled, so no data has been replicated
+ assertFalse(UTIL1.getAdmin().isReplicationPeerEnabled(PEER_ID2));
+ assertEquals(0, HBaseTestingUtil.countRows(htable2));
+ // enable peer, and make sure the replication can continue correctly
+ UTIL1.getAdmin().enableReplicationPeer(PEER_ID2);
+ waitForReplication(count, 100);
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedure.java
new file mode 100644
index 00000000000..752abc380b8
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedure.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.ServerMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.StartTestingClusterOption;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionServerList;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
+
+@Category({ MasterTests.class, MediumTests.class })
+public class TestMigrateReplicationQueueFromZkToTableProcedure {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestMigrateReplicationQueueFromZkToTableProcedure.class);
+
+ private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
+
+ public static final class HMasterForTest extends HMaster {
+
+ public HMasterForTest(Configuration conf) throws IOException {
+ super(conf);
+ }
+
+ @Override
+ protected ServerManager createServerManager(MasterServices master, RegionServerList storage)
+ throws IOException {
+ setupClusterConnection();
+ return new ServerManagerForTest(master, storage);
+ }
+ }
+
+ private static final ConcurrentMap<ServerName, ServerMetrics> EXTRA_REGION_SERVERS =
+ new ConcurrentHashMap<>();
+
+ public static final class ServerManagerForTest extends ServerManager {
+
+ public ServerManagerForTest(MasterServices master, RegionServerList storage) {
+ super(master, storage);
+ }
+
+ @Override
+ public Map<ServerName, ServerMetrics> getOnlineServers() {
+ Map<ServerName, ServerMetrics> map = new HashMap<>(super.getOnlineServers());
+ map.putAll(EXTRA_REGION_SERVERS);
+ return map;
+ }
+ }
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ UTIL.startMiniCluster(
+ StartTestingClusterOption.builder().masterClass(HMasterForTest.class).build());
+ }
+
+ @AfterClass
+ public static void cleanupTest() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ private ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
+ return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ Admin admin = UTIL.getAdmin();
+ for (ReplicationPeerDescription pd : admin.listReplicationPeers()) {
+ admin.removeReplicationPeer(pd.getPeerId());
+ }
+ }
+
+ private static CountDownLatch PEER_PROC_ARRIVE;
+
+ private static CountDownLatch PEER_PROC_RESUME;
+
+ public static final class FakePeerProcedure extends Procedure<MasterProcedureEnv>
+ implements PeerProcedureInterface {
+
+ private String peerId;
+
+ public FakePeerProcedure() {
+ }
+
+ public FakePeerProcedure(String peerId) {
+ this.peerId = peerId;
+ }
+
+ @Override
+ public String getPeerId() {
+ return peerId;
+ }
+
+ @Override
+ public PeerOperationType getPeerOperationType() {
+ return PeerOperationType.UPDATE_CONFIG;
+ }
+
+ @Override
+ protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
+ throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
+ PEER_PROC_ARRIVE.countDown();
+ PEER_PROC_RESUME.await();
+ return null;
+ }
+
+ @Override
+ protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected boolean abort(MasterProcedureEnv env) {
+ return false;
+ }
+
+ @Override
+ protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ }
+
+ @Override
+ protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ }
+ }
+
+ @Test
+ public void testWaitUntilNoPeerProcedure() throws Exception {
+ PEER_PROC_ARRIVE = new CountDownLatch(1);
+ PEER_PROC_RESUME = new CountDownLatch(1);
+ ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
+ procExec.submitProcedure(new FakePeerProcedure("1"));
+ PEER_PROC_ARRIVE.await();
+ MigrateReplicationQueueFromZkToTableProcedure proc =
+ new MigrateReplicationQueueFromZkToTableProcedure();
+ procExec.submitProcedure(proc);
+ // make sure we will wait until there is no peer related procedures before proceeding
+ UTIL.waitFor(30000, () -> proc.getState() == ProcedureState.WAITING_TIMEOUT);
+ // continue and make sure we can finish successfully
+ PEER_PROC_RESUME.countDown();
+ UTIL.waitFor(30000, () -> proc.isSuccess());
+ }
+
+ @Test
+ public void testDisablePeerAndWaitUpgrading() throws Exception {
+ String peerId = "2";
+ ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
+ .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/testhbase")
+ .setReplicateAllUserTables(true).build();
+ UTIL.getAdmin().addReplicationPeer(peerId, rpc);
+ // put a fake region server to simulate that there are still region servers with older version
+ ServerMetrics metrics = mock(ServerMetrics.class);
+ when(metrics.getVersion()).thenReturn("2.5.0");
+ EXTRA_REGION_SERVERS
+ .put(ServerName.valueOf("localhost", 54321, EnvironmentEdgeManager.currentTime()), metrics);
+
+ ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
+
+ MigrateReplicationQueueFromZkToTableProcedure proc =
+ new MigrateReplicationQueueFromZkToTableProcedure();
+ procExec.submitProcedure(proc);
+ // wait until we reach the wait upgrading state
+ UTIL.waitFor(30000,
+ () -> proc.getCurrentStateId()
+ == MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING.getNumber()
+ && proc.getState() == ProcedureState.WAITING_TIMEOUT);
+ // make sure the peer is disabled for migrating
+ assertFalse(UTIL.getAdmin().isReplicationPeerEnabled(peerId));
+
+ // the procedure should finish successfully
+ EXTRA_REGION_SERVERS.clear();
+ UTIL.waitFor(30000, () -> proc.isSuccess());
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedureRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedureRecovery.java
new file mode 100644
index 00000000000..8d1a975400f
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedureRecovery.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasSize;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureTestingUtility;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
+import org.apache.hadoop.hbase.replication.ZKReplicationStorageBase;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MasterTests.class, MediumTests.class })
+public class TestMigrateReplicationQueueFromZkToTableProcedureRecovery {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestMigrateReplicationQueueFromZkToTableProcedureRecovery.class);
+
+ private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
+ UTIL.startMiniCluster(1);
+ }
+
+ @AfterClass
+ public static void cleanupTest() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ private ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
+ return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor();
+ }
+
+ @Before
+ public void setup() throws Exception {
+ ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(getMasterProcedureExecutor(), false);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(getMasterProcedureExecutor(), false);
+ }
+
+ private String getHFileRefsZNode() throws IOException {
+ Configuration conf = UTIL.getConfiguration();
+ ZKWatcher zk = UTIL.getZooKeeperWatcher();
+ String replicationZNode = ZNodePaths.joinZNode(zk.getZNodePaths().baseZNode,
+ conf.get(ZKReplicationStorageBase.REPLICATION_ZNODE,
+ ZKReplicationStorageBase.REPLICATION_ZNODE_DEFAULT));
+ return ZNodePaths.joinZNode(replicationZNode,
+ conf.get(ZKReplicationQueueStorageForMigration.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
+ ZKReplicationQueueStorageForMigration.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT));
+ }
+
+ @Test
+ public void testRecoveryAndDoubleExecution() throws Exception {
+ String peerId = "2";
+ ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
+ .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/testhbase")
+ .setReplicateAllUserTables(true).build();
+ UTIL.getAdmin().addReplicationPeer(peerId, rpc);
+
+ // here we only test a simple migration, more complicated migration will be tested in other UTs,
+ // such as TestMigrateReplicationQueue and TestReplicationPeerManagerMigrateFromZk
+ String hfileRefsZNode = getHFileRefsZNode();
+ String hfile = "hfile";
+ String hfileZNode = ZNodePaths.joinZNode(hfileRefsZNode, peerId, hfile);
+ ZKUtil.createWithParents(UTIL.getZooKeeperWatcher(), hfileZNode);
+
+ ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
+
+ ProcedureTestingUtility.waitNoProcedureRunning(procExec);
+ ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
+
+ // Start the migration procedure && kill the executor
+ long procId = procExec.submitProcedure(new MigrateReplicationQueueFromZkToTableProcedure());
+ // Restart the executor and execute the step twice
+ MasterProcedureTestingUtility.testRecoveryAndDoubleExecution(procExec, procId);
+ // Validate the migration result
+ ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
+ ReplicationQueueStorage queueStorage =
+ UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
+ List<String> hfiles = queueStorage.getReplicableHFiles(peerId);
+ assertThat(hfiles, Matchers.<List<String>> both(hasItem(hfile)).and(hasSize(1)));
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestReplicationPeerManagerMigrateQueuesFromZk.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestReplicationPeerManagerMigrateQueuesFromZk.java
new file mode 100644
index 00000000000..73915e856ea
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestReplicationPeerManagerMigrateQueuesFromZk.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNameTestRule;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager.ReplicationQueueStorageInitializer;
+import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
+import org.apache.hadoop.hbase.replication.ReplicationQueueData;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.replication.TableReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.TestZKReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+@Category({ MasterTests.class, MediumTests.class })
+public class TestReplicationPeerManagerMigrateQueuesFromZk {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestReplicationPeerManagerMigrateQueuesFromZk.class);
+
+ private static HBaseTestingUtil UTIL = new HBaseTestingUtil();
+
+ private static ExecutorService EXECUTOR;
+
+ ConcurrentMap<String, ReplicationPeerDescription> peers;
+
+ private ReplicationPeerStorage peerStorage;
+
+ private ReplicationQueueStorage queueStorage;
+
+ private ReplicationQueueStorageInitializer queueStorageInitializer;
+
+ private ReplicationPeerManager manager;
+
+ private int nServers = 10;
+
+ private int nPeers = 10;
+
+ private int nRegions = 100;
+
+ private ServerName deadServerName;
+
+ @Rule
+ public final TableNameTestRule tableNameRule = new TableNameTestRule();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ UTIL.startMiniCluster(1);
+ EXECUTOR = Executors.newFixedThreadPool(3,
+ new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat(TestReplicationPeerManagerMigrateQueuesFromZk.class.getSimpleName() + "-%d")
+ .build());
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ EXECUTOR.shutdownNow();
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ Configuration conf = UTIL.getConfiguration();
+ peerStorage = mock(ReplicationPeerStorage.class);
+ TableName tableName = tableNameRule.getTableName();
+ UTIL.getAdmin()
+ .createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName));
+ queueStorage = new TableReplicationQueueStorage(UTIL.getConnection(), tableName);
+ queueStorageInitializer = mock(ReplicationQueueStorageInitializer.class);
+ peers = new ConcurrentHashMap<>();
+ deadServerName =
+ ServerName.valueOf("test-hbase-dead", 12345, EnvironmentEdgeManager.currentTime());
+ manager = new ReplicationPeerManager(peerStorage, queueStorage, peers, conf, "cluster",
+ queueStorageInitializer);
+ }
+
+ private Map<String, Set<String>> prepareData() throws Exception {
+ ZKReplicationQueueStorageForMigration storage = new ZKReplicationQueueStorageForMigration(
+ UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
+ TestZKReplicationQueueStorage.mockQueuesData(storage, 10, "peer_0", deadServerName);
+ Map<String, Set<String>> encodedName2PeerIds = TestZKReplicationQueueStorage
+ .mockLastPushedSeqIds(storage, "peer_1", "peer_2", nRegions, 10, 10);
+ TestZKReplicationQueueStorage.mockHFileRefs(storage, 10);
+ return encodedName2PeerIds;
+ }
+
+ @Test
+ public void testNoPeers() throws Exception {
+ prepareData();
+ for (Future<?> future : manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR)) {
+ future.get(1, TimeUnit.MINUTES);
+ }
+ // should have called initializer
+ verify(queueStorageInitializer).initialize();
+ // should have not migrated any data since there is no peer
+ try (Table table = UTIL.getConnection().getTable(tableNameRule.getTableName())) {
+ assertEquals(0, HBaseTestingUtil.countRows(table));
+ }
+ }
+
+ @Test
+ public void testMigrate() throws Exception {
+ Map<String, Set<String>> encodedName2PeerIds = prepareData();
+ // add all peers so we will migrate them all
+ for (int i = 0; i < nPeers; i++) {
+ // value is not used in this test, so just add a mock
+ peers.put("peer_" + i, mock(ReplicationPeerDescription.class));
+ }
+ for (Future<?> future : manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR)) {
+ future.get(1, TimeUnit.MINUTES);
+ }
+ // should have called initializer
+ verify(queueStorageInitializer).initialize();
+ List<ReplicationQueueData> queueDatas = queueStorage.listAllQueues();
+ // there should be two empty queues so minus 2
+ assertEquals(2 * nServers - 2, queueDatas.size());
+ for (ReplicationQueueData queueData : queueDatas) {
+ assertEquals("peer_0", queueData.getId().getPeerId());
+ assertEquals(1, queueData.getOffsets().size());
+ String walGroup = queueData.getId().getServerWALsBelongTo().toString();
+ ReplicationGroupOffset offset = queueData.getOffsets().get(walGroup);
+ assertEquals(0, offset.getOffset());
+ assertEquals(queueData.getId().getServerWALsBelongTo().toString() + ".0", offset.getWal());
+ }
+ // there is no method in ReplicationQueueStorage can list all the last pushed sequence ids
+ try (Table table = UTIL.getConnection().getTable(tableNameRule.getTableName());
+ ResultScanner scanner =
+ table.getScanner(TableReplicationQueueStorage.LAST_SEQUENCE_ID_FAMILY)) {
+ for (int i = 0; i < 2; i++) {
+ Result result = scanner.next();
+ String peerId = Bytes.toString(result.getRow());
+ assertEquals(nRegions, result.size());
+ for (Cell cell : result.rawCells()) {
+ String encodedRegionName = Bytes.toString(cell.getQualifierArray(),
+ cell.getQualifierOffset(), cell.getQualifierLength());
+ encodedName2PeerIds.get(encodedRegionName).remove(peerId);
+ long seqId =
+ Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+ assertEquals(i + 1, seqId);
+ }
+ }
+ encodedName2PeerIds.forEach((encodedRegionName, peerIds) -> {
+ assertThat(encodedRegionName + " still has unmigrated peers", peerIds, empty());
+ });
+ assertNull(scanner.next());
+ }
+ for (int i = 0; i < nPeers; i++) {
+ List<String> refs = queueStorage.getReplicableHFiles("peer_" + i);
+ assertEquals(i, refs.size());
+ Set<String> refsSet = new HashSet<>(refs);
+ for (int j = 0; j < i; j++) {
+ assertTrue(refsSet.remove("hfile-" + j));
+ }
+ assertThat(refsSet, empty());
+ }
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index b6157ac0f18..27477527277 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -216,7 +216,7 @@ public class TestReplicationBase {
conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
}
- static void restartSourceCluster(int numSlaves) throws Exception {
+ protected static void restartSourceCluster(int numSlaves) throws Exception {
Closeables.close(hbaseAdmin, true);
Closeables.close(htable1, true);
UTIL1.shutdownMiniHBaseCluster();
diff --git a/pom.xml b/pom.xml
index fd8312e3f28..2c3043ea436 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1028,13 +1028,18 @@
<artifactId>hbase-hadoop-compat</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-replication</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-replication</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-balancer</artifactId>