You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/11/07 06:47:05 UTC
[iotdb] branch master updated: [IOTDB-4857] Fix the problem when remove-datanode for ratis 1 replica (#7917)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1b1c57b6d8 [IOTDB-4857] Fix the problem when remove-datanode for ratis 1 replica (#7917)
1b1c57b6d8 is described below
commit 1b1c57b6d8160b2b3e44a6d55abd845e18e96dc8
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Mon Nov 7 14:47:00 2022 +0800
[IOTDB-4857] Fix the problem when remove-datanode for ratis 1 replica (#7917)
---
.../confignode/persistence/node/NodeInfo.java | 5 +-
.../persistence/partition/PartitionInfo.java | 9 +--
.../partition/StorageGroupPartitionTable.java | 19 ++++--
.../procedure/env/DataNodeRemoveHandler.java | 74 ++++++++++------------
.../impl/node/RemoveDataNodeProcedure.java | 9 ++-
.../impl/statemachine/RegionMigrateProcedure.java | 9 +--
.../iotdb/db/service/RegionMigrateService.java | 31 ++++++---
7 files changed, 94 insertions(+), 62 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
index d03fffa7f9..2526abbad6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
@@ -168,13 +168,14 @@ public class NodeInfo implements SnapshotProcessor {
"{}, There are {} data node in cluster before executed remove-datanode.sh",
REMOVE_DATANODE_PROCESS,
registeredDataNodes.size());
+
+ dataNodeInfoReadWriteLock.writeLock().lock();
try {
- dataNodeInfoReadWriteLock.writeLock().lock();
req.getDataNodeLocations()
.forEach(
removeDataNodes -> {
registeredDataNodes.remove(removeDataNodes.getDataNodeId());
- LOGGER.info("removed the datanode {} from cluster", removeDataNodes);
+ LOGGER.info("Removed the datanode {} from cluster", removeDataNodes);
});
} finally {
dataNodeInfoReadWriteLock.writeLock().unlock();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index 812393b0a2..84115faae2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -470,7 +470,7 @@ public class PartitionInfo implements SnapshotProcessor {
}
/**
- * update a region location
+ * Update the location info of given regionId
*
* @param req UpdateRegionLocationReq
* @return TSStatus
@@ -480,9 +480,10 @@ public class PartitionInfo implements SnapshotProcessor {
TConsensusGroupId regionId = req.getRegionId();
TDataNodeLocation oldNode = req.getOldNode();
TDataNodeLocation newNode = req.getNewNode();
- storageGroupPartitionTables
- .values()
- .forEach(s -> s.updateRegionLocation(regionId, oldNode, newNode));
+ storageGroupPartitionTables.values().stream()
+ .filter(sgPartitionTable -> sgPartitionTable.containRegion(regionId))
+ .forEach(
+ sgPartitionTable -> sgPartitionTable.updateRegionLocation(regionId, oldNode, newNode));
return status;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
index 5ffd0881f5..b763fded02 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
@@ -432,11 +432,18 @@ public class StorageGroupPartitionTable {
private void addRegionNewLocation(TConsensusGroupId regionId, TDataNodeLocation node) {
RegionGroup regionGroup = regionGroupMap.get(regionId);
if (regionGroup == null) {
- LOGGER.warn("not find Region Group for region {}", regionId);
+ LOGGER.warn(
+ "Cannot find RegionGroup for region {} when addRegionNewLocation in {}",
+ regionId,
+ storageGroupName);
return;
}
if (regionGroup.getReplicaSet().getDataNodeLocations().contains(node)) {
- LOGGER.info("Node is already in region locations, node: {}, region: {}", node, regionId);
+ LOGGER.info(
+ "Node is already in region locations when addRegionNewLocation in {}, node: {}, region: {}",
+ storageGroupName,
+ node,
+ regionId);
return;
}
regionGroup.getReplicaSet().getDataNodeLocations().add(node);
@@ -445,12 +452,16 @@ public class StorageGroupPartitionTable {
private void removeRegionOldLocation(TConsensusGroupId regionId, TDataNodeLocation node) {
RegionGroup regionGroup = regionGroupMap.get(regionId);
if (regionGroup == null) {
- LOGGER.warn("not find Region Group for region {}", regionId);
+ LOGGER.warn(
+ "Cannot find RegionGroup for region {} when removeRegionOldLocation in {}",
+ regionId,
+ storageGroupName);
return;
}
if (!regionGroup.getReplicaSet().getDataNodeLocations().contains(node)) {
LOGGER.info(
- "Node is Not in region locations, no need to remove it, node: {}, region: {}",
+ "Node is not in region locations when removeRegionOldLocation in {}, no need to remove it, node: {}, region: {}",
+ storageGroupName,
node,
regionId);
return;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index b4d4394dba..2e29166188 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -138,11 +138,11 @@ public class DataNodeRemoveHandler {
*/
public TDataNodeLocation findDestDataNode(TConsensusGroupId regionId) {
TSStatus status;
- List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
+ List<TDataNodeLocation> regionReplicaNodes = findRegionLocations(regionId);
if (regionReplicaNodes.isEmpty()) {
- LOGGER.warn("Not find region replica nodes, region: {}", regionId);
+ LOGGER.warn("Cannot find region replica nodes, region: {}", regionId);
status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- status.setMessage("Not find region replica nodes, region: " + regionId);
+ status.setMessage("Cannot find region replica nodes, region: " + regionId);
return null;
}
@@ -167,10 +167,10 @@ public class DataNodeRemoveHandler {
*/
public TSStatus createNewRegionPeer(TConsensusGroupId regionId, TDataNodeLocation destDataNode) {
TSStatus status;
- List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
+ List<TDataNodeLocation> regionReplicaNodes = findRegionLocations(regionId);
if (regionReplicaNodes.isEmpty()) {
LOGGER.warn(
- "{}, Not find region replica nodes in createPeer, regionId: {}",
+ "{}, Cannot find region replica nodes in createPeer, regionId: {}",
REMOVE_DATANODE_PROCESS,
regionId);
status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
@@ -249,11 +249,11 @@ public class DataNodeRemoveHandler {
maintainPeerReq,
DataNodeRequestType.ADD_REGION_PEER);
LOGGER.info(
- "{}, Send action addRegionPeer finished, regionId: {}, rpcDataNode: {}, destDataNode: {}",
+ "{}, Send action addRegionPeer finished, regionId: {}, rpcDataNode: {}, destDataNode: {}",
REMOVE_DATANODE_PROCESS,
regionId,
getIdWithRpcEndpoint(selectedDataNode.get()),
- destDataNode);
+ getIdWithRpcEndpoint(destDataNode));
return status;
}
@@ -354,41 +354,40 @@ public class DataNodeRemoveHandler {
TDataNodeLocation originalDataNode,
TDataNodeLocation destDataNode) {
LOGGER.info(
- "Start to update region {} location from {} to {} when it migrate succeed",
+ "Start to updateRegionLocationCache {} location from {} to {} when it migrate succeed",
regionId,
- originalDataNode.getInternalEndPoint().getIp(),
- destDataNode.getInternalEndPoint().getIp());
+ getIdWithRpcEndpoint(originalDataNode),
+ getIdWithRpcEndpoint(destDataNode));
UpdateRegionLocationPlan req =
new UpdateRegionLocationPlan(regionId, originalDataNode, destDataNode);
TSStatus status = configManager.getPartitionManager().updateRegionLocation(req);
LOGGER.info(
- "Update region {} location finished, result:{}, old:{}, new:{}",
+ "UpdateRegionLocationCache finished, region:{}, result:{}, old:{}, new:{}",
regionId,
status,
- originalDataNode.getInternalEndPoint().getIp(),
- destDataNode.getInternalEndPoint().getIp());
+ getIdWithRpcEndpoint(originalDataNode),
+ getIdWithRpcEndpoint(destDataNode));
+
// Broadcast the latest RegionRouteMap when Region migration finished
configManager.getLoadManager().broadcastLatestRegionRouteMap();
}
/**
- * Find region replication Nodes
+ * Find all DataNodes which contains the given regionId
*
* @param regionId region id
- * @return data node location
+ * @return DataNode locations
*/
- public List<TDataNodeLocation> findRegionReplicaNodes(TConsensusGroupId regionId) {
- // Through consensus?
- List<TRegionReplicaSet> regionReplicaSets =
+ public List<TDataNodeLocation> findRegionLocations(TConsensusGroupId regionId) {
+ Optional<TRegionReplicaSet> regionReplicaSet =
configManager.getPartitionManager().getAllReplicaSets().stream()
.filter(rg -> rg.regionId.equals(regionId))
- .collect(Collectors.toList());
- if (regionReplicaSets.isEmpty()) {
- LOGGER.warn("not find TRegionReplica for region: {}", regionId);
- return Collections.emptyList();
+ .findAny();
+ if (regionReplicaSet.isPresent()) {
+ return regionReplicaSet.get().getDataNodeLocations();
}
- return regionReplicaSets.get(0).getDataNodeLocations();
+ return Collections.emptyList();
}
private Optional<TDataNodeLocation> pickNewReplicaNodeForRegion(
@@ -413,7 +412,7 @@ public class DataNodeRemoveHandler {
* @param dataNode old data node
*/
public void stopDataNode(TDataNodeLocation dataNode) {
- LOGGER.info("{}, Begin to stop Data Node {}", REMOVE_DATANODE_PROCESS, dataNode);
+ LOGGER.info("{}, Begin to stop DataNode {}", REMOVE_DATANODE_PROCESS, dataNode);
AsyncDataNodeClientPool.getInstance().resetClient(dataNode.getInternalEndPoint());
TSStatus status =
SyncDataNodeClientPool.getInstance()
@@ -572,31 +571,28 @@ public class DataNodeRemoveHandler {
*/
private Optional<TDataNodeLocation> filterDataNodeWithOtherRegionReplica(
TConsensusGroupId regionId, TDataNodeLocation filterLocation) {
- List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
- if (regionReplicaNodes.isEmpty()) {
- LOGGER.warn("Not find region replica nodes, region: {}", regionId);
+ List<TDataNodeLocation> regionLocations = findRegionLocations(regionId);
+ if (regionLocations.isEmpty()) {
+ LOGGER.warn("Cannot find DataNodes contain the given region: {}", regionId);
return Optional.empty();
}
+ // Choosing the RUNNING DataNodes to execute firstly
+ // If all DataNodes are not RUNNING, then choose the REMOVING DataNodes secondly
List<TDataNodeLocation> aliveDataNodes =
configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream()
.map(TDataNodeConfiguration::getLocation)
.collect(Collectors.toList());
- // filter the RUNNING datanode firstly
- // if all the datanodes are not in RUNNING status, choose the REMOVING datanode
- // because REMOVING datanode is also alive, it can execute rpc request
- if (aliveDataNodes.isEmpty()) {
- aliveDataNodes =
- configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Removing).stream()
- .map(TDataNodeConfiguration::getLocation)
- .collect(Collectors.toList());
- }
+ aliveDataNodes.addAll(
+ configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Removing).stream()
+ .map(TDataNodeConfiguration::getLocation)
+ .collect(Collectors.toList()));
// TODO return the node which has lowest load.
- for (TDataNodeLocation regionReplicaNode : regionReplicaNodes) {
- if (aliveDataNodes.contains(regionReplicaNode) && !regionReplicaNode.equals(filterLocation)) {
- return Optional.of(regionReplicaNode);
+ for (TDataNodeLocation aliveDataNode : aliveDataNodes) {
+ if (regionLocations.contains(aliveDataNode) && !aliveDataNode.equals(filterLocation)) {
+ return Optional.of(aliveDataNode);
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
index 235a6a7376..6b7272c36f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
@@ -95,6 +95,7 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
setNextState(RemoveDataNodeState.STOP_DATA_NODE);
break;
case STOP_DATA_NODE:
+ // TODO if region migrate is failed, don't execute STOP_DATA_NODE
env.getDataNodeRemoveHandler().removeDataNodePersistence(disableDataNodeLocation);
env.getDataNodeRemoveHandler().stopDataNode(disableDataNodeLocation);
return Flow.NO_MORE_STATE;
@@ -195,7 +196,13 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
RegionMigrateProcedure regionMigrateProcedure =
new RegionMigrateProcedure(regionId, disableDataNodeLocation, destDataNode);
addChildProcedure(regionMigrateProcedure);
- LOG.info("Submit child procedure, {}", regionMigrateProcedure);
+ LOG.info("Submit child procedure {} for regionId {}", regionMigrateProcedure, regionId);
+ } else {
+ LOG.error(
+ "{}, Cannot find target DataNode to remove the region: {}",
+ REMOVE_DATANODE_PROCESS,
+ regionId);
+ // TODO terminate all the uncompleted remove datanode process
}
});
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
index 5a33553988..27e0d120b0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
@@ -40,6 +40,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
+import static org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler.getIdWithRpcEndpoint;
import static org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS;
/** region migrate procedure */
@@ -142,10 +143,10 @@ public class RegionMigrateProcedure
setFailure(new ProcedureException("Region migrate failed at state: " + state));
} else {
LOG.error(
- "{}, Failed state is not support rollback, failed state {}, originalDataNode: {}",
+ "{}, Failed state [{}] is not support rollback, originalDataNode: {}",
REMOVE_DATANODE_PROCESS,
state,
- originalDataNode);
+ getIdWithRpcEndpoint(originalDataNode));
if (getCycles() > RETRY_THRESHOLD) {
setFailure(
new ProcedureException(
@@ -283,7 +284,7 @@ public class RegionMigrateProcedure
public void notifyTheRegionMigrateFinished(TRegionMigrateResultReportReq req) {
LOG.info(
- "{}, ConfigNode received DataNode reported region migrate result: {}",
+ "{}, ConfigNode received region migrate result reported by DataNode: {}",
REMOVE_DATANODE_PROCESS,
req);
@@ -293,7 +294,7 @@ public class RegionMigrateProcedure
// migrate failed
if (migrateStatus.getCode() != SUCCESS_STATUS.getStatusCode()) {
LOG.info(
- "{}, Region migrate executed failed in DataNode, migrateStatus: {}",
+ "{}, Region migrate failed in DataNode, migrateStatus: {}",
REMOVE_DATANODE_PROCESS,
migrateStatus);
migrateSuccess = false;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
index a1f73fdb14..651ed99ac3 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
@@ -52,7 +52,9 @@ import java.util.Map;
public class RegionMigrateService implements IService {
private static final Logger LOGGER = LoggerFactory.getLogger(RegionMigrateService.class);
- private static final int RETRY = 5;
+ public static final String REMOVE_DATANODE_PROCESS = "[REMOVE_DATANODE_PROCESS]";
+
+ private static final int MAX_RETRY_NUM = 5;
private static final int SLEEP_MILLIS = 5000;
@@ -172,7 +174,7 @@ public class RegionMigrateService implements IService {
@Override
public void start() {
if (this.pool != null) {
- poolLogger.info("Data Node region migrate pool start");
+ poolLogger.info("DataNode region migrate pool start");
}
}
@@ -266,7 +268,7 @@ public class RegionMigrateService implements IService {
TEndPoint newPeerNode = getConsensusEndPoint(selectedDataNode, regionId);
taskLogger.info("Start to add peer {} for region {}", newPeerNode, tRegionId);
boolean addPeerSucceed = true;
- for (int i = 0; i < RETRY; i++) {
+ for (int i = 0; i < MAX_RETRY_NUM; i++) {
try {
if (!addPeerSucceed) {
Thread.sleep(SLEEP_MILLIS);
@@ -278,7 +280,12 @@ public class RegionMigrateService implements IService {
} catch (Throwable e) {
addPeerSucceed = false;
taskLogger.error(
- "Add new peer {} for region {} error, retry times: {}", newPeerNode, regionId, i, e);
+ "{}, Add new peer {} for region {} error, retry times: {}",
+ REMOVE_DATANODE_PROCESS,
+ newPeerNode,
+ regionId,
+ i,
+ e);
status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
status.setMessage(
String.format(
@@ -291,7 +298,11 @@ public class RegionMigrateService implements IService {
}
if (!addPeerSucceed || resp == null || !resp.isSuccess()) {
taskLogger.error(
- "Add new peer {} for region {} failed, resp: {}", newPeerNode, regionId, resp);
+ "{}, Add new peer {} for region {} failed, resp: {}",
+ REMOVE_DATANODE_PROCESS,
+ newPeerNode,
+ regionId,
+ resp);
status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
status.setMessage(
String.format(
@@ -300,7 +311,11 @@ public class RegionMigrateService implements IService {
return status;
}
- taskLogger.info("Succeed to add peer {} for region {}", newPeerNode, regionId);
+ taskLogger.info(
+ "{}, Succeed to add peer {} for region {}",
+ REMOVE_DATANODE_PROCESS,
+ newPeerNode,
+ regionId);
status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
status.setMessage("add peer " + newPeerNode + " for region " + regionId + " succeed");
return status;
@@ -375,7 +390,7 @@ public class RegionMigrateService implements IService {
taskLogger.info("Start to remove peer {} for region {}", oldPeerNode, regionId);
ConsensusGenericResponse resp = null;
boolean removePeerSucceed = true;
- for (int i = 0; i < RETRY; i++) {
+ for (int i = 0; i < MAX_RETRY_NUM; i++) {
try {
if (!removePeerSucceed) {
Thread.sleep(SLEEP_MILLIS);
@@ -387,7 +402,7 @@ public class RegionMigrateService implements IService {
} catch (Throwable e) {
removePeerSucceed = false;
taskLogger.error(
- "remove peer {} for region {} error, retry times: {}", oldPeerNode, regionId, i, e);
+ "Remove peer {} for region {} error, retry times: {}", oldPeerNode, regionId, i, e);
status.setCode(TSStatusCode.REGION_MIGRATE_FAILED.getStatusCode());
status.setMessage(
"remove peer: "