You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/02/28 04:19:37 UTC
[iotdb] branch master updated: [IOTDB-5555] Enable modify external RPC EndPoint of DataNode (#9155)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7959c1d769 [IOTDB-5555] Enable modify external RPC EndPoint of DataNode (#9155)
7959c1d769 is described below
commit 7959c1d769e66e9aa4a7b1945f49c44e46db5f41
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Tue Feb 28 12:19:31 2023 +0800
[IOTDB-5555] Enable modify external RPC EndPoint of DataNode (#9155)
---
.../request/write/datanode/UpdateDataNodePlan.java | 32 +++---
.../iotdb/confignode/manager/ConfigManager.java | 17 +--
.../apache/iotdb/confignode/manager/IManager.java | 9 --
.../manager/node/ClusterNodeStartUtils.java | 88 +++++++++-------
.../iotdb/confignode/manager/node/NodeManager.java | 55 ++--------
.../confignode/persistence/node/NodeInfo.java | 6 +-
.../iotdb/confignode/service/ConfigNode.java | 4 +-
.../thrift/ConfigNodeRPCServiceProcessor.java | 18 +---
.../request/ConfigPhysicalPlanSerDeTest.java | 12 ++-
.../apache/iotdb/it/env/cluster/AbstractEnv.java | 8 +-
.../iotdb/it/env/cluster/AbstractNodeWrapper.java | 6 +-
.../iotdb/it/env/cluster/ConfigNodeWrapper.java | 42 +++++---
.../iotdb/it/env/cluster/DataNodeWrapper.java | 65 +++++++-----
.../it/cluster/IoTDBClusterNodeErrorStartUpIT.java | 7 +-
.../it/cluster/IoTDBClusterRestartIT.java | 117 +++++----------------
.../confignode/it/utils/ConfigNodeTestUtils.java | 27 +----
.../commons/utils/ThriftCommonsSerDeUtils.java | 19 ++++
.../commons/utils/ThriftCommonsSerDeUtilsTest.java | 30 ++++++
.../apache/iotdb/db/client/ConfigNodeClient.java | 17 ---
.../org/apache/iotdb/db/conf/IoTDBStartCheck.java | 22 +++-
.../java/org/apache/iotdb/db/service/DataNode.java | 9 +-
.../src/main/thrift/confignode.thrift | 21 +---
22 files changed, 286 insertions(+), 345 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/datanode/UpdateDataNodePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/datanode/UpdateDataNodePlan.java
index 367b09279b..de9152777f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/datanode/UpdateDataNodePlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/datanode/UpdateDataNodePlan.java
@@ -16,9 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.confignode.consensus.request.write.datanode;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
@@ -30,42 +31,49 @@ import java.util.Objects;
public class UpdateDataNodePlan extends ConfigPhysicalPlan {
- private TDataNodeLocation dataNodeLocation;
+ private TDataNodeConfiguration dataNodeConfiguration;
public UpdateDataNodePlan() {
super(ConfigPhysicalPlanType.UpdateDataNodeConfiguration);
}
- public UpdateDataNodePlan(TDataNodeLocation datanodeLocation) {
+ public UpdateDataNodePlan(TDataNodeConfiguration dataNodeConfiguration) {
this();
- this.dataNodeLocation = datanodeLocation;
+ this.dataNodeConfiguration = dataNodeConfiguration;
}
- public TDataNodeLocation getDataNodeLocation() {
- return dataNodeLocation;
+ public TDataNodeConfiguration getDataNodeConfiguration() {
+ return dataNodeConfiguration;
}
@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
stream.writeShort(getType().getPlanType());
- ThriftCommonsSerDeUtils.serializeTDataNodeLocation(dataNodeLocation, stream);
+ ThriftCommonsSerDeUtils.serializeTDataNodeConfiguration(dataNodeConfiguration, stream);
}
@Override
protected void deserializeImpl(ByteBuffer buffer) {
- dataNodeLocation = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer);
+ dataNodeConfiguration = ThriftCommonsSerDeUtils.deserializeTDataNodeConfiguration(buffer);
}
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!getType().equals(((UpdateDataNodePlan) o).getType())) {
+ return false;
+ }
UpdateDataNodePlan that = (UpdateDataNodePlan) o;
- return dataNodeLocation.equals(that.dataNodeLocation);
+ return dataNodeConfiguration.equals(that.dataNodeConfiguration);
}
@Override
public int hashCode() {
- return Objects.hash(dataNodeLocation);
+ return Objects.hash(getType(), dataNodeConfiguration);
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 6d46b9b162..6b24df7bb9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -59,7 +59,6 @@ import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoLi
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
-import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DatabaseSchemaPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetDataReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetSchemaReplicationFactorPlan;
@@ -349,7 +348,7 @@ public class ConfigManager implements IManager {
req.getDataNodeConfiguration().getLocation(),
this);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return nodeManager.restartDataNode(req.getDataNodeConfiguration().getLocation());
+ return nodeManager.updateDataNodeIfNecessary(req.getDataNodeConfiguration());
}
}
@@ -370,20 +369,6 @@ public class ConfigManager implements IManager {
}
}
- @Override
- public DataSet updateDataNode(UpdateDataNodePlan updateDataNodePlan) {
- TSStatus status = confirmLeader();
- DataNodeRegisterResp dataSet;
- if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- dataSet = (DataNodeRegisterResp) nodeManager.updateDataNode(updateDataNodePlan);
- } else {
- dataSet = new DataNodeRegisterResp();
- dataSet.setStatus(status);
- dataSet.setConfigNodeList(nodeManager.getRegisteredConfigNodes());
- }
- return dataSet;
- }
-
@Override
public TSStatus reportDataNodeShutdown(TDataNodeLocation dataNodeLocation) {
TSStatus status = confirmLeader();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index dfcaff1149..b82577df94 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -35,7 +35,6 @@ import org.apache.iotdb.confignode.consensus.request.read.partition.GetTimeSlotL
import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
-import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DatabaseSchemaPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetDataReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetSchemaReplicationFactorPlan;
@@ -223,14 +222,6 @@ public interface IManager {
*/
DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan);
- /**
- * Update DataNode
- *
- * @param updateDataNodePlan UpdateDataNodePlan
- * @return DataNodeConfigurationDataSet
- */
- DataSet updateDataNode(UpdateDataNodePlan updateDataNodePlan);
-
/**
* Report that the specified DataNode will be shutdown.
*
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/ClusterNodeStartUtils.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/ClusterNodeStartUtils.java
index 699b7026d6..c69903464e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/ClusterNodeStartUtils.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/ClusterNodeStartUtils.java
@@ -195,30 +195,38 @@ public class ClusterNodeStartUtils {
return status;
}
- boolean isTEndPointUpdated;
+ boolean acceptRestart = true;
+ Set<Integer> updatedTEndPoints;
switch (nodeType) {
case ConfigNode:
- isTEndPointUpdated =
- isTEndPointsOfTConfigNodeLocationUpdated(
+ updatedTEndPoints =
+ checkUpdatedTEndPointOfConfigNode(
(TConfigNodeLocation) nodeLocation, (TConfigNodeLocation) matchedNodeLocation);
+ if (!updatedTEndPoints.isEmpty()) {
+ // TODO: Accept internal TEndPoints
+ acceptRestart = false;
+ }
break;
case DataNode:
default:
- isTEndPointUpdated =
- isTEndPointsOfTDataNodeLocationUpdated(
+ updatedTEndPoints =
+ checkUpdatedTEndPointOfDataNode(
(TDataNodeLocation) nodeLocation, (TDataNodeLocation) matchedNodeLocation);
+ if (updatedTEndPoints.stream().max(Integer::compare).orElse(-1) > 0) {
+ // TODO: Accept internal TEndPoints
+ acceptRestart = false;
+ }
break;
}
- if (isTEndPointUpdated) {
- /* Reject restart because some TEndPoints have changed */
- // TODO: @Itami-Sho, enable this
+ if (!acceptRestart) {
+ /* Reject restart because some internal TEndPoints have been changed */
status.setCode(TSStatusCode.REJECT_NODE_START.getStatusCode());
status.setMessage(
String.format(
- "Reject %s restart. Because some TEndPoints of this %s have been changed."
+ "Reject %s restart. Because the internal TEndPoints of this %s can't be modified."
+ POSSIBLE_SOLUTIONS
- + "\t1. Please delete 'data' dir and retry start. This Node will be registered as a new Node, so don't forget to remove the old one.",
+ + "\t1. Please keep the internal TEndPoints of this Node the same as before.",
nodeType.getNodeType(),
nodeType.getNodeType()));
return status;
@@ -336,47 +344,53 @@ public class ClusterNodeStartUtils {
/**
* Check if some TEndPoints of the specified ConfigNode have updated.
*
- * @return True if some TEndPoints of the specified ConfigNode have updated, false otherwise.
+ * @param restartLocation The location of restart ConfigNode
+ * @param recordLocation The record ConfigNode location
+ * @return The set of TEndPoints that have modified.
*/
- public static boolean isTEndPointsOfTConfigNodeLocationUpdated(
- TConfigNodeLocation configNodeLocationA, TConfigNodeLocation configNodeLocationB) {
- if (!configNodeLocationA
- .getInternalEndPoint()
- .equals(configNodeLocationB.getInternalEndPoint())) {
- return true;
+ public static Set<Integer> checkUpdatedTEndPointOfConfigNode(
+ TConfigNodeLocation restartLocation, TConfigNodeLocation recordLocation) {
+ Set<Integer> updatedTEndPoints = new HashSet<>();
+ if (!recordLocation.getInternalEndPoint().equals(restartLocation.getInternalEndPoint())) {
+ updatedTEndPoints.add(0);
}
- return !configNodeLocationA
- .getConsensusEndPoint()
- .equals(configNodeLocationB.getConsensusEndPoint());
+ if (!recordLocation.getConsensusEndPoint().equals(restartLocation.getConsensusEndPoint())) {
+ updatedTEndPoints.add(1);
+ }
+ return updatedTEndPoints;
}
/**
* Check if some TEndPoints of the specified DataNode have updated.
*
- * @return True if some TEndPoints of the specified DataNode have updated, false otherwise.
+ * @param restartLocation The location of restart DataNode
+ * @param recordLocation The record DataNode location
+ * @return The set of TEndPoints that have modified.
*/
- public static boolean isTEndPointsOfTDataNodeLocationUpdated(
- TDataNodeLocation dataNodeLocationA, TDataNodeLocation dataNodeLocationB) {
- if (!dataNodeLocationA
- .getClientRpcEndPoint()
- .equals(dataNodeLocationB.getClientRpcEndPoint())) {
- return true;
+ public static Set<Integer> checkUpdatedTEndPointOfDataNode(
+ TDataNodeLocation restartLocation, TDataNodeLocation recordLocation) {
+ Set<Integer> updatedTEndPoints = new HashSet<>();
+ if (!recordLocation.getClientRpcEndPoint().equals(restartLocation.getClientRpcEndPoint())) {
+ updatedTEndPoints.add(0);
}
- if (!dataNodeLocationA.getInternalEndPoint().equals(dataNodeLocationB.getInternalEndPoint())) {
- return true;
+ if (!recordLocation.getInternalEndPoint().equals(restartLocation.getInternalEndPoint())) {
+ updatedTEndPoints.add(1);
}
- if (!dataNodeLocationA
+ if (!recordLocation
.getMPPDataExchangeEndPoint()
- .equals(dataNodeLocationB.getMPPDataExchangeEndPoint())) {
- return true;
+ .equals(restartLocation.getMPPDataExchangeEndPoint())) {
+ updatedTEndPoints.add(2);
}
- if (!dataNodeLocationA
+ if (!recordLocation
.getSchemaRegionConsensusEndPoint()
- .equals(dataNodeLocationB.getSchemaRegionConsensusEndPoint())) {
- return true;
+ .equals(restartLocation.getSchemaRegionConsensusEndPoint())) {
+ updatedTEndPoints.add(3);
}
- return !dataNodeLocationA
+ if (!recordLocation
.getDataRegionConsensusEndPoint()
- .equals(dataNodeLocationB.getDataRegionConsensusEndPoint());
+ .equals(restartLocation.getDataRegionConsensusEndPoint())) {
+ updatedTEndPoints.add(4);
+ }
+ return updatedTEndPoints;
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 9812a2455a..fb822a9abb 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.confignode.manager.node;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
@@ -278,8 +279,16 @@ public class NodeManager {
return resp;
}
- public TDataNodeRestartResp restartDataNode(TDataNodeLocation dataNodeLocation) {
- // TODO: @Itami-Sho update peer if necessary
+ public TDataNodeRestartResp updateDataNodeIfNecessary(
+ TDataNodeConfiguration dataNodeConfiguration) {
+ TDataNodeConfiguration recordConfiguration =
+ getRegisteredDataNode(dataNodeConfiguration.getLocation().getDataNodeId());
+ if (!recordConfiguration.equals(dataNodeConfiguration)) {
+ // Update DataNodeConfiguration when modified during restart
+ UpdateDataNodePlan updateDataNodePlan = new UpdateDataNodePlan(dataNodeConfiguration);
+ getConsensusManager().write(updateDataNodePlan);
+ }
+
TDataNodeRestartResp resp = new TDataNodeRestartResp();
resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART);
resp.setConfigNodeList(getRegisteredConfigNodes());
@@ -337,48 +346,6 @@ public class NodeManager {
return dataSet;
}
- /**
- * Update the specified DataNode‘s location
- *
- * @param updateDataNodePlan UpdateDataNodePlan
- * @return TSStatus. The TSStatus will be set to SUCCESS_STATUS when update success, and
- * DATANODE_NOT_EXIST when some datanode is not exist, UPDATE_DATANODE_FAILED when update
- * failed.
- */
- public DataSet updateDataNode(UpdateDataNodePlan updateDataNodePlan) {
- LOGGER.info("NodeManager start to update DataNode {}", updateDataNodePlan);
-
- DataNodeRegisterResp dataSet = new DataNodeRegisterResp();
- TSStatus status;
- // check if node is already exist
- boolean found = false;
- List<TDataNodeConfiguration> configurationList = getRegisteredDataNodes();
- for (TDataNodeConfiguration configuration : configurationList) {
- if (configuration.getLocation().getDataNodeId()
- == updateDataNodePlan.getDataNodeLocation().getDataNodeId()) {
- found = true;
- break;
- }
- }
- if (found) {
- getConsensusManager().write(updateDataNodePlan);
- status =
- new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())
- .setMessage("updateDataNode(nodeId=%d) success.");
- } else {
- status =
- new TSStatus(TSStatusCode.DATANODE_NOT_EXIST.getStatusCode())
- .setMessage(
- String.format(
- "The specified DataNode(nodeId=%d) doesn't exist",
- updateDataNodePlan.getDataNodeLocation().getDataNodeId()));
- }
- dataSet.setStatus(status);
- dataSet.setDataNodeId(updateDataNodePlan.getDataNodeLocation().getDataNodeId());
- dataSet.setConfigNodeList(getRegisteredConfigNodes());
- return dataSet;
- }
-
public TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq req) {
int nodeId = nodeInfo.generateNextNodeId();
req.getConfigNodeLocation().setConfigNodeId(nodeId);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
index 9183869042..67c5526ff8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.confignode.persistence.node;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
@@ -170,9 +171,8 @@ public class NodeInfo implements SnapshotProcessor {
public TSStatus updateDataNode(UpdateDataNodePlan updateDataNodePlan) {
dataNodeInfoReadWriteLock.writeLock().lock();
try {
- registeredDataNodes
- .get(updateDataNodePlan.getDataNodeLocation().getDataNodeId())
- .setLocation(updateDataNodePlan.getDataNodeLocation());
+ TDataNodeConfiguration newConfiguration = updateDataNodePlan.getDataNodeConfiguration();
+ registeredDataNodes.replace(newConfiguration.getLocation().getDataNodeId(), newConfiguration);
} finally {
dataNodeInfoReadWriteLock.writeLock().unlock();
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index 198d272a4c..e0d85b9b46 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -237,11 +237,11 @@ public class ConfigNode implements ConfigNodeMBean {
private void sendRegisterConfigNodeRequest() throws StartupException, IOException {
TConfigNodeRegisterReq req =
new TConfigNodeRegisterReq(
+ configManager.getClusterParameters(),
new TConfigNodeLocation(
INIT_NON_SEED_CONFIG_NODE_ID,
new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort()),
- new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort())),
- configManager.getClusterParameters());
+ new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort())));
TEndPoint targetConfigNode = CONF.getTargetConfigNode();
if (targetConfigNode == null) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index d63d587df7..30a96f2fab 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -48,7 +48,6 @@ import org.apache.iotdb.confignode.consensus.request.read.partition.GetTimeSlotL
import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
-import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DatabaseSchemaPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetDataReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetSchemaReplicationFactorPlan;
@@ -88,7 +87,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
-import org.apache.iotdb.confignode.rpc.thrift.TDataNodeUpdateReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
@@ -212,7 +210,7 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
.convertToRpcDataNodeRegisterResp();
// Print log to record the ConfigNode that performs the RegisterDatanodeRequest
- LOGGER.info("Execute RegisterDatanodeRequest {} with result {}", req, resp);
+ LOGGER.info("Execute RegisterDataNodeRequest {} with result {}", req, resp);
return resp;
}
@@ -222,7 +220,7 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
TDataNodeRestartResp resp = configManager.restartDataNode(req);
// Print log to record the ConfigNode that performs the RestartDatanodeRequest
- LOGGER.info("Execute RestartDatanodeRequest {} with result {}", req, resp);
+ LOGGER.info("Execute RestartDataNodeRequest {} with result {}", req, resp);
return resp;
}
@@ -239,18 +237,6 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
return resp;
}
- @Override
- public TDataNodeRegisterResp updateDataNode(TDataNodeUpdateReq req) {
- LOGGER.info("ConfigNode RPC Service start to update DataNode, req: {}", req);
- UpdateDataNodePlan updateDataNodePlan = new UpdateDataNodePlan(req.getDataNodeLocation());
- TDataNodeRegisterResp resp =
- ((DataNodeRegisterResp) configManager.updateDataNode(updateDataNodePlan))
- .convertToRpcDataNodeRegisterResp();
- LOGGER.info(
- "ConfigNode RPC Service finished to update DataNode, req: {}, result: {}", req, resp);
- return resp;
- }
-
@Override
public TSStatus reportDataNodeShutdown(TDataNodeLocation dataNodeLocation) {
return configManager.reportDataNodeShutdown(dataNodeLocation);
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index 2355c53cd6..b63d8e6a4b 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -167,14 +167,22 @@ public class ConfigPhysicalPlanSerDeTest {
@Test
public void UpdateDataNodePlanTest() throws IOException {
TDataNodeLocation dataNodeLocation = new TDataNodeLocation();
- dataNodeLocation.setDataNodeId(1);
+ dataNodeLocation.setDataNodeId(0);
dataNodeLocation.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6667));
dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 10730));
dataNodeLocation.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0", 10740));
dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 10760));
dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 10750));
- UpdateDataNodePlan plan0 = new UpdateDataNodePlan(dataNodeLocation);
+ TNodeResource dataNodeResource = new TNodeResource();
+ dataNodeResource.setCpuCoreNum(16);
+ dataNodeResource.setMaxMemory(2022213861);
+
+ TDataNodeConfiguration dataNodeConfiguration = new TDataNodeConfiguration();
+ dataNodeConfiguration.setLocation(dataNodeLocation);
+ dataNodeConfiguration.setResource(dataNodeResource);
+
+ UpdateDataNodePlan plan0 = new UpdateDataNodePlan(dataNodeConfiguration);
UpdateDataNodePlan plan1 =
(UpdateDataNodePlan) ConfigPhysicalPlan.Factory.create(plan0.serializeToByteBuffer());
Assert.assertEquals(plan0, plan1);
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractEnv.java
index c0b36c0049..a660063358 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractEnv.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.it.env.cluster;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
@@ -706,11 +707,14 @@ public abstract class AbstractEnv implements BaseEnv {
for (int j = 0; j < nodes.size(); j++) {
String endpoint = nodes.get(j).getIpAndPortString();
if (!nodeIds.containsKey(endpoint)) {
- throw new IllegalStateException(
- "The node " + nodes.get(j).getIpAndPortString() + " is not found!");
+ // Node not exist
+ // Notice: Never modify this line, since the NodeLocation might be modified in IT
+ errorMessages.add("The node " + nodes.get(j).getIpAndPortString() + " is not found!");
+ continue;
}
String status = showClusterResp.getNodeStatus().get(nodeIds.get(endpoint));
if (!targetStatus.get(j).getStatus().equals(status)) {
+ // Error status
errorMessages.add(
String.format(
"Node %s is in status %s, but expected %s",
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractNodeWrapper.java
index d462e3f19d..a7bd8513cb 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractNodeWrapper.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/AbstractNodeWrapper.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.it.env.cluster;
import org.apache.iotdb.it.env.EnvFactory;
@@ -217,6 +218,7 @@ public abstract class AbstractNodeWrapper implements BaseNodeWrapper {
MppBaseConfig outputNodeConfig = nodeConfig.emptyClone();
// 2. Override by values which are hardcoded in mutable properties fields.
+ reloadMutableFields();
outputCommonConfig.updateProperties(mutableCommonProperties);
outputNodeConfig.updateProperties(mutableNodeProperties);
@@ -424,7 +426,9 @@ public abstract class AbstractNodeWrapper implements BaseNodeWrapper {
return testClassName + "_" + testMethodName;
}
- /* Abstract methods, which must be implemented in ConfigNode and Datanode. */
+ /* Abstract methods, which must be implemented in ConfigNode and DataNode. */
+ protected abstract void reloadMutableFields();
+
protected abstract void renameFile();
protected abstract String getTargetNodeConfigPath();
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ConfigNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ConfigNodeWrapper.java
index 3d8cd5ca43..7b30281e84 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ConfigNodeWrapper.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ConfigNodeWrapper.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.it.env.cluster;
import org.apache.iotdb.commons.conf.IoTDBConstant;
@@ -50,25 +51,9 @@ public class ConfigNodeWrapper extends AbstractNodeWrapper {
}
// initialize mutable properties
- mutableCommonProperties.setProperty(
- propertyKeyConfigNodeConsensusProtocolClass,
- "org.apache.iotdb.consensus.simple.SimpleConsensus");
- mutableCommonProperties.setProperty(
- propertyKeySchemaRegionConsensusProtocolClass,
- "org.apache.iotdb.consensus.simple.SimpleConsensus");
- mutableCommonProperties.setProperty(
- propertyKeyDataRegionConsensusProtocolClass,
- "org.apache.iotdb.consensus.simple.SimpleConsensus");
- mutableCommonProperties.setProperty(propertyKeySchemaReplicationFactor, "1");
- mutableCommonProperties.setProperty(propertyKeyDataReplicationFactor, "1");
-
- mutableNodeProperties.put("cn_connection_timeout_ms", "30000");
+ reloadMutableFields();
// initialize immutable properties
- immutableNodeProperties.setProperty(IoTDBConstant.CN_INTERNAL_ADDRESS, super.getIp());
- immutableNodeProperties.setProperty(IoTDBConstant.CN_INTERNAL_PORT, String.valueOf(getPort()));
- immutableNodeProperties.setProperty(
- IoTDBConstant.CN_CONSENSUS_PORT, String.valueOf(this.consensusPort));
immutableNodeProperties.setProperty(
IoTDBConstant.CN_TARGET_CONFIG_NODE_LIST, targetConfigNodes);
immutableNodeProperties.setProperty("cn_system_dir", MppBaseConfig.NULL_VALUE);
@@ -137,6 +122,29 @@ public class ConfigNodeWrapper extends AbstractNodeWrapper {
"-s"));
}
+ @Override
+ protected void reloadMutableFields() {
+ mutableCommonProperties.setProperty(
+ propertyKeyConfigNodeConsensusProtocolClass,
+ "org.apache.iotdb.consensus.simple.SimpleConsensus");
+ mutableCommonProperties.setProperty(
+ propertyKeySchemaRegionConsensusProtocolClass,
+ "org.apache.iotdb.consensus.simple.SimpleConsensus");
+ mutableCommonProperties.setProperty(
+ propertyKeyDataRegionConsensusProtocolClass,
+ "org.apache.iotdb.consensus.simple.SimpleConsensus");
+
+ mutableCommonProperties.setProperty(propertyKeySchemaReplicationFactor, "1");
+ mutableCommonProperties.setProperty(propertyKeyDataReplicationFactor, "1");
+
+ mutableNodeProperties.put("cn_connection_timeout_ms", "30000");
+
+ mutableNodeProperties.setProperty(IoTDBConstant.CN_INTERNAL_ADDRESS, super.getIp());
+ mutableNodeProperties.setProperty(IoTDBConstant.CN_INTERNAL_PORT, String.valueOf(getPort()));
+ mutableNodeProperties.setProperty(
+ IoTDBConstant.CN_CONSENSUS_PORT, String.valueOf(this.consensusPort));
+ }
+
@Override
protected void renameFile() {
String configNodeName = isSeed ? "SeedConfigNode" : "ConfigNode";
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/DataNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/DataNodeWrapper.java
index 7ba28d9c8d..d7aeae485d 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/DataNodeWrapper.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/DataNodeWrapper.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.it.env.cluster;
import org.apache.iotdb.commons.conf.IoTDBConstant;
@@ -48,40 +49,13 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
this.mqttPort = portList[5];
// initialize mutable properties
- mutableCommonProperties.setProperty(
- propertyKeyConfigNodeConsensusProtocolClass,
- "org.apache.iotdb.consensus.simple.SimpleConsensus");
- mutableCommonProperties.setProperty(
- propertyKeySchemaRegionConsensusProtocolClass,
- "org.apache.iotdb.consensus.simple.SimpleConsensus");
- mutableCommonProperties.setProperty(
- propertyKeyDataRegionConsensusProtocolClass,
- "org.apache.iotdb.consensus.simple.SimpleConsensus");
- mutableCommonProperties.setProperty(propertyKeySchemaReplicationFactor, "1");
- mutableCommonProperties.setProperty(propertyKeyDataReplicationFactor, "1");
- mutableCommonProperties.put("max_tsblock_size_in_bytes", "1024");
- mutableCommonProperties.put("page_size_in_byte", "1024");
-
- mutableNodeProperties.put("dn_join_cluster_retry_interval_ms", "1000");
- mutableNodeProperties.put("dn_connection_timeout_ms", "30000");
- mutableNodeProperties.put("dn_metric_internal_reporter_type", "MEMORY");
+ reloadMutableFields();
// initialize immutable properties
// Override mqtt properties of super class
immutableCommonProperties.setProperty("mqtt_host", super.getIp());
immutableCommonProperties.setProperty("mqtt_port", String.valueOf(this.mqttPort));
- immutableNodeProperties.setProperty(IoTDBConstant.DN_RPC_ADDRESS, super.getIp());
- immutableNodeProperties.setProperty(IoTDBConstant.DN_RPC_PORT, String.valueOf(super.getPort()));
- immutableNodeProperties.setProperty(IoTDBConstant.DN_INTERNAL_ADDRESS, this.internalAddress);
- immutableNodeProperties.setProperty(
- IoTDBConstant.DN_INTERNAL_PORT, String.valueOf(this.internalPort));
- immutableNodeProperties.setProperty(
- "dn_mpp_data_exchange_port", String.valueOf(this.mppDataExchangePort));
- immutableNodeProperties.setProperty(
- "dn_data_region_consensus_port", String.valueOf(this.dataRegionConsensusPort));
- immutableNodeProperties.setProperty(
- "dn_schema_region_consensus_port", String.valueOf(this.schemaRegionConsensusPort));
immutableNodeProperties.setProperty(IoTDBConstant.DN_TARGET_CONFIG_NODE_LIST, targetConfigNode);
immutableNodeProperties.setProperty("dn_system_dir", MppBaseConfig.NULL_VALUE);
immutableNodeProperties.setProperty("dn_data_dirs", MppBaseConfig.NULL_VALUE);
@@ -148,6 +122,41 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
"-s"));
}
+ @Override
+ protected void reloadMutableFields() {
+ mutableCommonProperties.setProperty(
+ propertyKeyConfigNodeConsensusProtocolClass,
+ "org.apache.iotdb.consensus.simple.SimpleConsensus");
+ mutableCommonProperties.setProperty(
+ propertyKeySchemaRegionConsensusProtocolClass,
+ "org.apache.iotdb.consensus.simple.SimpleConsensus");
+ mutableCommonProperties.setProperty(
+ propertyKeyDataRegionConsensusProtocolClass,
+ "org.apache.iotdb.consensus.simple.SimpleConsensus");
+
+ mutableCommonProperties.setProperty(propertyKeySchemaReplicationFactor, "1");
+ mutableCommonProperties.setProperty(propertyKeyDataReplicationFactor, "1");
+
+ mutableCommonProperties.put("max_tsblock_size_in_bytes", "1024");
+ mutableCommonProperties.put("page_size_in_byte", "1024");
+
+ mutableNodeProperties.put("dn_join_cluster_retry_interval_ms", "1000");
+ mutableNodeProperties.put("dn_connection_timeout_ms", "30000");
+ mutableNodeProperties.put("dn_metric_internal_reporter_type", "MEMORY");
+
+ mutableNodeProperties.setProperty(IoTDBConstant.DN_RPC_ADDRESS, super.getIp());
+ mutableNodeProperties.setProperty(IoTDBConstant.DN_RPC_PORT, String.valueOf(super.getPort()));
+ mutableNodeProperties.setProperty(IoTDBConstant.DN_INTERNAL_ADDRESS, this.internalAddress);
+ mutableNodeProperties.setProperty(
+ IoTDBConstant.DN_INTERNAL_PORT, String.valueOf(this.internalPort));
+ mutableNodeProperties.setProperty(
+ "dn_mpp_data_exchange_port", String.valueOf(this.mppDataExchangePort));
+ mutableNodeProperties.setProperty(
+ "dn_data_region_consensus_port", String.valueOf(this.dataRegionConsensusPort));
+ mutableNodeProperties.setProperty(
+ "dn_schema_region_consensus_port", String.valueOf(this.schemaRegionConsensusPort));
+ }
+
@Override
public void renameFile() {
String dataNodeName = "DataNode";
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java
index c92bc04c2b..26e57fab73 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java
@@ -241,7 +241,7 @@ public class IoTDBClusterNodeErrorStartUpIT {
Arrays.asList(NodeStatus.Unknown, NodeStatus.Unknown));
/* Restart and updatePeer */
- // TODO: @Itami-sho, enable this test and delete it
+ // TODO: Delete this IT after enable modify internal TEndPoints
int registeredConfigNodeId = -1;
TShowClusterResp showClusterResp = client.showCluster();
for (TConfigNodeLocation configNodeLocation : showClusterResp.getConfigNodeList()) {
@@ -260,7 +260,7 @@ public class IoTDBClusterNodeErrorStartUpIT {
configNodeRestartStatus = client.restartConfigNode(configNodeRestartReq);
Assert.assertEquals(
TSStatusCode.REJECT_NODE_START.getStatusCode(), configNodeRestartStatus.getCode());
- Assert.assertTrue(configNodeRestartStatus.getMessage().contains("have been changed"));
+ Assert.assertTrue(configNodeRestartStatus.getMessage().contains("the internal TEndPoints"));
registeredConfigNodeWrapper.setConsensusPort(originPort);
int registeredDataNodeId = -1;
@@ -282,7 +282,8 @@ public class IoTDBClusterNodeErrorStartUpIT {
Assert.assertEquals(
TSStatusCode.REJECT_NODE_START.getStatusCode(),
dataNodeRestartResp.getStatus().getCode());
- Assert.assertTrue(dataNodeRestartResp.getStatus().getMessage().contains("have been changed"));
+ Assert.assertTrue(
+ dataNodeRestartResp.getStatus().getMessage().contains("the internal TEndPoints"));
registeredDataNodeWrapper.setInternalPort(originPort);
// Restart and check
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java
index 66e44b9362..ff75c5be94 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java
@@ -19,55 +19,38 @@
package org.apache.iotdb.confignode.it.cluster;
-import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
-import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
-import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
-import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.AbstractEnv;
-import org.apache.iotdb.it.env.cluster.ConfigNodeWrapper;
import org.apache.iotdb.it.env.cluster.DataNodeWrapper;
import org.apache.iotdb.it.env.cluster.EnvUtils;
import org.apache.iotdb.it.env.cluster.MppBaseConfig;
import org.apache.iotdb.it.env.cluster.MppCommonConfig;
+import org.apache.iotdb.it.env.cluster.MppJVMConfig;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
-import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.thrift.TException;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.TimeUnit;
-import static org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils.checkNodeConfig;
-import static org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils.generatePatternTreeBuffer;
-import static org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils.getClusterNodeInfos;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
@RunWith(IoTDBTestRunner.class)
@Category({ClusterIT.class})
public class IoTDBClusterRestartIT {
- private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBClusterRestartIT.class);
-
private static final String ratisConsensusProtocolClass =
"org.apache.iotdb.consensus.ratis.RatisConsensus";
private static final int testConfigNodeNum = 2;
@@ -85,6 +68,7 @@ public class IoTDBClusterRestartIT {
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaReplicationFactor(testReplicationFactor)
.setDataReplicationFactor(testReplicationFactor);
+
// Init 2C2D cluster environment
EnvFactory.getEnv().initClusterEnvironment(testConfigNodeNum, testDataNodeNum);
}
@@ -119,96 +103,51 @@ public class IoTDBClusterRestartIT {
}
@Test
- @Ignore
- public void clusterRestartAfterUpdateDataNodeTest() throws InterruptedException {
- TShowClusterResp clusterNodes;
- final String sg0 = "root.sg0";
-
- final String d00 = sg0 + ".d0.s";
- final String d01 = sg0 + ".d1.s";
- // Shutdown all data nodes
+ public void clusterRestartAfterUpdateDataNodeTest()
+ throws InterruptedException, ClientManagerException, IOException, TException {
+ // Shutdown all DataNodes
for (int i = 0; i < testDataNodeNum; i++) {
EnvFactory.getEnv().shutdownDataNode(i);
}
-
- // Sleep 1s before restart
TimeUnit.SECONDS.sleep(1);
- // Modify data node config
List<DataNodeWrapper> dataNodeWrapperList = EnvFactory.getEnv().getDataNodeWrapperList();
- List<ConfigNodeWrapper> configNodeWrappersList = EnvFactory.getEnv().getConfigNodeWrapperList();
for (int i = 0; i < testDataNodeNum; i++) {
+ // Modify DataNode clientRpcEndPoint
int[] portList = EnvUtils.searchAvailablePorts();
dataNodeWrapperList.get(i).setPort(portList[0]);
- dataNodeWrapperList.get(i).setInternalPort(portList[1]);
- dataNodeWrapperList.get(i).setMppDataExchangePort(portList[2]);
-
- // update data node files'names
+ // Update DataNode files' names
dataNodeWrapperList.get(i).renameFile();
}
+ // Restart DataNodes
for (int i = 0; i < testDataNodeNum; i++) {
dataNodeWrapperList
.get(i)
.changeConfig(
(MppBaseConfig) EnvFactory.getEnv().getConfig().getDataNodeConfig(),
(MppCommonConfig) EnvFactory.getEnv().getConfig().getDataNodeCommonConfig(),
- null);
+ (MppJVMConfig) EnvFactory.getEnv().getConfig().getDataNodeJVMConfig());
EnvFactory.getEnv().startDataNode(i);
}
- ((AbstractEnv) EnvFactory.getEnv()).testWorking();
+ // Check DataNode status
+ EnvFactory.getEnv()
+ .ensureNodeStatus(
+ Arrays.asList(
+ EnvFactory.getEnv().getDataNodeWrapper(0),
+ EnvFactory.getEnv().getDataNodeWrapper(1)),
+ Arrays.asList(NodeStatus.Running, NodeStatus.Running));
- // check nodeInfo in cluster
+ // Check DataNode EndPoint
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
- // check the number and status of nodes
- clusterNodes = getClusterNodeInfos(client, testConfigNodeNum, testDataNodeNum);
- assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), clusterNodes.getStatus().getCode());
-
- // check the configuration of nodes
- List<TConfigNodeLocation> configNodeLocationList = clusterNodes.getConfigNodeList();
- List<TDataNodeLocation> dataNodeLocationList = clusterNodes.getDataNodeList();
- checkNodeConfig(
- configNodeLocationList,
- dataNodeLocationList,
- configNodeWrappersList,
+ TShowClusterResp showClusterResp = client.showCluster();
+ ConfigNodeTestUtils.checkNodeConfig(
+ showClusterResp.getConfigNodeList(),
+ showClusterResp.getDataNodeList(),
+ EnvFactory.getEnv().getConfigNodeWrapperList(),
dataNodeWrapperList);
-
- // check whether the cluster is working by testing GetAndCreateSchemaPartition
- TSStatus status;
- ByteBuffer buffer;
- TSchemaPartitionReq schemaPartitionReq;
- TSchemaPartitionTableResp schemaPartitionTableResp;
- Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> schemaPartitionTable;
-
- // Set StorageGroups
- status = client.setDatabase((new TDatabaseSchema(sg0)));
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
-
- // Test getSchemaPartition, the result should be empty
- buffer = generatePatternTreeBuffer(new String[] {d00, d01});
- schemaPartitionReq = new TSchemaPartitionReq(buffer);
- schemaPartitionTableResp = client.getSchemaPartitionTable(schemaPartitionReq);
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(),
- schemaPartitionTableResp.getStatus().getCode());
- Assert.assertEquals(0, schemaPartitionTableResp.getSchemaPartitionTableSize());
-
- // Test getOrCreateSchemaPartition, ConfigNode should create SchemaPartitions and return
- buffer = generatePatternTreeBuffer(new String[] {d00, d01});
- schemaPartitionReq.setPathPatternTree(buffer);
- schemaPartitionTableResp = client.getOrCreateSchemaPartitionTable(schemaPartitionReq);
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(),
- schemaPartitionTableResp.getStatus().getCode());
- Assert.assertEquals(1, schemaPartitionTableResp.getSchemaPartitionTableSize());
- schemaPartitionTable = schemaPartitionTableResp.getSchemaPartitionTable();
- Assert.assertTrue(schemaPartitionTable.containsKey(sg0));
- Assert.assertEquals(2, schemaPartitionTable.get(sg0).size());
- } catch (Exception e) {
- LOGGER.error(e.getMessage());
- fail(e.getMessage());
}
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java
index 64046dd76d..735082c5d7 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java
@@ -29,19 +29,16 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
-import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
import org.apache.iotdb.confignode.rpc.thrift.TClusterParameters;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
-import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.it.env.cluster.ConfigNodeWrapper;
import org.apache.iotdb.it.env.cluster.DataNodeWrapper;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
-import org.apache.thrift.TException;
import org.junit.Assert;
import java.io.IOException;
@@ -51,32 +48,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class ConfigNodeTestUtils {
- private static final int retryNum = 30;
-
- public static TShowClusterResp getClusterNodeInfos(
- IConfigNodeRPCService.Iface client, int expectedConfigNodeNum, int expectedDataNodeNum)
- throws TException, InterruptedException {
- TShowClusterResp clusterNodes = null;
- for (int i = 0; i < retryNum; i++) {
- clusterNodes = client.showCluster();
- if (clusterNodes.getConfigNodeListSize() == expectedConfigNodeNum
- && clusterNodes.getDataNodeListSize() == expectedDataNodeNum) {
- break;
- }
- Thread.sleep(1000);
- }
-
- assertEquals(expectedConfigNodeNum, clusterNodes.getConfigNodeListSize());
- assertEquals(expectedDataNodeNum, clusterNodes.getDataNodeListSize());
-
- return clusterNodes;
- }
-
public static void checkNodeConfig(
List<TConfigNodeLocation> configNodeList,
List<TDataNodeLocation> dataNodeList,
@@ -275,7 +250,7 @@ public class ConfigNodeTestUtils {
public static TDataNodeRegisterReq generateTDataNodeRegisterReq(
String clusterName, DataNodeWrapper dataNodeWrapper) {
return new TDataNodeRegisterReq(
- generateTDataNodeConfiguration(-1, dataNodeWrapper), clusterName);
+ clusterName, generateTDataNodeConfiguration(-1, dataNodeWrapper));
}
public static TDataNodeRestartReq generateTDataNodeRestartReq(
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtils.java b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtils.java
index ffa4b5cd01..d615bea6ad 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtils.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtils.java
@@ -83,6 +83,25 @@ public class ThriftCommonsSerDeUtils {
return endPoint;
}
+ public static void serializeTDataNodeConfiguration(
+ TDataNodeConfiguration dataNodeConfiguration, DataOutputStream stream) {
+ try {
+ dataNodeConfiguration.write(generateWriteProtocol(stream));
+ } catch (TException e) {
+ throw new ThriftSerDeException("Write TDataNodeConfiguration failed: ", e);
+ }
+ }
+
+ public static TDataNodeConfiguration deserializeTDataNodeConfiguration(ByteBuffer buffer) {
+ TDataNodeConfiguration dataNodeConfiguration = new TDataNodeConfiguration();
+ try {
+ dataNodeConfiguration.read(generateReadProtocol(buffer));
+ } catch (TException e) {
+ throw new ThriftSerDeException("Read TDataNodeConfiguration failed: ", e);
+ }
+ return dataNodeConfiguration;
+ }
+
public static void serializeTDataNodeLocation(
TDataNodeLocation dataNodeLocation, DataOutputStream stream) {
try {
diff --git a/node-commons/src/test/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtilsTest.java b/node-commons/src/test/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtilsTest.java
index 18a679ed2a..d5c1e4f65b 100644
--- a/node-commons/src/test/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtilsTest.java
+++ b/node-commons/src/test/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtilsTest.java
@@ -20,8 +20,10 @@ package org.apache.iotdb.commons.utils;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TNodeResource;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
@@ -59,6 +61,34 @@ public class ThriftCommonsSerDeUtilsTest {
}
}
+ @Test
+ public void readWriteTDataNodeConfigurationTest() throws IOException {
+ TDataNodeLocation dataNodeLocation0 = new TDataNodeLocation();
+ dataNodeLocation0.setDataNodeId(0);
+ dataNodeLocation0.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6667));
+ dataNodeLocation0.setInternalEndPoint(new TEndPoint("0.0.0.0", 10730));
+ dataNodeLocation0.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0", 10740));
+ dataNodeLocation0.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 10760));
+ dataNodeLocation0.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 10750));
+
+ TNodeResource dataNodeResource0 = new TNodeResource();
+ dataNodeResource0.setCpuCoreNum(16);
+ dataNodeResource0.setMaxMemory(2022213861);
+
+ TDataNodeConfiguration dataNodeConfiguration0 = new TDataNodeConfiguration();
+ dataNodeConfiguration0.setLocation(dataNodeLocation0);
+ dataNodeConfiguration0.setResource(dataNodeResource0);
+
+ try (PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
+ ThriftCommonsSerDeUtils.serializeTDataNodeConfiguration(dataNodeConfiguration0, outputStream);
+ TDataNodeConfiguration dataNodeConfiguration1 =
+ ThriftCommonsSerDeUtils.deserializeTDataNodeConfiguration(
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()));
+ Assert.assertEquals(dataNodeConfiguration0, dataNodeConfiguration1);
+ }
+ }
+
@Test
public void readWriteTDataNodeLocationTest() throws IOException {
TDataNodeLocation dataNodeLocation0 = new TDataNodeLocation();
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 66b090cd76..c525d5cd75 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -53,7 +53,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
-import org.apache.iotdb.confignode.rpc.thrift.TDataNodeUpdateReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
@@ -393,22 +392,6 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
throw new TException(MSG_RECONNECTION_FAIL);
}
- @Override
- public TDataNodeRegisterResp updateDataNode(TDataNodeUpdateReq req) throws TException {
- for (int i = 0; i < RETRY_NUM; i++) {
- try {
- TDataNodeRegisterResp resp = client.updateDataNode(req);
- if (!updateConfigNodeLeader(resp.status)) {
- return resp;
- }
- } catch (TException e) {
- configLeader = null;
- }
- waitAndReconnect();
- }
- throw new TException(MSG_RECONNECTION_FAIL);
- }
-
@Override
public TSStatus reportDataNodeShutdown(TDataNodeLocation dataNodeLocation) throws TException {
for (int i = 0; i < RETRY_NUM; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
index 48b4e30f6f..27b04be1bd 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
@@ -83,6 +83,7 @@ public class IoTDBStartCheck {
private static final String SCHEMA_ENGINE_MODE = "schema_engine_mode";
private static final String TIME_ENCODER_KEY = "time_encoder";
+ // Immutable system parameters
private static final Map<String, Supplier<String>> constantParamValueTable = new HashMap<>();
static {
@@ -116,6 +117,7 @@ public class IoTDBStartCheck {
private static final String MPP_DATA_EXCHANGE_PORT = "dn_mpp_data_exchange_port";
private static final String SCHEMA_REGION_CONSENSUS_PORT = "dn_schema_region_consensus_port";
private static final String DATA_REGION_CONSENSUS_PORT = "dn_data_region_consensus_port";
+ // Mutable system parameters
private static final Map<String, Supplier<String>> variableParamValueTable = new HashMap<>();
static {
@@ -327,7 +329,7 @@ public class IoTDBStartCheck {
checkWALNotExists();
upgradePropertiesFile();
}
- checkProperties();
+ checkImmutableSystemProperties();
}
}
@@ -412,7 +414,7 @@ public class IoTDBStartCheck {
}
/** Check all immutable properties */
- private void checkProperties() throws ConfigurationException, IOException {
+ private void checkImmutableSystemProperties() throws ConfigurationException, IOException {
for (Entry<String, Supplier<String>> entry : systemProperties.entrySet()) {
if (!properties.containsKey(entry.getKey())) {
upgradePropertiesFileFromBrokenFile();
@@ -526,4 +528,20 @@ public class IoTDBStartCheck {
logger.error("Unexpected consensus group type");
return false;
}
+
+ public void serializeMutableSystemPropertiesIfNecessary() throws IOException {
+ boolean needsSerialize = false;
+ for (String param : variableParamValueTable.keySet()) {
+ if (!(properties.getProperty(param).equals(getVal(param)))) {
+ needsSerialize = true;
+ }
+ }
+
+ if (needsSerialize) {
+ try (FileOutputStream outputStream = new FileOutputStream(propertiesFile)) {
+ systemProperties.forEach((k, v) -> properties.setProperty(k, v.get()));
+ properties.store(outputStream, SYSTEM_PROPERTIES_STRING);
+ }
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 658ff62d09..69a609ac09 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -174,6 +174,9 @@ public class DataNode implements DataNodeMBean {
// Setup metric service
setUpMetricService();
+ // Serialize mutable system properties
+ IoTDBStartCheck.getInstance().serializeMutableSystemPropertiesIfNecessary();
+
logger.info("IoTDB configuration: " + config.getConfigMessage());
logger.info("Congratulation, IoTDB DataNode is set up successfully. Now, enjoy yourself!");
@@ -368,7 +371,8 @@ public class DataNode implements DataNodeMBean {
if (dataNodeRegisterResp == null) {
// All tries failed
logger.error(
- "Cannot register into cluster after {} retries. Please check dn_target_config_node_list in iotdb-datanode.properties.",
+ "Cannot register into cluster after {} retries. "
+ + "Please check dn_target_config_node_list in iotdb-datanode.properties.",
DEFAULT_RETRY);
throw new StartupException("Cannot register into the cluster.");
}
@@ -427,7 +431,8 @@ public class DataNode implements DataNodeMBean {
if (dataNodeRestartResp == null) {
// All tries failed
logger.error(
- "Cannot send restart DataNode request to ConfigNode-leader after {} retries. Please check dn_target_config_node_list in iotdb-datanode.properties.",
+ "Cannot send restart DataNode request to ConfigNode-leader after {} retries. "
+ + "Please check dn_target_config_node_list in iotdb-datanode.properties.",
DEFAULT_RETRY);
throw new StartupException("Cannot send restart DataNode request to ConfigNode-leader.");
}
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index dd475411b8..f27532fac5 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -93,8 +93,8 @@ struct TRuntimeConfiguration {
}
struct TDataNodeRegisterReq {
- 1: required common.TDataNodeConfiguration dataNodeConfiguration
- 2: required string clusterName
+ 1: required string clusterName
+ 2: required common.TDataNodeConfiguration dataNodeConfiguration
}
struct TDataNodeRegisterResp {
@@ -115,10 +115,6 @@ struct TDataNodeRestartResp {
3: optional TRuntimeConfiguration runtimeConfiguration
}
-struct TDataNodeUpdateReq {
- 1: required common.TDataNodeLocation dataNodeLocation
-}
-
struct TDataNodeRemoveReq {
1: required list<common.TDataNodeLocation> dataNodeLocations
}
@@ -343,10 +339,10 @@ struct TClusterParameters {
}
struct TConfigNodeRegisterReq {
- 1: required common.TConfigNodeLocation configNodeLocation
// The Non-Seed-ConfigNode must ensure that the following
// fields are consistent with the Seed-ConfigNode
- 2: required TClusterParameters clusterParameters
+ 1: required TClusterParameters clusterParameters
+ 2: required common.TConfigNodeLocation configNodeLocation
}
struct TConfigNodeRegisterResp {
@@ -753,15 +749,6 @@ service IConfigNodeRPCService {
*/
TDataNodeRemoveResp removeDataNode(TDataNodeRemoveReq req)
- /**
- * Update the specified DataNode‘s location in the cluster when restart
- *
- * @return SUCCESS_STATUS if the DataNode updated successfully
- * DATANODE_NOT_EXIST if one of the DataNodes in the TDataNodeUpdateReq doesn't exist in the cluster
- * UPDATE_DATANODE_FAILED if failed to update the DataNode
- */
- TDataNodeRegisterResp updateDataNode(TDataNodeUpdateReq req)
-
/**
* Report that the specified DataNode will be shutdown.
* The ConfigNode-leader will mark it as Unknown.