You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/10/28 03:16:47 UTC
[iotdb] branch master updated: [IOTDB-4644] Support modifying DataNode's RPC IP:Port (#7620)
This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a78085d313 [IOTDB-4644] Support modifying DataNode's RPC IP:Port (#7620)
a78085d313 is described below
commit a78085d313ac5ea1c081595bdefe7d04efd4d92b
Author: Itami Sho <42...@users.noreply.github.com>
AuthorDate: Fri Oct 28 11:16:42 2022 +0800
[IOTDB-4644] Support modifying DataNode's RPC IP:Port (#7620)
---
.../consensus/request/ConfigPhysicalPlan.java | 4 +
.../consensus/request/ConfigPhysicalPlanType.java | 1 +
.../request/write/datanode/UpdateDataNodePlan.java | 71 ++++++++++
.../consensus/response/DataNodeRegisterResp.java | 3 +-
.../iotdb/confignode/manager/ConfigManager.java | 23 ++++
.../apache/iotdb/confignode/manager/IManager.java | 9 ++
.../iotdb/confignode/manager/node/NodeManager.java | 45 +++++++
.../persistence/executor/ConfigPlanExecutor.java | 3 +
.../confignode/persistence/node/NodeInfo.java | 22 ++-
.../thrift/ConfigNodeRPCServiceProcessor.java | 14 ++
.../request/ConfigPhysicalPlanSerDeTest.java | 17 +++
integration-test/checkstyle.xml | 2 +-
integration-test/import-control.xml | 1 +
.../apache/iotdb/it/env/AbstractNodeWrapper.java | 20 ++-
.../org/apache/iotdb/it/env/ConfigNodeWrapper.java | 20 +++
.../org/apache/iotdb/it/env/DataNodeWrapper.java | 42 +++++-
.../confignode/it/IoTDBClusterPartitionIT.java | 19 +--
.../iotdb/confignode/it/IoTDBClusterRestartIT.java | 148 ++++++++++++++++++++-
.../iotdb/confignode/it/IoTDBConfigNodeIT.java | 61 +--------
.../confignode/it/IoTDBConfigNodeSnapshotIT.java | 18 +--
.../confignode/it/utils/ConfigNodeTestUtils.java | 115 ++++++++++++++++
.../apache/iotdb/commons/conf/IoTDBConstant.java | 2 -
.../apache/iotdb/db/client/ConfigNodeClient.java | 17 +++
.../org/apache/iotdb/db/conf/IoTDBStartCheck.java | 101 ++++++++++++++
.../java/org/apache/iotdb/db/service/DataNode.java | 21 +--
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 3 +-
.../src/main/thrift/confignode.thrift | 14 ++
27 files changed, 703 insertions(+), 113 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index b3261bf166..828bd3db0d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfi
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
+import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.function.CreateFunctionPlan;
import org.apache.iotdb.confignode.consensus.request.write.function.DropFunctionPlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
@@ -135,6 +136,9 @@ public abstract class ConfigPhysicalPlan implements IConsensusRequest {
case RegisterDataNode:
req = new RegisterDataNodePlan();
break;
+ case UpdateDataNodeConfiguration:
+ req = new UpdateDataNodePlan();
+ break;
case RemoveDataNode:
req = new RemoveDataNodePlan();
break;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index 45a5e74ae4..dc64311f05 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -99,5 +99,6 @@ public enum ConfigPhysicalPlanType {
GetTransferringTriggers,
GetTriggerLocation,
GetTemplateSetInfo,
+ UpdateDataNodeConfiguration,
GetFunctionTable
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/datanode/UpdateDataNodePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/datanode/UpdateDataNodePlan.java
new file mode 100644
index 0000000000..9b2c006535
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/datanode/UpdateDataNodePlan.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.consensus.request.write.datanode;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class UpdateDataNodePlan extends ConfigPhysicalPlan {
+
+ private TDataNodeLocation dataNodeLocation;
+
+ public UpdateDataNodePlan() {
+ super(ConfigPhysicalPlanType.UpdateDataNodeConfiguration);
+ }
+
+ public UpdateDataNodePlan(TDataNodeLocation datanodeLocation) {
+ this();
+ this.dataNodeLocation = datanodeLocation;
+ }
+
+ public TDataNodeLocation getDataNodeLocation() {
+ return dataNodeLocation;
+ }
+
+ @Override
+ protected void serializeImpl(DataOutputStream stream) throws IOException {
+ stream.writeInt(ConfigPhysicalPlanType.UpdateDataNodeConfiguration.ordinal());
+ ThriftCommonsSerDeUtils.serializeTDataNodeLocation(dataNodeLocation, stream);
+ }
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) {
+ dataNodeLocation = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ UpdateDataNodePlan that = (UpdateDataNodePlan) o;
+ return dataNodeLocation.equals(that.dataNodeLocation);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(dataNodeLocation);
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeRegisterResp.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeRegisterResp.java
index 5c4881ca77..562145610b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeRegisterResp.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeRegisterResp.java
@@ -86,7 +86,8 @@ public class DataNodeRegisterResp implements DataSet {
resp.setConfigNodeList(configNodeList);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
- || status.getCode() == TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode()) {
+ || status.getCode() == TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode()
+ || status.getCode() == TSStatusCode.DATANODE_NOT_EXIST.getStatusCode()) {
resp.setDataNodeId(dataNodeId);
resp.setGlobalConfig(globalConfig);
resp.setTemplateInfo(templateInfo);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 7644299f30..d66d339dd8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -55,6 +55,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetTimeSlotListPlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
+import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetDataReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetSchemaReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetStorageGroupPlan;
@@ -264,6 +265,28 @@ public class ConfigManager implements IManager {
}
}
+ @Override
+ public DataSet updateDataNode(UpdateDataNodePlan updateDataNodePlan) {
+ TSStatus status = confirmLeader();
+ DataNodeRegisterResp dataSet;
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ triggerManager.getTriggerInfo().acquireTriggerTableLock();
+ try {
+ dataSet = (DataNodeRegisterResp) nodeManager.updateDataNode(updateDataNodePlan);
+ dataSet.setTemplateInfo(clusterSchemaManager.getAllTemplateSetInfo());
+ dataSet.setTriggerInformation(
+ triggerManager.getTriggerTable(false).getAllTriggerInformation());
+ } finally {
+ triggerManager.getTriggerInfo().releaseTriggerTableLock();
+ }
+ } else {
+ dataSet = new DataNodeRegisterResp();
+ dataSet.setStatus(status);
+ dataSet.setConfigNodeList(nodeManager.getRegisteredConfigNodes());
+ }
+ return dataSet;
+ }
+
@Override
public TSStatus reportRegionMigrateResult(TRegionMigrateResultReportReq req) {
TSStatus status = confirmLeader();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index a8e02dbf37..ef8976367e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetTimeSlotListPlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
+import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetDataReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetSchemaReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetStorageGroupPlan;
@@ -171,6 +172,14 @@ public interface IManager {
*/
DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan);
+ /**
+ * Update DataNode
+ *
+ * @param updateDataNodePlan UpdateDataNodePlan
+ * @return DataNodeConfigurationDataSet
+ */
+ DataSet updateDataNode(UpdateDataNodePlan updateDataNodePlan);
+
/**
* DataNode report region migrate result to ConfigNode when remove DataNode
*
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 57e82c553e..01e3254736 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -49,6 +49,7 @@ import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfi
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
+import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationResp;
import org.apache.iotdb.confignode.consensus.response.DataNodeRegisterResp;
import org.apache.iotdb.confignode.consensus.response.DataNodeToStatusResp;
@@ -293,6 +294,50 @@ public class NodeManager {
return dataSet;
}
+ /**
+ * Update the specified DataNode‘s location
+ *
+ * @param updateDataNodePlan UpdateDataNodePlan
+ * @return TSStatus. The TSStatus will be set to SUCCESS_STATUS when update success, and
+ * DATANODE_NOT_EXIST when some datanode is not exist, UPDATE_DATANODE_FAILED when update
+ * failed.
+ */
+ public DataSet updateDataNode(UpdateDataNodePlan updateDataNodePlan) {
+ LOGGER.info("NodeManager start to update DataNode {}", updateDataNodePlan);
+
+ DataNodeRegisterResp dataSet = new DataNodeRegisterResp();
+ TSStatus status;
+ // check if node is already exist
+ boolean found = false;
+ List<TDataNodeConfiguration> configurationList = getRegisteredDataNodes();
+ for (TDataNodeConfiguration configuration : configurationList) {
+ if (configuration.getLocation().getDataNodeId()
+ == updateDataNodePlan.getDataNodeLocation().getDataNodeId()) {
+ found = true;
+ break;
+ }
+ }
+ if (found) {
+ getConsensusManager().write(updateDataNodePlan);
+ status =
+ new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())
+ .setMessage("updateDataNode(nodeId=%d) success.");
+ } else {
+ status =
+ new TSStatus(TSStatusCode.DATANODE_NOT_EXIST.getStatusCode())
+ .setMessage(
+ String.format(
+ "The specified DataNode(nodeId=%d) doesn't exist",
+ updateDataNodePlan.getDataNodeLocation().getDataNodeId()));
+ }
+ dataSet.setStatus(status);
+ dataSet.setDataNodeId(updateDataNodePlan.getDataNodeLocation().getDataNodeId());
+ dataSet.setConfigNodeList(getRegisteredConfigNodes());
+ setGlobalConfig(dataSet);
+ setRatisConfig(dataSet);
+ return dataSet;
+ }
+
public TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq req) {
// Check global configuration
TSStatus status = configManager.getConsensusManager().confirmLeader();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 5d0b429b6c..c52bd02766 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfi
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
+import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.function.CreateFunctionPlan;
import org.apache.iotdb.confignode.consensus.request.write.function.DropFunctionPlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
@@ -217,6 +218,8 @@ public class ConfigPlanExecutor {
return nodeInfo.registerDataNode((RegisterDataNodePlan) physicalPlan);
case RemoveDataNode:
return nodeInfo.removeDataNode((RemoveDataNodePlan) physicalPlan);
+ case UpdateDataNodeConfiguration:
+ return nodeInfo.updateDataNode((UpdateDataNodePlan) physicalPlan);
case SetStorageGroup:
TSStatus status = clusterSchemaInfo.setStorageGroup((SetStorageGroupPlan) physicalPlan);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
index 2154d9bf5c..8f51f4f213 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfi
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
+import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.statistics.UpdateLoadStatisticsPlan;
import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationResp;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -165,7 +166,7 @@ public class NodeInfo implements SnapshotProcessor {
/**
* Persist Information about remove dataNode
*
- * @param req RemoveDataNodeReq
+ * @param req RemoveDataNodePlan
* @return TSStatus
*/
public TSStatus removeDataNode(RemoveDataNodePlan req) {
@@ -191,6 +192,25 @@ public class NodeInfo implements SnapshotProcessor {
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
+ /**
+ * Update the specified DataNode‘s location
+ *
+ * @param updateDataNodePlan UpdateDataNodePlan
+ * @return SUCCESS_STATUS if update DataNode info successfully, otherwise return
+ * UPDATE_DATA_NODE_ERROR
+ */
+ public TSStatus updateDataNode(UpdateDataNodePlan updateDataNodePlan) {
+ dataNodeInfoReadWriteLock.writeLock().lock();
+ try {
+ registeredDataNodes
+ .get(updateDataNodePlan.getDataNodeLocation().getDataNodeId())
+ .setLocation(updateDataNodePlan.getDataNodeLocation());
+ } finally {
+ dataNodeInfoReadWriteLock.writeLock().unlock();
+ }
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
/**
* Get DataNodeConfiguration
*
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 8af4f1df9e..7a385760c1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -48,6 +48,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetTimeSlotListPlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
+import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetDataReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetSchemaReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetStorageGroupPlan;
@@ -82,6 +83,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeUpdateReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TDeactivateSchemaTemplateReq;
@@ -196,6 +198,18 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
return resp;
}
+ @Override
+ public TDataNodeRegisterResp updateDataNode(TDataNodeUpdateReq req) {
+ LOGGER.info("ConfigNode RPC Service start to update DataNode, req: {}", req);
+ UpdateDataNodePlan updateDataNodePlan = new UpdateDataNodePlan(req.getDataNodeLocation());
+ TDataNodeRegisterResp resp =
+ ((DataNodeRegisterResp) configManager.updateDataNode(updateDataNodePlan))
+ .convertToRpcDataNodeRegisterResp();
+ LOGGER.info(
+ "ConfigNode RPC Service finished to update DataNode, req: {}, result: {}", req, resp);
+ return resp;
+ }
+
@Override
public TDataNodeConfigurationResp getDataNodeConfiguration(int dataNodeID) {
GetDataNodeConfigurationPlan queryReq = new GetDataNodeConfigurationPlan(dataNodeID);
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index cc272ae79e..52527f3ef2 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -67,6 +67,7 @@ import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfi
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
+import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
@@ -158,6 +159,22 @@ public class ConfigPhysicalPlanSerDeTest {
Assert.assertEquals(plan0, plan1);
}
+ @Test
+ public void UpdateDataNodePlanTest() throws IOException {
+ TDataNodeLocation dataNodeLocation = new TDataNodeLocation();
+ dataNodeLocation.setDataNodeId(1);
+ dataNodeLocation.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6667));
+ dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
+ dataNodeLocation.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0", 8777));
+ dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+ dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010));
+
+ UpdateDataNodePlan plan0 = new UpdateDataNodePlan(dataNodeLocation);
+ UpdateDataNodePlan plan1 =
+ (UpdateDataNodePlan) ConfigPhysicalPlan.Factory.create(plan0.serializeToByteBuffer());
+ Assert.assertEquals(plan0, plan1);
+ }
+
@Test
public void QueryDataNodeInfoPlanTest() throws IOException {
GetDataNodeConfigurationPlan plan0 = new GetDataNodeConfigurationPlan(-1);
diff --git a/integration-test/checkstyle.xml b/integration-test/checkstyle.xml
index 0375e5dabe..aacd96124d 100644
--- a/integration-test/checkstyle.xml
+++ b/integration-test/checkstyle.xml
@@ -25,7 +25,7 @@
<property name="severity" value="error"/>
<property name="fileExtensions" value="java"/>
<module name="BeforeExecutionExclusionFileFilter">
- <property name="fileNamePattern" value="^.*([\\/]src[\\/]main|[\\/]src[\\/]test[\\/]java[\\/]org[\\/]apache[\\/]iotdb[\\/]db[\\/]it[\\/](env|utils))[\\/].*$"/>
+ <property name="fileNamePattern" value="^.*([\\/]src[\\/]main|[\\/]src[\\/]test[\\/]java[\\/]org[\\/]apache[\\/]iotdb[\\/](db|confignode)[\\/]it[\\/](env|utils))[\\/].*$"/>
</module>
<module name="TreeWalker">
<module name="ImportControl">
diff --git a/integration-test/import-control.xml b/integration-test/import-control.xml
index c8abec26eb..532062face 100644
--- a/integration-test/import-control.xml
+++ b/integration-test/import-control.xml
@@ -71,6 +71,7 @@
<allow pkg="org\.apache\.thrift.*" regex="true" />
<allow pkg="org\.apache\.iotdb\.db\.qp\.logical\.sys.*" regex="true" />
<allow pkg="org\.apache\.iotdb\.trigger\.api\.enums.*" regex="true" />
+ <allow pkg="org\.apache\.iotdb\.confignode\.it\.utils\.ConfigNodeTestUtils.*" regex="true"/>
</subpackage>
<subpackage name="session.it">
<allow class="org.apache.iotdb.commons.conf.IoTDBConstant" />
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractNodeWrapper.java
index a55777c7d5..a2ef694f0d 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractNodeWrapper.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractNodeWrapper.java
@@ -86,11 +86,15 @@ public abstract class AbstractNodeWrapper implements BaseNodeWrapper {
protected final int jmxPort;
private final String TAB = " ";
private Process instance;
+ private String node_address;
+ private int node_port;
public AbstractNodeWrapper(String testClassName, String testMethodName, int[] portList) {
this.testClassName = testClassName;
this.testMethodName = testMethodName;
this.portList = portList;
+ this.node_address = "127.0.0.1";
+ this.node_port = portList[0];
jmxPort = this.portList[portList.length - 1];
}
@@ -233,12 +237,20 @@ public abstract class AbstractNodeWrapper implements BaseNodeWrapper {
@Override
public final String getIp() {
- return "127.0.0.1";
+ return this.node_address;
+ }
+
+ public void setIp(String ip) {
+ this.node_address = ip;
}
@Override
public final int getPort() {
- return portList[0];
+ return this.node_port;
+ }
+
+ public void setPort(int port) {
+ this.node_port = port;
}
@Override
@@ -260,7 +272,7 @@ public abstract class AbstractNodeWrapper implements BaseNodeWrapper {
return getLogDirPath() + File.separator + getId() + ".log";
}
- private String getLogDirPath() {
+ protected String getLogDirPath() {
return System.getProperty("user.dir")
+ File.separator
+ "target"
@@ -356,4 +368,6 @@ public abstract class AbstractNodeWrapper implements BaseNodeWrapper {
}
return testClassName + "_" + testMethodName;
}
+
+ protected abstract void renameFile();
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/ConfigNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/ConfigNodeWrapper.java
index f0c973e94a..a1a7e8e3f1 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/ConfigNodeWrapper.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/ConfigNodeWrapper.java
@@ -93,6 +93,26 @@ public class ConfigNodeWrapper extends AbstractNodeWrapper {
"-s"));
}
+ @Override
+ protected void renameFile() {
+ String configNodeName = isSeed ? "SeedConfigNode" : "ConfigNode";
+ // rename log file
+ File oldLogFile =
+ new File(getLogDirPath() + File.separator + configNodeName + portList[0] + ".log");
+ oldLogFile.renameTo(new File(getLogDirPath() + File.separator + getId() + ".log"));
+
+ // rename node dir
+ File oldNodeDir =
+ new File(
+ System.getProperty("user.dir")
+ + File.separator
+ + "target"
+ + File.separator
+ + configNodeName
+ + portList[0]);
+ oldNodeDir.renameTo(new File(getNodePath()));
+ }
+
public int getConsensusPort() {
return consensusPort;
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java
index b11b829219..3b4abb184c 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java
@@ -28,8 +28,9 @@ import java.util.Properties;
public class DataNodeWrapper extends AbstractNodeWrapper {
private final String targetConfigNode;
- private final int mppDataExchangePort;
- private final int internalPort;
+ private int mppDataExchangePort;
+ private int internalPort;
+ private String internal_address;
private final int dataRegionConsensusPort;
private final int schemaRegionConsensusPort;
private final int mqttPort;
@@ -38,6 +39,7 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
String targetConfigNode, String testClassName, String testMethodName, int[] portList) {
super(testClassName, testMethodName, portList);
this.targetConfigNode = targetConfigNode;
+ this.internal_address = super.getIp();
this.mppDataExchangePort = portList[1];
this.internalPort = portList[2];
this.dataRegionConsensusPort = portList[3];
@@ -48,10 +50,10 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
@Override
protected void updateConfig(Properties properties) {
properties.setProperty(IoTDBConstant.RPC_ADDRESS, super.getIp());
- properties.setProperty(IoTDBConstant.INTERNAL_ADDRESS, super.getIp());
- properties.setProperty(IoTDBConstant.RPC_PORT, String.valueOf(getPort()));
- properties.setProperty("mpp_data_exchange_port", String.valueOf(this.mppDataExchangePort));
+ properties.setProperty(IoTDBConstant.RPC_PORT, String.valueOf(super.getPort()));
+ properties.setProperty(IoTDBConstant.INTERNAL_ADDRESS, this.internal_address);
properties.setProperty(IoTDBConstant.INTERNAL_PORT, String.valueOf(this.internalPort));
+ properties.setProperty("mpp_data_exchange_port", String.valueOf(this.mppDataExchangePort));
properties.setProperty(
"data_region_consensus_port", String.valueOf(this.dataRegionConsensusPort));
properties.setProperty(
@@ -91,6 +93,28 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
"-s"));
}
+ @Override
+ public void renameFile() {
+ String dataNodeName = "DataNode";
+ // rename log file
+ String oldLogFilePath = getLogDirPath() + File.separator + dataNodeName + portList[0] + ".log";
+ String newLogFilePath = getLogDirPath() + File.separator + getId() + ".log";
+ File oldLogFile = new File(oldLogFilePath);
+ oldLogFile.renameTo(new File(newLogFilePath));
+
+ // rename node dir
+ String oldNodeDirPath =
+ System.getProperty("user.dir")
+ + File.separator
+ + "target"
+ + File.separator
+ + dataNodeName
+ + portList[0];
+ String newNodeDirPath = getNodePath();
+ File oldNodeDir = new File(oldNodeDirPath);
+ oldNodeDir.renameTo(new File(newNodeDirPath));
+ }
+
protected String mainClassName() {
return "org.apache.iotdb.db.service.DataNode";
}
@@ -99,10 +123,18 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
return mppDataExchangePort;
}
+ public void setMppDataExchangePort(int mppDataExchangePort) {
+ this.mppDataExchangePort = mppDataExchangePort;
+ }
+
public int getInternalPort() {
return internalPort;
}
+ public void setInternalPort(int internalPort) {
+ this.internalPort = internalPort;
+ }
+
public int getDataRegionConsensusPort() {
return dataRegionConsensusPort;
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterPartitionIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterPartitionIT.java
index 1eddbb6569..d13b506d69 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterPartitionIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterPartitionIT.java
@@ -28,8 +28,6 @@ import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
@@ -54,7 +52,6 @@ import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.thrift.TException;
import org.junit.After;
@@ -74,6 +71,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import static org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils.generatePatternTreeBuffer;
+
@RunWith(IoTDBTestRunner.class)
@Category({ClusterIT.class})
public class IoTDBClusterPartitionIT {
@@ -137,20 +136,6 @@ public class IoTDBClusterPartitionIT {
ConfigFactory.getConfig().setTimePartitionIntervalForRouting(originalTimePartitionInterval);
}
- /** Generate a PatternTree and serialize it into a ByteBuffer */
- private ByteBuffer generatePatternTreeBuffer(String[] paths)
- throws IllegalPathException, IOException {
- PathPatternTree patternTree = new PathPatternTree();
- for (String path : paths) {
- patternTree.appendPathPattern(new PartialPath(path));
- }
- patternTree.constructTree();
-
- PublicBAOS baos = new PublicBAOS();
- patternTree.serialize(baos);
- return ByteBuffer.wrap(baos.toByteArray());
- }
-
@Test
public void testGetAndCreateSchemaPartition()
throws TException, IOException, IllegalPathException, InterruptedException {
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRestartIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRestartIT.java
index 3a870acaf8..5d69dec2e3 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRestartIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRestartIT.java
@@ -18,36 +18,88 @@
*/
package org.apache.iotdb.confignode.it;
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.it.env.AbstractEnv;
import org.apache.iotdb.it.env.ConfigFactory;
+import org.apache.iotdb.it.env.ConfigNodeWrapper;
+import org.apache.iotdb.it.env.DataNodeWrapper;
import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.EnvUtils;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
+import static org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils.checkNodeConfig;
+import static org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils.generatePatternTreeBuffer;
+import static org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils.getClusterNodeInfos;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
@RunWith(IoTDBTestRunner.class)
@Category({ClusterIT.class})
public class IoTDBClusterRestartIT {
- protected static String originalConfigNodeConsensusProtocolClass;
+ private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBClusterRestartIT.class);
+ private static final String ratisConsensusProtocolClass =
+ "org.apache.iotdb.consensus.ratis.RatisConsensus";
private static final int testConfigNodeNum = 3;
private static final int testDataNodeNum = 3;
+ private static final int testReplicationFactor = 3;
+ private static final long testTimePartitionInterval = 604800000;
+ protected static String originalConfigNodeConsensusProtocolClass;
+ protected static String originalSchemaRegionConsensusProtocolClass;
+ protected static String originalDataRegionConsensusProtocolClass;
+ protected static int originSchemaReplicationFactor;
+ protected static int originalDataReplicationFactor;
+ protected static long originalTimePartitionInterval;
@Before
public void setUp() throws Exception {
originalConfigNodeConsensusProtocolClass =
ConfigFactory.getConfig().getConfigNodeConsesusProtocolClass();
+ originalSchemaRegionConsensusProtocolClass =
+ ConfigFactory.getConfig().getSchemaRegionConsensusProtocolClass();
+ originalDataRegionConsensusProtocolClass =
+ ConfigFactory.getConfig().getDataRegionConsensusProtocolClass();
+ ConfigFactory.getConfig().setConfigNodeConsesusProtocolClass(ratisConsensusProtocolClass);
+ ConfigFactory.getConfig().setSchemaRegionConsensusProtocolClass(ratisConsensusProtocolClass);
+ ConfigFactory.getConfig().setDataRegionConsensusProtocolClass(ratisConsensusProtocolClass);
ConfigFactory.getConfig().setConfigNodeConsesusProtocolClass(ConsensusFactory.RatisConsensus);
+ originSchemaReplicationFactor = ConfigFactory.getConfig().getSchemaReplicationFactor();
+ originalDataReplicationFactor = ConfigFactory.getConfig().getDataReplicationFactor();
+ ConfigFactory.getConfig().setSchemaReplicationFactor(testReplicationFactor);
+ ConfigFactory.getConfig().setDataReplicationFactor(testReplicationFactor);
+
+ originalTimePartitionInterval = ConfigFactory.getConfig().getTimePartitionInterval();
+ ConfigFactory.getConfig().setTimePartitionIntervalForRouting(testTimePartitionInterval);
// Init 3C3D cluster environment
EnvFactory.getEnv().initClusterEnvironment(testConfigNodeNum, testDataNodeNum);
}
@@ -57,6 +109,11 @@ public class IoTDBClusterRestartIT {
EnvFactory.getEnv().cleanAfterClass();
ConfigFactory.getConfig()
.setConfigNodeConsesusProtocolClass(originalConfigNodeConsensusProtocolClass);
+ ConfigFactory.getConfig()
+ .setSchemaRegionConsensusProtocolClass(originalSchemaRegionConsensusProtocolClass);
+ ConfigFactory.getConfig()
+ .setDataRegionConsensusProtocolClass(originalDataRegionConsensusProtocolClass);
+ ConfigFactory.getConfig().setTimePartitionIntervalForRouting(originalTimePartitionInterval);
}
@Test
@@ -83,5 +140,94 @@ public class IoTDBClusterRestartIT {
((AbstractEnv) EnvFactory.getEnv()).testWorking();
}
+ @Test
+ @Ignore
+ public void clusterRestartAfterUpdateDataNodeTest() throws InterruptedException {
+ TShowClusterResp clusterNodes;
+ final String sg0 = "root.sg0";
+
+ final String d00 = sg0 + ".d0.s";
+ final String d01 = sg0 + ".d1.s";
+ // Shutdown all data nodes
+ for (int i = 0; i < testDataNodeNum; i++) {
+ EnvFactory.getEnv().shutdownDataNode(i);
+ }
+
+ // Sleep 1s before restart
+ TimeUnit.SECONDS.sleep(1);
+
+ // Modify data node config
+ List<DataNodeWrapper> dataNodeWrapperList = EnvFactory.getEnv().getDataNodeWrapperList();
+ List<ConfigNodeWrapper> configNodeWrappersList = EnvFactory.getEnv().getConfigNodeWrapperList();
+ for (int i = 0; i < testDataNodeNum; i++) {
+ int[] portList = EnvUtils.searchAvailablePorts();
+ dataNodeWrapperList.get(i).setPort(portList[0]);
+ dataNodeWrapperList.get(i).setInternalPort(portList[1]);
+ dataNodeWrapperList.get(i).setMppDataExchangePort(portList[2]);
+
+ // update data node files'names
+ dataNodeWrapperList.get(i).renameFile();
+ }
+
+ for (int i = 0; i < testDataNodeNum; i++) {
+ dataNodeWrapperList.get(i).changeConfig(ConfigFactory.getConfig().getEngineProperties());
+ EnvFactory.getEnv().startDataNode(i);
+ }
+
+ ((AbstractEnv) EnvFactory.getEnv()).testWorking();
+
+ // check nodeInfo in cluster
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ // check the number and status of nodes
+ clusterNodes = getClusterNodeInfos(client, testConfigNodeNum, testDataNodeNum);
+ assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), clusterNodes.getStatus().getCode());
+
+ // check the configuration of nodes
+ List<TConfigNodeLocation> configNodeLocationList = clusterNodes.getConfigNodeList();
+ List<TDataNodeLocation> dataNodeLocationList = clusterNodes.getDataNodeList();
+ checkNodeConfig(
+ configNodeLocationList,
+ dataNodeLocationList,
+ configNodeWrappersList,
+ dataNodeWrapperList);
+
+ // check whether the cluster is working by testing GetAndCreateSchemaPartition
+ TSStatus status;
+ ByteBuffer buffer;
+ TSchemaPartitionReq schemaPartitionReq;
+ TSchemaPartitionTableResp schemaPartitionTableResp;
+ Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> schemaPartitionTable;
+
+ // Set StorageGroups
+ status = client.setStorageGroup(new TSetStorageGroupReq(new TStorageGroupSchema(sg0)));
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+
+ // Test getSchemaPartition, the result should be empty
+ buffer = generatePatternTreeBuffer(new String[] {d00, d01});
+ schemaPartitionReq = new TSchemaPartitionReq(buffer);
+ schemaPartitionTableResp = client.getSchemaPartitionTable(schemaPartitionReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ schemaPartitionTableResp.getStatus().getCode());
+ Assert.assertEquals(0, schemaPartitionTableResp.getSchemaPartitionTableSize());
+
+ // Test getOrCreateSchemaPartition, ConfigNode should create SchemaPartitions and return
+ buffer = generatePatternTreeBuffer(new String[] {d00, d01});
+ schemaPartitionReq.setPathPatternTree(buffer);
+ schemaPartitionTableResp = client.getOrCreateSchemaPartitionTable(schemaPartitionReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ schemaPartitionTableResp.getStatus().getCode());
+ Assert.assertEquals(1, schemaPartitionTableResp.getSchemaPartitionTableSize());
+ schemaPartitionTable = schemaPartitionTableResp.getSchemaPartitionTable();
+ Assert.assertTrue(schemaPartitionTable.containsKey(sg0));
+ Assert.assertEquals(2, schemaPartitionTable.get(sg0).size());
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage());
+ fail(e.getMessage());
+ }
+ }
+
// TODO: Add persistence tests in the future
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeIT.java
index 18ceba53c5..d92b2a4a34 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeIT.java
@@ -68,6 +68,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils.checkNodeConfig;
+import static org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils.getClusterNodeInfos;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
@@ -80,7 +82,6 @@ public class IoTDBConfigNodeIT {
protected static String originalConfigNodeConsensusProtocolClass;
protected static String originalSchemaRegionConsensusProtocolClass;
protected static String originalDataRegionConsensusProtocolClass;
-
private final int retryNum = 30;
@Before
@@ -111,64 +112,6 @@ public class IoTDBConfigNodeIT {
.setDataRegionConsensusProtocolClass(originalDataRegionConsensusProtocolClass);
}
- private TShowClusterResp getClusterNodeInfos(
- IConfigNodeRPCService.Iface client, int expectedConfigNodeNum, int expectedDataNodeNum)
- throws TException, InterruptedException {
- TShowClusterResp clusterNodes = null;
- for (int i = 0; i < retryNum; i++) {
- clusterNodes = client.showCluster();
- if (clusterNodes.getConfigNodeListSize() == expectedConfigNodeNum
- && clusterNodes.getDataNodeListSize() == expectedDataNodeNum) {
- break;
- }
- Thread.sleep(1000);
- }
-
- assertEquals(expectedConfigNodeNum, clusterNodes.getConfigNodeListSize());
- assertEquals(expectedDataNodeNum, clusterNodes.getDataNodeListSize());
-
- return clusterNodes;
- }
-
- private void checkNodeConfig(
- List<TConfigNodeLocation> configNodeList,
- List<TDataNodeLocation> dataNodeList,
- List<ConfigNodeWrapper> configNodeWrappers,
- List<DataNodeWrapper> dataNodeWrappers) {
- // check ConfigNode
- for (TConfigNodeLocation configNodeLocation : configNodeList) {
- boolean found = false;
- for (ConfigNodeWrapper configNodeWrapper : configNodeWrappers) {
- if (configNodeWrapper.getIp().equals(configNodeLocation.getInternalEndPoint().getIp())
- && configNodeWrapper.getPort() == configNodeLocation.getInternalEndPoint().getPort()
- && configNodeWrapper.getConsensusPort()
- == configNodeLocation.getConsensusEndPoint().getPort()) {
- found = true;
- break;
- }
- }
- assertTrue(found);
- }
-
- // check DataNode
- for (TDataNodeLocation dataNodeLocation : dataNodeList) {
- boolean found = false;
- for (DataNodeWrapper dataNodeWrapper : dataNodeWrappers) {
- if (dataNodeWrapper.getIp().equals(dataNodeLocation.getClientRpcEndPoint().getIp())
- && dataNodeWrapper.getPort() == dataNodeLocation.getClientRpcEndPoint().getPort()
- && dataNodeWrapper.getInternalPort() == dataNodeLocation.getInternalEndPoint().getPort()
- && dataNodeWrapper.getSchemaRegionConsensusPort()
- == dataNodeLocation.getSchemaRegionConsensusEndPoint().getPort()
- && dataNodeWrapper.getDataRegionConsensusPort()
- == dataNodeLocation.getDataRegionConsensusEndPoint().getPort()) {
- found = true;
- break;
- }
- }
- assertTrue(found);
- }
- }
-
@Test
public void removeAndStopConfigNodeTest() {
TShowClusterResp clusterNodes;
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java
index 320ba373d9..42d0b6d88a 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathDeserializeUtil;
-import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.trigger.TriggerInformation;
import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
@@ -45,7 +44,6 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.trigger.api.enums.FailureStrategy;
import org.apache.iotdb.trigger.api.enums.TriggerEvent;
import org.apache.iotdb.trigger.api.enums.TriggerType;
-import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.thrift.TException;
@@ -65,6 +63,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils.generatePatternTreeBuffer;
+
@RunWith(IoTDBTestRunner.class)
@Category({ClusterIT.class})
public class IoTDBConfigNodeSnapshotIT {
@@ -105,17 +105,6 @@ public class IoTDBConfigNodeSnapshotIT {
ConfigFactory.getConfig().setTimePartitionIntervalForRouting(originalTimePartitionInterval);
}
- private ByteBuffer generatePatternTreeBuffer(String path)
- throws IllegalPathException, IOException {
- PathPatternTree patternTree = new PathPatternTree();
- patternTree.appendPathPattern(new PartialPath(path));
- patternTree.constructTree();
-
- PublicBAOS baos = new PublicBAOS();
- patternTree.serialize(baos);
- return ByteBuffer.wrap(baos.toByteArray());
- }
-
@Test
public void testPartitionInfoSnapshot()
throws IOException, IllegalPathException, TException, InterruptedException {
@@ -140,7 +129,8 @@ public class IoTDBConfigNodeSnapshotIT {
TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(j);
// Create SchemaPartition
- ByteBuffer patternTree = generatePatternTreeBuffer(storageGroup + ".d" + j + ".s");
+ ByteBuffer patternTree =
+ generatePatternTreeBuffer(new String[] {storageGroup + ".d" + j + ".s"});
TSchemaPartitionReq schemaPartitionReq = new TSchemaPartitionReq(patternTree);
TSchemaPartitionTableResp schemaPartitionTableResp =
client.getOrCreateSchemaPartitionTable(schemaPartitionReq);
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java
new file mode 100644
index 0000000000..4a09ffaeb3
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.it.utils;
+
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
+import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
+import org.apache.iotdb.it.env.ConfigNodeWrapper;
+import org.apache.iotdb.it.env.DataNodeWrapper;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ConfigNodeTestUtils {
+ private static final int retryNum = 30;
+
+ public static TShowClusterResp getClusterNodeInfos(
+ IConfigNodeRPCService.Iface client, int expectedConfigNodeNum, int expectedDataNodeNum)
+ throws TException, InterruptedException {
+ TShowClusterResp clusterNodes = null;
+ for (int i = 0; i < retryNum; i++) {
+ clusterNodes = client.showCluster();
+ if (clusterNodes.getConfigNodeListSize() == expectedConfigNodeNum
+ && clusterNodes.getDataNodeListSize() == expectedDataNodeNum) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+
+ assertEquals(expectedConfigNodeNum, clusterNodes.getConfigNodeListSize());
+ assertEquals(expectedDataNodeNum, clusterNodes.getDataNodeListSize());
+
+ return clusterNodes;
+ }
+
+ public static void checkNodeConfig(
+ List<TConfigNodeLocation> configNodeList,
+ List<TDataNodeLocation> dataNodeList,
+ List<ConfigNodeWrapper> configNodeWrappers,
+ List<DataNodeWrapper> dataNodeWrappers) {
+ // check ConfigNode
+ for (TConfigNodeLocation configNodeLocation : configNodeList) {
+ boolean found = false;
+ for (ConfigNodeWrapper configNodeWrapper : configNodeWrappers) {
+ if (configNodeWrapper.getIp().equals(configNodeLocation.getInternalEndPoint().getIp())
+ && configNodeWrapper.getPort() == configNodeLocation.getInternalEndPoint().getPort()
+ && configNodeWrapper.getConsensusPort()
+ == configNodeLocation.getConsensusEndPoint().getPort()) {
+ found = true;
+ break;
+ }
+ }
+ assertTrue(found);
+ }
+
+ // check DataNode
+ for (TDataNodeLocation dataNodeLocation : dataNodeList) {
+ boolean found = false;
+ for (DataNodeWrapper dataNodeWrapper : dataNodeWrappers) {
+ if (dataNodeWrapper.getIp().equals(dataNodeLocation.getClientRpcEndPoint().getIp())
+ && dataNodeWrapper.getPort() == dataNodeLocation.getClientRpcEndPoint().getPort()
+ && dataNodeWrapper.getInternalPort() == dataNodeLocation.getInternalEndPoint().getPort()
+ && dataNodeWrapper.getSchemaRegionConsensusPort()
+ == dataNodeLocation.getSchemaRegionConsensusEndPoint().getPort()
+ && dataNodeWrapper.getDataRegionConsensusPort()
+ == dataNodeLocation.getDataRegionConsensusEndPoint().getPort()) {
+ found = true;
+ break;
+ }
+ }
+ assertTrue(found);
+ }
+ }
+
+ /** Generate a PatternTree and serialize it into a ByteBuffer */
+ public static ByteBuffer generatePatternTreeBuffer(String[] paths)
+ throws IllegalPathException, IOException {
+ PathPatternTree patternTree = new PathPatternTree();
+ for (String path : paths) {
+ patternTree.appendPathPattern(new PartialPath(path));
+ }
+ patternTree.constructTree();
+
+ PublicBAOS baos = new PublicBAOS();
+ patternTree.serialize(baos);
+ return ByteBuffer.wrap(baos.toByteArray());
+ }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index fe349f364a..ff27133d88 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -50,8 +50,6 @@ public class IoTDBConstant {
public static final String IOTDB_CONF = "IOTDB_CONF";
public static final String GLOBAL_DB_NAME = "IoTDB";
- public static final String CONFIG_NODE_ID = "config_node_id";
- public static final String DATA_NODE_ID = "data_node_id";
public static final String RPC_ADDRESS = "rpc_address";
public static final String RPC_PORT = "rpc_port";
public static final String INTERNAL_ADDRESS = "internal_address";
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index c6df2ef36c..75399d0071 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -49,6 +49,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeUpdateReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TDeactivateSchemaTemplateReq;
@@ -331,6 +332,22 @@ public class ConfigNodeClient
throw new TException(MSG_RECONNECTION_FAIL);
}
+ @Override
+ public TDataNodeRegisterResp updateDataNode(TDataNodeUpdateReq req) throws TException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TDataNodeRegisterResp resp = client.updateDataNode(req);
+ if (!updateConfigNodeLeader(resp.status)) {
+ return resp;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ reconnect();
+ }
+ throw new TException(MSG_RECONNECTION_FAIL);
+ }
+
@Override
public TDataNodeConfigurationResp getDataNodeConfiguration(int dataNodeId) throws TException {
for (int i = 0; i < RETRY_NUM; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
index c2336ac7fc..3f7a63689b 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.conf;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
@@ -119,6 +120,29 @@ public class IoTDBStartCheck {
private static final String IOTDB_VERSION_STRING = "iotdb_version";
+ private static final String INTERNAL_ADDRESS = "internal_address";
+ private static final String internalAddress = config.getInternalAddress();
+
+ private static final String INTERNAL_PORT = "internal_port";
+ private static final String internalPort = String.valueOf(config.getInternalPort());
+
+ private static final String RPC_ADDRESS = "rpc_address";
+ private static final String rpcAddress = config.getRpcAddress();
+
+ private static final String RPC_PORT = "rpc_port";
+ private static final String rpcPort = String.valueOf(config.getRpcPort());
+
+ private static final String MPP_DATA_EXCHANGE_PORT = "mpp_data_exchange_port";
+ private static final String mppDataExchangePort = String.valueOf(config.getMppDataExchangePort());
+
+ private static final String SCHEMA_REGION_CONSENSUS_PORT = "schema_region_consensus_port";
+ private static final String schemaRegionConsensusPort =
+ String.valueOf(config.getSchemaRegionConsensusPort());
+
+ private static final String DATA_REGION_CONSENSUS_PORT = "data_region_consensus_port";
+ private static final String dataRegionConsensusPort =
+ String.valueOf(config.getDataRegionConsensusPort());
+
public static IoTDBStartCheck getInstance() {
return IoTDBConfigCheckHolder.INSTANCE;
}
@@ -176,6 +200,13 @@ public class IoTDBStartCheck {
systemProperties.put(ENABLE_ID_TABLE, enableIDTable);
systemProperties.put(ENABLE_ID_TABLE_LOG_FILE, enableIdTableLogFile);
systemProperties.put(SCHEMA_ENGINE_MODE, schemaEngineMode);
+ systemProperties.put(INTERNAL_ADDRESS, internalAddress);
+ systemProperties.put(INTERNAL_PORT, internalPort);
+ systemProperties.put(RPC_ADDRESS, rpcAddress);
+ systemProperties.put(RPC_PORT, rpcPort);
+ systemProperties.put(MPP_DATA_EXCHANGE_PORT, mppDataExchangePort);
+ systemProperties.put(SCHEMA_REGION_CONSENSUS_PORT, schemaRegionConsensusPort);
+ systemProperties.put(DATA_REGION_CONSENSUS_PORT, dataRegionConsensusPort);
}
/**
@@ -500,6 +531,30 @@ public class IoTDBStartCheck {
FileUtils.moveFile(tmpPropertiesFile, propertiesFile);
}
+ public void serializeNewDataNode(TDataNodeLocation dataNodeLocation) throws IOException {
+ reloadProperties();
+
+ try (FileOutputStream fileOutputStream = new FileOutputStream(propertiesFile)) {
+ properties.setProperty(INTERNAL_ADDRESS, dataNodeLocation.getInternalEndPoint().getIp());
+ properties.setProperty(
+ INTERNAL_PORT, String.valueOf(dataNodeLocation.getInternalEndPoint().getPort()));
+ properties.setProperty(
+ RPC_ADDRESS, String.valueOf(dataNodeLocation.getClientRpcEndPoint().getIp()));
+ properties.setProperty(
+ RPC_PORT, String.valueOf(dataNodeLocation.getClientRpcEndPoint().getPort()));
+ properties.setProperty(
+ MPP_DATA_EXCHANGE_PORT,
+ String.valueOf(dataNodeLocation.getMPPDataExchangeEndPoint().getPort()));
+ properties.setProperty(
+ SCHEMA_REGION_CONSENSUS_PORT,
+ String.valueOf(dataNodeLocation.getSchemaRegionConsensusEndPoint().getPort()));
+ properties.setProperty(
+ DATA_REGION_CONSENSUS_PORT,
+ String.valueOf(dataNodeLocation.getDataRegionConsensusEndPoint().getPort()));
+ properties.store(fileOutputStream, SYSTEM_PROPERTIES_STRING);
+ }
+ }
+
public boolean checkConsensusProtocolExists(TConsensusGroupType type) {
if (type == TConsensusGroupType.DataRegion) {
return properties.containsKey(DATA_REGION_CONSENSUS_PROTOCOL);
@@ -510,4 +565,50 @@ public class IoTDBStartCheck {
logger.error("Unexpected consensus group type");
return false;
}
+
+ public boolean isIpPortUpdated() {
+ boolean isUpdated = false;
+ // check the modifiable parts of configuration
+ if (!(properties.getProperty(INTERNAL_PORT).equals(internalPort))) {
+ isUpdated = true;
+ logger.info(
+ "Internal port is updated from {} to {}",
+ properties.getProperty(INTERNAL_PORT),
+ internalPort);
+ }
+ if (!(properties.getProperty(RPC_ADDRESS).equals(rpcAddress))) {
+ isUpdated = true;
+ logger.info(
+ "RPC address is updated from {} to {}", properties.getProperty(RPC_ADDRESS), rpcAddress);
+ }
+ if (!(properties.getProperty(RPC_PORT).equals(rpcPort))) {
+ isUpdated = true;
+ logger.info("RPC port is updated from {} to {}", properties.getProperty(RPC_PORT), rpcPort);
+ }
+ if (!(properties.getProperty(MPP_DATA_EXCHANGE_PORT).equals(mppDataExchangePort))) {
+ isUpdated = true;
+ logger.info(
+ "MPP data exchange port is updated from {} to {}",
+ properties.getProperty(MPP_DATA_EXCHANGE_PORT),
+ mppDataExchangePort);
+ }
+ return isUpdated;
+ }
+
+ public boolean checkNonModifiableConfiguration() {
+ // check the non-modifiable parts of configuration
+ if (!(properties.getProperty(INTERNAL_ADDRESS).equals(internalAddress))) {
+ logger.error("Internal address is not allowed to be updated");
+ return true;
+ }
+ if (!(properties.getProperty(SCHEMA_REGION_CONSENSUS_PORT).equals(schemaRegionConsensusPort))) {
+ logger.error("Schema region consensus port is not allowed to be updated");
+ return true;
+ }
+ if (!(properties.getProperty(DATA_REGION_CONSENSUS_PORT).equals(dataRegionConsensusPort))) {
+ logger.error("Data region consensus port is not allowed to be updated");
+ return true;
+ }
+ return false;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index ea9e8284bf..7580b3f675 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -188,8 +188,8 @@ public class DataNode implements DataNodeMBean {
ConfigNodeInfo.getInstance().updateConfigNodeList(config.getTargetConfigNodeList());
while (retry > 0) {
- logger.info("Start registering to the cluster.");
try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
+ logger.info("Start registering to the cluster.");
TDataNodeRegisterReq req = new TDataNodeRegisterReq();
req.setDataNodeConfiguration(generateDataNodeConfiguration());
TDataNodeRegisterResp dataNodeRegisterResp = configNodeClient.registerDataNode(req);
@@ -382,13 +382,7 @@ public class DataNode implements DataNodeMBean {
initProtocols();
}
- /**
- * generate dataNodeConfiguration
- *
- * @return TDataNodeConfiguration
- */
- private TDataNodeConfiguration generateDataNodeConfiguration() {
- // Set DataNodeLocation
+ private TDataNodeLocation generateDataNodeLocation() {
TDataNodeLocation location = new TDataNodeLocation();
location.setDataNodeId(config.getDataNodeId());
location.setClientRpcEndPoint(new TEndPoint(config.getRpcAddress(), config.getRpcPort()));
@@ -400,6 +394,17 @@ public class DataNode implements DataNodeMBean {
new TEndPoint(config.getInternalAddress(), config.getDataRegionConsensusPort()));
location.setSchemaRegionConsensusEndPoint(
new TEndPoint(config.getInternalAddress(), config.getSchemaRegionConsensusPort()));
+ return location;
+ }
+
+ /**
+ * generate dataNodeConfiguration
+ *
+ * @return TDataNodeConfiguration
+ */
+ private TDataNodeConfiguration generateDataNodeConfiguration() {
+ // Set DataNodeLocation
+ TDataNodeLocation location = generateDataNodeLocation();
// Set NodeResource
TNodeResource resource = new TNodeResource();
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index d3acdfa73c..c6155942a1 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -174,7 +174,8 @@ public enum TSStatusCode {
OVERLAP_WITH_EXISTING_TASK(920),
NOT_AVAILABLE_REGION_GROUP(921),
CREATE_TRIGGER_ERROR(922),
- DROP_TRIGGER_ERROR(923);
+ DROP_TRIGGER_ERROR(923),
+ UPDATE_DATANODE_FAILED(924);
private int statusCode;
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 3a5be47ff4..770f4baf49 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -89,6 +89,11 @@ struct TRatisConfig {
26: required i64 firstElectionTimeoutMax
}
+struct TDataNodeUpdateReq{
+ 1: required common.TDataNodeLocation dataNodeLocation
+}
+
+
struct TDataNodeRemoveReq {
1: required list<common.TDataNodeLocation> dataNodeLocations
}
@@ -581,6 +586,15 @@ service IConfigNodeRPCService {
*/
TDataNodeRemoveResp removeDataNode(TDataNodeRemoveReq req)
+ /**
+ * Update the specified DataNode‘s location in the cluster when restart
+ *
+ * @return SUCCESS_STATUS if the DataNode updated successfully
+ * DATANODE_NOT_EXIST if one of the DataNodes in the TDataNodeUpdateReq doesn't exist in the cluster
+ * UPDATE_DATANODE_FAILED if failed to update the DataNode
+ */
+ TDataNodeRegisterResp updateDataNode(TDataNodeUpdateReq req)
+
/**
* Get one or more DataNodes' configuration
*