You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/08/10 09:12:41 UTC

[GitHub] [iotdb] qiaojialin commented on a diff in pull request #6938: [IOTDB-4044] Remove a DataNode from the cluster, when this node stopped.

qiaojialin commented on code in PR #6938:
URL: https://github.com/apache/iotdb/pull/6938#discussion_r942085870


##########
thrift/src/main/thrift/datanode.thrift:
##########
@@ -279,10 +279,22 @@ service IDataNodeRPCService {
   common.TSStatus addToRegionConsensusGroup(TAddConsensusGroup req);
 
   /**
-   * Config node will migrate a region from this node to newNode
-   * @param migrate which region from one node to other node
+   * Config node will add a region peer from this node to newNode
+   * @param add region req which region from one node to other node
    */
-  common.TSStatus migrateRegion(TMigrateRegionReq req);
+  common.TSStatus addRegionPeer(TMigrateRegionReq req);
+
+  /**
+   * Config node will remove a region peer from this node to newNode
+   * @param remove region peer region from one node to other node
+   */
+  common.TSStatus removeRegionPeer(TMigrateRegionReq req);
+
+  /**
+   * Config node will remove a region consensus group from this node to newNode

Review Comment:
   remove a region group from this node to new node is strange. Usually a region group has multiple replicas, thus relates to multiple nodes.



##########
thrift/src/main/thrift/datanode.thrift:
##########
@@ -279,10 +279,22 @@ service IDataNodeRPCService {
   common.TSStatus addToRegionConsensusGroup(TAddConsensusGroup req);
 
   /**
-   * Config node will migrate a region from this node to newNode
-   * @param migrate which region from one node to other node
+   * Config node will add a region peer from this node to newNode

Review Comment:
   add a peer on one node to a region group?



##########
thrift/src/main/thrift/datanode.thrift:
##########
@@ -279,10 +279,22 @@ service IDataNodeRPCService {
   common.TSStatus addToRegionConsensusGroup(TAddConsensusGroup req);
 
   /**
-   * Config node will migrate a region from this node to newNode
-   * @param migrate which region from one node to other node
+   * Config node will add a region peer from this node to newNode
+   * @param add region req which region from one node to other node
    */
-  common.TSStatus migrateRegion(TMigrateRegionReq req);
+  common.TSStatus addRegionPeer(TMigrateRegionReq req);
+
+  /**
+   * Config node will remove a region peer from this node to newNode
+   * @param remove region peer region from one node to other node

Review Comment:
   need  update



##########
server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java:
##########
@@ -632,6 +632,42 @@ public TSStatus addToRegionConsensusGroup(TAddConsensusGroup req) throws TExcept
     return addConsensusGroup(regionId, peers);
   }
 
+  @Override
+  public TSStatus removeRegionPeer(TMigrateRegionReq req) throws TException {
+    TConsensusGroupId regionId = req.getRegionId();
+    String fromNodeIp = req.getFromNode().getInternalEndPoint().getIp();
+    boolean submitSucceed = RegionMigrateService.getInstance().submitRemoveRegionPeerTask(req);
+    TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    if (submitSucceed) {
+      LOGGER.info(
+          "succeed to submit a remove region peer task. region: {}, from {}", regionId, fromNodeIp);
+      return status;
+    }
+    status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+    status.setMessage("submit region remove region peer task failed, region: " + regionId);

Review Comment:
   ```suggestion
       status.setMessage("submit region remove region peer task failed, region: " + regionId + " from:" + req.getFromNode().getInternalEndPoint());
   ```



##########
confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveDataNodeProcedure.java:
##########
@@ -77,8 +77,8 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, RemoveDataNodeState
           setNextState(RemoveDataNodeState.STOP_DATA_NODE);
           break;
         case STOP_DATA_NODE:
-          env.getDataNodeRemoveHandler().stopDataNode(tDataNodeLocation);
           env.getDataNodeRemoveHandler().removeDataNodePersistence(tDataNodeLocation);
+          env.getDataNodeRemoveHandler().stopDataNode(tDataNodeLocation);

Review Comment:
   In line-68,
   DataNode region ids are:



##########
confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java:
##########
@@ -82,13 +83,46 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, RegionTransitionStat
         case ADD_NEW_NODE_TO_REGION_CONSENSUS_GROUP:
           env.getDataNodeRemoveHandler()
               .addNewNodeToRegionConsensusGroup(consensusGroupId, destDataNode);
-          setNextState(RegionTransitionState.MIGRATE_REGION);
+          setNextState(RegionTransitionState.ADD_REGION_PEER);
           break;
-        case MIGRATE_REGION:
-          env.getDataNodeRemoveHandler()
-              .migrateRegion(originalDataNode, destDataNode, consensusGroupId);
-          waitForTheRegionMigrateFinished(consensusGroupId);
-          LOG.info("Wait for region {}  migrate finished", consensusGroupId);
+        case ADD_REGION_PEER:
+          tsStatus =
+              env.getDataNodeRemoveHandler()
+                  .addRegionPeer(originalDataNode, destDataNode, consensusGroupId);
+          if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            waitForTheRegionMigrateFinished(consensusGroupId);
+            LOG.info("Wait for region {}  add peer finished", consensusGroupId);
+          } else {
+            throw new ProcedureException("Failed to add region peer");
+          }
+          setNextState(RegionTransitionState.CHANGE_REGION_LEADER);
+          break;
+        case CHANGE_REGION_LEADER:
+          env.getDataNodeRemoveHandler().changeRegionLeader(consensusGroupId, originalDataNode);
+          setNextState(RegionTransitionState.REMOVE_REGION_PEER);
+          break;
+        case REMOVE_REGION_PEER:
+          tsStatus =
+              env.getDataNodeRemoveHandler()
+                  .removeRegionPeer(originalDataNode, destDataNode, consensusGroupId);
+          if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            waitForTheRegionMigrateFinished(consensusGroupId);
+            LOG.info("Wait for region {} remove peer finished", consensusGroupId);
+          } else {
+            throw new ProcedureException("Failed to remove region peer");
+          }
+          setNextState(RegionTransitionState.REMOVE_REGION_CONSENSUS_GROUP);
+          break;
+        case REMOVE_REGION_CONSENSUS_GROUP:
+          tsStatus =
+              env.getDataNodeRemoveHandler()
+                  .removeRegionConsensusGroup(originalDataNode, destDataNode, consensusGroupId);
+          if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            waitForTheRegionMigrateFinished(consensusGroupId);

Review Comment:
   ```suggestion
               waitForOneMigrationStepFinished(consensusGroupId);
   ```



##########
confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java:
##########
@@ -383,4 +450,31 @@ public void removeDataNodePersistence(TDataNodeLocation tDataNodeLocation) {
     removeDataNodes.add(tDataNodeLocation);
     configManager.getConsensusManager().write(new RemoveDataNodePlan(removeDataNodes));
   }
+
+  public void changeRegionLeader(TConsensusGroupId regionId, TDataNodeLocation tDataNodeLocation) {
+    Optional<TDataNodeLocation> newLeaderNode = findOtherNode(regionId, tDataNodeLocation);
+    if (newLeaderNode.isPresent()) {
+      SyncDataNodeClientPool.getInstance()
+          .changeRegionLeader(
+              regionId, tDataNodeLocation.getInternalEndPoint(), newLeaderNode.get());
+      LOGGER.info(
+          "Change region leader finished, region is {}, newLeaderNode is {}",
+          regionId,
+          newLeaderNode);
+    }
+  }
+
+  private Optional<TDataNodeLocation> findOtherNode(

Review Comment:
   findNodeOfAnotherReplica



##########
confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java:
##########
@@ -135,30 +136,60 @@ public TDataNodeLocation findDestDataNode(TConsensusGroupId regionId) {
   }
 
   /**
-   * Send to DataNode, migrate region from originalDataNode to destDataNode

Review Comment:
   In line:130,
   will migrate the region to the new node, which does not contains a replica of this region



##########
server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java:
##########
@@ -632,6 +632,42 @@ public TSStatus addToRegionConsensusGroup(TAddConsensusGroup req) throws TExcept
     return addConsensusGroup(regionId, peers);
   }
 
+  @Override
+  public TSStatus removeRegionPeer(TMigrateRegionReq req) throws TException {

Review Comment:
   I found that removeRegionPeer and other operation will all report success with a regionId to ConfigNode, how configNode distinguish the different step?



##########
confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java:
##########
@@ -82,13 +83,46 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, RegionTransitionStat
         case ADD_NEW_NODE_TO_REGION_CONSENSUS_GROUP:
           env.getDataNodeRemoveHandler()
               .addNewNodeToRegionConsensusGroup(consensusGroupId, destDataNode);
-          setNextState(RegionTransitionState.MIGRATE_REGION);
+          setNextState(RegionTransitionState.ADD_REGION_PEER);
           break;
-        case MIGRATE_REGION:
-          env.getDataNodeRemoveHandler()
-              .migrateRegion(originalDataNode, destDataNode, consensusGroupId);
-          waitForTheRegionMigrateFinished(consensusGroupId);
-          LOG.info("Wait for region {}  migrate finished", consensusGroupId);
+        case ADD_REGION_PEER:
+          tsStatus =
+              env.getDataNodeRemoveHandler()
+                  .addRegionPeer(originalDataNode, destDataNode, consensusGroupId);
+          if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            waitForTheRegionMigrateFinished(consensusGroupId);

Review Comment:
   ```suggestion
               waitForOneMigrationStepFinished(consensusGroupId);
   ```



##########
server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java:
##########
@@ -632,6 +632,42 @@ public TSStatus addToRegionConsensusGroup(TAddConsensusGroup req) throws TExcept
     return addConsensusGroup(regionId, peers);
   }
 
+  @Override
+  public TSStatus removeRegionPeer(TMigrateRegionReq req) throws TException {
+    TConsensusGroupId regionId = req.getRegionId();
+    String fromNodeIp = req.getFromNode().getInternalEndPoint().getIp();
+    boolean submitSucceed = RegionMigrateService.getInstance().submitRemoveRegionPeerTask(req);
+    TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    if (submitSucceed) {
+      LOGGER.info(
+          "succeed to submit a remove region peer task. region: {}, from {}", regionId, fromNodeIp);

Review Comment:
   ```suggestion
             "succeed to submit a remove region peer task. region: {}, from {}", regionId, req.getFromNode().getInternalEndPoint());
   ```



##########
confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java:
##########
@@ -82,13 +83,46 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, RegionTransitionStat
         case ADD_NEW_NODE_TO_REGION_CONSENSUS_GROUP:
           env.getDataNodeRemoveHandler()
               .addNewNodeToRegionConsensusGroup(consensusGroupId, destDataNode);
-          setNextState(RegionTransitionState.MIGRATE_REGION);
+          setNextState(RegionTransitionState.ADD_REGION_PEER);
           break;
-        case MIGRATE_REGION:
-          env.getDataNodeRemoveHandler()
-              .migrateRegion(originalDataNode, destDataNode, consensusGroupId);
-          waitForTheRegionMigrateFinished(consensusGroupId);
-          LOG.info("Wait for region {}  migrate finished", consensusGroupId);
+        case ADD_REGION_PEER:
+          tsStatus =
+              env.getDataNodeRemoveHandler()
+                  .addRegionPeer(originalDataNode, destDataNode, consensusGroupId);
+          if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            waitForTheRegionMigrateFinished(consensusGroupId);
+            LOG.info("Wait for region {}  add peer finished", consensusGroupId);
+          } else {
+            throw new ProcedureException("Failed to add region peer");
+          }
+          setNextState(RegionTransitionState.CHANGE_REGION_LEADER);
+          break;
+        case CHANGE_REGION_LEADER:
+          env.getDataNodeRemoveHandler().changeRegionLeader(consensusGroupId, originalDataNode);
+          setNextState(RegionTransitionState.REMOVE_REGION_PEER);
+          break;
+        case REMOVE_REGION_PEER:
+          tsStatus =
+              env.getDataNodeRemoveHandler()
+                  .removeRegionPeer(originalDataNode, destDataNode, consensusGroupId);
+          if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            waitForTheRegionMigrateFinished(consensusGroupId);

Review Comment:
   ```suggestion
               waitForOneMigrationStepFinished(consensusGroupId);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org