You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/10/26 12:18:47 UTC
[iotdb] 01/01: add ADD_NEW_NODE impl for ConfigNodeProcedure
This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch beyyes/confignode_ratis_addNewNodeToExistedGroup
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 03aad5ff0d0faf221fdb7f78787a27f81bfe1a51
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Wed Oct 26 20:18:33 2022 +0800
add ADD_NEW_NODE impl for ConfigNodeProcedure
---
.../confignode/client/ConfigNodeRequestType.java | 1 -
.../client/sync/SyncConfigNodeClientPool.java | 3 --
.../iotdb/confignode/manager/ConsensusManager.java | 45 +++++++++++++--------
.../procedure/env/ConfigNodeProcedureEnv.java | 36 +++++------------
.../impl/node/AddConfigNodeProcedure.java | 40 ++++++------------
.../procedure/state/AddConfigNodeState.java | 3 +-
server/logtest.test | Bin 0 -> 75 bytes
7 files changed, 52 insertions(+), 76 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
index d9ee56ae25..d22f3f276c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.confignode.client;
public enum ConfigNodeRequestType {
- ADD_CONSENSUS_GROUP,
NOTIFY_REGISTER_SUCCESS,
REGISTER_CONFIG_NODE,
REMOVE_CONFIG_NODE,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
index 30d41a1d7b..7f69bf36fa 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
-import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
import org.apache.iotdb.rpc.RpcUtils;
@@ -74,8 +73,6 @@ public class SyncConfigNodeClientPool {
case REGISTER_CONFIG_NODE:
// Only use registerConfigNode when the ConfigNode is first startup.
return client.registerConfigNode((TConfigNodeRegisterReq) req);
- case ADD_CONSENSUS_GROUP:
- return client.addConsensusGroup((TAddConsensusGroupReq) req);
case NOTIFY_REGISTER_SUCCESS:
client.notifyRegisterSuccess();
return null;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index bc266bd89e..3ad6fa2d77 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.consensus.config.ConsensusConfig;
@@ -51,6 +52,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
/** ConsensusManager maintains consensus class, request will redirect to consensus layer */
public class ConsensusManager {
@@ -206,24 +208,35 @@ public class ConsensusManager {
}
/**
- * Add new ConfigNode Peer into PartitionRegion
+ * Tell the group to [create a new Peer on new node] and [add this member to join the group].
*
- * @param configNodeLocation The new ConfigNode
- * @throws AddPeerException When addPeer doesn't success
+ * <p>Using this method to replace `createPeer` and `addPeer`.
+ *
+ * @param originalConfigNodes the original members of the existed group
+ * @param newConfigNode the new member
*/
- public void addConfigNodePeer(TConfigNodeLocation configNodeLocation) throws AddPeerException {
- boolean result =
- consensusImpl
- .addPeer(
- consensusGroupId,
- new Peer(
- consensusGroupId,
- configNodeLocation.getConfigNodeId(),
- configNodeLocation.getConsensusEndPoint()))
- .isSuccess();
-
- if (!result) {
- throw new AddPeerException(configNodeLocation);
+ public void addNewNodeToExistedGroup(
+ List<TConfigNodeLocation> originalConfigNodes, TConfigNodeLocation newConfigNode)
+ throws AddPeerException {
+ Peer newPeer =
+ new Peer(
+ consensusGroupId,
+ newConfigNode.getConfigNodeId(),
+ newConfigNode.getConsensusEndPoint());
+
+ List<Peer> originalPeers =
+ originalConfigNodes.stream()
+ .map(
+ node ->
+ new Peer(consensusGroupId, node.getConfigNodeId(), node.getConsensusEndPoint()))
+ .collect(Collectors.toList());
+
+ ConsensusGenericResponse response =
+ consensusImpl.addNewNodeToExistedGroup(consensusGroupId, newPeer, originalPeers);
+ if (!response.isSuccess()) {
+ LOGGER.error(
+ "Execute addNewNodeToExistedGroup for ConfigNode failed, response: {}", response);
+ throw new AddPeerException(newConfigNode);
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 1467c1156d..2b12431398 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -38,7 +38,6 @@ import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConf
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DeleteStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.PreDeleteStorageGroupPlan;
-import org.apache.iotdb.confignode.exception.AddConsensusGroupException;
import org.apache.iotdb.confignode.exception.AddPeerException;
import org.apache.iotdb.confignode.exception.StorageGroupNotExistsException;
import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
@@ -51,7 +50,6 @@ import org.apache.iotdb.confignode.manager.partition.RegionHeartbeatSample;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
-import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
@@ -171,35 +169,19 @@ public class ConfigNodeProcedureEnv {
}
/**
- * Let the remotely new ConfigNode build the ConsensusGroup
+ * Only ConfigNode leader will invoke this method. Add the new ConfigNode Peer into
+ * PartitionRegionGroup.
*
- * @param tConfigNodeLocation New ConfigNode's location
+ * @param newConfigNode The new ConfigNode
+ * @throws AddPeerException When addNewNodeToExistedGroup doesn't success
*/
- public void addConsensusGroup(TConfigNodeLocation tConfigNodeLocation)
- throws AddConsensusGroupException {
- List<TConfigNodeLocation> configNodeLocations =
+ public void addNewNodeToExistedGroup(TConfigNodeLocation newConfigNode) throws AddPeerException {
+ List<TConfigNodeLocation> originalConfigNodes =
new ArrayList<>(configManager.getNodeManager().getRegisteredConfigNodes());
- configNodeLocations.add(tConfigNodeLocation);
- TSStatus status =
- (TSStatus)
- SyncConfigNodeClientPool.getInstance()
- .sendSyncRequestToConfigNodeWithRetry(
- tConfigNodeLocation.getInternalEndPoint(),
- new TAddConsensusGroupReq(configNodeLocations),
- ConfigNodeRequestType.ADD_CONSENSUS_GROUP);
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new AddConsensusGroupException(tConfigNodeLocation);
- }
- }
- /**
- * Leader will add the new ConfigNode Peer into PartitionRegion
- *
- * @param configNodeLocation The new ConfigNode
- * @throws AddPeerException When addPeer doesn't success
- */
- public void addConfigNodePeer(TConfigNodeLocation configNodeLocation) throws AddPeerException {
- configManager.getConsensusManager().addConfigNodePeer(configNodeLocation);
+ configManager
+ .getConsensusManager()
+ .addNewNodeToExistedGroup(originalConfigNodes, newConfigNode);
}
/**
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
index 9d7d0ab760..4dedc6072d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
@@ -58,19 +58,13 @@ public class AddConfigNodeProcedure extends AbstractNodeProcedure<AddConfigNodeS
try {
switch (state) {
case ADD_CONFIG_NODE_PREPARE:
- setNextState(AddConfigNodeState.CREATE_PEER);
+ setNextState(AddConfigNodeState.ADD_NEW_NODE);
break;
- case CREATE_PEER:
- LOG.info("Executing createPeerForConsensusGroup on {}...", tConfigNodeLocation);
- env.addConsensusGroup(tConfigNodeLocation);
- setNextState(AddConfigNodeState.ADD_PEER);
- LOG.info("Successfully createPeerForConsensusGroup on {}", tConfigNodeLocation);
- break;
- case ADD_PEER:
- LOG.info("Executing addPeer {}...", tConfigNodeLocation);
- env.addConfigNodePeer(tConfigNodeLocation);
+ case ADD_NEW_NODE:
+ LOG.info("Executing addNewNodeToExistedGroup {}...", tConfigNodeLocation);
+ env.addNewNodeToExistedGroup(tConfigNodeLocation);
setNextState(AddConfigNodeState.REGISTER_SUCCESS);
- LOG.info("Successfully addPeer {}", tConfigNodeLocation);
+ LOG.info("Successfully addNewNodeToExistedGroup {}", tConfigNodeLocation);
break;
case REGISTER_SUCCESS:
env.notifyRegisterSuccess(tConfigNodeLocation);
@@ -81,7 +75,7 @@ public class AddConfigNodeProcedure extends AbstractNodeProcedure<AddConfigNodeS
}
} catch (Exception e) {
if (isRollbackSupported(state)) {
- setFailure(new ProcedureException("Add Config Node failed " + state));
+ setFailure(new ProcedureException("Add ConfigNode failed " + state));
} else {
LOG.error(
"Retrievable error trying to add config node {}, state {}",
@@ -99,26 +93,18 @@ public class AddConfigNodeProcedure extends AbstractNodeProcedure<AddConfigNodeS
@Override
protected void rollbackState(ConfigNodeProcedureEnv env, AddConfigNodeState state)
throws ProcedureException {
- switch (state) {
- case CREATE_PEER:
- env.deleteConfigNodePeer(tConfigNodeLocation);
- LOG.info("Rollback add consensus group:{}", tConfigNodeLocation);
- break;
- case ADD_PEER:
- env.removeConfigNodePeer(tConfigNodeLocation);
- LOG.info("Rollback remove peer:{}", tConfigNodeLocation);
- break;
+ if (state == AddConfigNodeState.ADD_NEW_NODE) {
+ LOG.info("Rollback in AddConfigNodeProcedure, execute RemovePeer: {}", tConfigNodeLocation);
+ env.removeConfigNodePeer(tConfigNodeLocation);
+
+ LOG.info("Rollback in AddConfigNodeProcedure, execute DeletePeer: {}", tConfigNodeLocation);
+ env.deleteConfigNodePeer(tConfigNodeLocation);
}
}
@Override
protected boolean isRollbackSupported(AddConfigNodeState state) {
- switch (state) {
- case CREATE_PEER:
- case ADD_PEER:
- return true;
- }
- return false;
+ return state == AddConfigNodeState.ADD_NEW_NODE;
}
@Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/AddConfigNodeState.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/AddConfigNodeState.java
index a7f1912609..31c94e300b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/AddConfigNodeState.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/AddConfigNodeState.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.confignode.procedure.state;
public enum AddConfigNodeState {
ADD_CONFIG_NODE_PREPARE,
- CREATE_PEER,
- ADD_PEER,
+ ADD_NEW_NODE,
REGISTER_SUCCESS
}
diff --git a/server/logtest.test b/server/logtest.test
new file mode 100644
index 0000000000..a9ee656a7e
Binary files /dev/null and b/server/logtest.test differ