You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/10/26 12:18:47 UTC

[iotdb] 01/01: add ADD_NEW_NODE impl for ConfigNodeProcedure

This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/confignode_ratis_addNewNodeToExistedGroup
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 03aad5ff0d0faf221fdb7f78787a27f81bfe1a51
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Wed Oct 26 20:18:33 2022 +0800

    add ADD_NEW_NODE impl for ConfigNodeProcedure
---
 .../confignode/client/ConfigNodeRequestType.java   |   1 -
 .../client/sync/SyncConfigNodeClientPool.java      |   3 --
 .../iotdb/confignode/manager/ConsensusManager.java |  45 +++++++++++++--------
 .../procedure/env/ConfigNodeProcedureEnv.java      |  36 +++++------------
 .../impl/node/AddConfigNodeProcedure.java          |  40 ++++++------------
 .../procedure/state/AddConfigNodeState.java        |   3 +-
 server/logtest.test                                | Bin 0 -> 75 bytes
 7 files changed, 52 insertions(+), 76 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
index d9ee56ae25..d22f3f276c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.confignode.client;
 
 public enum ConfigNodeRequestType {
-  ADD_CONSENSUS_GROUP,
   NOTIFY_REGISTER_SUCCESS,
   REGISTER_CONFIG_NODE,
   REMOVE_CONFIG_NODE,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
index 30d41a1d7b..7f69bf36fa 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
-import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -74,8 +73,6 @@ public class SyncConfigNodeClientPool {
           case REGISTER_CONFIG_NODE:
             // Only use registerConfigNode when the ConfigNode is first startup.
             return client.registerConfigNode((TConfigNodeRegisterReq) req);
-          case ADD_CONSENSUS_GROUP:
-            return client.addConsensusGroup((TAddConsensusGroupReq) req);
           case NOTIFY_REGISTER_SUCCESS:
             client.notifyRegisterSuccess();
             return null;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index bc266bd89e..3ad6fa2d77 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
@@ -51,6 +52,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 /** ConsensusManager maintains consensus class, request will redirect to consensus layer */
 public class ConsensusManager {
@@ -206,24 +208,35 @@ public class ConsensusManager {
   }
 
   /**
-   * Add new ConfigNode Peer into PartitionRegion
+   * Tell the group to [create a new Peer on new node] and [add this member to join the group].
    *
-   * @param configNodeLocation The new ConfigNode
-   * @throws AddPeerException When addPeer doesn't success
+   * <p>Using this method to replace `createPeer` and `addPeer`.
+   *
+   * @param originalConfigNodes the original members of the existed group
+   * @param newConfigNode the new member
    */
-  public void addConfigNodePeer(TConfigNodeLocation configNodeLocation) throws AddPeerException {
-    boolean result =
-        consensusImpl
-            .addPeer(
-                consensusGroupId,
-                new Peer(
-                    consensusGroupId,
-                    configNodeLocation.getConfigNodeId(),
-                    configNodeLocation.getConsensusEndPoint()))
-            .isSuccess();
-
-    if (!result) {
-      throw new AddPeerException(configNodeLocation);
+  public void addNewNodeToExistedGroup(
+      List<TConfigNodeLocation> originalConfigNodes, TConfigNodeLocation newConfigNode)
+      throws AddPeerException {
+    Peer newPeer =
+        new Peer(
+            consensusGroupId,
+            newConfigNode.getConfigNodeId(),
+            newConfigNode.getConsensusEndPoint());
+
+    List<Peer> originalPeers =
+        originalConfigNodes.stream()
+            .map(
+                node ->
+                    new Peer(consensusGroupId, node.getConfigNodeId(), node.getConsensusEndPoint()))
+            .collect(Collectors.toList());
+
+    ConsensusGenericResponse response =
+        consensusImpl.addNewNodeToExistedGroup(consensusGroupId, newPeer, originalPeers);
+    if (!response.isSuccess()) {
+      LOGGER.error(
+          "Execute addNewNodeToExistedGroup for ConfigNode failed, response: {}", response);
+      throw new AddPeerException(newConfigNode);
     }
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 1467c1156d..2b12431398 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -38,7 +38,6 @@ import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConf
 import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
 import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DeleteStorageGroupPlan;
 import org.apache.iotdb.confignode.consensus.request.write.storagegroup.PreDeleteStorageGroupPlan;
-import org.apache.iotdb.confignode.exception.AddConsensusGroupException;
 import org.apache.iotdb.confignode.exception.AddPeerException;
 import org.apache.iotdb.confignode.exception.StorageGroupNotExistsException;
 import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
@@ -51,7 +50,6 @@ import org.apache.iotdb.confignode.manager.partition.RegionHeartbeatSample;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
 import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
-import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
 import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
@@ -171,35 +169,19 @@ public class ConfigNodeProcedureEnv {
   }
 
   /**
-   * Let the remotely new ConfigNode build the ConsensusGroup
+   * Only ConfigNode leader will invoke this method. Add the new ConfigNode Peer into
+   * PartitionRegionGroup.
    *
-   * @param tConfigNodeLocation New ConfigNode's location
+   * @param newConfigNode The new ConfigNode
+   * @throws AddPeerException When addNewNodeToExistedGroup doesn't success
    */
-  public void addConsensusGroup(TConfigNodeLocation tConfigNodeLocation)
-      throws AddConsensusGroupException {
-    List<TConfigNodeLocation> configNodeLocations =
+  public void addNewNodeToExistedGroup(TConfigNodeLocation newConfigNode) throws AddPeerException {
+    List<TConfigNodeLocation> originalConfigNodes =
         new ArrayList<>(configManager.getNodeManager().getRegisteredConfigNodes());
-    configNodeLocations.add(tConfigNodeLocation);
-    TSStatus status =
-        (TSStatus)
-            SyncConfigNodeClientPool.getInstance()
-                .sendSyncRequestToConfigNodeWithRetry(
-                    tConfigNodeLocation.getInternalEndPoint(),
-                    new TAddConsensusGroupReq(configNodeLocations),
-                    ConfigNodeRequestType.ADD_CONSENSUS_GROUP);
-    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      throw new AddConsensusGroupException(tConfigNodeLocation);
-    }
-  }
 
-  /**
-   * Leader will add the new ConfigNode Peer into PartitionRegion
-   *
-   * @param configNodeLocation The new ConfigNode
-   * @throws AddPeerException When addPeer doesn't success
-   */
-  public void addConfigNodePeer(TConfigNodeLocation configNodeLocation) throws AddPeerException {
-    configManager.getConsensusManager().addConfigNodePeer(configNodeLocation);
+    configManager
+        .getConsensusManager()
+        .addNewNodeToExistedGroup(originalConfigNodes, newConfigNode);
   }
 
   /**
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
index 9d7d0ab760..4dedc6072d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
@@ -58,19 +58,13 @@ public class AddConfigNodeProcedure extends AbstractNodeProcedure<AddConfigNodeS
     try {
       switch (state) {
         case ADD_CONFIG_NODE_PREPARE:
-          setNextState(AddConfigNodeState.CREATE_PEER);
+          setNextState(AddConfigNodeState.ADD_NEW_NODE);
           break;
-        case CREATE_PEER:
-          LOG.info("Executing createPeerForConsensusGroup on {}...", tConfigNodeLocation);
-          env.addConsensusGroup(tConfigNodeLocation);
-          setNextState(AddConfigNodeState.ADD_PEER);
-          LOG.info("Successfully createPeerForConsensusGroup on {}", tConfigNodeLocation);
-          break;
-        case ADD_PEER:
-          LOG.info("Executing addPeer {}...", tConfigNodeLocation);
-          env.addConfigNodePeer(tConfigNodeLocation);
+        case ADD_NEW_NODE:
+          LOG.info("Executing addNewNodeToExistedGroup {}...", tConfigNodeLocation);
+          env.addNewNodeToExistedGroup(tConfigNodeLocation);
           setNextState(AddConfigNodeState.REGISTER_SUCCESS);
-          LOG.info("Successfully addPeer {}", tConfigNodeLocation);
+          LOG.info("Successfully addNewNodeToExistedGroup {}", tConfigNodeLocation);
           break;
         case REGISTER_SUCCESS:
           env.notifyRegisterSuccess(tConfigNodeLocation);
@@ -81,7 +75,7 @@ public class AddConfigNodeProcedure extends AbstractNodeProcedure<AddConfigNodeS
       }
     } catch (Exception e) {
       if (isRollbackSupported(state)) {
-        setFailure(new ProcedureException("Add Config Node failed " + state));
+        setFailure(new ProcedureException("Add ConfigNode failed " + state));
       } else {
         LOG.error(
             "Retrievable error trying to add config node {}, state {}",
@@ -99,26 +93,18 @@ public class AddConfigNodeProcedure extends AbstractNodeProcedure<AddConfigNodeS
   @Override
   protected void rollbackState(ConfigNodeProcedureEnv env, AddConfigNodeState state)
       throws ProcedureException {
-    switch (state) {
-      case CREATE_PEER:
-        env.deleteConfigNodePeer(tConfigNodeLocation);
-        LOG.info("Rollback add consensus group:{}", tConfigNodeLocation);
-        break;
-      case ADD_PEER:
-        env.removeConfigNodePeer(tConfigNodeLocation);
-        LOG.info("Rollback remove peer:{}", tConfigNodeLocation);
-        break;
+    if (state == AddConfigNodeState.ADD_NEW_NODE) {
+      LOG.info("Rollback in AddConfigNodeProcedure, execute RemovePeer: {}", tConfigNodeLocation);
+      env.removeConfigNodePeer(tConfigNodeLocation);
+
+      LOG.info("Rollback in AddConfigNodeProcedure, execute DeletePeer: {}", tConfigNodeLocation);
+      env.deleteConfigNodePeer(tConfigNodeLocation);
     }
   }
 
   @Override
   protected boolean isRollbackSupported(AddConfigNodeState state) {
-    switch (state) {
-      case CREATE_PEER:
-      case ADD_PEER:
-        return true;
-    }
-    return false;
+    return state == AddConfigNodeState.ADD_NEW_NODE;
   }
 
   @Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/AddConfigNodeState.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/AddConfigNodeState.java
index a7f1912609..31c94e300b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/AddConfigNodeState.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/AddConfigNodeState.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.confignode.procedure.state;
 
 public enum AddConfigNodeState {
   ADD_CONFIG_NODE_PREPARE,
-  CREATE_PEER,
-  ADD_PEER,
+  ADD_NEW_NODE,
   REGISTER_SUCCESS
 }
diff --git a/server/logtest.test b/server/logtest.test
new file mode 100644
index 0000000000..a9ee656a7e
Binary files /dev/null and b/server/logtest.test differ