You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/11/13 08:13:30 UTC

[iotdb] branch beyyes/master created (now 27a9fb4133)

This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a change to branch beyyes/master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 27a9fb4133 donnot stop datanode when some regions migrated failed

This branch includes the following new commits:

     new 27a9fb4133 donnot stop datanode when some regions migrated failed

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: donnot stop datanode when some regions migrated failed

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/master
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 27a9fb4133ffbdf6fe51b403240dc56eaf6a5775
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Sun Nov 13 16:13:14 2022 +0800

    donnot stop datanode when some regions migrated failed
---
 .../iotdb/confignode/manager/ProcedureManager.java |   2 +-
 .../iotdb/confignode/manager/node/NodeManager.java |  11 +-
 .../procedure/env/DataNodeRemoveHandler.java       |  12 +--
 .../impl/node/RemoveDataNodeProcedure.java         | 120 ++++++++++++---------
 .../impl/statemachine/RegionMigrateProcedure.java  |   7 +-
 5 files changed, 89 insertions(+), 63 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 4deef73d26..79ecc64828 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -326,7 +326,7 @@ public class ProcedureManager {
         .forEach(
             tDataNodeLocation -> {
               this.executor.submitProcedure(new RemoveDataNodeProcedure(tDataNodeLocation));
-              LOGGER.info("Submit to remove data node procedure, {}", tDataNodeLocation);
+              LOGGER.info("Submit RemoveDataNodeProcedure successfully, {}", tDataNodeLocation);
             });
     return true;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 3db90ba4fc..f71a481a25 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -279,14 +279,14 @@ public class NodeManager {
         dataNodeRemoveHandler.checkRemoveDataNodeRequest(removeDataNodePlan);
     if (preCheckStatus.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       LOGGER.error(
-          "The remove DataNode request check failed.  req: {}, check result: {}",
+          "The remove DataNode request check failed. req: {}, check result: {}",
           removeDataNodePlan,
           preCheckStatus.getStatus());
       return preCheckStatus;
     }
 
+    // Do transfer of the DataNodes before remove
     DataNodeToStatusResp dataSet = new DataNodeToStatusResp();
-    // do transfer of the DataNodes before remove
     if (configManager.transfer(removeDataNodePlan.getDataNodeLocations()).getCode()
         != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       dataSet.setStatus(
@@ -294,7 +294,8 @@ public class NodeManager {
               .setMessage("Fail to do transfer of the DataNodes"));
       return dataSet;
     }
-    // if add request to queue, then return to client
+
+    // Add request to queue, then return to client
     boolean registerSucceed =
         configManager.getProcedureManager().removeDataNode(removeDataNodePlan);
     TSStatus status;
@@ -307,7 +308,9 @@ public class NodeManager {
     }
     dataSet.setStatus(status);
 
-    LOGGER.info("NodeManager finished to remove DataNode {}", removeDataNodePlan);
+    LOGGER.info(
+        "NodeManager submit RemoveDataNodePlan finished, removeDataNodePlan: {}",
+        removeDataNodePlan);
     return dataSet;
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index 6660c0c080..7ae18943e6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -77,15 +77,15 @@ public class DataNodeRemoveHandler {
   /**
    * Get all consensus group id in this node
    *
-   * @param dataNodeLocation data node location
-   * @return group id list
+   * @param removedDataNode the DataNode to be removed
+   * @return group id list to be migrated
    */
-  public List<TConsensusGroupId> getDataNodeRegionIds(TDataNodeLocation dataNodeLocation) {
+  public List<TConsensusGroupId> getMigratedDataNodeRegions(TDataNodeLocation removedDataNode) {
     return configManager.getPartitionManager().getAllReplicaSets().stream()
         .filter(
-            rg ->
-                rg.getDataNodeLocations().contains(dataNodeLocation)
-                    && rg.regionId.getType() != TConsensusGroupType.ConfigNodeRegion)
+            replicaSet ->
+                replicaSet.getDataNodeLocations().contains(removedDataNode)
+                    && replicaSet.regionId.getType() != TConsensusGroupType.ConfigNodeRegion)
         .map(TRegionReplicaSet::getRegionId)
         .collect(Collectors.toList());
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
index 3380aa3c18..fc432bb966 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
@@ -21,9 +21,11 @@ package org.apache.iotdb.confignode.procedure.impl.node;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.impl.statemachine.RegionMigrateProcedure;
 import org.apache.iotdb.confignode.procedure.state.RemoveDataNodeState;
@@ -37,6 +39,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
 
@@ -45,24 +48,26 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
   private static final Logger LOG = LoggerFactory.getLogger(RemoveDataNodeProcedure.class);
   private static final int RETRY_THRESHOLD = 5;
 
-  private TDataNodeLocation disableDataNodeLocation;
+  private TDataNodeLocation removedDataNode;
 
-  private List<TConsensusGroupId> execDataNodeRegionIds = new ArrayList<>();
+  private List<TConsensusGroupId> migratedDataNodeRegions = new ArrayList<>();
 
   public RemoveDataNodeProcedure() {
     super();
   }
 
-  public RemoveDataNodeProcedure(TDataNodeLocation disableDataNodeLocation) {
+  public RemoveDataNodeProcedure(TDataNodeLocation removedDataNode) {
     super();
-    this.disableDataNodeLocation = disableDataNodeLocation;
+    this.removedDataNode = removedDataNode;
   }
 
   @Override
   protected Flow executeFromState(ConfigNodeProcedureEnv env, RemoveDataNodeState state) {
-    if (disableDataNodeLocation == null) {
+    if (removedDataNode == null) {
       return Flow.NO_MORE_STATE;
     }
+
+    DataNodeRemoveHandler handler = env.getDataNodeRemoveHandler();
     try {
       switch (state) {
         case REGION_REPLICA_CHECK:
@@ -70,24 +75,24 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
             setNextState(RemoveDataNodeState.REMOVE_DATA_NODE_PREPARE);
           } else {
             LOG.error(
-                "{}, Can not remove DataNode {} because the number of DataNodes is less or equal than region replica number",
+                "{}, Can not remove DataNode {} "
+                    + "because the number of DataNodes is less or equal than region replica number",
                 REMOVE_DATANODE_PROCESS,
-                disableDataNodeLocation);
+                removedDataNode);
             return Flow.NO_MORE_STATE;
           }
         case REMOVE_DATA_NODE_PREPARE:
           // mark the datanode as removing status and broadcast region route map
-          env.markDataNodeAsRemovingAndBroadcast(disableDataNodeLocation);
-          execDataNodeRegionIds =
-              env.getDataNodeRemoveHandler().getDataNodeRegionIds(disableDataNodeLocation);
+          env.markDataNodeAsRemovingAndBroadcast(removedDataNode);
+          migratedDataNodeRegions = handler.getMigratedDataNodeRegions(removedDataNode);
           LOG.info(
               "{}, DataNode regions to be removed is {}",
               REMOVE_DATANODE_PROCESS,
-              execDataNodeRegionIds);
+              migratedDataNodeRegions);
           setNextState(RemoveDataNodeState.BROADCAST_DISABLE_DATA_NODE);
           break;
         case BROADCAST_DISABLE_DATA_NODE:
-          env.getDataNodeRemoveHandler().broadcastDisableDataNode(disableDataNodeLocation);
+          handler.broadcastDisableDataNode(removedDataNode);
           setNextState(RemoveDataNodeState.SUBMIT_REGION_MIGRATE);
           break;
         case SUBMIT_REGION_MIGRATE:
@@ -95,11 +100,11 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
           setNextState(RemoveDataNodeState.STOP_DATA_NODE);
           break;
         case STOP_DATA_NODE:
-          // TODO if region migrate is failed, don't execute STOP_DATA_NODE
-          LOG.info(
-              "{}, Begin to stop DataNode: {}", REMOVE_DATANODE_PROCESS, disableDataNodeLocation);
-          env.getDataNodeRemoveHandler().removeDataNodePersistence(disableDataNodeLocation);
-          env.getDataNodeRemoveHandler().stopDataNode(disableDataNodeLocation);
+          if (isAllRegionMigratedSuccessfully(env)) {
+            LOG.info("{}, Begin to stop DataNode: {}", REMOVE_DATANODE_PROCESS, removedDataNode);
+            handler.removeDataNodePersistence(removedDataNode);
+            handler.stopDataNode(removedDataNode);
+          }
           return Flow.NO_MORE_STATE;
       }
     } catch (Exception e) {
@@ -107,10 +112,7 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
         setFailure(new ProcedureException("Remove Data Node failed " + state));
       } else {
         LOG.error(
-            "Retrievable error trying to remove data node {}, state {}",
-            disableDataNodeLocation,
-            state,
-            e);
+            "Retrievable error trying to remove data node {}, state {}", removedDataNode, state, e);
         if (getCycles() > RETRY_THRESHOLD) {
           setFailure(new ProcedureException("State stuck at " + state));
         }
@@ -119,6 +121,46 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
     return Flow.HAS_MORE_STATE;
   }
 
+  private void submitChildRegionMigrate(ConfigNodeProcedureEnv env) {
+    migratedDataNodeRegions.forEach(
+        regionId -> {
+          TDataNodeLocation destDataNode =
+              env.getDataNodeRemoveHandler().findDestDataNode(regionId);
+          if (destDataNode != null) {
+            RegionMigrateProcedure regionMigrateProcedure =
+                new RegionMigrateProcedure(regionId, removedDataNode, destDataNode);
+            addChildProcedure(regionMigrateProcedure);
+            LOG.info("Submit child procedure {} for regionId {}", regionMigrateProcedure, regionId);
+          } else {
+            LOG.error(
+                "{}, Cannot find target DataNode to migrate the region: {}",
+                REMOVE_DATANODE_PROCESS,
+                regionId);
+            // TODO terminate all the uncompleted remove datanode process
+          }
+        });
+  }
+
+  private boolean isAllRegionMigratedSuccessfully(ConfigNodeProcedureEnv env) {
+    List<TRegionReplicaSet> replicaSets =
+        env.getConfigManager().getPartitionManager().getAllReplicaSets();
+
+    List<TConsensusGroupId> migratedFailedRegions =
+        replicaSets.stream()
+            .filter(replica -> replica.getDataNodeLocations().contains(removedDataNode))
+            .map(TRegionReplicaSet::getRegionId)
+            .collect(Collectors.toList());
+    if (migratedFailedRegions.size() > 0) {
+      LOG.warn(
+          "{}, Some regions are migrated failed, the StopDataNode process should not be executed, migratedFailedRegions: {}",
+          REMOVE_DATANODE_PROCESS,
+          migratedFailedRegions);
+      return false;
+    }
+
+    return true;
+  }
+
   @Override
   protected void rollbackState(ConfigNodeProcedureEnv env, RemoveDataNodeState state)
       throws IOException, InterruptedException, ProcedureException {}
@@ -157,9 +199,9 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
   public void serialize(DataOutputStream stream) throws IOException {
     stream.writeShort(ProcedureType.REMOVE_DATA_NODE_PROCEDURE.getTypeCode());
     super.serialize(stream);
-    ThriftCommonsSerDeUtils.serializeTDataNodeLocation(disableDataNodeLocation, stream);
-    stream.writeInt(execDataNodeRegionIds.size());
-    execDataNodeRegionIds.forEach(
+    ThriftCommonsSerDeUtils.serializeTDataNodeLocation(removedDataNode, stream);
+    stream.writeInt(migratedDataNodeRegions.size());
+    migratedDataNodeRegions.forEach(
         tid -> ThriftCommonsSerDeUtils.serializeTConsensusGroupId(tid, stream));
   }
 
@@ -167,11 +209,12 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
   public void deserialize(ByteBuffer byteBuffer) {
     super.deserialize(byteBuffer);
     try {
-      disableDataNodeLocation = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer);
+      removedDataNode = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer);
       int regionSize = byteBuffer.getInt();
-      execDataNodeRegionIds = new ArrayList<>(regionSize);
+      migratedDataNodeRegions = new ArrayList<>(regionSize);
       for (int i = 0; i < regionSize; i++) {
-        execDataNodeRegionIds.add(ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer));
+        migratedDataNodeRegions.add(
+            ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer));
       }
     } catch (ThriftSerDeException e) {
       LOG.error("Error in deserialize RemoveConfigNodeProcedure", e);
@@ -184,28 +227,9 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
       RemoveDataNodeProcedure thatProc = (RemoveDataNodeProcedure) that;
       return thatProc.getProcId() == this.getProcId()
           && thatProc.getState() == this.getState()
-          && thatProc.disableDataNodeLocation.equals(this.disableDataNodeLocation);
+          && thatProc.removedDataNode.equals(this.removedDataNode)
+          && thatProc.migratedDataNodeRegions.equals(this.migratedDataNodeRegions);
     }
     return false;
   }
-
-  private void submitChildRegionMigrate(ConfigNodeProcedureEnv env) {
-    execDataNodeRegionIds.forEach(
-        regionId -> {
-          TDataNodeLocation destDataNode =
-              env.getDataNodeRemoveHandler().findDestDataNode(regionId);
-          if (destDataNode != null) {
-            RegionMigrateProcedure regionMigrateProcedure =
-                new RegionMigrateProcedure(regionId, disableDataNodeLocation, destDataNode);
-            addChildProcedure(regionMigrateProcedure);
-            LOG.info("Submit child procedure {} for regionId {}", regionMigrateProcedure, regionId);
-          } else {
-            LOG.error(
-                "{}, Cannot find target DataNode to remove the region: {}",
-                REMOVE_DATANODE_PROCESS,
-                regionId);
-            // TODO terminate all the uncompleted remove datanode process
-          }
-        });
-  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
index f15ed67489..07a498c7a1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
@@ -98,7 +98,7 @@ public class RegionMigrateProcedure
           if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) {
             waitForOneMigrationStepFinished(consensusGroupId, state);
           } else {
-            throw new ProcedureException("Failed to add region peer");
+            throw new ProcedureException("ADD_REGION_PEER executed failed in DataNode");
           }
           setNextState(RegionTransitionState.CHANGE_REGION_LEADER);
           break;
@@ -111,7 +111,7 @@ public class RegionMigrateProcedure
           if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) {
             waitForOneMigrationStepFinished(consensusGroupId, state);
           } else {
-            throw new ProcedureException("Failed to remove region peer");
+            throw new ProcedureException("REMOVE_REGION_PEER executed failed in DataNode");
           }
           setNextState(RegionTransitionState.DELETE_OLD_REGION_PEER);
           break;
@@ -150,8 +150,7 @@ public class RegionMigrateProcedure
                   "Procedure retried failed exceed 5 times, state stuck at " + state));
         }
 
-        // meets exception in region migrate process
-        // terminate the process
+        // meets exception in region migrate process terminate the process
         return Flow.NO_MORE_STATE;
       }
     }