You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/11/05 09:09:16 UTC

[iotdb] branch beyyes/master1 created (now 399a961b8d)

This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a change to branch beyyes/master1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 399a961b8d perfect remove process

This branch includes the following new commits:

     new 399a961b8d perfect remove process

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: perfect remove process

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/master1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 399a961b8da27175ac1f4cd024522a3a1ee9f042
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Sat Nov 5 17:08:13 2022 +0800

    perfect remove process
---
 .../persistence/partition/PartitionInfo.java       |  2 +-
 .../partition/StorageGroupPartitionTable.java      |  6 +-
 .../procedure/env/DataNodeRemoveHandler.java       | 67 +++++++++++-----------
 .../impl/node/RemoveDataNodeProcedure.java         |  6 ++
 4 files changed, 42 insertions(+), 39 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index 812393b0a2..82a461375a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -470,7 +470,7 @@ public class PartitionInfo implements SnapshotProcessor {
   }
 
   /**
-   * update a region location
+   * Update the location info of given regionId
    *
    * @param req UpdateRegionLocationReq
    * @return TSStatus
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
index 5ffd0881f5..5e98a80c3d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
@@ -432,7 +432,7 @@ public class StorageGroupPartitionTable {
   private void addRegionNewLocation(TConsensusGroupId regionId, TDataNodeLocation node) {
     RegionGroup regionGroup = regionGroupMap.get(regionId);
     if (regionGroup == null) {
-      LOGGER.warn("not find Region Group for region {}", regionId);
+      LOGGER.warn("Cannot find RegionGroup in addRegionNewLocation for region {}", regionId);
       return;
     }
     if (regionGroup.getReplicaSet().getDataNodeLocations().contains(node)) {
@@ -445,12 +445,12 @@ public class StorageGroupPartitionTable {
   private void removeRegionOldLocation(TConsensusGroupId regionId, TDataNodeLocation node) {
     RegionGroup regionGroup = regionGroupMap.get(regionId);
     if (regionGroup == null) {
-      LOGGER.warn("not find Region Group for region {}", regionId);
+      LOGGER.warn("Cannot find RegionGroup in removeRegionOldLocation for region {}", regionId);
       return;
     }
     if (!regionGroup.getReplicaSet().getDataNodeLocations().contains(node)) {
       LOGGER.info(
-          "Node is Not in region locations, no need to remove it, node: {}, region: {}",
+          "Node is not in region locations, no need to remove it, node: {}, region: {}",
           node,
           regionId);
       return;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index a5f1cc5b2e..8a48624046 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -138,11 +138,11 @@ public class DataNodeRemoveHandler {
    */
   public TDataNodeLocation findDestDataNode(TConsensusGroupId regionId) {
     TSStatus status;
-    List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
+    List<TDataNodeLocation> regionReplicaNodes = findRegionLocations(regionId);
     if (regionReplicaNodes.isEmpty()) {
-      LOGGER.warn("Not find region replica nodes, region: {}", regionId);
+      LOGGER.warn("Cannot find region replica nodes, region: {}", regionId);
       status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
-      status.setMessage("not find region replica nodes, region: " + regionId);
+      status.setMessage("Cannot find region replica nodes, region: " + regionId);
       return null;
     }
 
@@ -167,10 +167,10 @@ public class DataNodeRemoveHandler {
    */
   public TSStatus createNewRegionPeer(TConsensusGroupId regionId, TDataNodeLocation destDataNode) {
     TSStatus status;
-    List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
+    List<TDataNodeLocation> regionReplicaNodes = findRegionLocations(regionId);
     if (regionReplicaNodes.isEmpty()) {
       LOGGER.warn(
-          "{}, Not find region replica nodes in createPeer, regionId: {}",
+          "{}, Cannot find region replica nodes in createPeer, regionId: {}",
           REMOVE_DATANODE_PROCESS,
           regionId);
       status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
@@ -354,40 +354,40 @@ public class DataNodeRemoveHandler {
       TDataNodeLocation originalDataNode,
       TDataNodeLocation destDataNode) {
     LOGGER.info(
-        "Start to update region {} location from {} to {} when it migrate succeed",
+        "Start to updateRegionLocationCache {} location from {} to {} when it migrate succeed",
         regionId,
-        originalDataNode.getInternalEndPoint().getIp(),
-        destDataNode.getInternalEndPoint().getIp());
+        getIdWithRpcEndpoint(originalDataNode),
+        getIdWithRpcEndpoint(destDataNode));
     UpdateRegionLocationPlan req =
         new UpdateRegionLocationPlan(regionId, originalDataNode, destDataNode);
     TSStatus status = configManager.getPartitionManager().updateRegionLocation(req);
     LOGGER.info(
-        "Update region {} location finished, result:{}, old:{}, new:{}",
+        "UpdateRegionLocationCache finished, region:{}, result:{}, old:{}, new:{}",
         regionId,
         status,
-        originalDataNode.getInternalEndPoint().getIp(),
-        destDataNode.getInternalEndPoint().getIp());
+            getIdWithRpcEndpoint(originalDataNode),
+            getIdWithRpcEndpoint(destDataNode));
+
     // Broadcast the latest RegionRouteMap when Region migration finished
     configManager.getLoadManager().broadcastLatestRegionRouteMap();
   }
 
   /**
-   * Find region replication Nodes
+   * Find all DataNodes which contains the given regionId
    *
    * @param regionId region id
-   * @return data node location
+   * @return DataNode locations
    */
-  public List<TDataNodeLocation> findRegionReplicaNodes(TConsensusGroupId regionId) {
-    List<TRegionReplicaSet> regionReplicaSets =
+  public List<TDataNodeLocation> findRegionLocations(TConsensusGroupId regionId) {
+    Optional<TRegionReplicaSet> regionReplicaSet =
         configManager.getPartitionManager().getAllReplicaSets().stream()
             .filter(rg -> rg.regionId.equals(regionId))
-            .collect(Collectors.toList());
-    if (regionReplicaSets.isEmpty()) {
-      LOGGER.warn("not find TRegionReplica for region: {}", regionId);
-      return Collections.emptyList();
+            .findAny();
+    if (regionReplicaSet.isPresent()) {
+      return regionReplicaSet.get().getDataNodeLocations();
     }
 
-    return regionReplicaSets.get(0).getDataNodeLocations();
+    return Collections.emptyList();
   }
 
   private Optional<TDataNodeLocation> pickNewReplicaNodeForRegion(
@@ -571,31 +571,28 @@ public class DataNodeRemoveHandler {
    */
   private Optional<TDataNodeLocation> filterDataNodeWithOtherRegionReplica(
       TConsensusGroupId regionId, TDataNodeLocation filterLocation) {
-    List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
-    if (regionReplicaNodes.isEmpty()) {
-      LOGGER.warn("Not find region replica nodes, region: {}", regionId);
+    List<TDataNodeLocation> regionLocations = findRegionLocations(regionId);
+    if (regionLocations.isEmpty()) {
+      LOGGER.warn("Cannot find DataNodes contain the given region: {}", regionId);
       return Optional.empty();
     }
 
+    // Choosing the RUNNING DataNodes to execute firstly
+    // If all DataNodes are not RUNNING, then choose the REMOVING DataNodes secondly
     List<TDataNodeLocation> aliveDataNodes =
         configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream()
             .map(TDataNodeConfiguration::getLocation)
             .collect(Collectors.toList());
 
-    // filter the RUNNING datanode firstly
-    // if all the datanodes are not in RUNNING status, choose the REMOVING datanode
-    // because REMOVING datanode is also alive, it can execute rpc request
-    if (aliveDataNodes.isEmpty()) {
-      aliveDataNodes =
-          configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Removing).stream()
-              .map(TDataNodeConfiguration::getLocation)
-              .collect(Collectors.toList());
-    }
+    aliveDataNodes.addAll(
+        configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Removing).stream()
+            .map(TDataNodeConfiguration::getLocation)
+            .collect(Collectors.toList()));
 
     // TODO return the node which has lowest load.
-    for (TDataNodeLocation regionReplicaNode : regionReplicaNodes) {
-      if (aliveDataNodes.contains(regionReplicaNode) && !regionReplicaNode.equals(filterLocation)) {
-        return Optional.of(regionReplicaNode);
+    for (TDataNodeLocation aliveDataNode : aliveDataNodes) {
+      if (regionLocations.contains(aliveDataNode) && !aliveDataNode.equals(filterLocation)) {
+        return Optional.of(aliveDataNode);
       }
     }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
index f8c6f7b83c..fe59a56d97 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
@@ -196,6 +196,12 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
                 new RegionMigrateProcedure(regionId, disableDataNodeLocation, destDataNode);
             addChildProcedure(regionMigrateProcedure);
             LOG.info("Submit child procedure {} for regionId {}", regionMigrateProcedure, regionId);
+          } else {
+            LOG.error(
+                "{}, Cannot find target DataNode to remove the region: {}",
+                REMOVE_DATANODE_PROCESS,
+                regionId);
+            // TODO terminate all the uncompleted remove datanode process
           }
         });
   }