You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/11/07 06:47:05 UTC

[iotdb] branch master updated: [IOTDB-4857] Fix the problem when remove-datanode for ratis 1 replica (#7917)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b1c57b6d8 [IOTDB-4857] Fix the problem when remove-datanode for ratis 1 replica (#7917)
1b1c57b6d8 is described below

commit 1b1c57b6d8160b2b3e44a6d55abd845e18e96dc8
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Mon Nov 7 14:47:00 2022 +0800

    [IOTDB-4857] Fix the problem when remove-datanode for ratis 1 replica (#7917)
---
 .../confignode/persistence/node/NodeInfo.java      |  5 +-
 .../persistence/partition/PartitionInfo.java       |  9 +--
 .../partition/StorageGroupPartitionTable.java      | 19 ++++--
 .../procedure/env/DataNodeRemoveHandler.java       | 74 ++++++++++------------
 .../impl/node/RemoveDataNodeProcedure.java         |  9 ++-
 .../impl/statemachine/RegionMigrateProcedure.java  |  9 +--
 .../iotdb/db/service/RegionMigrateService.java     | 31 ++++++---
 7 files changed, 94 insertions(+), 62 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
index d03fffa7f9..2526abbad6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
@@ -168,13 +168,14 @@ public class NodeInfo implements SnapshotProcessor {
         "{}, There are {} data node in cluster before executed remove-datanode.sh",
         REMOVE_DATANODE_PROCESS,
         registeredDataNodes.size());
+
+    dataNodeInfoReadWriteLock.writeLock().lock();
     try {
-      dataNodeInfoReadWriteLock.writeLock().lock();
       req.getDataNodeLocations()
           .forEach(
               removeDataNodes -> {
                 registeredDataNodes.remove(removeDataNodes.getDataNodeId());
-                LOGGER.info("removed the datanode {} from cluster", removeDataNodes);
+                LOGGER.info("Removed the datanode {} from cluster", removeDataNodes);
               });
     } finally {
       dataNodeInfoReadWriteLock.writeLock().unlock();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index 812393b0a2..84115faae2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -470,7 +470,7 @@ public class PartitionInfo implements SnapshotProcessor {
   }
 
   /**
-   * update a region location
+   * Update the location info of given regionId
    *
    * @param req UpdateRegionLocationReq
    * @return TSStatus
@@ -480,9 +480,10 @@ public class PartitionInfo implements SnapshotProcessor {
     TConsensusGroupId regionId = req.getRegionId();
     TDataNodeLocation oldNode = req.getOldNode();
     TDataNodeLocation newNode = req.getNewNode();
-    storageGroupPartitionTables
-        .values()
-        .forEach(s -> s.updateRegionLocation(regionId, oldNode, newNode));
+    storageGroupPartitionTables.values().stream()
+        .filter(sgPartitionTable -> sgPartitionTable.containRegion(regionId))
+        .forEach(
+            sgPartitionTable -> sgPartitionTable.updateRegionLocation(regionId, oldNode, newNode));
 
     return status;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
index 5ffd0881f5..b763fded02 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
@@ -432,11 +432,18 @@ public class StorageGroupPartitionTable {
   private void addRegionNewLocation(TConsensusGroupId regionId, TDataNodeLocation node) {
     RegionGroup regionGroup = regionGroupMap.get(regionId);
     if (regionGroup == null) {
-      LOGGER.warn("not find Region Group for region {}", regionId);
+      LOGGER.warn(
+          "Cannot find RegionGroup for region {} when addRegionNewLocation in {}",
+          regionId,
+          storageGroupName);
       return;
     }
     if (regionGroup.getReplicaSet().getDataNodeLocations().contains(node)) {
-      LOGGER.info("Node is already in region locations, node: {}, region: {}", node, regionId);
+      LOGGER.info(
+          "Node is already in region locations when addRegionNewLocation in {}, node: {}, region: {}",
+          storageGroupName,
+          node,
+          regionId);
       return;
     }
     regionGroup.getReplicaSet().getDataNodeLocations().add(node);
@@ -445,12 +452,16 @@ public class StorageGroupPartitionTable {
   private void removeRegionOldLocation(TConsensusGroupId regionId, TDataNodeLocation node) {
     RegionGroup regionGroup = regionGroupMap.get(regionId);
     if (regionGroup == null) {
-      LOGGER.warn("not find Region Group for region {}", regionId);
+      LOGGER.warn(
+          "Cannot find RegionGroup for region {} when removeRegionOldLocation in {}",
+          regionId,
+          storageGroupName);
       return;
     }
     if (!regionGroup.getReplicaSet().getDataNodeLocations().contains(node)) {
       LOGGER.info(
-          "Node is Not in region locations, no need to remove it, node: {}, region: {}",
+          "Node is not in region locations when removeRegionOldLocation in {}, no need to remove it, node: {}, region: {}",
+          storageGroupName,
           node,
           regionId);
       return;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index b4d4394dba..2e29166188 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -138,11 +138,11 @@ public class DataNodeRemoveHandler {
    */
   public TDataNodeLocation findDestDataNode(TConsensusGroupId regionId) {
     TSStatus status;
-    List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
+    List<TDataNodeLocation> regionReplicaNodes = findRegionLocations(regionId);
     if (regionReplicaNodes.isEmpty()) {
-      LOGGER.warn("Not find region replica nodes, region: {}", regionId);
+      LOGGER.warn("Cannot find region replica nodes, region: {}", regionId);
       status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
-      status.setMessage("Not find region replica nodes, region: " + regionId);
+      status.setMessage("Cannot find region replica nodes, region: " + regionId);
       return null;
     }
 
@@ -167,10 +167,10 @@ public class DataNodeRemoveHandler {
    */
   public TSStatus createNewRegionPeer(TConsensusGroupId regionId, TDataNodeLocation destDataNode) {
     TSStatus status;
-    List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
+    List<TDataNodeLocation> regionReplicaNodes = findRegionLocations(regionId);
     if (regionReplicaNodes.isEmpty()) {
       LOGGER.warn(
-          "{}, Not find region replica nodes in createPeer, regionId: {}",
+          "{}, Cannot find region replica nodes in createPeer, regionId: {}",
           REMOVE_DATANODE_PROCESS,
           regionId);
       status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
@@ -249,11 +249,11 @@ public class DataNodeRemoveHandler {
                 maintainPeerReq,
                 DataNodeRequestType.ADD_REGION_PEER);
     LOGGER.info(
-        "{}, Send action addRegionPeer finished, regionId: {}, rpcDataNode: {}, destDataNode: {}",
+        "{}, Send action addRegionPeer finished, regionId: {}, rpcDataNode: {},  destDataNode: {}",
         REMOVE_DATANODE_PROCESS,
         regionId,
         getIdWithRpcEndpoint(selectedDataNode.get()),
-        destDataNode);
+        getIdWithRpcEndpoint(destDataNode));
     return status;
   }
 
@@ -354,41 +354,40 @@ public class DataNodeRemoveHandler {
       TDataNodeLocation originalDataNode,
       TDataNodeLocation destDataNode) {
     LOGGER.info(
-        "Start to update region {} location from {} to {} when it migrate succeed",
+        "Start to updateRegionLocationCache {} location from {} to {} when it migrate succeed",
         regionId,
-        originalDataNode.getInternalEndPoint().getIp(),
-        destDataNode.getInternalEndPoint().getIp());
+        getIdWithRpcEndpoint(originalDataNode),
+        getIdWithRpcEndpoint(destDataNode));
     UpdateRegionLocationPlan req =
         new UpdateRegionLocationPlan(regionId, originalDataNode, destDataNode);
     TSStatus status = configManager.getPartitionManager().updateRegionLocation(req);
     LOGGER.info(
-        "Update region {} location finished, result:{}, old:{}, new:{}",
+        "UpdateRegionLocationCache finished, region:{}, result:{}, old:{}, new:{}",
         regionId,
         status,
-        originalDataNode.getInternalEndPoint().getIp(),
-        destDataNode.getInternalEndPoint().getIp());
+        getIdWithRpcEndpoint(originalDataNode),
+        getIdWithRpcEndpoint(destDataNode));
+
     // Broadcast the latest RegionRouteMap when Region migration finished
     configManager.getLoadManager().broadcastLatestRegionRouteMap();
   }
 
   /**
-   * Find region replication Nodes
+   * Find all DataNodes which contains the given regionId
    *
    * @param regionId region id
-   * @return data node location
+   * @return DataNode locations
    */
-  public List<TDataNodeLocation> findRegionReplicaNodes(TConsensusGroupId regionId) {
-    // Through consensus?
-    List<TRegionReplicaSet> regionReplicaSets =
+  public List<TDataNodeLocation> findRegionLocations(TConsensusGroupId regionId) {
+    Optional<TRegionReplicaSet> regionReplicaSet =
         configManager.getPartitionManager().getAllReplicaSets().stream()
             .filter(rg -> rg.regionId.equals(regionId))
-            .collect(Collectors.toList());
-    if (regionReplicaSets.isEmpty()) {
-      LOGGER.warn("not find TRegionReplica for region: {}", regionId);
-      return Collections.emptyList();
+            .findAny();
+    if (regionReplicaSet.isPresent()) {
+      return regionReplicaSet.get().getDataNodeLocations();
     }
 
-    return regionReplicaSets.get(0).getDataNodeLocations();
+    return Collections.emptyList();
   }
 
   private Optional<TDataNodeLocation> pickNewReplicaNodeForRegion(
@@ -413,7 +412,7 @@ public class DataNodeRemoveHandler {
    * @param dataNode old data node
    */
   public void stopDataNode(TDataNodeLocation dataNode) {
-    LOGGER.info("{}, Begin to stop Data Node {}", REMOVE_DATANODE_PROCESS, dataNode);
+    LOGGER.info("{}, Begin to stop DataNode {}", REMOVE_DATANODE_PROCESS, dataNode);
     AsyncDataNodeClientPool.getInstance().resetClient(dataNode.getInternalEndPoint());
     TSStatus status =
         SyncDataNodeClientPool.getInstance()
@@ -572,31 +571,28 @@ public class DataNodeRemoveHandler {
    */
   private Optional<TDataNodeLocation> filterDataNodeWithOtherRegionReplica(
       TConsensusGroupId regionId, TDataNodeLocation filterLocation) {
-    List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
-    if (regionReplicaNodes.isEmpty()) {
-      LOGGER.warn("Not find region replica nodes, region: {}", regionId);
+    List<TDataNodeLocation> regionLocations = findRegionLocations(regionId);
+    if (regionLocations.isEmpty()) {
+      LOGGER.warn("Cannot find DataNodes contain the given region: {}", regionId);
       return Optional.empty();
     }
 
+    // Choosing the RUNNING DataNodes to execute firstly
+    // If all DataNodes are not RUNNING, then choose the REMOVING DataNodes secondly
     List<TDataNodeLocation> aliveDataNodes =
         configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream()
             .map(TDataNodeConfiguration::getLocation)
             .collect(Collectors.toList());
 
-    // filter the RUNNING datanode firstly
-    // if all the datanodes are not in RUNNING status, choose the REMOVING datanode
-    // because REMOVING datanode is also alive, it can execute rpc request
-    if (aliveDataNodes.isEmpty()) {
-      aliveDataNodes =
-          configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Removing).stream()
-              .map(TDataNodeConfiguration::getLocation)
-              .collect(Collectors.toList());
-    }
+    aliveDataNodes.addAll(
+        configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Removing).stream()
+            .map(TDataNodeConfiguration::getLocation)
+            .collect(Collectors.toList()));
 
     // TODO return the node which has lowest load.
-    for (TDataNodeLocation regionReplicaNode : regionReplicaNodes) {
-      if (aliveDataNodes.contains(regionReplicaNode) && !regionReplicaNode.equals(filterLocation)) {
-        return Optional.of(regionReplicaNode);
+    for (TDataNodeLocation aliveDataNode : aliveDataNodes) {
+      if (regionLocations.contains(aliveDataNode) && !aliveDataNode.equals(filterLocation)) {
+        return Optional.of(aliveDataNode);
       }
     }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
index 235a6a7376..6b7272c36f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
@@ -95,6 +95,7 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
           setNextState(RemoveDataNodeState.STOP_DATA_NODE);
           break;
         case STOP_DATA_NODE:
+          // TODO if region migrate is failed, don't execute STOP_DATA_NODE
           env.getDataNodeRemoveHandler().removeDataNodePersistence(disableDataNodeLocation);
           env.getDataNodeRemoveHandler().stopDataNode(disableDataNodeLocation);
           return Flow.NO_MORE_STATE;
@@ -195,7 +196,13 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
             RegionMigrateProcedure regionMigrateProcedure =
                 new RegionMigrateProcedure(regionId, disableDataNodeLocation, destDataNode);
             addChildProcedure(regionMigrateProcedure);
-            LOG.info("Submit child procedure, {}", regionMigrateProcedure);
+            LOG.info("Submit child procedure {} for regionId {}", regionMigrateProcedure, regionId);
+          } else {
+            LOG.error(
+                "{}, Cannot find target DataNode to remove the region: {}",
+                REMOVE_DATANODE_PROCESS,
+                regionId);
+            // TODO terminate all the uncompleted remove datanode process
           }
         });
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
index 5a33553988..27e0d120b0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
@@ -40,6 +40,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
+import static org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler.getIdWithRpcEndpoint;
 import static org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS;
 
 /** region migrate procedure */
@@ -142,10 +143,10 @@ public class RegionMigrateProcedure
         setFailure(new ProcedureException("Region migrate failed at state: " + state));
       } else {
         LOG.error(
-            "{}, Failed state is not support rollback, failed state {}, originalDataNode: {}",
+            "{}, Failed state [{}] is not support rollback, originalDataNode: {}",
             REMOVE_DATANODE_PROCESS,
             state,
-            originalDataNode);
+            getIdWithRpcEndpoint(originalDataNode));
         if (getCycles() > RETRY_THRESHOLD) {
           setFailure(
               new ProcedureException(
@@ -283,7 +284,7 @@ public class RegionMigrateProcedure
   public void notifyTheRegionMigrateFinished(TRegionMigrateResultReportReq req) {
 
     LOG.info(
-        "{}, ConfigNode received DataNode reported region migrate result: {}",
+        "{}, ConfigNode received region migrate result reported by DataNode: {}",
         REMOVE_DATANODE_PROCESS,
         req);
 
@@ -293,7 +294,7 @@ public class RegionMigrateProcedure
       // migrate failed
       if (migrateStatus.getCode() != SUCCESS_STATUS.getStatusCode()) {
         LOG.info(
-            "{}, Region migrate executed failed in DataNode, migrateStatus: {}",
+            "{}, Region migrate failed in DataNode, migrateStatus: {}",
             REMOVE_DATANODE_PROCESS,
             migrateStatus);
         migrateSuccess = false;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
index a1f73fdb14..651ed99ac3 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
@@ -52,7 +52,9 @@ import java.util.Map;
 public class RegionMigrateService implements IService {
   private static final Logger LOGGER = LoggerFactory.getLogger(RegionMigrateService.class);
 
-  private static final int RETRY = 5;
+  public static final String REMOVE_DATANODE_PROCESS = "[REMOVE_DATANODE_PROCESS]";
+
+  private static final int MAX_RETRY_NUM = 5;
 
   private static final int SLEEP_MILLIS = 5000;
 
@@ -172,7 +174,7 @@ public class RegionMigrateService implements IService {
     @Override
     public void start() {
       if (this.pool != null) {
-        poolLogger.info("Data Node region migrate pool start");
+        poolLogger.info("DataNode region migrate pool start");
       }
     }
 
@@ -266,7 +268,7 @@ public class RegionMigrateService implements IService {
       TEndPoint newPeerNode = getConsensusEndPoint(selectedDataNode, regionId);
       taskLogger.info("Start to add peer {} for region {}", newPeerNode, tRegionId);
       boolean addPeerSucceed = true;
-      for (int i = 0; i < RETRY; i++) {
+      for (int i = 0; i < MAX_RETRY_NUM; i++) {
         try {
           if (!addPeerSucceed) {
             Thread.sleep(SLEEP_MILLIS);
@@ -278,7 +280,12 @@ public class RegionMigrateService implements IService {
         } catch (Throwable e) {
           addPeerSucceed = false;
           taskLogger.error(
-              "Add new peer {} for region {} error, retry times: {}", newPeerNode, regionId, i, e);
+              "{}, Add new peer {} for region {} error, retry times: {}",
+              REMOVE_DATANODE_PROCESS,
+              newPeerNode,
+              regionId,
+              i,
+              e);
           status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
           status.setMessage(
               String.format(
@@ -291,7 +298,11 @@ public class RegionMigrateService implements IService {
       }
       if (!addPeerSucceed || resp == null || !resp.isSuccess()) {
         taskLogger.error(
-            "Add new peer {} for region {} failed, resp: {}", newPeerNode, regionId, resp);
+            "{}, Add new peer {} for region {} failed, resp: {}",
+            REMOVE_DATANODE_PROCESS,
+            newPeerNode,
+            regionId,
+            resp);
         status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
         status.setMessage(
             String.format(
@@ -300,7 +311,11 @@ public class RegionMigrateService implements IService {
         return status;
       }
 
-      taskLogger.info("Succeed to add peer {} for region {}", newPeerNode, regionId);
+      taskLogger.info(
+          "{}, Succeed to add peer {} for region {}",
+          REMOVE_DATANODE_PROCESS,
+          newPeerNode,
+          regionId);
       status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
       status.setMessage("add peer " + newPeerNode + " for region " + regionId + " succeed");
       return status;
@@ -375,7 +390,7 @@ public class RegionMigrateService implements IService {
       taskLogger.info("Start to remove peer {} for region {}", oldPeerNode, regionId);
       ConsensusGenericResponse resp = null;
       boolean removePeerSucceed = true;
-      for (int i = 0; i < RETRY; i++) {
+      for (int i = 0; i < MAX_RETRY_NUM; i++) {
         try {
           if (!removePeerSucceed) {
             Thread.sleep(SLEEP_MILLIS);
@@ -387,7 +402,7 @@ public class RegionMigrateService implements IService {
         } catch (Throwable e) {
           removePeerSucceed = false;
           taskLogger.error(
-              "remove peer {} for region {} error, retry times: {}", oldPeerNode, regionId, i, e);
+              "Remove peer {} for region {} error, retry times: {}", oldPeerNode, regionId, i, e);
           status.setCode(TSStatusCode.REGION_MIGRATE_FAILED.getStatusCode());
           status.setMessage(
               "remove peer: "