You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/07/02 01:58:51 UTC

[iotdb] branch master updated: [IOTDB-3528] Filter DataNode which may not be ready in ConfigNode (#6539)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 708a08c9b7 [IOTDB-3528] Filter DataNode which may not be ready in ConfigNode (#6539)
708a08c9b7 is described below

commit 708a08c9b72101f3eb4585ab410fdbeb8850a238
Author: imquanke <39...@users.noreply.github.com>
AuthorDate: Sat Jul 2 09:58:46 2022 +0800

    [IOTDB-3528] Filter DataNode which may not be ready in ConfigNode (#6539)
---
 .../consensus/request/ConfigPhysicalPlan.java      |  4 ++
 .../consensus/request/ConfigPhysicalPlanType.java  |  1 +
 .../request/write/ActivateDataNodePlan.java        | 71 +++++++++++++++++++++
 .../iotdb/confignode/manager/ConfigManager.java    | 10 +++
 .../apache/iotdb/confignode/manager/IManager.java  |  9 +++
 .../iotdb/confignode/manager/NodeManager.java      | 44 ++++++++-----
 .../iotdb/confignode/persistence/NodeInfo.java     | 28 ++++++++-
 .../persistence/executor/ConfigPlanExecutor.java   |  3 +
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  4 +-
 .../request/ConfigPhysicalPlanSerDeTest.java       | 22 +++++++
 .../thrift/ConfigNodeRPCServiceProcessorTest.java  | 73 +++++++++++++++++-----
 .../java/org/apache/iotdb/db/service/DataNode.java | 65 +++++++++----------
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |  3 +-
 .../src/main/thrift/confignode.thrift              |  3 +-
 14 files changed, 265 insertions(+), 75 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index 900126c325..55b4289994 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateSchemaParti
 import org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
+import org.apache.iotdb.confignode.consensus.request.write.ActivateDataNodePlan;
 import org.apache.iotdb.confignode.consensus.request.write.AdjustMaxRegionGroupCountPlan;
 import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodePlan;
 import org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionPlan;
@@ -101,6 +102,9 @@ public abstract class ConfigPhysicalPlan implements IConsensusRequest {
         case RegisterDataNode:
           req = new RegisterDataNodePlan();
           break;
+        case ActivateDataNode:
+          req = new ActivateDataNodePlan();
+          break;
         case GetDataNodeInfo:
           req = new GetDataNodeInfoPlan();
           break;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index 1475228a48..0a9f458db1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.consensus.request;
 
 public enum ConfigPhysicalPlanType {
   RegisterDataNode,
+  ActivateDataNode,
   GetDataNodeInfo,
   SetStorageGroup,
   SetTTL,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/ActivateDataNodePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/ActivateDataNodePlan.java
new file mode 100644
index 0000000000..d1c8513102
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/ActivateDataNodePlan.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.consensus.request.write;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
+import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class ActivateDataNodePlan extends ConfigPhysicalPlan {
+
+  private TDataNodeInfo info;
+
+  public ActivateDataNodePlan() {
+    super(ConfigPhysicalPlanType.ActivateDataNode);
+  }
+
+  public ActivateDataNodePlan(TDataNodeInfo info) {
+    this();
+    this.info = info;
+  }
+
+  public TDataNodeInfo getInfo() {
+    return info;
+  }
+
+  @Override
+  protected void serializeImpl(DataOutputStream stream) throws IOException {
+    stream.writeInt(ConfigPhysicalPlanType.ActivateDataNode.ordinal());
+    ThriftCommonsSerDeUtils.serializeTDataNodeInfo(info, stream);
+  }
+
+  @Override
+  protected void deserializeImpl(ByteBuffer buffer) {
+    info = ThriftCommonsSerDeUtils.deserializeTDataNodeInfo(buffer);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    ActivateDataNodePlan that = (ActivateDataNodePlan) o;
+    return info.equals(that.info);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(info);
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 9d56895b03..d87b748455 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateSchemaParti
 import org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
+import org.apache.iotdb.confignode.consensus.request.write.ActivateDataNodePlan;
 import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodePlan;
 import org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodePlan;
 import org.apache.iotdb.confignode.consensus.request.write.SetDataReplicationFactorPlan;
@@ -177,6 +178,15 @@ public class ConfigManager implements IManager {
     }
   }
 
+  @Override
+  public TSStatus activateDataNode(ActivateDataNodePlan activateDataNodePlan) {
+    TSStatus status = confirmLeader();
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      return nodeManager.activateDataNode(activateDataNodePlan);
+    }
+    return status;
+  }
+
   @Override
   public DataSet getDataNodeInfo(GetDataNodeInfoPlan getDataNodeInfoPlan) {
     TSStatus status = confirmLeader();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 8ea24c841e..10106b3e6f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetDataPartitionPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateDataPartitionPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
+import org.apache.iotdb.confignode.consensus.request.write.ActivateDataNodePlan;
 import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodePlan;
 import org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodePlan;
 import org.apache.iotdb.confignode.consensus.request.write.SetDataReplicationFactorPlan;
@@ -112,6 +113,14 @@ public interface IManager {
    */
   DataSet registerDataNode(RegisterDataNodePlan registerDataNodePlan);
 
+  /**
+   * activate DataNode
+   *
+   * @param activateDataNodePlan ActivateDataNodePlan
+   * @return TSStatus
+   */
+  TSStatus activateDataNode(ActivateDataNodePlan activateDataNodePlan);
+
   /**
    * Get DataNode info
    *
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index 1bce931ae8..0b80adfae8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.confignode.client.AsyncDataNodeClientPool;
 import org.apache.iotdb.confignode.client.handlers.FlushHandler;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.read.GetDataNodeInfoPlan;
+import org.apache.iotdb.confignode.consensus.request.write.ActivateDataNodePlan;
 import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodePlan;
 import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodePlan;
 import org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodePlan;
@@ -98,31 +99,44 @@ public class NodeManager {
    */
   public DataSet registerDataNode(RegisterDataNodePlan req) {
     DataNodeConfigurationResp dataSet = new DataNodeConfigurationResp();
-
+    TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    status.setMessage("registerDataNode success.");
     if (nodeInfo.isOnlineDataNode(req.getInfo().getLocation())) {
-      // Reset client
-      AsyncDataNodeClientPool.getInstance()
-          .resetClient(req.getInfo().getLocation().getInternalEndPoint());
-
-      TSStatus status = new TSStatus(TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode());
+      status.setCode(TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode());
       status.setMessage("DataNode already registered.");
-      dataSet.setStatus(status);
-    } else {
-      // Persist DataNodeInfo
+    } else if (req.getInfo().getLocation().getDataNodeId() < 0) {
+      // only when new dataNode is registered, generate new dataNodeId
       req.getInfo().getLocation().setDataNodeId(nodeInfo.generateNextNodeId());
-      ConsensusWriteResponse resp = getConsensusManager().write(req);
-      dataSet.setStatus(resp.getStatus());
-
-      // Adjust the maximum RegionGroup number of each StorageGroup
-      getClusterSchemaManager().adjustMaxRegionGroupCount();
+      status = getConsensusManager().write(req).getStatus();
     }
-
+    dataSet.setStatus(status);
     dataSet.setDataNodeId(req.getInfo().getLocation().getDataNodeId());
     dataSet.setConfigNodeList(nodeInfo.getRegisteredConfigNodes());
     setGlobalConfig(dataSet);
     return dataSet;
   }
 
+  /**
+   * Active DataNode
+   *
+   * @param req ActiveDataNodeReq
+   * @return TSStatus The TSStatus will be set to SUCCESS_STATUS when active success, and
+   *     DATANODE_ALREADY_REGISTERED when the DataNode is already exist.
+   */
+  public TSStatus activateDataNode(ActivateDataNodePlan req) {
+    TSStatus status = new TSStatus();
+    if (nodeInfo.isOnlineDataNode(req.getInfo().getLocation())) {
+      status.setCode(TSStatusCode.DATANODE_ALREADY_ACTIVATED.getStatusCode());
+      status.setMessage("DataNode already activated.");
+    } else {
+      ConsensusWriteResponse resp = getConsensusManager().write(req);
+      status = resp.getStatus();
+      // Adjust the maximum RegionGroup number of each StorageGroup
+      getClusterSchemaManager().adjustMaxRegionGroupCount();
+    }
+    return status;
+  }
+
   /**
    * Get DataNode info
    *
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
index ca5c4fa6aa..a4769da54e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
 import org.apache.iotdb.confignode.consensus.request.read.GetDataNodeInfoPlan;
+import org.apache.iotdb.confignode.consensus.request.write.ActivateDataNodePlan;
 import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodePlan;
 import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodePlan;
 import org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodePlan;
@@ -128,7 +129,7 @@ public class NodeInfo implements SnapshotProcessor {
   public boolean isOnlineDataNode(TDataNodeLocation info) {
     boolean result = false;
     dataNodeInfoReadWriteLock.readLock().lock();
-
+    int originalDataNodeId = info.getDataNodeId();
     try {
       for (Map.Entry<Integer, TDataNodeInfo> entry : onlineDataNodes.entrySet()) {
         info.setDataNodeId(entry.getKey());
@@ -136,6 +137,7 @@ public class NodeInfo implements SnapshotProcessor {
           result = true;
           break;
         }
+        info.setDataNodeId(originalDataNodeId);
       }
     } finally {
       dataNodeInfoReadWriteLock.readLock().unlock();
@@ -155,7 +157,6 @@ public class NodeInfo implements SnapshotProcessor {
     TDataNodeInfo info = registerDataNodePlan.getInfo();
     dataNodeInfoReadWriteLock.writeLock().lock();
     try {
-      onlineDataNodes.put(info.getLocation().getDataNodeId(), info);
 
       // To ensure that the nextNodeId is updated correctly when
       // the ConfigNode-followers concurrently processes RegisterDataNodePlan,
@@ -181,6 +182,25 @@ public class NodeInfo implements SnapshotProcessor {
     return result;
   }
 
+  /**
+   * add dataNode to onlineDataNodes
+   *
+   * @param activateDataNodePlan ActivateDataNodePlan
+   * @return SUCCESS_STATUS
+   */
+  public TSStatus activateDataNode(ActivateDataNodePlan activateDataNodePlan) {
+    TSStatus result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    result.setMessage("activateDataNode success.");
+    TDataNodeInfo info = activateDataNodePlan.getInfo();
+    dataNodeInfoReadWriteLock.writeLock().lock();
+    try {
+      onlineDataNodes.put(info.getLocation().getDataNodeId(), info);
+    } finally {
+      dataNodeInfoReadWriteLock.writeLock().unlock();
+    }
+    return result;
+  }
+
   /**
    * Get DataNode info
    *
@@ -199,7 +219,9 @@ public class NodeInfo implements SnapshotProcessor {
         result.setDataNodeInfoMap(new HashMap<>(onlineDataNodes));
       } else {
         result.setDataNodeInfoMap(
-            Collections.singletonMap(dataNodeId, onlineDataNodes.get(dataNodeId)));
+            onlineDataNodes.get(dataNodeId) == null
+                ? new HashMap<>(0)
+                : Collections.singletonMap(dataNodeId, onlineDataNodes.get(dataNodeId)));
       }
     } finally {
       dataNodeInfoReadWriteLock.readLock().unlock();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index a081e5c526..dc2e5c6e68 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetNodePathsPartitionP
 import org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
+import org.apache.iotdb.confignode.consensus.request.write.ActivateDataNodePlan;
 import org.apache.iotdb.confignode.consensus.request.write.AdjustMaxRegionGroupCountPlan;
 import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodePlan;
 import org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionPlan;
@@ -145,6 +146,8 @@ public class ConfigPlanExecutor {
     switch (req.getType()) {
       case RegisterDataNode:
         return nodeInfo.registerDataNode((RegisterDataNodePlan) req);
+      case ActivateDataNode:
+        return nodeInfo.activateDataNode((ActivateDataNodePlan) req);
       case SetStorageGroup:
         TSStatus status = clusterSchemaInfo.setStorageGroup((SetStorageGroupPlan) req);
         if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index a5012a695d..1f738ed078 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetDataPartitionPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateDataPartitionPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
+import org.apache.iotdb.confignode.consensus.request.write.ActivateDataNodePlan;
 import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodePlan;
 import org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodePlan;
 import org.apache.iotdb.confignode.consensus.request.write.SetDataReplicationFactorPlan;
@@ -142,8 +143,7 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
 
   @Override
   public TSStatus activeDataNode(TDataNodeActiveReq req) throws TException {
-    // TODO: implement active data node
-    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    return configManager.activateDataNode(new ActivateDataNodePlan(req.getDataNodeInfo()));
   }
 
   @Override
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index 46bf937f4d..86c9a4067f 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateSchemaParti
 import org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionPlan;
 import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupPlan;
+import org.apache.iotdb.confignode.consensus.request.write.ActivateDataNodePlan;
 import org.apache.iotdb.confignode.consensus.request.write.AdjustMaxRegionGroupCountPlan;
 import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodePlan;
 import org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionPlan;
@@ -98,6 +99,27 @@ public class ConfigPhysicalPlanSerDeTest {
     Assert.assertEquals(plan0, plan1);
   }
 
+  @Test
+  public void ActivateDataNodePlanTest() throws IOException {
+    TDataNodeLocation dataNodeLocation = new TDataNodeLocation();
+    dataNodeLocation.setDataNodeId(1);
+    dataNodeLocation.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6667));
+    dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
+    dataNodeLocation.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0", 8777));
+    dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
+    dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010));
+
+    TDataNodeInfo dataNodeInfo = new TDataNodeInfo();
+    dataNodeInfo.setLocation(dataNodeLocation);
+    dataNodeInfo.setCpuCoreNum(16);
+    dataNodeInfo.setMaxMemory(34359738368L);
+
+    ActivateDataNodePlan plan0 = new ActivateDataNodePlan(dataNodeInfo);
+    ActivateDataNodePlan plan1 =
+        (ActivateDataNodePlan) ConfigPhysicalPlan.Factory.create(plan0.serializeToByteBuffer());
+    Assert.assertEquals(plan0, plan1);
+  }
+
   @Test
   public void QueryDataNodeInfoPlanTest() throws IOException {
     GetDataNodeInfoPlan plan0 = new GetDataNodeInfoPlan(-1);
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
index 3be5967836..945c591506 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
@@ -48,6 +48,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
 import org.apache.iotdb.confignode.rpc.thrift.TClusterNodeInfos;
 import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeActiveReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
@@ -142,9 +143,10 @@ public class ConfigNodeRPCServiceProcessorTest {
         globalConfig.getSeriesPartitionExecutorClass());
   }
 
-  private void registerDataNodes() throws TException {
+  private void registerAndActivateDataNodes() throws TException {
     for (int i = 0; i < 3; i++) {
       TDataNodeLocation dataNodeLocation = new TDataNodeLocation();
+      dataNodeLocation.setDataNodeId(-1);
       dataNodeLocation.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6667 + i));
       dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003 + i));
       dataNodeLocation.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0", 8777 + i));
@@ -162,33 +164,64 @@ public class ConfigNodeRPCServiceProcessorTest {
       Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getStatus().getCode());
       Assert.assertEquals(i, resp.getDataNodeId());
       checkGlobalConfig(resp.getGlobalConfig());
+      // activate dataNode
+      dataNodeLocation.setDataNodeId(resp.getDataNodeId());
+      TDataNodeActiveReq dataNodeActiveReq = new TDataNodeActiveReq(dataNodeInfo);
+      TSStatus activeDataNodeRsp = processor.activeDataNode(dataNodeActiveReq);
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), activeDataNodeRsp.getCode());
     }
   }
 
   @Test
   public void testRegisterAndQueryDataNode() throws TException {
-    registerDataNodes();
+    registerAndActivateDataNodes();
+    TDataNodeLocation dataNodeLocation = new TDataNodeLocation();
+    TDataNodeInfo dataNodeInfo = new TDataNodeInfo();
+    dataNodeInfo.setLocation(dataNodeLocation);
+    dataNodeInfo.setCpuCoreNum(8);
+    dataNodeInfo.setMaxMemory(1024 * 1024);
+
+    TDataNodeRegisterReq req = new TDataNodeRegisterReq(dataNodeInfo);
+    TDataNodeRegisterResp resp;
+
+    // test only register not activate
+    dataNodeLocation.setDataNodeId(-1);
+    dataNodeLocation.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6670));
+    dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9007));
+    dataNodeLocation.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0", 8781));
+    dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40014));
+    dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50014));
+    resp = processor.registerDataNode(req);
+    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getStatus().getCode());
+    Assert.assertEquals(3, resp.getDataNodeId());
 
     // test success re-register
-    TDataNodeLocation dataNodeLocation = new TDataNodeLocation();
+    dataNodeLocation.setDataNodeId(1);
     dataNodeLocation.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6668));
     dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9004));
     dataNodeLocation.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0", 8778));
     dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40011));
     dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50011));
 
-    TDataNodeInfo dataNodeInfo = new TDataNodeInfo();
-    dataNodeInfo.setLocation(dataNodeLocation);
-    dataNodeInfo.setCpuCoreNum(8);
-    dataNodeInfo.setMaxMemory(1024 * 1024);
-
-    TDataNodeRegisterReq req = new TDataNodeRegisterReq(dataNodeInfo);
-    TDataNodeRegisterResp resp = processor.registerDataNode(req);
+    resp = processor.registerDataNode(req);
     Assert.assertEquals(
         TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode(), resp.getStatus().getCode());
     Assert.assertEquals(1, resp.getDataNodeId());
     checkGlobalConfig(resp.getGlobalConfig());
 
+    // test success re-activated
+    dataNodeLocation.setDataNodeId(1);
+    dataNodeLocation.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6668));
+    dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9004));
+    dataNodeLocation.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0", 8778));
+    dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40011));
+    dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50011));
+
+    TDataNodeActiveReq activateReq = new TDataNodeActiveReq(dataNodeInfo);
+    TSStatus activateRlt = processor.activeDataNode(activateReq);
+    Assert.assertEquals(
+        TSStatusCode.DATANODE_ALREADY_ACTIVATED.getStatusCode(), activateRlt.getCode());
+
     // test query DataNodeInfo
     TDataNodeInfoResp infoResp = processor.getDataNodeInfo(-1);
     Assert.assertEquals(
@@ -207,6 +240,12 @@ public class ConfigNodeRPCServiceProcessorTest {
       Assert.assertEquals(dataNodeLocation, infoList.get(i).getValue().getLocation());
     }
 
+    infoResp = processor.getDataNodeInfo(3);
+    Assert.assertEquals(
+        TSStatusCode.SUCCESS_STATUS.getStatusCode(), infoResp.getStatus().getCode());
+    infoMap = infoResp.getDataNodeInfoMap();
+    Assert.assertEquals(0, infoMap.size());
+
     infoResp = processor.getDataNodeInfo(0);
     Assert.assertEquals(
         TSStatusCode.SUCCESS_STATUS.getStatusCode(), infoResp.getStatus().getCode());
@@ -224,7 +263,7 @@ public class ConfigNodeRPCServiceProcessorTest {
 
   @Test
   public void getAllClusterNodeInfosTest() throws TException {
-    registerDataNodes();
+    registerAndActivateDataNodes();
 
     TClusterNodeInfos clusterNodes = processor.getAllClusterNodeInfos();
 
@@ -249,7 +288,7 @@ public class ConfigNodeRPCServiceProcessorTest {
     final String sg1 = "root.sg1";
 
     // register DataNodes
-    registerDataNodes();
+    registerAndActivateDataNodes();
 
     // set StorageGroup0 by default values
     TSetStorageGroupReq setReq0 = new TSetStorageGroupReq(new TStorageGroupSchema(sg0));
@@ -383,7 +422,7 @@ public class ConfigNodeRPCServiceProcessorTest {
     Assert.assertNull(schemaPartitionResp.getSchemaRegionMap());
 
     // register DataNodes
-    registerDataNodes();
+    registerAndActivateDataNodes();
 
     // Test getSchemaPartition, the result should be empty
     buffer = generatePatternTreeBuffer(new String[] {d00, d01, allSg1});
@@ -582,7 +621,7 @@ public class ConfigNodeRPCServiceProcessorTest {
     Assert.assertNull(dataPartitionResp.getDataPartitionMap());
 
     // register DataNodes
-    registerDataNodes();
+    registerAndActivateDataNodes();
 
     // Test getDataPartition, the result should be empty
     dataPartitionReq = new TDataPartitionReq(partitionSlotsMap0);
@@ -959,7 +998,7 @@ public class ConfigNodeRPCServiceProcessorTest {
     final String sg0 = "root.sg0";
     final String sg1 = "root.sg1";
     // register DataNodes
-    registerDataNodes();
+    registerAndActivateDataNodes();
     ConfigNodeProcedureEnv.setSkipForTest(true);
     TSetStorageGroupReq setReq0 = new TSetStorageGroupReq(new TStorageGroupSchema(sg0));
     // set StorageGroup0 by default values
@@ -985,7 +1024,7 @@ public class ConfigNodeRPCServiceProcessorTest {
     final String sg0 = "root.sg0";
     final String sg1 = "root.sg1";
     // register DataNodes
-    registerDataNodes();
+    registerAndActivateDataNodes();
     ConfigNodeProcedureEnv.setSkipForTest(true);
     ConfigNodeProcedureEnv.setInvalidCacheResult(false);
     TSetStorageGroupReq setReq0 = new TSetStorageGroupReq(new TStorageGroupSchema(sg0));
@@ -1070,7 +1109,7 @@ public class ConfigNodeRPCServiceProcessorTest {
     TSchemaNodeManagementResp nodeManagementResp;
 
     // register DataNodes
-    registerDataNodes();
+    registerAndActivateDataNodes();
 
     // set StorageGroups
     for (int i = 0; i < storageGroupNum; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 7036b738a1..82416df4a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -175,27 +175,8 @@ public class DataNode implements DataNodeMBean {
     while (retry > 0) {
       logger.info("start registering to the cluster.");
       try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
-        // Set DataNodeLocation
-        TDataNodeLocation location = new TDataNodeLocation();
-        location.setDataNodeId(config.getDataNodeId());
-        location.setClientRpcEndPoint(new TEndPoint(config.getRpcAddress(), config.getRpcPort()));
-        location.setInternalEndPoint(
-            new TEndPoint(config.getInternalAddress(), config.getInternalPort()));
-        location.setMPPDataExchangeEndPoint(
-            new TEndPoint(config.getInternalAddress(), config.getMppDataExchangePort()));
-        location.setDataRegionConsensusEndPoint(
-            new TEndPoint(config.getInternalAddress(), config.getDataRegionConsensusPort()));
-        location.setSchemaRegionConsensusEndPoint(
-            new TEndPoint(config.getInternalAddress(), config.getSchemaRegionConsensusPort()));
-
-        // Set DataNodeInfo
-        TDataNodeInfo info = new TDataNodeInfo();
-        info.setLocation(location);
-        info.setCpuCoreNum(Runtime.getRuntime().availableProcessors());
-        info.setMaxMemory(Runtime.getRuntime().totalMemory());
-
         TDataNodeRegisterReq req = new TDataNodeRegisterReq();
-        req.setDataNodeInfo(info);
+        req.setDataNodeInfo(generateDataNodeInfo());
         TDataNodeRegisterResp dataNodeRegisterResp = configNodeClient.registerDataNode(req);
 
         // store config node lists from resp
@@ -349,23 +330,11 @@ public class DataNode implements DataNodeMBean {
     while (retry > 0) {
       logger.info("start joining the cluster.");
       try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
-        // Set DataNodeLocation
-        TDataNodeLocation location = new TDataNodeLocation();
-        location.setDataNodeId(config.getDataNodeId());
-        location.setClientRpcEndPoint(new TEndPoint(config.getRpcAddress(), config.getRpcPort()));
-        location.setInternalEndPoint(
-            new TEndPoint(config.getInternalAddress(), config.getInternalPort()));
-        location.setMPPDataExchangeEndPoint(
-            new TEndPoint(config.getInternalAddress(), config.getMppDataExchangePort()));
-        location.setDataRegionConsensusEndPoint(
-            new TEndPoint(config.getInternalAddress(), config.getDataRegionConsensusPort()));
-        location.setSchemaRegionConsensusEndPoint(
-            new TEndPoint(config.getInternalAddress(), config.getSchemaRegionConsensusPort()));
         TDataNodeActiveReq req = new TDataNodeActiveReq();
-        req.setLocation(location);
-        req.setDataNodeId(config.getDataNodeId());
+        req.setDataNodeInfo(generateDataNodeInfo());
         TSStatus status = configNodeClient.activeDataNode(req);
-        if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+            || status.getCode() == TSStatusCode.DATANODE_ALREADY_ACTIVATED.getStatusCode()) {
           logger.info("Joined the cluster successfully");
           return;
         }
@@ -402,6 +371,32 @@ public class DataNode implements DataNodeMBean {
     initProtocols();
   }
 
+  /**
+   * generate dataNodeInfo
+   *
+   * @return TDataNodeInfo
+   */
+  private TDataNodeInfo generateDataNodeInfo() {
+    // Set DataNodeLocation
+    TDataNodeLocation location = new TDataNodeLocation();
+    location.setDataNodeId(config.getDataNodeId());
+    location.setClientRpcEndPoint(new TEndPoint(config.getRpcAddress(), config.getRpcPort()));
+    location.setInternalEndPoint(
+        new TEndPoint(config.getInternalAddress(), config.getInternalPort()));
+    location.setMPPDataExchangeEndPoint(
+        new TEndPoint(config.getInternalAddress(), config.getMppDataExchangePort()));
+    location.setDataRegionConsensusEndPoint(
+        new TEndPoint(config.getInternalAddress(), config.getDataRegionConsensusPort()));
+    location.setSchemaRegionConsensusEndPoint(
+        new TEndPoint(config.getInternalAddress(), config.getSchemaRegionConsensusPort()));
+    // Set DataNodeInfo
+    TDataNodeInfo info = new TDataNodeInfo();
+    info.setLocation(location);
+    info.setCpuCoreNum(Runtime.getRuntime().availableProcessors());
+    info.setMaxMemory(Runtime.getRuntime().totalMemory());
+    return info;
+  }
+
   private void registerUdfServices() throws StartupException {
     registerManager.register(TemporaryQueryDataFileService.getInstance());
     registerManager.register(
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 47d6625146..0fcc1a8ada 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -136,7 +136,8 @@ public enum TSStatusCode {
   REGISTER_CONFIGNODE_FAILED(907),
   REMOVE_CONFIGNODE_FAILED(908),
   REMOVE_CONFIGNODE_DUPLICATION(909),
-  STOP_CONOFIGNODE_FAILED(910);
+  STOP_CONOFIGNODE_FAILED(910),
+  DATANODE_ALREADY_ACTIVATED(911);
 
   private int statusCode;
 
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 21a3b6dff9..75059ddf08 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -30,8 +30,7 @@ struct TDataNodeRegisterReq {
 }
 
 struct TDataNodeActiveReq {
-  1: required common.TDataNodeLocation location
-  2: required i32 dataNodeId
+  1: required common.TDataNodeInfo dataNodeInfo
 }
 
 struct TGlobalConfig {