You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/10/26 12:18:46 UTC

[iotdb] branch beyyes/confignode_ratis_addNewNodeToExistedGroup created (now 03aad5ff0d)

This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a change to branch beyyes/confignode_ratis_addNewNodeToExistedGroup
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 03aad5ff0d add ADD_NEW_NODE impl for ConfigNodeProcedure

This branch includes the following new commits:

     new 03aad5ff0d add ADD_NEW_NODE impl for ConfigNodeProcedure

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: add ADD_NEW_NODE impl for ConfigNodeProcedure

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/confignode_ratis_addNewNodeToExistedGroup
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 03aad5ff0d0faf221fdb7f78787a27f81bfe1a51
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Wed Oct 26 20:18:33 2022 +0800

    add ADD_NEW_NODE impl for ConfigNodeProcedure
---
 .../confignode/client/ConfigNodeRequestType.java   |   1 -
 .../client/sync/SyncConfigNodeClientPool.java      |   3 --
 .../iotdb/confignode/manager/ConsensusManager.java |  45 +++++++++++++--------
 .../procedure/env/ConfigNodeProcedureEnv.java      |  36 +++++------------
 .../impl/node/AddConfigNodeProcedure.java          |  40 ++++++------------
 .../procedure/state/AddConfigNodeState.java        |   3 +-
 server/logtest.test                                | Bin 0 -> 75 bytes
 7 files changed, 52 insertions(+), 76 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
