You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/08/10 02:22:29 UTC

[GitHub] [iotdb] CRZbulabula commented on a diff in pull request #6938: [IOTDB-4044] Remove a DataNode from the cluster, when this node stopped.

CRZbulabula commented on code in PR #6938:
URL: https://github.com/apache/iotdb/pull/6938#discussion_r941948145


##########
confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java:
##########
@@ -135,29 +136,99 @@ public TDataNodeLocation findDestDataNode(TConsensusGroupId regionId) {
   }
 
   /**
-   * Send to DataNode, migrate region from originalDataNode to destDataNode
+   * Send to DataNode, add region peer
    *
    * @param originalDataNode old location data node
    * @param destDataNode dest data node
    * @param regionId region id
    * @return migrate status
    */
-  public TSStatus migrateRegion(
+  public TSStatus addRegionPeer(
       TDataNodeLocation originalDataNode,
       TDataNodeLocation destDataNode,
       TConsensusGroupId regionId) {
     TSStatus status;
-    List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
-    if (regionReplicaNodes.isEmpty()) {
-      LOGGER.warn("Not find region replica nodes, region: {}", regionId);
+    Optional<TDataNodeLocation> newLeaderNode = findNewLeader(regionId, originalDataNode);

Review Comment:
   Why should we findNewLeader here since we've executed changeLeader at an earlier step in RemoveDataNodeProcedure?



##########
confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java:
##########
@@ -135,29 +136,99 @@ public TDataNodeLocation findDestDataNode(TConsensusGroupId regionId) {
   }
 
   /**
-   * Send to DataNode, migrate region from originalDataNode to destDataNode
+   * Send to DataNode, add region peer
    *
    * @param originalDataNode old location data node
    * @param destDataNode dest data node
    * @param regionId region id
    * @return migrate status
    */
-  public TSStatus migrateRegion(
+  public TSStatus addRegionPeer(
       TDataNodeLocation originalDataNode,
       TDataNodeLocation destDataNode,
       TConsensusGroupId regionId) {
     TSStatus status;
-    List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
-    if (regionReplicaNodes.isEmpty()) {
-      LOGGER.warn("Not find region replica nodes, region: {}", regionId);
+    Optional<TDataNodeLocation> newLeaderNode = findNewLeader(regionId, originalDataNode);
+    if (!newLeaderNode.isPresent()) {
+      LOGGER.warn(
+          "No other Node to change region leader, check by show regions, region: {}", regionId);
       status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
-      status.setMessage("not find region replica nodes, region: " + regionId);
+      status.setMessage("No other Node to change region leader, check by show regions");
       return status;
     }
 
-    // TODO if region replica is 1, the new leader is null, it also need to migrate
-    Optional<TDataNodeLocation> newLeaderNode =
-        regionReplicaNodes.stream().filter(e -> !e.equals(originalDataNode)).findAny();
+    TMigrateRegionReq migrateRegionReq =
+        new TMigrateRegionReq(regionId, destDataNode, destDataNode);
+    migrateRegionReq.setNewLeaderNode(newLeaderNode.get());
+
+    // send to new leader
+    status =
+        SyncDataNodeClientPool.getInstance()
+            .sendSyncRequestToDataNodeWithRetry(
+                newLeaderNode.get().getInternalEndPoint(),
+                migrateRegionReq,
+                DataNodeRequestType.ADD_REGION_PEER);
+    LOGGER.info(
+        "Send region {} migrate action to {}, wait it finished",
+        regionId,
+        destDataNode.getInternalEndPoint());
+    return status;
+  }
+
+  /**
+   * Send to DataNode, remove region
+   *
+   * @param originalDataNode old location data node
+   * @param destDataNode dest data node
+   * @param regionId region id
+   * @return migrate status
+   */
+  public TSStatus removeRegionPeer(
+      TDataNodeLocation originalDataNode,
+      TDataNodeLocation destDataNode,
+      TConsensusGroupId regionId) {
+    TSStatus status;
+    Optional<TDataNodeLocation> newLeaderNode = findNewLeader(regionId, originalDataNode);
+    if (!newLeaderNode.isPresent()) {
+      LOGGER.warn(
+          "No other Node to change region leader, check by show regions, region: {}", regionId);
+      status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+      status.setMessage("No other Node to change region leader, check by show regions");
+      return status;
+    }
+
+    TMigrateRegionReq migrateRegionReq =
+        new TMigrateRegionReq(regionId, originalDataNode, destDataNode);
+    migrateRegionReq.setNewLeaderNode(newLeaderNode.get());
+
+    // send to new leader
+    status =
+        SyncDataNodeClientPool.getInstance()
+            .sendSyncRequestToDataNodeWithRetry(
+                newLeaderNode.get().getInternalEndPoint(),
+                migrateRegionReq,
+                DataNodeRequestType.REMOVE_REGION_PEER);
+    LOGGER.info(
+        "Send region {} remove peer to {}, wait it finished",
+        regionId,
+        originalDataNode.getInternalEndPoint());
+    return status;
+  }
+
+  /**
+   * Send to DataNode, remove region consensus group from originalDataNode node
+   *
+   * @param originalDataNode old location data node
+   * @param destDataNode dest data node
+   * @param regionId region id
+   * @return migrate status
+   */
+  public TSStatus removeRegionConsensusGroup(
+      TDataNodeLocation originalDataNode,
+      TDataNodeLocation destDataNode,
+      TConsensusGroupId regionId) {
+    TSStatus status;
+    Optional<TDataNodeLocation> newLeaderNode = findNewLeader(regionId, originalDataNode);

Review Comment:
   Why should we findNewLeader here since we've executed changeLeader at an earlier step in RemoveDataNodeProcedure?



##########
server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java:
##########
@@ -474,4 +502,479 @@ private boolean isFailed(TSStatus status) {
       return !isSucceed(status);
     }
   }
+
+  private static class AddRegionPeerTask implements Runnable {
+    private static final Logger taskLogger = LoggerFactory.getLogger(RegionMigrateTask.class);
+
+    // migrate which region
+    private final TConsensusGroupId tRegionId;
+
+    // migrate to which node
+    private final TDataNodeLocation toNode;
+
+    public AddRegionPeerTask(TConsensusGroupId tRegionId, TDataNodeLocation toNode) {
+      this.tRegionId = tRegionId;
+      this.toNode = toNode;
+    }
+
+    @Override
+    public void run() {
+      TSStatus runResult = addPeer();
+      if (isFailed(runResult)) {
+        reportFailed(toNode, TRegionMigrateFailedType.AddPeerFailed, runResult);
+        return;
+      }
+
+      reportSucceed();
+    }
+
+    private void reportSucceed() {
+      TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+      status.setMessage("Region: " + tRegionId + " migrated succeed");
+      TRegionMigrateResultReportReq req = new TRegionMigrateResultReportReq(tRegionId, status);
+      try {
+        reportRegionMigrateResultToConfigNode(req);
+      } catch (Throwable e) {
+        taskLogger.error(
+            "report region {} migrate successful result error, result:{}", tRegionId, req, e);
+      }
+    }
+
+    private void reportFailed(
+        TDataNodeLocation failedNode, TRegionMigrateFailedType failedType, TSStatus status) {
+      TRegionMigrateResultReportReq req = createFailedRequest(failedNode, failedType, status);
+      try {
+        reportRegionMigrateResultToConfigNode(req);
+      } catch (Throwable e) {
+        taskLogger.error(
+            "report region {} migrate failed result error, result:{}", tRegionId, req, e);
+      }
+    }
+
+    private TRegionMigrateResultReportReq createFailedRequest(
+        TDataNodeLocation failedNode, TRegionMigrateFailedType failedType, TSStatus status) {
+      Map<TDataNodeLocation, TRegionMigrateFailedType> failedNodeAndReason = new HashMap<>();
+      failedNodeAndReason.put(failedNode, failedType);
+      TRegionMigrateResultReportReq req = new TRegionMigrateResultReportReq(tRegionId, status);
+      req.setFailedNodeAndReason(failedNodeAndReason);
+      return req;
+    }
+
+    private void reportRegionMigrateResultToConfigNode(TRegionMigrateResultReportReq req)
+        throws TException {
+      TSStatus status;
+      try (ConfigNodeClient client = new ConfigNodeClient()) {
+        status = client.reportRegionMigrateResult(req);
+        taskLogger.info(
+            "report region {} migrate result {} to Config node succeed, result: {}",
+            tRegionId,
+            req,
+            status);
+      }
+    }
+
+    private TSStatus addPeer() {
+      ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId);
+      TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+      ConsensusGenericResponse resp = null;
+      TEndPoint newPeerNode = getConsensusEndPoint(toNode, regionId);
+      taskLogger.info("start to add peer {} for region {}", newPeerNode, tRegionId);
+      boolean addPeerSucceed = true;
+      for (int i = 0; i < RETRY; i++) {
+        try {
+          if (!addPeerSucceed) {
+            Thread.sleep(SLEEP_MILLIS);
+          }
+          resp = addRegionPeer(regionId, new Peer(regionId, newPeerNode));
+          addPeerSucceed = true;
+        } catch (Throwable e) {
+          addPeerSucceed = false;
+          taskLogger.error(
+              "add new peer {} for region {} error, retry times: {}", newPeerNode, regionId, i, e);
+          status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+          status.setMessage(
+              "add peer "
+                  + newPeerNode
+                  + " for region: "
+                  + regionId
+                  + " error, exception: "
+                  + e.getMessage());
+        }
+        if (addPeerSucceed && resp != null && resp.isSuccess()) {
+          break;
+        }
+      }
+      if (!addPeerSucceed || resp == null || !resp.isSuccess()) {
+        taskLogger.error(
+            "add new peer {} for region {} failed, resp: {}", newPeerNode, regionId, resp);
+        status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+        status.setMessage("add new peer " + newPeerNode + " for region " + regionId + "failed");
+        return status;
+      }
+
+      taskLogger.info("succeed to add peer {} for region {}", newPeerNode, regionId);
+      status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+      status.setMessage("add peer " + newPeerNode + " for region " + regionId + " succeed");
+      return status;
+    }
+
+    private ConsensusGenericResponse addRegionPeer(ConsensusGroupId regionId, Peer newPeer) {
+      ConsensusGenericResponse resp;
+      if (regionId instanceof DataRegionId) {
+        resp = DataRegionConsensusImpl.getInstance().addPeer(regionId, newPeer);
+      } else {
+        resp = SchemaRegionConsensusImpl.getInstance().addPeer(regionId, newPeer);
+      }
+      return resp;
+    }
+
+    private TEndPoint getConsensusEndPoint(
+        TDataNodeLocation nodeLocation, ConsensusGroupId regionId) {
+      if (regionId instanceof DataRegionId) {
+        return nodeLocation.getDataRegionConsensusEndPoint();
+      }
+      return nodeLocation.getSchemaRegionConsensusEndPoint();
+    }
+
+    private boolean isSucceed(TSStatus status) {
+      return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
+    }
+
+    private boolean isFailed(TSStatus status) {
+      return !isSucceed(status);
+    }
+  }
+
+  private static class RemoveRegionPeerTask implements Runnable {
+    private static final Logger taskLogger = LoggerFactory.getLogger(RegionMigrateTask.class);
+
+    // migrate which region
+    private final TConsensusGroupId tRegionId;
+
+    // migrate from which node
+    private final TDataNodeLocation fromNode;
+
+    // transfer leader to which node
+    private final TDataNodeLocation newLeaderNode;
+
+    public RemoveRegionPeerTask(
+        TConsensusGroupId tRegionId, TDataNodeLocation fromNode, TDataNodeLocation newLeaderNode) {
+      this.tRegionId = tRegionId;
+      this.fromNode = fromNode;
+      this.newLeaderNode = newLeaderNode;
+    }
+
+    @Override
+    public void run() {
+      // changeLeader();

Review Comment:
   I supposed that we'll never call changeLeader inside the process of RemoveRegionPeer?



##########
confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java:
##########
@@ -135,29 +136,99 @@ public TDataNodeLocation findDestDataNode(TConsensusGroupId regionId) {
   }
 
   /**
-   * Send to DataNode, migrate region from originalDataNode to destDataNode
+   * Send to DataNode, add region peer
    *
    * @param originalDataNode old location data node
    * @param destDataNode dest data node
    * @param regionId region id
    * @return migrate status
    */
-  public TSStatus migrateRegion(
+  public TSStatus addRegionPeer(
       TDataNodeLocation originalDataNode,
       TDataNodeLocation destDataNode,
       TConsensusGroupId regionId) {
     TSStatus status;
-    List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
-    if (regionReplicaNodes.isEmpty()) {
-      LOGGER.warn("Not find region replica nodes, region: {}", regionId);
+    Optional<TDataNodeLocation> newLeaderNode = findNewLeader(regionId, originalDataNode);
+    if (!newLeaderNode.isPresent()) {
+      LOGGER.warn(
+          "No other Node to change region leader, check by show regions, region: {}", regionId);
       status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
-      status.setMessage("not find region replica nodes, region: " + regionId);
+      status.setMessage("No other Node to change region leader, check by show regions");
       return status;
     }
 
-    // TODO if region replica is 1, the new leader is null, it also need to migrate
-    Optional<TDataNodeLocation> newLeaderNode =
-        regionReplicaNodes.stream().filter(e -> !e.equals(originalDataNode)).findAny();
+    TMigrateRegionReq migrateRegionReq =
+        new TMigrateRegionReq(regionId, destDataNode, destDataNode);
+    migrateRegionReq.setNewLeaderNode(newLeaderNode.get());
+
+    // send to new leader
+    status =
+        SyncDataNodeClientPool.getInstance()
+            .sendSyncRequestToDataNodeWithRetry(
+                newLeaderNode.get().getInternalEndPoint(),
+                migrateRegionReq,
+                DataNodeRequestType.ADD_REGION_PEER);
+    LOGGER.info(
+        "Send region {} migrate action to {}, wait it finished",
+        regionId,
+        destDataNode.getInternalEndPoint());
+    return status;
+  }
+
+  /**
+   * Send to DataNode, remove region
+   *
+   * @param originalDataNode old location data node
+   * @param destDataNode dest data node
+   * @param regionId region id
+   * @return migrate status
+   */
+  public TSStatus removeRegionPeer(
+      TDataNodeLocation originalDataNode,
+      TDataNodeLocation destDataNode,
+      TConsensusGroupId regionId) {
+    TSStatus status;
+    Optional<TDataNodeLocation> newLeaderNode = findNewLeader(regionId, originalDataNode);

Review Comment:
   Why should we findNewLeader here since we've executed changeLeader at an earlier step in RemoveDataNodeProcedure?



##########
confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java:
##########
@@ -135,29 +136,99 @@ public TDataNodeLocation findDestDataNode(TConsensusGroupId regionId) {
   }
 
   /**
-   * Send to DataNode, migrate region from originalDataNode to destDataNode
+   * Send to DataNode, add region peer
    *
    * @param originalDataNode old location data node
    * @param destDataNode dest data node
    * @param regionId region id
    * @return migrate status
    */
-  public TSStatus migrateRegion(
+  public TSStatus addRegionPeer(
       TDataNodeLocation originalDataNode,
       TDataNodeLocation destDataNode,
       TConsensusGroupId regionId) {
     TSStatus status;
-    List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
-    if (regionReplicaNodes.isEmpty()) {
-      LOGGER.warn("Not find region replica nodes, region: {}", regionId);
+    Optional<TDataNodeLocation> newLeaderNode = findNewLeader(regionId, originalDataNode);
+    if (!newLeaderNode.isPresent()) {
+      LOGGER.warn(
+          "No other Node to change region leader, check by show regions, region: {}", regionId);
       status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
-      status.setMessage("not find region replica nodes, region: " + regionId);
+      status.setMessage("No other Node to change region leader, check by show regions");
       return status;
     }
 
-    // TODO if region replica is 1, the new leader is null, it also need to migrate
-    Optional<TDataNodeLocation> newLeaderNode =
-        regionReplicaNodes.stream().filter(e -> !e.equals(originalDataNode)).findAny();
+    TMigrateRegionReq migrateRegionReq =
+        new TMigrateRegionReq(regionId, destDataNode, destDataNode);

Review Comment:
   Why the parameter _destDataNode_ occurs twice here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org