You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/11/13 08:13:30 UTC
[iotdb] branch beyyes/master created (now 27a9fb4133)
This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a change to branch beyyes/master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
at 27a9fb4133 donnot stop datanode when some regions migrated failed
This branch includes the following new commits:
new 27a9fb4133 donnot stop datanode when some regions migrated failed
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[iotdb] 01/01: donnot stop datanode when some regions migrated failed
Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch beyyes/master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 27a9fb4133ffbdf6fe51b403240dc56eaf6a5775
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Sun Nov 13 16:13:14 2022 +0800
donnot stop datanode when some regions migrated failed
---
.../iotdb/confignode/manager/ProcedureManager.java | 2 +-
.../iotdb/confignode/manager/node/NodeManager.java | 11 +-
.../procedure/env/DataNodeRemoveHandler.java | 12 +--
.../impl/node/RemoveDataNodeProcedure.java | 120 ++++++++++++---------
.../impl/statemachine/RegionMigrateProcedure.java | 7 +-
5 files changed, 89 insertions(+), 63 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 4deef73d26..79ecc64828 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -326,7 +326,7 @@ public class ProcedureManager {
.forEach(
tDataNodeLocation -> {
this.executor.submitProcedure(new RemoveDataNodeProcedure(tDataNodeLocation));
- LOGGER.info("Submit to remove data node procedure, {}", tDataNodeLocation);
+ LOGGER.info("Submit RemoveDataNodeProcedure successfully, {}", tDataNodeLocation);
});
return true;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 3db90ba4fc..f71a481a25 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -279,14 +279,14 @@ public class NodeManager {
dataNodeRemoveHandler.checkRemoveDataNodeRequest(removeDataNodePlan);
if (preCheckStatus.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.error(
- "The remove DataNode request check failed. req: {}, check result: {}",
+ "The remove DataNode request check failed. req: {}, check result: {}",
removeDataNodePlan,
preCheckStatus.getStatus());
return preCheckStatus;
}
+ // Do transfer of the DataNodes before remove
DataNodeToStatusResp dataSet = new DataNodeToStatusResp();
- // do transfer of the DataNodes before remove
if (configManager.transfer(removeDataNodePlan.getDataNodeLocations()).getCode()
!= TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
dataSet.setStatus(
@@ -294,7 +294,8 @@ public class NodeManager {
.setMessage("Fail to do transfer of the DataNodes"));
return dataSet;
}
- // if add request to queue, then return to client
+
+ // Add request to queue, then return to client
boolean registerSucceed =
configManager.getProcedureManager().removeDataNode(removeDataNodePlan);
TSStatus status;
@@ -307,7 +308,9 @@ public class NodeManager {
}
dataSet.setStatus(status);
- LOGGER.info("NodeManager finished to remove DataNode {}", removeDataNodePlan);
+ LOGGER.info(
+ "NodeManager submit RemoveDataNodePlan finished, removeDataNodePlan: {}",
+ removeDataNodePlan);
return dataSet;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index 6660c0c080..7ae18943e6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -77,15 +77,15 @@ public class DataNodeRemoveHandler {
/**
* Get all consensus group id in this node
*
- * @param dataNodeLocation data node location
- * @return group id list
+ * @param removedDataNode the DataNode to be removed
+ * @return group id list to be migrated
*/
- public List<TConsensusGroupId> getDataNodeRegionIds(TDataNodeLocation dataNodeLocation) {
+ public List<TConsensusGroupId> getMigratedDataNodeRegions(TDataNodeLocation removedDataNode) {
return configManager.getPartitionManager().getAllReplicaSets().stream()
.filter(
- rg ->
- rg.getDataNodeLocations().contains(dataNodeLocation)
- && rg.regionId.getType() != TConsensusGroupType.ConfigNodeRegion)
+ replicaSet ->
+ replicaSet.getDataNodeLocations().contains(removedDataNode)
+ && replicaSet.regionId.getType() != TConsensusGroupType.ConfigNodeRegion)
.map(TRegionReplicaSet::getRegionId)
.collect(Collectors.toList());
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
index 3380aa3c18..fc432bb966 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
@@ -21,9 +21,11 @@ package org.apache.iotdb.confignode.procedure.impl.node;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.impl.statemachine.RegionMigrateProcedure;
import org.apache.iotdb.confignode.procedure.state.RemoveDataNodeState;
@@ -37,6 +39,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
@@ -45,24 +48,26 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
private static final Logger LOG = LoggerFactory.getLogger(RemoveDataNodeProcedure.class);
private static final int RETRY_THRESHOLD = 5;
- private TDataNodeLocation disableDataNodeLocation;
+ private TDataNodeLocation removedDataNode;
- private List<TConsensusGroupId> execDataNodeRegionIds = new ArrayList<>();
+ private List<TConsensusGroupId> migratedDataNodeRegions = new ArrayList<>();
public RemoveDataNodeProcedure() {
super();
}
- public RemoveDataNodeProcedure(TDataNodeLocation disableDataNodeLocation) {
+ public RemoveDataNodeProcedure(TDataNodeLocation removedDataNode) {
super();
- this.disableDataNodeLocation = disableDataNodeLocation;
+ this.removedDataNode = removedDataNode;
}
@Override
protected Flow executeFromState(ConfigNodeProcedureEnv env, RemoveDataNodeState state) {
- if (disableDataNodeLocation == null) {
+ if (removedDataNode == null) {
return Flow.NO_MORE_STATE;
}
+
+ DataNodeRemoveHandler handler = env.getDataNodeRemoveHandler();
try {
switch (state) {
case REGION_REPLICA_CHECK:
@@ -70,24 +75,24 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
setNextState(RemoveDataNodeState.REMOVE_DATA_NODE_PREPARE);
} else {
LOG.error(
- "{}, Can not remove DataNode {} because the number of DataNodes is less or equal than region replica number",
+ "{}, Can not remove DataNode {} "
+ + "because the number of DataNodes is less or equal than region replica number",
REMOVE_DATANODE_PROCESS,
- disableDataNodeLocation);
+ removedDataNode);
return Flow.NO_MORE_STATE;
}
case REMOVE_DATA_NODE_PREPARE:
// mark the datanode as removing status and broadcast region route map
- env.markDataNodeAsRemovingAndBroadcast(disableDataNodeLocation);
- execDataNodeRegionIds =
- env.getDataNodeRemoveHandler().getDataNodeRegionIds(disableDataNodeLocation);
+ env.markDataNodeAsRemovingAndBroadcast(removedDataNode);
+ migratedDataNodeRegions = handler.getMigratedDataNodeRegions(removedDataNode);
LOG.info(
"{}, DataNode regions to be removed is {}",
REMOVE_DATANODE_PROCESS,
- execDataNodeRegionIds);
+ migratedDataNodeRegions);
setNextState(RemoveDataNodeState.BROADCAST_DISABLE_DATA_NODE);
break;
case BROADCAST_DISABLE_DATA_NODE:
- env.getDataNodeRemoveHandler().broadcastDisableDataNode(disableDataNodeLocation);
+ handler.broadcastDisableDataNode(removedDataNode);
setNextState(RemoveDataNodeState.SUBMIT_REGION_MIGRATE);
break;
case SUBMIT_REGION_MIGRATE:
@@ -95,11 +100,11 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
setNextState(RemoveDataNodeState.STOP_DATA_NODE);
break;
case STOP_DATA_NODE:
- // TODO if region migrate is failed, don't execute STOP_DATA_NODE
- LOG.info(
- "{}, Begin to stop DataNode: {}", REMOVE_DATANODE_PROCESS, disableDataNodeLocation);
- env.getDataNodeRemoveHandler().removeDataNodePersistence(disableDataNodeLocation);
- env.getDataNodeRemoveHandler().stopDataNode(disableDataNodeLocation);
+ if (isAllRegionMigratedSuccessfully(env)) {
+ LOG.info("{}, Begin to stop DataNode: {}", REMOVE_DATANODE_PROCESS, removedDataNode);
+ handler.removeDataNodePersistence(removedDataNode);
+ handler.stopDataNode(removedDataNode);
+ }
return Flow.NO_MORE_STATE;
}
} catch (Exception e) {
@@ -107,10 +112,7 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
setFailure(new ProcedureException("Remove Data Node failed " + state));
} else {
LOG.error(
- "Retrievable error trying to remove data node {}, state {}",
- disableDataNodeLocation,
- state,
- e);
+ "Retrievable error trying to remove data node {}, state {}", removedDataNode, state, e);
if (getCycles() > RETRY_THRESHOLD) {
setFailure(new ProcedureException("State stuck at " + state));
}
@@ -119,6 +121,46 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
return Flow.HAS_MORE_STATE;
}
+ private void submitChildRegionMigrate(ConfigNodeProcedureEnv env) {
+ migratedDataNodeRegions.forEach(
+ regionId -> {
+ TDataNodeLocation destDataNode =
+ env.getDataNodeRemoveHandler().findDestDataNode(regionId);
+ if (destDataNode != null) {
+ RegionMigrateProcedure regionMigrateProcedure =
+ new RegionMigrateProcedure(regionId, removedDataNode, destDataNode);
+ addChildProcedure(regionMigrateProcedure);
+ LOG.info("Submit child procedure {} for regionId {}", regionMigrateProcedure, regionId);
+ } else {
+ LOG.error(
+ "{}, Cannot find target DataNode to migrate the region: {}",
+ REMOVE_DATANODE_PROCESS,
+ regionId);
+ // TODO terminate all the uncompleted remove datanode process
+ }
+ });
+ }
+
+ private boolean isAllRegionMigratedSuccessfully(ConfigNodeProcedureEnv env) {
+ List<TRegionReplicaSet> replicaSets =
+ env.getConfigManager().getPartitionManager().getAllReplicaSets();
+
+ List<TConsensusGroupId> migratedFailedRegions =
+ replicaSets.stream()
+ .filter(replica -> replica.getDataNodeLocations().contains(removedDataNode))
+ .map(TRegionReplicaSet::getRegionId)
+ .collect(Collectors.toList());
+ if (migratedFailedRegions.size() > 0) {
+ LOG.warn(
+ "{}, Some regions are migrated failed, the StopDataNode process should not be executed, migratedFailedRegions: {}",
+ REMOVE_DATANODE_PROCESS,
+ migratedFailedRegions);
+ return false;
+ }
+
+ return true;
+ }
+
@Override
protected void rollbackState(ConfigNodeProcedureEnv env, RemoveDataNodeState state)
throws IOException, InterruptedException, ProcedureException {}
@@ -157,9 +199,9 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
public void serialize(DataOutputStream stream) throws IOException {
stream.writeShort(ProcedureType.REMOVE_DATA_NODE_PROCEDURE.getTypeCode());
super.serialize(stream);
- ThriftCommonsSerDeUtils.serializeTDataNodeLocation(disableDataNodeLocation, stream);
- stream.writeInt(execDataNodeRegionIds.size());
- execDataNodeRegionIds.forEach(
+ ThriftCommonsSerDeUtils.serializeTDataNodeLocation(removedDataNode, stream);
+ stream.writeInt(migratedDataNodeRegions.size());
+ migratedDataNodeRegions.forEach(
tid -> ThriftCommonsSerDeUtils.serializeTConsensusGroupId(tid, stream));
}
@@ -167,11 +209,12 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
public void deserialize(ByteBuffer byteBuffer) {
super.deserialize(byteBuffer);
try {
- disableDataNodeLocation = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer);
+ removedDataNode = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer);
int regionSize = byteBuffer.getInt();
- execDataNodeRegionIds = new ArrayList<>(regionSize);
+ migratedDataNodeRegions = new ArrayList<>(regionSize);
for (int i = 0; i < regionSize; i++) {
- execDataNodeRegionIds.add(ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer));
+ migratedDataNodeRegions.add(
+ ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer));
}
} catch (ThriftSerDeException e) {
LOG.error("Error in deserialize RemoveConfigNodeProcedure", e);
@@ -184,28 +227,9 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
RemoveDataNodeProcedure thatProc = (RemoveDataNodeProcedure) that;
return thatProc.getProcId() == this.getProcId()
&& thatProc.getState() == this.getState()
- && thatProc.disableDataNodeLocation.equals(this.disableDataNodeLocation);
+ && thatProc.removedDataNode.equals(this.removedDataNode)
+ && thatProc.migratedDataNodeRegions.equals(this.migratedDataNodeRegions);
}
return false;
}
-
- private void submitChildRegionMigrate(ConfigNodeProcedureEnv env) {
- execDataNodeRegionIds.forEach(
- regionId -> {
- TDataNodeLocation destDataNode =
- env.getDataNodeRemoveHandler().findDestDataNode(regionId);
- if (destDataNode != null) {
- RegionMigrateProcedure regionMigrateProcedure =
- new RegionMigrateProcedure(regionId, disableDataNodeLocation, destDataNode);
- addChildProcedure(regionMigrateProcedure);
- LOG.info("Submit child procedure {} for regionId {}", regionMigrateProcedure, regionId);
- } else {
- LOG.error(
- "{}, Cannot find target DataNode to remove the region: {}",
- REMOVE_DATANODE_PROCESS,
- regionId);
- // TODO terminate all the uncompleted remove datanode process
- }
- });
- }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
index f15ed67489..07a498c7a1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
@@ -98,7 +98,7 @@ public class RegionMigrateProcedure
if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) {
waitForOneMigrationStepFinished(consensusGroupId, state);
} else {
- throw new ProcedureException("Failed to add region peer");
+ throw new ProcedureException("ADD_REGION_PEER executed failed in DataNode");
}
setNextState(RegionTransitionState.CHANGE_REGION_LEADER);
break;
@@ -111,7 +111,7 @@ public class RegionMigrateProcedure
if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) {
waitForOneMigrationStepFinished(consensusGroupId, state);
} else {
- throw new ProcedureException("Failed to remove region peer");
+ throw new ProcedureException("REMOVE_REGION_PEER executed failed in DataNode");
}
setNextState(RegionTransitionState.DELETE_OLD_REGION_PEER);
break;
@@ -150,8 +150,7 @@ public class RegionMigrateProcedure
"Procedure retried failed exceed 5 times, state stuck at " + state));
}
- // meets exception in region migrate process
- // terminate the process
+ // meets exception in region migrate process terminate the process
return Flow.NO_MORE_STATE;
}
}