You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/07/02 01:58:51 UTC
[iotdb] branch master updated: [IOTDB-3528] Filter DataNode which may not be ready in ConfigNode (#6539)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 708a08c9b7 [IOTDB-3528] Filter DataNode which may not be ready in ConfigNode (#6539)
708a08c9b7 is described below
commit 708a08c9b72101f3eb4585ab410fdbeb8850a238
Author: imquanke <39...@users.noreply.github.com>
AuthorDate: Sat Jul 2 09:58:46 2022 +0800
[IOTDB-3528] Filter DataNode which may not be ready in ConfigNode (#6539)
---
.../consensus/request/ConfigPhysicalPlan.java | 4 ++
.../consensus/request/ConfigPhysicalPlanType.java | 1 +
.../request/write/ActivateDataNodePlan.java | 71 +++++++++++++++++++++
.../iotdb/confignode/manager/ConfigManager.java | 10 +++
.../apache/iotdb/confignode/manager/IManager.java | 9 +++
.../iotdb/confignode/manager/NodeManager.java | 44 ++++++++-----
.../iotdb/confignode/persistence/NodeInfo.java | 28 ++++++++-
.../persistence/executor/ConfigPlanExecutor.java | 3 +
.../thrift/ConfigNodeRPCServiceProcessor.java | 4 +-
.../request/ConfigPhysicalPlanSerDeTest.java | 22 +++++++
.../thrift/ConfigNodeRPCServiceProcessorTest.java | 73 +++++++++++++++++-----
.../java/org/apache/iotdb/db/service/DataNode.java | 65 +++++++++----------
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 3 +-
.../src/main/thrift/confignode.thrift | 3 +-
14 files changed, 265 insertions(+), 75 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index 900126c325..55b4289994 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateSchemaParti
import org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
+import org.apache.iotdb.confignode.consensus.request.write.ActivateDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.AdjustMaxRegionGroupCountPlan;
import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionPlan;
@@ -101,6 +102,9 @@ public abstract class ConfigPhysicalPlan implements IConsensusRequest {
case RegisterDataNode:
req = new RegisterDataNodePlan();
break;
+ case ActivateDataNode:
+ req = new ActivateDataNodePlan();
+ break;
case GetDataNodeInfo:
req = new GetDataNodeInfoPlan();
break;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index 1475228a48..0a9f458db1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.consensus.request;
public enum ConfigPhysicalPlanType {
RegisterDataNode,
+ ActivateDataNode,
GetDataNodeInfo,
SetStorageGroup,
SetTTL,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/ActivateDataNodePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/ActivateDataNodePlan.java
new file mode 100644
index 0000000000..d1c8513102
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/ActivateDataNodePlan.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.consensus.request.write;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
+import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class ActivateDataNodePlan extends ConfigPhysicalPlan {
+
+ private TDataNodeInfo info;
+
+ public ActivateDataNodePlan() {
+ super(ConfigPhysicalPlanType.ActivateDataNode);
+ }
+
+ public ActivateDataNodePlan(TDataNodeInfo info) {
+ this();
+ this.info = info;
+ }
+
+ public TDataNodeInfo getInfo() {
+ return info;
+ }
+
+ @Override
+ protected void serializeImpl(DataOutputStream stream) throws IOException {
+ stream.writeInt(ConfigPhysicalPlanType.ActivateDataNode.ordinal());
+ ThriftCommonsSerDeUtils.serializeTDataNodeInfo(info, stream);
+ }
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) {
+ info = ThriftCommonsSerDeUtils.deserializeTDataNodeInfo(buffer);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ActivateDataNodePlan that = (ActivateDataNodePlan) o;
+ return info.equals(that.info);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(info);
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 9d56895b03..d87b748455 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateSchemaParti
import org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
+import org.apache.iotdb.confignode.consensus.request.write.ActivateDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.SetDataReplicationFactorPlan;
@@ -177,6 +178,15 @@ public class ConfigManager implements IManager {
}
}
+ @Override
+ public TSStatus activateDataNode(ActivateDataNodePlan activateDataNodePlan) {
+ TSStatus status = confirmLeader();
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return nodeManager.activateDataNode(activateDataNodePlan);
+ }
+ return status;
+ }
+
@Override
public DataSet getDataNodeInfo(GetDataNodeInfoPlan getDataNodeInfoPlan) {
TSStatus status = confirmLeader();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 8ea24c841e..10106b3e6f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetDataPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateDataPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
+import org.apache.iotdb.confignode.consensus.request.write.ActivateDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.SetDataReplicationFactorPlan;
@@ -112,6 +113,14 @@ public interface IManager {
*/
DataSet registerDataNode(RegisterDataNodePlan registerDataNodePlan);
+ /**
+ * activate DataNode
+ *
+ * @param activateDataNodePlan ActivateDataNodePlan
+ * @return TSStatus
+ */
+ TSStatus activateDataNode(ActivateDataNodePlan activateDataNodePlan);
+
/**
* Get DataNode info
*
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index 1bce931ae8..0b80adfae8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.confignode.client.AsyncDataNodeClientPool;
import org.apache.iotdb.confignode.client.handlers.FlushHandler;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.read.GetDataNodeInfoPlan;
+import org.apache.iotdb.confignode.consensus.request.write.ActivateDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodePlan;
@@ -98,31 +99,44 @@ public class NodeManager {
*/
public DataSet registerDataNode(RegisterDataNodePlan req) {
DataNodeConfigurationResp dataSet = new DataNodeConfigurationResp();
-
+ TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ status.setMessage("registerDataNode success.");
if (nodeInfo.isOnlineDataNode(req.getInfo().getLocation())) {
- // Reset client
- AsyncDataNodeClientPool.getInstance()
- .resetClient(req.getInfo().getLocation().getInternalEndPoint());
-
- TSStatus status = new TSStatus(TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode());
+ status.setCode(TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode());
status.setMessage("DataNode already registered.");
- dataSet.setStatus(status);
- } else {
- // Persist DataNodeInfo
+ } else if (req.getInfo().getLocation().getDataNodeId() < 0) {
+ // only when new dataNode is registered, generate new dataNodeId
req.getInfo().getLocation().setDataNodeId(nodeInfo.generateNextNodeId());
- ConsensusWriteResponse resp = getConsensusManager().write(req);
- dataSet.setStatus(resp.getStatus());
-
- // Adjust the maximum RegionGroup number of each StorageGroup
- getClusterSchemaManager().adjustMaxRegionGroupCount();
+ status = getConsensusManager().write(req).getStatus();
}
-
+ dataSet.setStatus(status);
dataSet.setDataNodeId(req.getInfo().getLocation().getDataNodeId());
dataSet.setConfigNodeList(nodeInfo.getRegisteredConfigNodes());
setGlobalConfig(dataSet);
return dataSet;
}
+ /**
+ * Active DataNode
+ *
+ * @param req ActiveDataNodeReq
+ * @return TSStatus The TSStatus will be set to SUCCESS_STATUS when active success, and
+ * DATANODE_ALREADY_REGISTERED when the DataNode is already exist.
+ */
+ public TSStatus activateDataNode(ActivateDataNodePlan req) {
+ TSStatus status = new TSStatus();
+ if (nodeInfo.isOnlineDataNode(req.getInfo().getLocation())) {
+ status.setCode(TSStatusCode.DATANODE_ALREADY_ACTIVATED.getStatusCode());
+ status.setMessage("DataNode already activated.");
+ } else {
+ ConsensusWriteResponse resp = getConsensusManager().write(req);
+ status = resp.getStatus();
+ // Adjust the maximum RegionGroup number of each StorageGroup
+ getClusterSchemaManager().adjustMaxRegionGroupCount();
+ }
+ return status;
+ }
+
/**
* Get DataNode info
*
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
index ca5c4fa6aa..a4769da54e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
import org.apache.iotdb.confignode.consensus.request.read.GetDataNodeInfoPlan;
+import org.apache.iotdb.confignode.consensus.request.write.ActivateDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodePlan;
@@ -128,7 +129,7 @@ public class NodeInfo implements SnapshotProcessor {
public boolean isOnlineDataNode(TDataNodeLocation info) {
boolean result = false;
dataNodeInfoReadWriteLock.readLock().lock();
-
+ int originalDataNodeId = info.getDataNodeId();
try {
for (Map.Entry<Integer, TDataNodeInfo> entry : onlineDataNodes.entrySet()) {
info.setDataNodeId(entry.getKey());
@@ -136,6 +137,7 @@ public class NodeInfo implements SnapshotProcessor {
result = true;
break;
}
+ info.setDataNodeId(originalDataNodeId);
}
} finally {
dataNodeInfoReadWriteLock.readLock().unlock();
@@ -155,7 +157,6 @@ public class NodeInfo implements SnapshotProcessor {
TDataNodeInfo info = registerDataNodePlan.getInfo();
dataNodeInfoReadWriteLock.writeLock().lock();
try {
- onlineDataNodes.put(info.getLocation().getDataNodeId(), info);
// To ensure that the nextNodeId is updated correctly when
// the ConfigNode-followers concurrently processes RegisterDataNodePlan,
@@ -181,6 +182,25 @@ public class NodeInfo implements SnapshotProcessor {
return result;
}
+ /**
+ * add dataNode to onlineDataNodes
+ *
+ * @param activateDataNodePlan ActivateDataNodePlan
+ * @return SUCCESS_STATUS
+ */
+ public TSStatus activateDataNode(ActivateDataNodePlan activateDataNodePlan) {
+ TSStatus result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ result.setMessage("activateDataNode success.");
+ TDataNodeInfo info = activateDataNodePlan.getInfo();
+ dataNodeInfoReadWriteLock.writeLock().lock();
+ try {
+ onlineDataNodes.put(info.getLocation().getDataNodeId(), info);
+ } finally {
+ dataNodeInfoReadWriteLock.writeLock().unlock();
+ }
+ return result;
+ }
+
/**
* Get DataNode info
*
@@ -199,7 +219,9 @@ public class NodeInfo implements SnapshotProcessor {
result.setDataNodeInfoMap(new HashMap<>(onlineDataNodes));
} else {
result.setDataNodeInfoMap(
- Collections.singletonMap(dataNodeId, onlineDataNodes.get(dataNodeId)));
+ onlineDataNodes.get(dataNodeId) == null
+ ? new HashMap<>(0)
+ : Collections.singletonMap(dataNodeId, onlineDataNodes.get(dataNodeId)));
}
} finally {
dataNodeInfoReadWriteLock.readLock().unlock();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index a081e5c526..dc2e5c6e68 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetNodePathsPartitionP
import org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
+import org.apache.iotdb.confignode.consensus.request.write.ActivateDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.AdjustMaxRegionGroupCountPlan;
import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionPlan;
@@ -145,6 +146,8 @@ public class ConfigPlanExecutor {
switch (req.getType()) {
case RegisterDataNode:
return nodeInfo.registerDataNode((RegisterDataNodePlan) req);
+ case ActivateDataNode:
+ return nodeInfo.activateDataNode((ActivateDataNodePlan) req);
case SetStorageGroup:
TSStatus status = clusterSchemaInfo.setStorageGroup((SetStorageGroupPlan) req);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index a5012a695d..1f738ed078 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetDataPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateDataPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
+import org.apache.iotdb.confignode.consensus.request.write.ActivateDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.SetDataReplicationFactorPlan;
@@ -142,8 +143,7 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
@Override
public TSStatus activeDataNode(TDataNodeActiveReq req) throws TException {
- // TODO: implement active data node
- return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ return configManager.activateDataNode(new ActivateDataNodePlan(req.getDataNodeInfo()));
}
@Override
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index 46bf937f4d..86c9a4067f 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateSchemaParti
import org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
+import org.apache.iotdb.confignode.consensus.request.write.ActivateDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.AdjustMaxRegionGroupCountPlan;
import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionPlan;
@@ -98,6 +99,27 @@ public class ConfigPhysicalPlanSerDeTest {
Assert.assertEquals(plan0, plan1);
}
+ @Test
+ public void ActivateDataNodePlanTest() throws IOException {
+ TDataNodeLocation dataNodeLocation = new TDataNodeLocation();
+ dataNodeLocation.setDataNodeId(1);
+ dataNodeLocation.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6667));
+ dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
+ dataNodeLocation.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0", 8777));
+ dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+ dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010));
+
+ TDataNodeInfo dataNodeInfo = new TDataNodeInfo();
+ dataNodeInfo.setLocation(dataNodeLocation);
+ dataNodeInfo.setCpuCoreNum(16);
+ dataNodeInfo.setMaxMemory(34359738368L);
+
+ ActivateDataNodePlan plan0 = new ActivateDataNodePlan(dataNodeInfo);
+ ActivateDataNodePlan plan1 =
+ (ActivateDataNodePlan) ConfigPhysicalPlan.Factory.create(plan0.serializeToByteBuffer());
+ Assert.assertEquals(plan0, plan1);
+ }
+
@Test
public void QueryDataNodeInfoPlanTest() throws IOException {
GetDataNodeInfoPlan plan0 = new GetDataNodeInfoPlan(-1);
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
index 3be5967836..945c591506 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
@@ -48,6 +48,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
import org.apache.iotdb.confignode.rpc.thrift.TClusterNodeInfos;
import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeActiveReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
@@ -142,9 +143,10 @@ public class ConfigNodeRPCServiceProcessorTest {
globalConfig.getSeriesPartitionExecutorClass());
}
- private void registerDataNodes() throws TException {
+ private void registerAndActivateDataNodes() throws TException {
for (int i = 0; i < 3; i++) {
TDataNodeLocation dataNodeLocation = new TDataNodeLocation();
+ dataNodeLocation.setDataNodeId(-1);
dataNodeLocation.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6667 + i));
dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003 + i));
dataNodeLocation.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0", 8777 + i));
@@ -162,33 +164,64 @@ public class ConfigNodeRPCServiceProcessorTest {
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getStatus().getCode());
Assert.assertEquals(i, resp.getDataNodeId());
checkGlobalConfig(resp.getGlobalConfig());
+ // activate dataNode
+ dataNodeLocation.setDataNodeId(resp.getDataNodeId());
+ TDataNodeActiveReq dataNodeActiveReq = new TDataNodeActiveReq(dataNodeInfo);
+ TSStatus activeDataNodeRsp = processor.activeDataNode(dataNodeActiveReq);
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), activeDataNodeRsp.getCode());
}
}
@Test
public void testRegisterAndQueryDataNode() throws TException {
- registerDataNodes();
+ registerAndActivateDataNodes();
+ TDataNodeLocation dataNodeLocation = new TDataNodeLocation();
+ TDataNodeInfo dataNodeInfo = new TDataNodeInfo();
+ dataNodeInfo.setLocation(dataNodeLocation);
+ dataNodeInfo.setCpuCoreNum(8);
+ dataNodeInfo.setMaxMemory(1024 * 1024);
+
+ TDataNodeRegisterReq req = new TDataNodeRegisterReq(dataNodeInfo);
+ TDataNodeRegisterResp resp;
+
+ // test only register not activate
+ dataNodeLocation.setDataNodeId(-1);
+ dataNodeLocation.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6670));
+ dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9007));
+ dataNodeLocation.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0", 8781));
+ dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40014));
+ dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50014));
+ resp = processor.registerDataNode(req);
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getStatus().getCode());
+ Assert.assertEquals(3, resp.getDataNodeId());
// test success re-register
- TDataNodeLocation dataNodeLocation = new TDataNodeLocation();
+ dataNodeLocation.setDataNodeId(1);
dataNodeLocation.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6668));
dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9004));
dataNodeLocation.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0", 8778));
dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40011));
dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50011));
- TDataNodeInfo dataNodeInfo = new TDataNodeInfo();
- dataNodeInfo.setLocation(dataNodeLocation);
- dataNodeInfo.setCpuCoreNum(8);
- dataNodeInfo.setMaxMemory(1024 * 1024);
-
- TDataNodeRegisterReq req = new TDataNodeRegisterReq(dataNodeInfo);
- TDataNodeRegisterResp resp = processor.registerDataNode(req);
+ resp = processor.registerDataNode(req);
Assert.assertEquals(
TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode(), resp.getStatus().getCode());
Assert.assertEquals(1, resp.getDataNodeId());
checkGlobalConfig(resp.getGlobalConfig());
+ // test success re-activated
+ dataNodeLocation.setDataNodeId(1);
+ dataNodeLocation.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6668));
+ dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9004));
+ dataNodeLocation.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0", 8778));
+ dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40011));
+ dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50011));
+
+ TDataNodeActiveReq activateReq = new TDataNodeActiveReq(dataNodeInfo);
+ TSStatus activateRlt = processor.activeDataNode(activateReq);
+ Assert.assertEquals(
+ TSStatusCode.DATANODE_ALREADY_ACTIVATED.getStatusCode(), activateRlt.getCode());
+
// test query DataNodeInfo
TDataNodeInfoResp infoResp = processor.getDataNodeInfo(-1);
Assert.assertEquals(
@@ -207,6 +240,12 @@ public class ConfigNodeRPCServiceProcessorTest {
Assert.assertEquals(dataNodeLocation, infoList.get(i).getValue().getLocation());
}
+ infoResp = processor.getDataNodeInfo(3);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), infoResp.getStatus().getCode());
+ infoMap = infoResp.getDataNodeInfoMap();
+ Assert.assertEquals(0, infoMap.size());
+
infoResp = processor.getDataNodeInfo(0);
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), infoResp.getStatus().getCode());
@@ -224,7 +263,7 @@ public class ConfigNodeRPCServiceProcessorTest {
@Test
public void getAllClusterNodeInfosTest() throws TException {
- registerDataNodes();
+ registerAndActivateDataNodes();
TClusterNodeInfos clusterNodes = processor.getAllClusterNodeInfos();
@@ -249,7 +288,7 @@ public class ConfigNodeRPCServiceProcessorTest {
final String sg1 = "root.sg1";
// register DataNodes
- registerDataNodes();
+ registerAndActivateDataNodes();
// set StorageGroup0 by default values
TSetStorageGroupReq setReq0 = new TSetStorageGroupReq(new TStorageGroupSchema(sg0));
@@ -383,7 +422,7 @@ public class ConfigNodeRPCServiceProcessorTest {
Assert.assertNull(schemaPartitionResp.getSchemaRegionMap());
// register DataNodes
- registerDataNodes();
+ registerAndActivateDataNodes();
// Test getSchemaPartition, the result should be empty
buffer = generatePatternTreeBuffer(new String[] {d00, d01, allSg1});
@@ -582,7 +621,7 @@ public class ConfigNodeRPCServiceProcessorTest {
Assert.assertNull(dataPartitionResp.getDataPartitionMap());
// register DataNodes
- registerDataNodes();
+ registerAndActivateDataNodes();
// Test getDataPartition, the result should be empty
dataPartitionReq = new TDataPartitionReq(partitionSlotsMap0);
@@ -959,7 +998,7 @@ public class ConfigNodeRPCServiceProcessorTest {
final String sg0 = "root.sg0";
final String sg1 = "root.sg1";
// register DataNodes
- registerDataNodes();
+ registerAndActivateDataNodes();
ConfigNodeProcedureEnv.setSkipForTest(true);
TSetStorageGroupReq setReq0 = new TSetStorageGroupReq(new TStorageGroupSchema(sg0));
// set StorageGroup0 by default values
@@ -985,7 +1024,7 @@ public class ConfigNodeRPCServiceProcessorTest {
final String sg0 = "root.sg0";
final String sg1 = "root.sg1";
// register DataNodes
- registerDataNodes();
+ registerAndActivateDataNodes();
ConfigNodeProcedureEnv.setSkipForTest(true);
ConfigNodeProcedureEnv.setInvalidCacheResult(false);
TSetStorageGroupReq setReq0 = new TSetStorageGroupReq(new TStorageGroupSchema(sg0));
@@ -1070,7 +1109,7 @@ public class ConfigNodeRPCServiceProcessorTest {
TSchemaNodeManagementResp nodeManagementResp;
// register DataNodes
- registerDataNodes();
+ registerAndActivateDataNodes();
// set StorageGroups
for (int i = 0; i < storageGroupNum; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 7036b738a1..82416df4a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -175,27 +175,8 @@ public class DataNode implements DataNodeMBean {
while (retry > 0) {
logger.info("start registering to the cluster.");
try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
- // Set DataNodeLocation
- TDataNodeLocation location = new TDataNodeLocation();
- location.setDataNodeId(config.getDataNodeId());
- location.setClientRpcEndPoint(new TEndPoint(config.getRpcAddress(), config.getRpcPort()));
- location.setInternalEndPoint(
- new TEndPoint(config.getInternalAddress(), config.getInternalPort()));
- location.setMPPDataExchangeEndPoint(
- new TEndPoint(config.getInternalAddress(), config.getMppDataExchangePort()));
- location.setDataRegionConsensusEndPoint(
- new TEndPoint(config.getInternalAddress(), config.getDataRegionConsensusPort()));
- location.setSchemaRegionConsensusEndPoint(
- new TEndPoint(config.getInternalAddress(), config.getSchemaRegionConsensusPort()));
-
- // Set DataNodeInfo
- TDataNodeInfo info = new TDataNodeInfo();
- info.setLocation(location);
- info.setCpuCoreNum(Runtime.getRuntime().availableProcessors());
- info.setMaxMemory(Runtime.getRuntime().totalMemory());
-
TDataNodeRegisterReq req = new TDataNodeRegisterReq();
- req.setDataNodeInfo(info);
+ req.setDataNodeInfo(generateDataNodeInfo());
TDataNodeRegisterResp dataNodeRegisterResp = configNodeClient.registerDataNode(req);
// store config node lists from resp
@@ -349,23 +330,11 @@ public class DataNode implements DataNodeMBean {
while (retry > 0) {
logger.info("start joining the cluster.");
try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
- // Set DataNodeLocation
- TDataNodeLocation location = new TDataNodeLocation();
- location.setDataNodeId(config.getDataNodeId());
- location.setClientRpcEndPoint(new TEndPoint(config.getRpcAddress(), config.getRpcPort()));
- location.setInternalEndPoint(
- new TEndPoint(config.getInternalAddress(), config.getInternalPort()));
- location.setMPPDataExchangeEndPoint(
- new TEndPoint(config.getInternalAddress(), config.getMppDataExchangePort()));
- location.setDataRegionConsensusEndPoint(
- new TEndPoint(config.getInternalAddress(), config.getDataRegionConsensusPort()));
- location.setSchemaRegionConsensusEndPoint(
- new TEndPoint(config.getInternalAddress(), config.getSchemaRegionConsensusPort()));
TDataNodeActiveReq req = new TDataNodeActiveReq();
- req.setLocation(location);
- req.setDataNodeId(config.getDataNodeId());
+ req.setDataNodeInfo(generateDataNodeInfo());
TSStatus status = configNodeClient.activeDataNode(req);
- if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ || status.getCode() == TSStatusCode.DATANODE_ALREADY_ACTIVATED.getStatusCode()) {
logger.info("Joined the cluster successfully");
return;
}
@@ -402,6 +371,32 @@ public class DataNode implements DataNodeMBean {
initProtocols();
}
+ /**
+ * generate dataNodeInfo
+ *
+ * @return TDataNodeInfo
+ */
+ private TDataNodeInfo generateDataNodeInfo() {
+ // Set DataNodeLocation
+ TDataNodeLocation location = new TDataNodeLocation();
+ location.setDataNodeId(config.getDataNodeId());
+ location.setClientRpcEndPoint(new TEndPoint(config.getRpcAddress(), config.getRpcPort()));
+ location.setInternalEndPoint(
+ new TEndPoint(config.getInternalAddress(), config.getInternalPort()));
+ location.setMPPDataExchangeEndPoint(
+ new TEndPoint(config.getInternalAddress(), config.getMppDataExchangePort()));
+ location.setDataRegionConsensusEndPoint(
+ new TEndPoint(config.getInternalAddress(), config.getDataRegionConsensusPort()));
+ location.setSchemaRegionConsensusEndPoint(
+ new TEndPoint(config.getInternalAddress(), config.getSchemaRegionConsensusPort()));
+ // Set DataNodeInfo
+ TDataNodeInfo info = new TDataNodeInfo();
+ info.setLocation(location);
+ info.setCpuCoreNum(Runtime.getRuntime().availableProcessors());
+ info.setMaxMemory(Runtime.getRuntime().totalMemory());
+ return info;
+ }
+
private void registerUdfServices() throws StartupException {
registerManager.register(TemporaryQueryDataFileService.getInstance());
registerManager.register(
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 47d6625146..0fcc1a8ada 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -136,7 +136,8 @@ public enum TSStatusCode {
REGISTER_CONFIGNODE_FAILED(907),
REMOVE_CONFIGNODE_FAILED(908),
REMOVE_CONFIGNODE_DUPLICATION(909),
- STOP_CONOFIGNODE_FAILED(910);
+ STOP_CONOFIGNODE_FAILED(910),
+ DATANODE_ALREADY_ACTIVATED(911);
private int statusCode;
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 21a3b6dff9..75059ddf08 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -30,8 +30,7 @@ struct TDataNodeRegisterReq {
}
struct TDataNodeActiveReq {
- 1: required common.TDataNodeLocation location
- 2: required i32 dataNodeId
+ 1: required common.TDataNodeInfo dataNodeInfo
}
struct TGlobalConfig {