index d9ee56ae25..d22f3f276c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.confignode.client;
 
 public enum ConfigNodeRequestType {
-  ADD_CONSENSUS_GROUP,
   NOTIFY_REGISTER_SUCCESS,
   REGISTER_CONFIG_NODE,
   REMOVE_CONFIG_NODE,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
index 30d41a1d7b..7f69bf36fa 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
-import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -74,8 +73,6 @@ public class SyncConfigNodeClientPool {
           case REGISTER_CONFIG_NODE:
             // Only use registerConfigNode when the ConfigNode is first startup.
             return client.registerConfigNode((TConfigNodeRegisterReq) req);
-          case ADD_CONSENSUS_GROUP:
-            return client.addConsensusGroup((TAddConsensusGroupReq) req);
           case NOTIFY_REGISTER_SUCCESS:
             client.notifyRegisterSuccess();
             return null;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index bc266bd89e..3ad6fa2d77 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
@@ -51,6 +52,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 /** ConsensusManager maintains consensus class, request will redirect to consensus layer */
 public class ConsensusManager {
@@ -206,24 +208,35 @@ public class ConsensusManager {
   }
 
   /**
-   * Add new ConfigNode Peer into PartitionRegion
+   * Tell the group to [create a new Peer on new node] and [add this member to join the group].
    *
-   * @param configNodeLocation The new ConfigNode
-   * @throws AddPeerException When addPeer doesn't success
+   * <p>Using this method to replace `createPeer` and `addPeer`.
+   *
+   * @param originalConfigNodes the original members of the existed group
+   * @param newConfigNode the new member
    */
-  public void addConfigNodePeer(TConfigNodeLocation configNodeLocation) throws AddPeerException {
-    boolean result =
-        consensusImpl
-            .addPeer(
-                consensusGroupId,
-                new Peer(
-                    consensusGroupId,
-                    configNodeLocation.getConfigNodeId(),
-                    configNodeLocation.getConsensusEndPoint()))
-            .isSuccess();
-
-    if (!result) {
-      throw new AddPeerException(configNodeLocation);
+  public void addNewNodeToExistedGroup(
+      List<TConfigNodeLocation> originalConfigNodes, TConfigNodeLocation newConfigNode)
+      throws AddPeerException {
+    Peer newPeer =
+        new Peer(
+            consensusGroupId,
+            newConfigNode.getConfigNodeId(),
+            newConfigNode.getConsensusEndPoint());
+
+    List<Peer> originalPeers =
+        originalConfigNodes.stream()
+            .map(
+                node ->
+                    new Peer(consensusGroupId, node.getConfigNodeId(), node.getConsensusEndPoint()))
+            .collect(Collectors.toList());
+
+    ConsensusGenericResponse response =
+        consensusImpl.addNewNodeToExistedGroup(consensusGroupId, newPeer, originalPeers);
+    if (!response.isSuccess()) {
+      LOGGER.error(
+          "Execute addNewNodeToExistedGroup for ConfigNode failed, response: {}", response);
+      throw new AddPeerException(newConfigNode);
     }
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 1467c1156d..2b12431398 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -38,7 +38,6 @@ import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConf
 import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
 import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DeleteStorageGroupPlan;
 import org.apache.iotdb.confignode.consensus.request.write.storagegroup.PreDeleteStorageGroupPlan;
-import org.apache.iotdb.confignode.exception.AddConsensusGroupException;
 import org.apache.iotdb.confignode.exception.AddPeerException;
 import org.apache.iotdb.confignode.exception.StorageGroupNotExistsException;
 import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
@@ -51,7 +50,6 @@ import org.apache.iotdb.confignode.manager.partition.RegionHeartbeatSample;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
 import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
-import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
 import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
@@ -171,35 +169,19 @@ public class ConfigNodeProcedureEnv {
   }
 
   /**
-   * Let the remotely new ConfigNode build the ConsensusGroup
+   * Only ConfigNode leader will invoke this method. Add the new ConfigNode Peer into
+   * PartitionRegionGroup.
    *
-   * @param tConfigNodeLocation New ConfigNode's location
+   * @param newConfigNode The new ConfigNode
+   * @throws AddPeerException When addNewNodeToExistedGroup doesn't success
    */
-  public void addConsensusGroup(TConfigNodeLocation tConfigNodeLocation)
-      throws AddConsensusGroupException {
-    List<TConfigNodeLocation> configNodeLocations =
+  public void addNewNodeToExistedGroup(TConfigNodeLocation newConfigNode) throws AddPeerException {
+    List<TConfigNodeLocation> originalConfigNodes =
         new ArrayList<>(configManager.getNodeManager().getRegisteredConfigNodes());
-    configNodeLocations.add(tConfigNodeLocation);
-    TSStatus status =
-        (TSStatus)
-            SyncConfigNodeClientPool.getInstance()
-                .sendSyncRequestToConfigNodeWithRetry(
-                    tConfigNodeLocation.getInternalEndPoint(),
-                    new TAddConsensusGroupReq(configNodeLocations),
-                    ConfigNodeRequestType.ADD_CONSENSUS_GROUP);
-    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      throw new AddConsensusGroupException(tConfigNodeLocation);
-    }
-  }
 
-  /**
-   * Leader will add the new ConfigNode Peer into PartitionRegion
-   *
-   * @param configNodeLocation The new ConfigNode
-   * @throws AddPeerException When addPeer doesn't success
-   */
-  public void addConfigNodePeer(TConfigNodeLocation configNodeLocation) throws AddPeerException {
-    configManager.getConsensusManager().addConfigNodePeer(configNodeLocation);
+    configManager
+        .getConsensusManager()
+        .addNewNodeToExistedGroup(originalConfigNodes, newConfigNode);
   }
 
   /**
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
index 9d7d0ab760..4dedc6072d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
@@ -58,19 +58,13 @@ public class AddConfigNodeProcedure extends AbstractNodeProcedure<AddConfigNodeS
     try {
       switch (state) {
         case ADD_CONFIG_NODE_PREPARE:
-          setNextState(AddConfigNodeState.CREATE_PEER);
+          setNextState(AddConfigNodeState.ADD_NEW_NODE);
           break;
-        case CREATE_PEER:
-          LOG.info("Executing createPeerForConsensusGroup on {}...", tConfigNodeLocation);
-          env.addConsensusGroup(tConfigNodeLocation);
-          setNextState(AddConfigNodeState.ADD_PEER);
-          LOG.info("Successfully createPeerForConsensusGroup on {}", tConfigNodeLocation);
-          break;
-        case ADD_PEER:
-          LOG.info("Executing addPeer {}...", tConfigNodeLocation);
-          env.addConfigNodePeer(tConfigNodeLocation);
+        case ADD_NEW_NODE:
+          LOG.info("Executing addNewNodeToExistedGroup {}...", tConfigNodeLocation);
+          env.addNewNodeToExistedGroup(tConfigNodeLocation);
           setNextState(AddConfigNodeState.REGISTER_SUCCESS);
-          LOG.info("Successfully addPeer {}", tConfigNodeLocation);
+          LOG.info("Successfully addNewNodeToExistedGroup {}", tConfigNodeLocation);
           break;
         case REGISTER_SUCCESS:
           env.notifyRegisterSuccess(tConfigNodeLocation);
@@ -81,7 +75,7 @@ public class AddConfigNodeProcedure extends AbstractNodeProcedure<AddConfigNodeS
       }
     } catch (Exception e) {
       if (isRollbackSupported(state)) {
-        setFailure(new ProcedureException("Add Config Node failed " + state));
+        setFailure(new ProcedureException("Add ConfigNode failed " + state));
       } else {
         LOG.error(
             "Retrievable error trying to add config node {}, state {}",
@@ -99,26 +93,18 @@ public class AddConfigNodeProcedure extends AbstractNodeProcedure<AddConfigNodeS
   @Override
   protected void rollbackState(ConfigNodeProcedureEnv env, AddConfigNodeState state)
       throws ProcedureException {
-    switch (state) {
-      case CREATE_PEER:
-        env.deleteConfigNodePeer(tConfigNodeLocation);
-        LOG.info("Rollback add consensus group:{}", tConfigNodeLocation);
-        break;
-      case ADD_PEER:
-        env.removeConfigNodePeer(tConfigNodeLocation);
-        LOG.info("Rollback remove peer:{}", tConfigNodeLocation);
-        break;
+    if (state == AddConfigNodeState.ADD_NEW_NODE) {
+      LOG.info("Rollback in AddConfigNodeProcedure, execute RemovePeer: {}", tConfigNodeLocation);
+      env.removeConfigNodePeer(tConfigNodeLocation);
+
+      LOG.info("Rollback in AddConfigNodeProcedure, execute DeletePeer: {}", tConfigNodeLocation);
+      env.deleteConfigNodePeer(tConfigNodeLocation);
     }
   }
 
   @Override
   protected boolean isRollbackSupported(AddConfigNodeState state) {
-    switch (state) {
-      case CREATE_PEER:
-      case ADD_PEER:
-        return true;
-    }
-    return false;
+    return state == AddConfigNodeState.ADD_NEW_NODE;
   }
 
   @Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/AddConfigNodeState.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/AddConfigNodeState.java
index a7f1912609..31c94e300b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/AddConfigNodeState.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/AddConfigNodeState.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.confignode.procedure.state;
 
 public enum AddConfigNodeState {
   ADD_CONFIG_NODE_PREPARE,
-  CREATE_PEER,
-  ADD_PEER,
+  ADD_NEW_NODE,
   REGISTER_SUCCESS
 }
diff --git a/server/logtest.test b/server/logtest.test
new file mode 100644
index 0000000000..a9ee656a7e
Binary files /dev/null and b/server/logtest.test differ