You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/11/13 08:13:31 UTC

[iotdb] 01/01: donnot stop datanode when some regions migrated failed

This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/master
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 27a9fb4133ffbdf6fe51b403240dc56eaf6a5775
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Sun Nov 13 16:13:14 2022 +0800

    donnot stop datanode when some regions migrated failed
---
 .../iotdb/confignode/manager/ProcedureManager.java |   2 +-
 .../iotdb/confignode/manager/node/NodeManager.java |  11 +-
 .../procedure/env/DataNodeRemoveHandler.java       |  12 +--
 .../impl/node/RemoveDataNodeProcedure.java         | 120 ++++++++++++---------
 .../impl/statemachine/RegionMigrateProcedure.java  |   7 +-
 5 files changed, 89 insertions(+), 63 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 4deef73d26..79ecc64828 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -326,7 +326,7 @@ public class ProcedureManager {
         .forEach(
             tDataNodeLocation -> {
               this.executor.submitProcedure(new RemoveDataNodeProcedure(tDataNodeLocation));
-              LOGGER.info("Submit to remove data node procedure, {}", tDataNodeLocation);
+              LOGGER.info("Submit RemoveDataNodeProcedure successfully, {}", tDataNodeLocation);
             });
     return true;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 3db90ba4fc..f71a481a25 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -279,14 +279,14 @@ public class NodeManager {
         dataNodeRemoveHandler.checkRemoveDataNodeRequest(removeDataNodePlan);
     if (preCheckStatus.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       LOGGER.error(
-          "The remove DataNode request check failed.  req: {}, check result: {}",
+          "The remove DataNode request check failed. req: {}, check result: {}",
           removeDataNodePlan,
           preCheckStatus.getStatus());
       return preCheckStatus;
     }
 
+    // Do transfer of the DataNodes before remove
     DataNodeToStatusResp dataSet = new DataNodeToStatusResp();
-    // do transfer of the DataNodes before remove
     if (configManager.transfer(removeDataNodePlan.getDataNodeLocations()).getCode()
         != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       dataSet.setStatus(
@@ -294,7 +294,8 @@ public class NodeManager {
               .setMessage("Fail to do transfer of the DataNodes"));
       return dataSet;
     }
-    // if add request to queue, then return to client
+
+    // Add request to queue, then return to client
     boolean registerSucceed =
         configManager.getProcedureManager().removeDataNode(removeDataNodePlan);
     TSStatus status;
@@ -307,7 +308,9 @@ public class NodeManager {
     }
     dataSet.setStatus(status);
 
-    LOGGER.info("NodeManager finished to remove DataNode {}", removeDataNodePlan);
+    LOGGER.info(
+        "NodeManager submit RemoveDataNodePlan finished, removeDataNodePlan: {}",
+        removeDataNodePlan);
     return dataSet;
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index 6660c0c080..7ae18943e6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -77,15 +77,15 @@ public class DataNodeRemoveHandler {
   /**
    * Get all consensus group id in this node
    *
-   * @param dataNodeLocation data node location
-   * @return group id list
+   * @param removedDataNode the DataNode to be removed
+   * @return group id list to be migrated
    */
-  public List<TConsensusGroupId> getDataNodeRegionIds(TDataNodeLocation dataNodeLocation) {
+  public List<TConsensusGroupId> getMigratedDataNodeRegions(TDataNodeLocation removedDataNode) {
     return configManager.getPartitionManager().getAllReplicaSets().stream()
         .filter(
-            rg ->
-                rg.getDataNodeLocations().contains(dataNodeLocation)
-                    && rg.regionId.getType() != TConsensusGroupType.ConfigNodeRegion)
+            replicaSet ->
+                replicaSet.getDataNodeLocations().contains(removedDataNode)
+                    && replicaSet.regionId.getType() != TConsensusGroupType.ConfigNodeRegion)
         .map(TRegionReplicaSet::getRegionId)
         .collect(Collectors.toList());
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
index 3380aa3c18..fc432bb966 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
@@ -21,9 +21,11 @@ package org.apache.iotdb.confignode.procedure.impl.node;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.impl.statemachine.RegionMigrateProcedure;
 import org.apache.iotdb.confignode.procedure.state.RemoveDataNodeState;
@@ -37,6 +39,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
 
@@ -45,24 +48,26 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
   private static final Logger LOG = LoggerFactory.getLogger(RemoveDataNodeProcedure.class);
   private static final int RETRY_THRESHOLD = 5;
 
-  private TDataNodeLocation disableDataNodeLocation;
+  private TDataNodeLocation removedDataNode;
 
-  private List<TConsensusGroupId> execDataNodeRegionIds = new ArrayList<>();
+  private List<TConsensusGroupId> migratedDataNodeRegions = new ArrayList<>();
 
   public RemoveDataNodeProcedure() {
     super();
   }
 
-  public RemoveDataNodeProcedure(TDataNodeLocation disableDataNodeLocation) {
+  public RemoveDataNodeProcedure(TDataNodeLocation removedDataNode) {
     super();
-    this.disableDataNodeLocation = disableDataNodeLocation;
+    this.removedDataNode = removedDataNode;
   }
 
   @Override
   protected Flow executeFromState(ConfigNodeProcedureEnv env, RemoveDataNodeState state) {
-    if (disableDataNodeLocation == null) {
+    if (removedDataNode == null) {
       return Flow.NO_MORE_STATE;
     }
+
+    DataNodeRemoveHandler handler = env.getDataNodeRemoveHandler();
     try {
       switch (state) {
         case REGION_REPLICA_CHECK:
@@ -70,24 +75,24 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
             setNextState(RemoveDataNodeState.REMOVE_DATA_NODE_PREPARE);
           } else {
             LOG.error(
-                "{}, Can not remove DataNode {} because the number of DataNodes is less or equal than region replica number",
+                "{}, Can not remove DataNode {} "
+                    + "because the number of DataNodes is less or equal than region replica number",
                 REMOVE_DATANODE_PROCESS,
-                disableDataNodeLocation);
+                removedDataNode);
             return Flow.NO_MORE_STATE;
           }
         case REMOVE_DATA_NODE_PREPARE:
           // mark the datanode as removing status and broadcast region route map
-          env.markDataNodeAsRemovingAndBroadcast(disableDataNodeLocation);
-          execDataNodeRegionIds =
-              env.getDataNodeRemoveHandler().getDataNodeRegionIds(disableDataNodeLocation);
+          env.markDataNodeAsRemovingAndBroadcast(removedDataNode);
+          migratedDataNodeRegions = handler.getMigratedDataNodeRegions(removedDataNode);
           LOG.info(
               "{}, DataNode regions to be removed is {}",
               REMOVE_DATANODE_PROCESS,
-              execDataNodeRegionIds);
+              migratedDataNodeRegions);
           setNextState(RemoveDataNodeState.BROADCAST_DISABLE_DATA_NODE);
           break;
         case BROADCAST_DISABLE_DATA_NODE:
-          env.getDataNodeRemoveHandler().broadcastDisableDataNode(disableDataNodeLocation);
+          handler.broadcastDisableDataNode(removedDataNode);
           setNextState(RemoveDataNodeState.SUBMIT_REGION_MIGRATE);
           break;
         case SUBMIT_REGION_MIGRATE:
@@ -95,11 +100,11 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
           setNextState(RemoveDataNodeState.STOP_DATA_NODE);
           break;
         case STOP_DATA_NODE:
-          // TODO if region migrate is failed, don't execute STOP_DATA_NODE
-          LOG.info(
-              "{}, Begin to stop DataNode: {}", REMOVE_DATANODE_PROCESS, disableDataNodeLocation);
-          env.getDataNodeRemoveHandler().removeDataNodePersistence(disableDataNodeLocation);
-          env.getDataNodeRemoveHandler().stopDataNode(disableDataNodeLocation);
+          if (isAllRegionMigratedSuccessfully(env)) {
+            LOG.info("{}, Begin to stop DataNode: {}", REMOVE_DATANODE_PROCESS, removedDataNode);
+            handler.removeDataNodePersistence(removedDataNode);
+            handler.stopDataNode(removedDataNode);
+          }
           return Flow.NO_MORE_STATE;
       }
     } catch (Exception e) {
@@ -107,10 +112,7 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
         setFailure(new ProcedureException("Remove Data Node failed " + state));
       } else {
         LOG.error(
-            "Retrievable error trying to remove data node {}, state {}",
-            disableDataNodeLocation,
-            state,
-            e);
+            "Retrievable error trying to remove data node {}, state {}", removedDataNode, state, e);
         if (getCycles() > RETRY_THRESHOLD) {
           setFailure(new ProcedureException("State stuck at " + state));
         }
@@ -119,6 +121,46 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
     return Flow.HAS_MORE_STATE;
   }
 
+  private void submitChildRegionMigrate(ConfigNodeProcedureEnv env) {
+    migratedDataNodeRegions.forEach(
+        regionId -> {
+          TDataNodeLocation destDataNode =
+              env.getDataNodeRemoveHandler().findDestDataNode(regionId);
+          if (destDataNode != null) {
+            RegionMigrateProcedure regionMigrateProcedure =
+                new RegionMigrateProcedure(regionId, removedDataNode, destDataNode);
+            addChildProcedure(regionMigrateProcedure);
+            LOG.info("Submit child procedure {} for regionId {}", regionMigrateProcedure, regionId);
+          } else {
+            LOG.error(
+                "{}, Cannot find target DataNode to migrate the region: {}",
+                REMOVE_DATANODE_PROCESS,
+                regionId);
+            // TODO terminate all the uncompleted remove datanode process
+          }
+        });
+  }
+
+  private boolean isAllRegionMigratedSuccessfully(ConfigNodeProcedureEnv env) {
+    List<TRegionReplicaSet> replicaSets =
+        env.getConfigManager().getPartitionManager().getAllReplicaSets();
+
+    List<TConsensusGroupId> migratedFailedRegions =
+        replicaSets.stream()
+            .filter(replica -> replica.getDataNodeLocations().contains(removedDataNode))
+            .map(TRegionReplicaSet::getRegionId)
+            .collect(Collectors.toList());
+    if (migratedFailedRegions.size() > 0) {
+      LOG.warn(
+          "{}, Some regions are migrated failed, the StopDataNode process should not be executed, migratedFailedRegions: {}",
+          REMOVE_DATANODE_PROCESS,
+          migratedFailedRegions);
+      return false;
+    }
+
+    return true;
+  }
+
   @Override
   protected void rollbackState(ConfigNodeProcedureEnv env, RemoveDataNodeState state)
       throws IOException, InterruptedException, ProcedureException {}
@@ -157,9 +199,9 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
   public void serialize(DataOutputStream stream) throws IOException {
     stream.writeShort(ProcedureType.REMOVE_DATA_NODE_PROCEDURE.getTypeCode());
     super.serialize(stream);
-    ThriftCommonsSerDeUtils.serializeTDataNodeLocation(disableDataNodeLocation, stream);
-    stream.writeInt(execDataNodeRegionIds.size());
-    execDataNodeRegionIds.forEach(
+    ThriftCommonsSerDeUtils.serializeTDataNodeLocation(removedDataNode, stream);
+    stream.writeInt(migratedDataNodeRegions.size());
+    migratedDataNodeRegions.forEach(
         tid -> ThriftCommonsSerDeUtils.serializeTConsensusGroupId(tid, stream));
   }
 
@@ -167,11 +209,12 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
   public void deserialize(ByteBuffer byteBuffer) {
     super.deserialize(byteBuffer);
     try {
-      disableDataNodeLocation = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer);
+      removedDataNode = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer);
       int regionSize = byteBuffer.getInt();
-      execDataNodeRegionIds = new ArrayList<>(regionSize);
+      migratedDataNodeRegions = new ArrayList<>(regionSize);
       for (int i = 0; i < regionSize; i++) {
-        execDataNodeRegionIds.add(ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer));
+        migratedDataNodeRegions.add(
+            ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer));
       }
     } catch (ThriftSerDeException e) {
       LOG.error("Error in deserialize RemoveConfigNodeProcedure", e);
@@ -184,28 +227,9 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
       RemoveDataNodeProcedure thatProc = (RemoveDataNodeProcedure) that;
       return thatProc.getProcId() == this.getProcId()
           && thatProc.getState() == this.getState()
-          && thatProc.disableDataNodeLocation.equals(this.disableDataNodeLocation);
+          && thatProc.removedDataNode.equals(this.removedDataNode)
+          && thatProc.migratedDataNodeRegions.equals(this.migratedDataNodeRegions);
     }
     return false;
   }
-
-  private void submitChildRegionMigrate(ConfigNodeProcedureEnv env) {
-    execDataNodeRegionIds.forEach(
-        regionId -> {
-          TDataNodeLocation destDataNode =
-              env.getDataNodeRemoveHandler().findDestDataNode(regionId);
-          if (destDataNode != null) {
-            RegionMigrateProcedure regionMigrateProcedure =
-                new RegionMigrateProcedure(regionId, disableDataNodeLocation, destDataNode);
-            addChildProcedure(regionMigrateProcedure);
-            LOG.info("Submit child procedure {} for regionId {}", regionMigrateProcedure, regionId);
-          } else {
-            LOG.error(
-                "{}, Cannot find target DataNode to remove the region: {}",
-                REMOVE_DATANODE_PROCESS,
-                regionId);
-            // TODO terminate all the uncompleted remove datanode process
-          }
-        });
-  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
index f15ed67489..07a498c7a1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
@@ -98,7 +98,7 @@ public class RegionMigrateProcedure
           if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) {
             waitForOneMigrationStepFinished(consensusGroupId, state);
           } else {
-            throw new ProcedureException("Failed to add region peer");
+            throw new ProcedureException("ADD_REGION_PEER executed failed in DataNode");
           }
           setNextState(RegionTransitionState.CHANGE_REGION_LEADER);
           break;
@@ -111,7 +111,7 @@ public class RegionMigrateProcedure
           if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) {
             waitForOneMigrationStepFinished(consensusGroupId, state);
           } else {
-            throw new ProcedureException("Failed to remove region peer");
+            throw new ProcedureException("REMOVE_REGION_PEER executed failed in DataNode");
           }
           setNextState(RegionTransitionState.DELETE_OLD_REGION_PEER);
           break;
@@ -150,8 +150,7 @@ public class RegionMigrateProcedure
                   "Procedure retried failed exceed 5 times, state stuck at " + state));
         }
 
-        // meets exception in region migrate process
-        // terminate the process
+        // meets exception in region migrate process terminate the process
         return Flow.NO_MORE_STATE;
       }
     }