You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2024/02/28 11:03:02 UTC
(iotdb) branch master updated: Cache clusterId in IoTDBConfig (#12053)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c3c48a2e321 Cache clusterId in IoTDBConfig (#12053)
c3c48a2e321 is described below
commit c3c48a2e321b0c1077068c04169ecc130a844694
Author: Li Yu Heng <li...@126.com>
AuthorDate: Wed Feb 28 19:02:56 2024 +0800
Cache clusterId in IoTDBConfig (#12053)
---
.../confignode/it/cluster/IoTDBClusterStartIT.java | 28 ++++++++++++----
.../iotdb/confignode/manager/ClusterManager.java | 23 ++++++++++++++
.../iotdb/confignode/manager/node/NodeManager.java | 37 ++++++++++++++++++----
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 14 ++++++++
.../db/pipe/agent/runtime/PipeRuntimeAgent.java | 21 ------------
.../protocol/airgap/IoTDBAirGapConnector.java | 3 +-
.../async/IoTDBThriftAsyncClientManager.java | 4 +--
.../thrift/sync/IoTDBThriftSyncClientManager.java | 4 +--
.../receiver/thrift/IoTDBThriftReceiverV1.java | 13 ++------
.../config/executor/ClusterConfigTaskExecutor.java | 14 ++------
.../config/metadata/ShowClusterIdTask.java | 5 +--
.../java/org/apache/iotdb/db/service/DataNode.java | 3 ++
.../src/main/thrift/confignode.thrift | 1 +
13 files changed, 103 insertions(+), 67 deletions(-)
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterStartIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterStartIT.java
index dd4be3b4756..d1572855dbb 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterStartIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterStartIT.java
@@ -38,6 +38,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.util.concurrent.TimeUnit;
import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;
@@ -47,7 +51,7 @@ import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;
public class IoTDBClusterStartIT {
private static final Logger logger = LoggerFactory.getLogger(IoTDBClusterStartIT.class);
- private static final int testConfigNodeNum = 3, testDataNodeNum = 0;
+ private static final int testConfigNodeNum = 3, testDataNodeNum = 1;
@Before
public void setUp() {
@@ -67,27 +71,39 @@ public class IoTDBClusterStartIT {
}
@Test
- public void clusterIdTest() throws ClientManagerException, IOException, InterruptedException {
+ public void clusterIdTest()
+ throws ClientManagerException, IOException, InterruptedException, SQLException {
final long maxTestTime = TimeUnit.SECONDS.toMillis(30);
final long testInterval = TimeUnit.SECONDS.toMillis(1);
try (SyncConfigNodeIServiceClient client =
- (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection();
+ Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
long startTime = System.currentTimeMillis();
+ boolean testPass = false;
+ String clusterIdFromConfigNode = null;
while (System.currentTimeMillis() - startTime < maxTestTime) {
try {
TGetClusterIdResp resp = client.getClusterId();
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == resp.getStatus().getCode()) {
Assert.assertNotNull(resp.getClusterId());
Assert.assertNotEquals("", resp.getClusterId());
- return;
+ clusterIdFromConfigNode = resp.getClusterId();
+ testPass = true;
+ break;
}
} catch (TException e) {
logger.error("TException:", e);
}
Thread.sleep(testInterval);
}
- String errorMessage = String.format("Cluster ID failed to generate in %d ms.", maxTestTime);
- Assert.fail(errorMessage);
+ if (!testPass) {
+ String errorMessage = String.format("Cluster ID failed to generate in %d ms.", maxTestTime);
+ Assert.fail(errorMessage);
+ }
+ ResultSet resultSet = statement.executeQuery("show clusterid");
+ resultSet.next();
+ Assert.assertEquals(clusterIdFromConfigNode, resultSet.getString(1));
}
}
}
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java
index 1a8df13681a..6f77bbc5936 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java
@@ -27,6 +27,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
public class ClusterManager {
@@ -55,9 +56,31 @@ public class ClusterManager {
return clusterInfo.getClusterId();
}
+ public String getClusterIdWithRetry(long maxWaitTime) {
+ long startTime = System.currentTimeMillis();
+ while (clusterInfo.getClusterId() == null
+ && System.currentTimeMillis() - startTime < maxWaitTime) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ return clusterInfo.getClusterId();
+ }
+
private void generateClusterId() {
String clusterId = String.valueOf(UUID.randomUUID());
UpdateClusterIdPlan updateClusterIdPlan = new UpdateClusterIdPlan(clusterId);
+ while (configManager.getConsensusManager() == null) {
+ try {
+ LOGGER.info("consensus layer is not ready, sleep 100ms...");
+ TimeUnit.MILLISECONDS.sleep(100);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("Unexpected interruption during waiting for consensus layer ready.");
+ }
+ }
try {
configManager.getConsensusManager().write(updateClusterIdPlan);
} catch (ConsensusException e) {
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index d18f701d1fb..307da61e91c 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -79,6 +79,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TSetDataNodeStatusReq;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -257,8 +258,20 @@ public class NodeManager {
* TSStatusCode#SUCCESS_STATUS} when register success.
*/
public DataSet registerDataNode(TDataNodeRegisterReq req) {
- int dataNodeId = nodeInfo.generateNextNodeId();
+ DataNodeRegisterResp resp = new DataNodeRegisterResp();
+ final String clusterId =
+ configManager
+ .getClusterManager()
+ .getClusterIdWithRetry(
+ IoTDBDescriptor.getInstance().getConfig().getConnectionTimeoutInMS() / 2);
+ if (clusterId == null) {
+ resp.setStatus(
+ new TSStatus(TSStatusCode.GET_CLUSTER_ID_ERROR.getStatusCode())
+ .setMessage("clusterId has not generated"));
+ return resp;
+ }
+ int dataNodeId = nodeInfo.generateNextNodeId();
RegisterDataNodePlan registerDataNodePlan =
new RegisterDataNodePlan(req.getDataNodeConfiguration());
// Register new DataNode
@@ -292,17 +305,29 @@ public class NodeManager {
// Adjust the maximum RegionGroup number of each StorageGroup
getClusterSchemaManager().adjustMaxRegionGroupNum();
- DataNodeRegisterResp resp = new DataNodeRegisterResp();
-
resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION);
resp.setConfigNodeList(getRegisteredConfigNodes());
resp.setDataNodeId(
registerDataNodePlan.getDataNodeConfiguration().getLocation().getDataNodeId());
- resp.setRuntimeConfiguration(getRuntimeConfiguration());
+ resp.setRuntimeConfiguration(getRuntimeConfiguration().setClusterId(clusterId));
return resp;
}
public TDataNodeRestartResp updateDataNodeIfNecessary(TDataNodeRestartReq req) {
+ final String clusterId =
+ configManager
+ .getClusterManager()
+ .getClusterIdWithRetry(
+ IoTDBDescriptor.getInstance().getConfig().getConnectionTimeoutInMS() / 2);
+ TDataNodeRestartResp resp = new TDataNodeRestartResp();
+ resp.setConfigNodeList(getRegisteredConfigNodes());
+ if (clusterId == null) {
+ resp.setStatus(
+ new TSStatus(TSStatusCode.GET_CLUSTER_ID_ERROR.getStatusCode())
+ .setMessage("clusterId has not generated"));
+ return resp;
+ }
+
int nodeId = req.getDataNodeConfiguration().getLocation().getDataNodeId();
TDataNodeConfiguration dataNodeConfiguration = getRegisteredDataNode(nodeId);
if (!req.getDataNodeConfiguration().equals(dataNodeConfiguration)) {
@@ -327,10 +352,8 @@ public class NodeManager {
}
}
- TDataNodeRestartResp resp = new TDataNodeRestartResp();
resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART);
- resp.setConfigNodeList(getRegisteredConfigNodes());
- resp.setRuntimeConfiguration(getRuntimeConfiguration());
+ resp.setRuntimeConfiguration(getRuntimeConfiguration().setClusterId(clusterId));
return resp;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 8aba6ac18b3..75db93e3d07 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -602,6 +602,12 @@ public class IoTDBConfig {
*/
private String clusterName = "defaultCluster";
+ /**
+ * The cluster ID that this DataNode joined in the cluster mode. DataNode will fetch cluster ID
+ * from ConfigNode and cache it here when first time use it.
+ */
+ private String clusterId = "";
+
/**
* The DataNodeId of this DataNode for cluster mode. The default value -1 will be changed after
* join cluster
@@ -3015,6 +3021,14 @@ public class IoTDBConfig {
this.clusterName = clusterName;
}
+ public String getClusterId() {
+ return clusterId;
+ }
+
+ public void setClusterId(String clusterId) {
+ this.clusterId = clusterId;
+ }
+
public int getDataNodeId() {
return dataNodeId;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 1f2eca0e5e8..4fb5051bff3 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -32,9 +32,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.progress.assigner.SimpleConsensusProgressIndexAssigner;
import org.apache.iotdb.db.pipe.resource.PipeHardlinkFileDirStartupCleaner;
-import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
-import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
-import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.service.ResourcesInformationHolder;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -44,7 +41,6 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
public class PipeRuntimeAgent implements IService {
@@ -52,7 +48,6 @@ public class PipeRuntimeAgent implements IService {
private static final int DATA_NODE_ID = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
- private final AtomicReference<String> clusterId = new AtomicReference<>(null);
private final SimpleConsensusProgressIndexAssigner simpleConsensusProgressIndexAssigner =
new SimpleConsensusProgressIndexAssigner();
@@ -108,22 +103,6 @@ public class PipeRuntimeAgent implements IService {
return ServiceType.PIPE_RUNTIME_AGENT;
}
- public String getClusterIdIfPossible() {
- if (clusterId.get() == null) {
- synchronized (clusterId) {
- if (clusterId.get() == null) {
- try (final ConfigNodeClient configNodeClient =
- ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
- clusterId.set(configNodeClient.getClusterId().getClusterId());
- } catch (Exception e) {
- LOGGER.warn("Unable to get clusterId, because: {}", e.getMessage(), e);
- }
- }
- }
- }
- return clusterId.get();
- }
-
////////////////////// SimpleConsensus ProgressIndex Assigner //////////////////////
public void assignSimpleProgressIndexIfNeeded(InsertNode insertNode) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
index b018ff0bb80..1ea01204c4a 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.IoTDBConnect
import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapELanguageConstant;
import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapOneByteResponse;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.common.PipeTransferHandshakeConstant;
@@ -200,7 +199,7 @@ public class IoTDBAirGapConnector extends IoTDBConnector {
final HashMap<String, String> params = new HashMap<>();
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID,
- PipeAgent.runtime().getClusterIdIfPossible());
+ IoTDBDescriptor.getInstance().getConfig().getClusterId());
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncClientManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncClientManager.java
index 59cedeb9d3c..8c00195a694 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncClientManager.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncClientManager.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.common.PipeTransferHandshakeConstant;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV1Req;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV2Req;
@@ -177,7 +177,7 @@ public class IoTDBThriftAsyncClientManager extends IoTDBThriftClientManager {
final HashMap<String, String> params = new HashMap<>();
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID,
- PipeAgent.runtime().getClusterIdIfPossible());
+ IoTDBDescriptor.getInstance().getConfig().getClusterId());
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
index db7844a213b..a1eb144f53e 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.client.IoTDBThriftSyncConnectorClient;
-import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.common.PipeTransferHandshakeConstant;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV1Req;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV2Req;
@@ -150,7 +150,7 @@ public class IoTDBThriftSyncClientManager extends IoTDBThriftClientManager imple
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID,
- PipeAgent.runtime().getClusterIdIfPossible());
+ IoTDBDescriptor.getInstance().getConfig().getClusterId());
// Try to handshake by PipeTransferHandshakeV2Req.
TPipeTransferResp resp =
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
index da6f888bc57..22c3b96c71e 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
-import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.common.PipeTransferHandshakeConstant;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.reponse.PipeTransferFilePieceResp;
@@ -254,15 +253,7 @@ public class IoTDBThriftReceiverV1 implements IoTDBThriftReceiver {
private TPipeTransferResp handleTransferHandshakeV2(PipeTransferHandshakeV2Req req)
throws IOException {
// Reject to handshake if the receiver can not take clusterId from config node.
- final String clusterIdFromConfigNode = PipeAgent.runtime().getClusterIdIfPossible();
- if (clusterIdFromConfigNode == null) {
- final TSStatus status =
- RpcUtils.getStatus(
- TSStatusCode.PIPE_HANDSHAKE_ERROR,
- "Receiver can not get clusterId from config node.");
- LOGGER.warn("Handshake failed, response status = {}.", status);
- return new TPipeTransferResp(status);
- }
+ String clusterId = IoTDBDescriptor.getInstance().getConfig().getClusterId();
// Reject to handshake if the request does not contain sender's clusterId.
final String clusterIdFromHandshakeRequest =
@@ -276,7 +267,7 @@ public class IoTDBThriftReceiverV1 implements IoTDBThriftReceiver {
}
// Reject to handshake if the receiver and sender are from the same cluster.
- if (Objects.equals(clusterIdFromConfigNode, clusterIdFromHandshakeRequest)) {
+ if (Objects.equals(clusterId, clusterIdFromHandshakeRequest)) {
final TSStatus status =
RpcUtils.getStatus(
TSStatusCode.PIPE_HANDSHAKE_ERROR,
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index e230a70591d..406999f2b5d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -67,7 +67,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
-import org.apache.iotdb.confignode.rpc.thrift.TGetClusterIdResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetDatabaseReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
@@ -1184,17 +1183,8 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
@Override
public SettableFuture<ConfigTaskResult> showClusterId() {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
- TGetClusterIdResp getClusterIdResp = new TGetClusterIdResp();
- try (ConfigNodeClient client =
- CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
- getClusterIdResp = client.getClusterId();
- } catch (ClientManagerException | TException e) {
- future.setException(e);
- }
-
- // build TSBlock
- ShowClusterIdTask.buildTSBlock(getClusterIdResp, future);
-
+ ShowClusterIdTask.buildTSBlock(
+ IoTDBDescriptor.getInstance().getConfig().getClusterId(), future);
return future;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowClusterIdTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowClusterIdTask.java
index 3e6ee3a97f8..ad7a939a495 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowClusterIdTask.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowClusterIdTask.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.queryengine.plan.execution.config.metadata;
-import org.apache.iotdb.confignode.rpc.thrift.TGetClusterIdResp;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeader;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
@@ -40,15 +39,13 @@ import java.util.List;
import java.util.stream.Collectors;
public class ShowClusterIdTask implements IConfigTask {
- public static void buildTSBlock(
- TGetClusterIdResp getClusterIdResp, SettableFuture<ConfigTaskResult> future) {
+ public static void buildTSBlock(String clusterId, SettableFuture<ConfigTaskResult> future) {
List<TSDataType> outputDataTypes =
ColumnHeaderConstant.showClusterIdColumnHeaders.stream()
.map(ColumnHeader::getColumnType)
.collect(Collectors.toList());
TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
- String clusterId = getClusterIdResp.getClusterId();
builder.getTimeColumnBuilder().writeLong(0L);
builder.getColumnBuilder(0).writeBinary(new Binary(clusterId, TSFileConfig.STRING_CHARSET));
builder.declarePosition();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 3241db3e313..628fa551678 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -370,6 +370,9 @@ public class DataNode implements DataNodeMBean {
/* Store ttl information */
StorageEngine.getInstance().updateTTLInfo(runtimeConfiguration.getAllTTLInformation());
+
+ /* Store cluster ID */
+ IoTDBDescriptor.getInstance().getConfig().setClusterId(runtimeConfiguration.getClusterId());
}
/**
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index 989255c0cc8..38250d5ce2a 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -104,6 +104,7 @@ struct TRuntimeConfiguration {
3: required list<binary> allUDFInformation
4: required binary allTTLInformation
5: required list<binary> allPipeInformation
+ 6: optional string clusterId
}
struct TDataNodeRegisterReq {