You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/11/13 08:13:31 UTC
[iotdb] 01/01: donnot stop datanode when some regions migrated failed
This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch beyyes/master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 27a9fb4133ffbdf6fe51b403240dc56eaf6a5775
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Sun Nov 13 16:13:14 2022 +0800
donnot stop datanode when some regions migrated failed
---
.../iotdb/confignode/manager/ProcedureManager.java | 2 +-
.../iotdb/confignode/manager/node/NodeManager.java | 11 +-
.../procedure/env/DataNodeRemoveHandler.java | 12 +--
.../impl/node/RemoveDataNodeProcedure.java | 120 ++++++++++++---------
.../impl/statemachine/RegionMigrateProcedure.java | 7 +-
5 files changed, 89 insertions(+), 63 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 4deef73d26..79ecc64828 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -326,7 +326,7 @@ public class ProcedureManager {
.forEach(
tDataNodeLocation -> {
this.executor.submitProcedure(new RemoveDataNodeProcedure(tDataNodeLocation));
- LOGGER.info("Submit to remove data node procedure, {}", tDataNodeLocation);
+ LOGGER.info("Submit RemoveDataNodeProcedure successfully, {}", tDataNodeLocation);
});
return true;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 3db90ba4fc..f71a481a25 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -279,14 +279,14 @@ public class NodeManager {
dataNodeRemoveHandler.checkRemoveDataNodeRequest(removeDataNodePlan);
if (preCheckStatus.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.error(
- "The remove DataNode request check failed. req: {}, check result: {}",
+ "The remove DataNode request check failed. req: {}, check result: {}",
removeDataNodePlan,
preCheckStatus.getStatus());
return preCheckStatus;
}
+ // Do transfer of the DataNodes before remove
DataNodeToStatusResp dataSet = new DataNodeToStatusResp();
- // do transfer of the DataNodes before remove
if (configManager.transfer(removeDataNodePlan.getDataNodeLocations()).getCode()
!= TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
dataSet.setStatus(
@@ -294,7 +294,8 @@ public class NodeManager {
.setMessage("Fail to do transfer of the DataNodes"));
return dataSet;
}
- // if add request to queue, then return to client
+
+ // Add request to queue, then return to client
boolean registerSucceed =
configManager.getProcedureManager().removeDataNode(removeDataNodePlan);
TSStatus status;
@@ -307,7 +308,9 @@ public class NodeManager {
}
dataSet.setStatus(status);
- LOGGER.info("NodeManager finished to remove DataNode {}", removeDataNodePlan);
+ LOGGER.info(
+ "NodeManager submit RemoveDataNodePlan finished, removeDataNodePlan: {}",
+ removeDataNodePlan);
return dataSet;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index 6660c0c080..7ae18943e6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -77,15 +77,15 @@ public class DataNodeRemoveHandler {
/**
* Get all consensus group id in this node
*
- * @param dataNodeLocation data node location
- * @return group id list
+ * @param removedDataNode the DataNode to be removed
+ * @return group id list to be migrated
*/
- public List<TConsensusGroupId> getDataNodeRegionIds(TDataNodeLocation dataNodeLocation) {
+ public List<TConsensusGroupId> getMigratedDataNodeRegions(TDataNodeLocation removedDataNode) {
return configManager.getPartitionManager().getAllReplicaSets().stream()
.filter(
- rg ->
- rg.getDataNodeLocations().contains(dataNodeLocation)
- && rg.regionId.getType() != TConsensusGroupType.ConfigNodeRegion)
+ replicaSet ->
+ replicaSet.getDataNodeLocations().contains(removedDataNode)
+ && replicaSet.regionId.getType() != TConsensusGroupType.ConfigNodeRegion)
.map(TRegionReplicaSet::getRegionId)
.collect(Collectors.toList());
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
index 3380aa3c18..fc432bb966 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
@@ -21,9 +21,11 @@ package org.apache.iotdb.confignode.procedure.impl.node;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.impl.statemachine.RegionMigrateProcedure;
import org.apache.iotdb.confignode.procedure.state.RemoveDataNodeState;
@@ -37,6 +39,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
@@ -45,24 +48,26 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
private static final Logger LOG = LoggerFactory.getLogger(RemoveDataNodeProcedure.class);
private static final int RETRY_THRESHOLD = 5;
- private TDataNodeLocation disableDataNodeLocation;
+ private TDataNodeLocation removedDataNode;
- private List<TConsensusGroupId> execDataNodeRegionIds = new ArrayList<>();
+ private List<TConsensusGroupId> migratedDataNodeRegions = new ArrayList<>();
public RemoveDataNodeProcedure() {
super();
}
- public RemoveDataNodeProcedure(TDataNodeLocation disableDataNodeLocation) {
+ public RemoveDataNodeProcedure(TDataNodeLocation removedDataNode) {
super();
- this.disableDataNodeLocation = disableDataNodeLocation;
+ this.removedDataNode = removedDataNode;
}
@Override
protected Flow executeFromState(ConfigNodeProcedureEnv env, RemoveDataNodeState state) {
- if (disableDataNodeLocation == null) {
+ if (removedDataNode == null) {
return Flow.NO_MORE_STATE;
}
+
+ DataNodeRemoveHandler handler = env.getDataNodeRemoveHandler();
try {
switch (state) {
case REGION_REPLICA_CHECK:
@@ -70,24 +75,24 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
setNextState(RemoveDataNodeState.REMOVE_DATA_NODE_PREPARE);
} else {
LOG.error(
- "{}, Can not remove DataNode {} because the number of DataNodes is less or equal than region replica number",
+ "{}, Can not remove DataNode {} "
+ + "because the number of DataNodes is less or equal than region replica number",
REMOVE_DATANODE_PROCESS,
- disableDataNodeLocation);
+ removedDataNode);
return Flow.NO_MORE_STATE;
}
case REMOVE_DATA_NODE_PREPARE:
// mark the datanode as removing status and broadcast region route map
- env.markDataNodeAsRemovingAndBroadcast(disableDataNodeLocation);
- execDataNodeRegionIds =
- env.getDataNodeRemoveHandler().getDataNodeRegionIds(disableDataNodeLocation);
+ env.markDataNodeAsRemovingAndBroadcast(removedDataNode);
+ migratedDataNodeRegions = handler.getMigratedDataNodeRegions(removedDataNode);
LOG.info(
"{}, DataNode regions to be removed is {}",
REMOVE_DATANODE_PROCESS,
- execDataNodeRegionIds);
+ migratedDataNodeRegions);
setNextState(RemoveDataNodeState.BROADCAST_DISABLE_DATA_NODE);
break;
case BROADCAST_DISABLE_DATA_NODE:
- env.getDataNodeRemoveHandler().broadcastDisableDataNode(disableDataNodeLocation);
+ handler.broadcastDisableDataNode(removedDataNode);
setNextState(RemoveDataNodeState.SUBMIT_REGION_MIGRATE);
break;
case SUBMIT_REGION_MIGRATE:
@@ -95,11 +100,11 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
setNextState(RemoveDataNodeState.STOP_DATA_NODE);
break;
case STOP_DATA_NODE:
- // TODO if region migrate is failed, don't execute STOP_DATA_NODE
- LOG.info(
- "{}, Begin to stop DataNode: {}", REMOVE_DATANODE_PROCESS, disableDataNodeLocation);
- env.getDataNodeRemoveHandler().removeDataNodePersistence(disableDataNodeLocation);
- env.getDataNodeRemoveHandler().stopDataNode(disableDataNodeLocation);
+ if (isAllRegionMigratedSuccessfully(env)) {
+ LOG.info("{}, Begin to stop DataNode: {}", REMOVE_DATANODE_PROCESS, removedDataNode);
+ handler.removeDataNodePersistence(removedDataNode);
+ handler.stopDataNode(removedDataNode);
+ }
return Flow.NO_MORE_STATE;
}
} catch (Exception e) {
@@ -107,10 +112,7 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
setFailure(new ProcedureException("Remove Data Node failed " + state));
} else {
LOG.error(
- "Retrievable error trying to remove data node {}, state {}",
- disableDataNodeLocation,
- state,
- e);
+ "Retrievable error trying to remove data node {}, state {}", removedDataNode, state, e);
if (getCycles() > RETRY_THRESHOLD) {
setFailure(new ProcedureException("State stuck at " + state));
}
@@ -119,6 +121,46 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
return Flow.HAS_MORE_STATE;
}
+ private void submitChildRegionMigrate(ConfigNodeProcedureEnv env) {
+ migratedDataNodeRegions.forEach(
+ regionId -> {
+ TDataNodeLocation destDataNode =
+ env.getDataNodeRemoveHandler().findDestDataNode(regionId);
+ if (destDataNode != null) {
+ RegionMigrateProcedure regionMigrateProcedure =
+ new RegionMigrateProcedure(regionId, removedDataNode, destDataNode);
+ addChildProcedure(regionMigrateProcedure);
+ LOG.info("Submit child procedure {} for regionId {}", regionMigrateProcedure, regionId);
+ } else {
+ LOG.error(
+ "{}, Cannot find target DataNode to migrate the region: {}",
+ REMOVE_DATANODE_PROCESS,
+ regionId);
+ // TODO terminate all the uncompleted remove datanode process
+ }
+ });
+ }
+
+ private boolean isAllRegionMigratedSuccessfully(ConfigNodeProcedureEnv env) {
+ List<TRegionReplicaSet> replicaSets =
+ env.getConfigManager().getPartitionManager().getAllReplicaSets();
+
+ List<TConsensusGroupId> migratedFailedRegions =
+ replicaSets.stream()
+ .filter(replica -> replica.getDataNodeLocations().contains(removedDataNode))
+ .map(TRegionReplicaSet::getRegionId)
+ .collect(Collectors.toList());
+ if (migratedFailedRegions.size() > 0) {
+ LOG.warn(
+ "{}, Some regions are migrated failed, the StopDataNode process should not be executed, migratedFailedRegions: {}",
+ REMOVE_DATANODE_PROCESS,
+ migratedFailedRegions);
+ return false;
+ }
+
+ return true;
+ }
+
@Override
protected void rollbackState(ConfigNodeProcedureEnv env, RemoveDataNodeState state)
throws IOException, InterruptedException, ProcedureException {}
@@ -157,9 +199,9 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
public void serialize(DataOutputStream stream) throws IOException {
stream.writeShort(ProcedureType.REMOVE_DATA_NODE_PROCEDURE.getTypeCode());
super.serialize(stream);
- ThriftCommonsSerDeUtils.serializeTDataNodeLocation(disableDataNodeLocation, stream);
- stream.writeInt(execDataNodeRegionIds.size());
- execDataNodeRegionIds.forEach(
+ ThriftCommonsSerDeUtils.serializeTDataNodeLocation(removedDataNode, stream);
+ stream.writeInt(migratedDataNodeRegions.size());
+ migratedDataNodeRegions.forEach(
tid -> ThriftCommonsSerDeUtils.serializeTConsensusGroupId(tid, stream));
}
@@ -167,11 +209,12 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
public void deserialize(ByteBuffer byteBuffer) {
super.deserialize(byteBuffer);
try {
- disableDataNodeLocation = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer);
+ removedDataNode = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer);
int regionSize = byteBuffer.getInt();
- execDataNodeRegionIds = new ArrayList<>(regionSize);
+ migratedDataNodeRegions = new ArrayList<>(regionSize);
for (int i = 0; i < regionSize; i++) {
- execDataNodeRegionIds.add(ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer));
+ migratedDataNodeRegions.add(
+ ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer));
}
} catch (ThriftSerDeException e) {
LOG.error("Error in deserialize RemoveConfigNodeProcedure", e);
@@ -184,28 +227,9 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
RemoveDataNodeProcedure thatProc = (RemoveDataNodeProcedure) that;
return thatProc.getProcId() == this.getProcId()
&& thatProc.getState() == this.getState()
- && thatProc.disableDataNodeLocation.equals(this.disableDataNodeLocation);
+ && thatProc.removedDataNode.equals(this.removedDataNode)
+ && thatProc.migratedDataNodeRegions.equals(this.migratedDataNodeRegions);
}
return false;
}
-
- private void submitChildRegionMigrate(ConfigNodeProcedureEnv env) {
- execDataNodeRegionIds.forEach(
- regionId -> {
- TDataNodeLocation destDataNode =
- env.getDataNodeRemoveHandler().findDestDataNode(regionId);
- if (destDataNode != null) {
- RegionMigrateProcedure regionMigrateProcedure =
- new RegionMigrateProcedure(regionId, disableDataNodeLocation, destDataNode);
- addChildProcedure(regionMigrateProcedure);
- LOG.info("Submit child procedure {} for regionId {}", regionMigrateProcedure, regionId);
- } else {
- LOG.error(
- "{}, Cannot find target DataNode to remove the region: {}",
- REMOVE_DATANODE_PROCESS,
- regionId);
- // TODO terminate all the uncompleted remove datanode process
- }
- });
- }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
index f15ed67489..07a498c7a1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
@@ -98,7 +98,7 @@ public class RegionMigrateProcedure
if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) {
waitForOneMigrationStepFinished(consensusGroupId, state);
} else {
- throw new ProcedureException("Failed to add region peer");
+ throw new ProcedureException("ADD_REGION_PEER executed failed in DataNode");
}
setNextState(RegionTransitionState.CHANGE_REGION_LEADER);
break;
@@ -111,7 +111,7 @@ public class RegionMigrateProcedure
if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) {
waitForOneMigrationStepFinished(consensusGroupId, state);
} else {
- throw new ProcedureException("Failed to remove region peer");
+ throw new ProcedureException("REMOVE_REGION_PEER executed failed in DataNode");
}
setNextState(RegionTransitionState.DELETE_OLD_REGION_PEER);
break;
@@ -150,8 +150,7 @@ public class RegionMigrateProcedure
"Procedure retried failed exceed 5 times, state stuck at " + state));
}
- // meets exception in region migrate process
- // terminate the process
+ // meets exception in region migrate process terminate the process
return Flow.NO_MORE_STATE;
}
}