You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/08/24 05:56:35 UTC

[iotdb] 01/01: rename addConsensusGroup to createPeer, removeConsensusGroup to deletePeer in RegionMigrateProcedure

This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/rename_RemoveDataNodeProcedure_state
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e0ca2b05ee78d0026aa41531182cfba922645560
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Wed Aug 24 13:55:56 2022 +0800

    rename addConsensusGroup to createPeer, removeConsensusGroup to deletePeer in RegionMigrateProcedure
---
 .../confignode/client/DataNodeRequestType.java     |  4 +-
 .../sync/datanode/SyncDataNodeClientPool.java      | 10 ++---
 .../procedure/env/DataNodeRemoveHandler.java       | 45 ++++++++++++----------
 .../procedure/impl/RegionMigrateProcedure.java     | 13 +++----
 .../procedure/state/RegionTransitionState.java     |  4 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |  6 +--
 thrift/src/main/thrift/datanode.thrift             | 14 +++----
 7 files changed, 49 insertions(+), 47 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index 04c8347395..bc3b083fae 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -24,10 +24,10 @@ public enum DataNodeRequestType {
   INVALIDATE_PARTITION_CACHE,
   INVALIDATE_PERMISSION_CACHE,
   INVALIDATE_SCHEMA_CACHE,
-  ADD_REGION_CONSENSUS_GROUP,
+  CREATE_PEER,
   ADD_REGION_PEER,
   REMOVE_REGION_PEER,
-  REMOVE_REGION_CONSENSUS_GROUP,
+  DELETE_PEER,
   DISABLE_DATA_NODE,
   STOP_DATA_NODE,
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java
index 85c21f56e3..690acb8e90 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.mpp.rpc.thrift.TAddConsensusGroup;
+import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq;
@@ -85,14 +85,14 @@ public class SyncDataNodeClientPool {
             return client.stopDataNode();
           case UPDATE_TEMPLATE:
             return client.updateTemplate((TUpdateTemplateReq) req);
-          case ADD_REGION_CONSENSUS_GROUP:
-            return client.addToRegionConsensusGroup((TAddConsensusGroup) req);
+          case CREATE_PEER:
+            return client.createPeerToConsensusGroup((TCreatePeerReq) req);
           case ADD_REGION_PEER:
             return client.addRegionPeer((TMigrateRegionReq) req);
           case REMOVE_REGION_PEER:
             return client.removeRegionPeer((TMigrateRegionReq) req);
-          case REMOVE_REGION_CONSENSUS_GROUP:
-            return client.removeToRegionConsensusGroup((TMigrateRegionReq) req);
+          case DELETE_PEER:
+            return client.deletePeerToConsensusGroup((TMigrateRegionReq) req);
           default:
             return RpcUtils.getStatus(
                 TSStatusCode.EXECUTE_STATEMENT_ERROR, "Unknown request type: " + requestType);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index 5227268877..1f3e53a730 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -36,7 +36,7 @@ import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.persistence.NodeInfo;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
-import org.apache.iotdb.mpp.rpc.thrift.TAddConsensusGroup;
+import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq;
 import org.apache.iotdb.mpp.rpc.thrift.TMigrateRegionReq;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -216,14 +216,17 @@ public class DataNodeRemoveHandler {
   }
 
   /**
-   * Send to DataNode, remove region consensus group from originalDataNode node
+   * Send to DataNode, delete peer from originalDataNode node.
    *
-   * @param originalDataNode old location data node
-   * @param destDataNode dest data node
+   * <p>If the originalDataNode is down, we should delete local data and do other cleanup works
+   * manually.
+   *
+   * @param originalDataNode data node where the peer to be deleted locates
+   * @param destDataNode dest data node to be migrated
    * @param regionId region id
    * @return migrate status
    */
-  public TSStatus removeRegionConsensusGroup(
+  public TSStatus deletePeer(
       TDataNodeLocation originalDataNode,
       TDataNodeLocation destDataNode,
       TConsensusGroupId regionId) {
@@ -236,9 +239,9 @@ public class DataNodeRemoveHandler {
             .sendSyncRequestToDataNodeWithRetry(
                 originalDataNode.getInternalEndPoint(),
                 migrateRegionReq,
-                DataNodeRequestType.REMOVE_REGION_CONSENSUS_GROUP);
+                DataNodeRequestType.DELETE_PEER);
     LOGGER.info(
-        "Send region {} remove consensus group action to {}, wait it finished",
+        "Send region {} delete peer action to {}, wait it finished",
         regionId,
         originalDataNode.getInternalEndPoint());
     return status;
@@ -301,43 +304,43 @@ public class DataNodeRemoveHandler {
   }
 
   /**
-   * add region Consensus group in new node
+   * Create a Peer and become a member of the given consensus group.
    *
-   * @param regionId region id
-   * @param destDataNode dest data node
+   * <p>CreatePeer should be called on a node that does not contain any peer of the consensus group,
+   * to avoid one node having more than one replica.
+   *
+   * @param regionId region id, means the given consensus group
+   * @param destDataNode dest data node where the peer creates
    * @return status
    */
-  public TSStatus addNewNodeToRegionConsensusGroup(
-      TConsensusGroupId regionId, TDataNodeLocation destDataNode) {
+  public TSStatus createPeer(TConsensusGroupId regionId, TDataNodeLocation destDataNode) {
     TSStatus status;
     List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
     if (regionReplicaNodes.isEmpty()) {
-      LOGGER.warn("Not find region replica nodes, region: {}", regionId);
+      LOGGER.warn("Not find region replica nodes in createPeer, regionId: {}", regionId);
       status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
-      status.setMessage("not find region replica nodes, region: " + regionId);
+      status.setMessage("Not find region replica nodes in createPeer, regionId: " + regionId);
       return status;
     }
 
     List<TDataNodeLocation> currentPeerNodes = new ArrayList<>(regionReplicaNodes);
     currentPeerNodes.add(destDataNode);
     String storageGroup = configManager.getPartitionManager().getRegionStorageGroup(regionId);
-    TAddConsensusGroup req = new TAddConsensusGroup(regionId, currentPeerNodes, storageGroup);
+    TCreatePeerReq req = new TCreatePeerReq(regionId, currentPeerNodes, storageGroup);
     // TODO replace with real ttl
     req.setTtl(Long.MAX_VALUE);
 
     status =
         SyncDataNodeClientPool.getInstance()
             .sendSyncRequestToDataNodeWithRetry(
-                destDataNode.getInternalEndPoint(),
-                req,
-                DataNodeRequestType.ADD_REGION_CONSENSUS_GROUP);
+                destDataNode.getInternalEndPoint(), req, DataNodeRequestType.CREATE_PEER);
 
-    LOGGER.info("send add region {} consensus group to {}", regionId, destDataNode);
+    LOGGER.info("Send create peer for regionId {} on data node {}", regionId, destDataNode);
     if (isFailed(status)) {
       LOGGER.error(
-          "add new node {} to region {} consensus group failed,  result: {}",
-          destDataNode,
+          "Send create peer for regionId {} on data node {},  result: {}",
           regionId,
+          destDataNode,
           status);
     }
     return status;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java
index dd44b0ac33..937130a636 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java
@@ -78,11 +78,10 @@ public class RegionMigrateProcedure
     try {
       switch (state) {
         case REGION_MIGRATE_PREPARE:
-          setNextState(RegionTransitionState.ADD_NEW_NODE_TO_REGION_CONSENSUS_GROUP);
+          setNextState(RegionTransitionState.CREATE_PEER);
           break;
-        case ADD_NEW_NODE_TO_REGION_CONSENSUS_GROUP:
-          env.getDataNodeRemoveHandler()
-              .addNewNodeToRegionConsensusGroup(consensusGroupId, destDataNode);
+        case CREATE_PEER:
+          env.getDataNodeRemoveHandler().createPeer(consensusGroupId, destDataNode);
           setNextState(RegionTransitionState.ADD_REGION_PEER);
           break;
         case ADD_REGION_PEER:
@@ -111,12 +110,12 @@ public class RegionMigrateProcedure
           } else {
             throw new ProcedureException("Failed to remove region peer");
           }
-          setNextState(RegionTransitionState.REMOVE_REGION_CONSENSUS_GROUP);
+          setNextState(RegionTransitionState.DELETE_PEER);
           break;
-        case REMOVE_REGION_CONSENSUS_GROUP:
+        case DELETE_PEER:
           tsStatus =
               env.getDataNodeRemoveHandler()
-                  .removeRegionConsensusGroup(originalDataNode, destDataNode, consensusGroupId);
+                  .deletePeer(originalDataNode, destDataNode, consensusGroupId);
           if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
             waitForOneMigrationStepFinished(consensusGroupId);
             LOG.info("Wait for region {}  remove consensus group finished", consensusGroupId);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java
index fb0ebe4312..71aacde1f2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java
@@ -21,10 +21,10 @@ package org.apache.iotdb.confignode.procedure.state;
 
 public enum RegionTransitionState {
   REGION_MIGRATE_PREPARE,
-  ADD_NEW_NODE_TO_REGION_CONSENSUS_GROUP,
+  CREATE_PEER,
   ADD_REGION_PEER,
   CHANGE_REGION_LEADER,
   REMOVE_REGION_PEER,
-  REMOVE_REGION_CONSENSUS_GROUP,
+  DELETE_PEER,
   UPDATE_REGION_LOCATION_CACHE
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 0a14bda8e5..046c3b1967 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -77,13 +77,13 @@ import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
 import org.apache.iotdb.metrics.type.Gauge;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
-import org.apache.iotdb.mpp.rpc.thrift.TAddConsensusGroup;
 import org.apache.iotdb.mpp.rpc.thrift.TCancelFragmentInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCancelPlanFragmentReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCancelResp;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
+import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDropFunctionRequest;
@@ -624,7 +624,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
   }
 
   @Override
-  public TSStatus addToRegionConsensusGroup(TAddConsensusGroup req) throws TException {
+  public TSStatus createPeerToConsensusGroup(TCreatePeerReq req) throws TException {
     ConsensusGroupId regionId =
         ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getRegionId());
     List<Peer> peers =
@@ -662,7 +662,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
   }
 
   @Override
-  public TSStatus removeToRegionConsensusGroup(TMigrateRegionReq req) throws TException {
+  public TSStatus deletePeerToConsensusGroup(TMigrateRegionReq req) throws TException {
     TConsensusGroupId regionId = req.getRegionId();
     String fromNodeIp = req.getFromNode().getInternalEndPoint().getIp();
     boolean submitSucceed =
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index 3f74fdac49..dc26980368 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -47,7 +47,7 @@ struct TRegionLeaderChangeReq {
     2: required common.TDataNodeLocation newLeaderNode
 }
 
-struct TAddConsensusGroup {
+struct TCreatePeerReq {
     1: required common.TConsensusGroupId regionId
     2: required list<common.TDataNodeLocation> regionLocations
     3: required string storageGroup
@@ -273,10 +273,10 @@ service IDataNodeRPCService {
   common.TSStatus changeRegionLeader(TRegionLeaderChangeReq req);
 
   /**
-   * Config node will add Data nodes to the region consensus group
-   * @param region id and it's expect locations
+   * Create new peer in the given data node for region consensus group
+   * @param region id and it's expected locations
    */
-  common.TSStatus addToRegionConsensusGroup(TAddConsensusGroup req);
+  common.TSStatus createPeerToConsensusGroup(TCreatePeerReq req);
 
   /**
    * Config node will add a region peer to a region group
@@ -291,11 +291,11 @@ service IDataNodeRPCService {
   common.TSStatus removeRegionPeer(TMigrateRegionReq req);
 
   /**
-   * Config node will remove a region group from this node to newNode. Usually a region group has
+   * Delete the datanode peer for the given consensus group. Usually a region group has
    * multiple replicas, thus relates to multiple nodes.
-   * @param remove consensus group req which region from one node to other node
+   * @param TMigrateRegionReq which contains the dest datanode to be removed
   */
-  common.TSStatus removeToRegionConsensusGroup(TMigrateRegionReq req);
+  common.TSStatus deletePeerToConsensusGroup(TMigrateRegionReq req);
 
   /**
   * Config node will disable the Data node, the Data node will not accept read/write request when disabled