You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/12/23 02:44:06 UTC

[iotdb] branch beyyes/cherry_pick_cluster_node created (now 216c54798f)

This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a change to branch beyyes/cherry_pick_cluster_node
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 216c54798f Start ConfigNode and DataNode successfully when cluster_name is missed (#8547)

This branch includes the following new commits:

     new 485f5365d8 cherry pick 8da1c8728e260f0455684ac9b9d7677105ebf153
     new 216c54798f Start ConfigNode and DataNode successfully when cluster_name is missed (#8547)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 02/02: Start ConfigNode and DataNode successfully when cluster_name is missed (#8547)

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/cherry_pick_cluster_node
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 216c54798f1d506c792c7e0ded174fd8a9fb4250
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Wed Dec 21 08:27:42 2022 +0800

    Start ConfigNode and DataNode successfully when cluster_name is missed (#8547)
---
 .../apache/iotdb/confignode/conf/ConfigNodeConfig.java    |  4 ++--
 .../iotdb/confignode/conf/SystemPropertiesUtils.java      | 15 +++++++++++----
 .../it/cluster/IoTDBClusterNodeErrorStartUpIT.java        |  2 +-
 .../java/org/apache/iotdb/db/client/ConfigNodeClient.java |  2 +-
 .../main/java/org/apache/iotdb/db/conf/IoTDBConfig.java   |  6 +++---
 .../main/java/org/apache/iotdb/db/service/DataNode.java   |  3 ++-
 6 files changed, 20 insertions(+), 12 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 2040134b51..8e6b354534 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -31,8 +31,8 @@ import java.io.File;
 
 public class ConfigNodeConfig {
 
-  /** ClusterId, the default value "testCluster" will be changed after join cluster */
-  private volatile String clusterName = "testCluster";
+  /** ClusterId, the default value "defaultCluster" will be changed after join cluster */
+  private volatile String clusterName = "defaultCluster";
 
   /** ConfigNodeId, the default value -1 will be changed after join cluster */
   private volatile int configNodeId = -1;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
index 708b06f9a5..c7decd16e6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
@@ -260,9 +260,10 @@ public class SystemPropertiesUtils {
     Properties systemProperties = getSystemProperties();
     String clusterName = systemProperties.getProperty("cluster_name", null);
     if (clusterName == null) {
-      throw new IOException(
-          "The parameter cluster_name doesn't exist in data/confignode/system/confignode-system.properties. "
-              + "Please delete data dir data/confignode and restart again");
+      LOGGER.warn(
+          "Lack cluster_name field in data/confignode/system/confignode-system.properties, set it as defaultCluster");
+      systemProperties.setProperty("cluster_name", "defaultCluster");
+      return systemProperties.getProperty("cluster_name", null);
     }
     return clusterName;
   }
@@ -295,7 +296,13 @@ public class SystemPropertiesUtils {
   public static boolean isSeedConfigNode() {
     try {
       Properties systemProperties = getSystemProperties();
-      return Boolean.parseBoolean(systemProperties.getProperty("is_seed_config_node", null));
+      boolean isSeedConfigNode =
+          Boolean.parseBoolean(systemProperties.getProperty("is_seed_config_node", null));
+      if (isSeedConfigNode) {
+        return true;
+      } else {
+        return ConfigNodeDescriptor.getInstance().isSeedConfigNode();
+      }
     } catch (IOException ignore) {
       return false;
     }
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java
index 2477ce1e68..2c924c9589 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java
@@ -67,7 +67,7 @@ public class IoTDBClusterNodeErrorStartUpIT {
   protected static String originalConfigNodeConsensusProtocolClass;
   private static final String testConsensusProtocolClass = ConsensusFactory.RATIS_CONSENSUS;
 
-  private static final String TEST_CLUSTER_NAME = "testCluster";
+  private static final String TEST_CLUSTER_NAME = "defaultCluster";
   private static final String ERROR_CLUSTER_NAME = "errorCluster";
 
   @Before
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 9b2fc8711f..2a4b3b942a 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -136,7 +136,7 @@ public class ConfigNodeClient
   private static final int RETRY_NUM = 5;
 
   public static final String MSG_RECONNECTION_FAIL =
-      "Fail to connect to any config node. Please check server status";
+      "Fail to connect to any config node. Please check status of ConfigNodes";
 
   private static final int retryIntervalMs = 1000;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 768f317c2f..663c6c5fdb 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -572,10 +572,10 @@ public class IoTDBConfig {
   private boolean isClusterMode = false;
 
   /**
-   * The cluster name that this DataNode joined in the cluster mode. The default value "testCluster"
-   * will be changed after join cluster
+   * The cluster name that this DataNode joined in the cluster mode. The default value
+   * "defaultCluster" will be changed after join cluster
    */
-  private String clusterName = "testCluster";
+  private String clusterName = "defaultCluster";
 
   /**
    * The DataNodeId of this DataNode for cluster mode. The default value -1 will be changed after
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 59574fc31d..92d8af4f92 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -398,7 +398,8 @@ public class DataNode implements DataNodeMBean {
     /* Send restart request */
     int retry = DEFAULT_RETRY;
     TDataNodeRestartReq req = new TDataNodeRestartReq();
-    req.setClusterName(config.getClusterName());
+    req.setClusterName(
+        config.getClusterName() == null ? "defaultCluster" : config.getClusterName());
     req.setDataNodeConfiguration(generateDataNodeConfiguration());
     TDataNodeRestartResp dataNodeRestartResp = null;
     while (retry > 0) {


[iotdb] 01/02: cherry pick 8da1c8728e260f0455684ac9b9d7677105ebf153

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/cherry_pick_cluster_node
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 485f5365d8f885be909f1e75680accf166926836
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Tue Dec 20 13:25:46 2022 +0800

    cherry pick 8da1c8728e260f0455684ac9b9d7677105ebf153
---
 .../confignode/client/ConfigNodeRequestType.java   |   1 +
 .../client/sync/SyncConfigNodeClientPool.java      |   3 +
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |  15 +-
 .../confignode/conf/ConfigNodeDescriptor.java      |   2 +
 .../confignode/conf/SystemPropertiesUtils.java     |  42 ++-
 .../consensus/response/ConfigurationResp.java      |   6 +-
 .../consensus/response/DataNodeRegisterResp.java   |  44 +--
 .../iotdb/confignode/manager/ConfigManager.java    | 136 +++++--
 .../iotdb/confignode/manager/ConsensusManager.java |   2 +-
 .../apache/iotdb/confignode/manager/IManager.java  |  19 +-
 .../manager/node/ClusterNodeStartUtils.java        | 380 +++++++++++++++++++
 .../iotdb/confignode/manager/node/NodeManager.java | 156 ++++----
 .../confignode/persistence/node/NodeInfo.java      |  27 --
 .../iotdb/confignode/service/ConfigNode.java       |  74 +++-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  36 +-
 .../java/org/apache/iotdb/it/env/AbstractEnv.java  |  93 +++--
 .../apache/iotdb/it/env/AbstractNodeWrapper.java   |   2 +
 .../org/apache/iotdb/it/env/ConfigNodeWrapper.java |  11 +-
 .../org/apache/iotdb/it/env/DataNodeWrapper.java   |   5 +
 .../org/apache/iotdb/it/env/RemoteServerEnv.java   |  29 +-
 .../org/apache/iotdb/itbase/env/BaseConfig.java    |   2 +-
 .../java/org/apache/iotdb/itbase/env/BaseEnv.java  |  24 +-
 .../confignode/it/IoTDBSnapshotTransferIT.java     |   2 +-
 .../it/cluster/IoTDBClusterNodeErrorStartUpIT.java | 305 +++++++++++++++
 .../IoTDBClusterNodeGetterIT.java}                 |  15 +-
 .../it/{ => cluster}/IoTDBClusterRestartIT.java    |   2 +-
 .../IoTDBClusterRegionLeaderBalancingIT.java       |   4 +-
 .../{ => load}/IoTDBConfigNodeSwitchLeaderIT.java  |   2 +-
 .../it/partition/IoTDBPartitionDurableIT.java      |   4 +-
 .../confignode/it/utils/ConfigNodeTestUtils.java   |  82 ++++
 .../org/apache/iotdb/db/it/env/StandaloneEnv.java  |  29 +-
 .../iotdb/db/integration/IoTDBCheckConfigIT.java   |   6 +-
 .../org/apache/iotdb/commons/cluster/NodeType.java |  30 +-
 .../apache/iotdb/commons/conf/IoTDBConstant.java   |   2 +
 .../apache/iotdb/db/client/ConfigNodeClient.java   |  35 +-
 .../org/apache/iotdb/db/client/ConfigNodeInfo.java |   6 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  17 +-
 .../org/apache/iotdb/db/conf/IoTDBStartCheck.java  |  14 +-
 .../java/org/apache/iotdb/db/service/DataNode.java | 411 ++++++++++++++-------
 .../db/service/DataNodeServerCommandLine.java      |  18 +-
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   5 +-
 .../src/main/thrift/confignode.thrift              |  78 ++--
 42 files changed, 1713 insertions(+), 463 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
index d9ee56ae25..648170cabc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
@@ -23,6 +23,7 @@ public enum ConfigNodeRequestType {
   ADD_CONSENSUS_GROUP,
   NOTIFY_REGISTER_SUCCESS,
   REGISTER_CONFIG_NODE,
+  RESTART_CONFIG_NODE,
   REMOVE_CONFIG_NODE,
   DELETE_CONFIG_NODE_PEER,
   STOP_CONFIG_NODE;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
index 2e0b2c4379..cb8e62b595 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
 import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
 import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -79,6 +80,8 @@ public class SyncConfigNodeClientPool {
           case NOTIFY_REGISTER_SUCCESS:
             client.notifyRegisterSuccess();
             return null;
+          case RESTART_CONFIG_NODE:
+            return client.restartConfigNode((TConfigNodeRestartReq) req);
           case REMOVE_CONFIG_NODE:
             return removeConfigNode((TConfigNodeLocation) req, client);
           case DELETE_CONFIG_NODE_PEER:
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 9db28f9472..2040134b51 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -31,9 +31,10 @@ import java.io.File;
 
 public class ConfigNodeConfig {
 
-  /**
-   * the config node id for cluster mode, the default value -1 should be changed after join cluster
-   */
+  /** ClusterId, the default value "testCluster" will be changed after join cluster */
+  private volatile String clusterName = "testCluster";
+
+  /** ConfigNodeId, the default value -1 will be changed after join cluster */
   private volatile int configNodeId = -1;
 
   /** could set ip or hostname */
@@ -296,6 +297,14 @@ public class ConfigNodeConfig {
     return dir;
   }
 
+  public String getClusterName() {
+    return clusterName;
+  }
+
+  public void setClusterName(String clusterName) {
+    this.clusterName = clusterName;
+  }
+
   public int getConfigNodeId() {
     return configNodeId;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 6a88c0c9f9..c78f69cb39 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -780,6 +780,8 @@ public class ConfigNodeDescriptor {
   /**
    * Check if the current ConfigNode is SeedConfigNode.
    *
+   * <p>Notice: Only invoke this interface when first startup.
+   *
    * @return True if the target_config_node_list points to itself
    */
   public boolean isSeedConfigNode() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
index 00cf0a46d4..708b06f9a5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
@@ -196,7 +196,12 @@ public class SystemPropertiesUtils {
   public static void storeSystemParameters() throws IOException {
     Properties systemProperties = getSystemProperties();
 
+    // Cluster configuration
+    systemProperties.setProperty("cluster_name", conf.getClusterName());
     systemProperties.setProperty("config_node_id", String.valueOf(conf.getConfigNodeId()));
+    systemProperties.setProperty(
+        "is_seed_config_node",
+        String.valueOf(ConfigNodeDescriptor.getInstance().isSeedConfigNode()));
 
     // Startup configuration
     systemProperties.setProperty("cn_internal_address", String.valueOf(conf.getInternalAddress()));
@@ -244,6 +249,24 @@ public class SystemPropertiesUtils {
     storeSystemProperties(systemProperties);
   }
 
+  /**
+   * Load the cluster_name in confignode-system.properties file. We only invoke this interface when
+   * restarted.
+   *
+   * @return The property of cluster_name in confignode-system.properties file
+   * @throws IOException When load confignode-system.properties file failed
+   */
+  public static String loadClusterNameWhenRestarted() throws IOException {
+    Properties systemProperties = getSystemProperties();
+    String clusterName = systemProperties.getProperty("cluster_name", null);
+    if (clusterName == null) {
+      throw new IOException(
+          "The parameter cluster_name doesn't exist in data/confignode/system/confignode-system.properties. "
+              + "Please delete data dir data/confignode and restart again");
+    }
+    return clusterName;
+  }
+
   /**
    * Load the config_node_id in confignode-system.properties file. We only invoke this interface
    * when restarted.
@@ -257,7 +280,24 @@ public class SystemPropertiesUtils {
       return Integer.parseInt(systemProperties.getProperty("config_node_id", null));
     } catch (NumberFormatException e) {
       throw new IOException(
-          "The parameter config_node_id doesn't exist in confignode-system.properties");
+          "The parameter config_node_id doesn't exist in data/confignode/system/confignode-system.properties. "
+              + "Please delete data dir data/confignode and restart again.");
+    }
+  }
+
+  /**
+   * Check if the current ConfigNode is SeedConfigNode.
+   *
+   * <p>Notice: Only invoke this interface when restarted.
+   *
+   * @return True if the is_seed_config_node is set to True in iotdb-confignode.properties file
+   */
+  public static boolean isSeedConfigNode() {
+    try {
+      Properties systemProperties = getSystemProperties();
+      return Boolean.parseBoolean(systemProperties.getProperty("is_seed_config_node", null));
+    } catch (IOException ignore) {
+      return false;
     }
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/ConfigurationResp.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/ConfigurationResp.java
index 70e8af6df6..3353411fe4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/ConfigurationResp.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/ConfigurationResp.java
@@ -20,9 +20,9 @@ package org.apache.iotdb.confignode.consensus.response;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.confignode.rpc.thrift.TCQConfig;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigurationResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
 import org.apache.iotdb.confignode.rpc.thrift.TRatisConfig;
+import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -53,8 +53,8 @@ public class ConfigurationResp implements DataSet {
     this.cqConfig = cqConfig;
   }
 
-  public TConfigurationResp convertToRpcDataNodeRegisterResp() {
-    TConfigurationResp resp = new TConfigurationResp();
+  public TSystemConfigurationResp convertToRpcSystemConfigurationResp() {
+    TSystemConfigurationResp resp = new TSystemConfigurationResp();
     resp.setStatus(status);
     if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       resp.setGlobalConfig(globalConfig);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeRegisterResp.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeRegisterResp.java
index 0a90903f35..e3ab67070c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeRegisterResp.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeRegisterResp.java
@@ -21,13 +21,13 @@ package org.apache.iotdb.confignode.consensus.response;
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRuntimeConfiguration;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 
@@ -35,12 +35,11 @@ public class DataNodeRegisterResp implements DataSet {
 
   private TSStatus status;
   private List<TConfigNodeLocation> configNodeList;
+
+  private String clusterName;
   private Integer dataNodeId;
-  private byte[] templateInfo;
-  private List<ByteBuffer> allTriggerInformation;
-  private List<ByteBuffer> allUDFInformation;
 
-  private byte[] allTTLInformation;
+  private TRuntimeConfiguration runtimeConfiguration;
 
   public DataNodeRegisterResp() {
     this.dataNodeId = null;
@@ -58,27 +57,19 @@ public class DataNodeRegisterResp implements DataSet {
     this.configNodeList = configNodeList;
   }
 
-  public void setDataNodeId(Integer dataNodeId) {
-    this.dataNodeId = dataNodeId;
-  }
-
-  public void setTemplateInfo(byte[] templateInfo) {
-    this.templateInfo = templateInfo;
+  public void setClusterName(String clusterName) {
+    this.clusterName = clusterName;
   }
 
-  public List<ByteBuffer> getTriggerInformation() {
-    return allTriggerInformation;
-  }
-
-  public void setTriggerInformation(List<ByteBuffer> triggerInformation) {
-    this.allTriggerInformation = triggerInformation;
+  public void setDataNodeId(Integer dataNodeId) {
+    this.dataNodeId = dataNodeId;
   }
 
-  public void setAllUDFInformation(List<ByteBuffer> allUDFInformation) {
-    this.allUDFInformation = allUDFInformation;
+  public void setRuntimeConfiguration(TRuntimeConfiguration runtimeConfiguration) {
+    this.runtimeConfiguration = runtimeConfiguration;
   }
 
-  public void setAllTTLInformation(Map<String, Long> allTTLInformation) {
+  public static byte[] convertAllTTLInformation(Map<String, Long> allTTLInformation) {
     ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
     try {
       ReadWriteIOUtils.write(allTTLInformation.size(), outputStream);
@@ -87,8 +78,9 @@ public class DataNodeRegisterResp implements DataSet {
         ReadWriteIOUtils.write(entry.getValue(), outputStream);
       }
     } catch (IOException ignored) {
+      // Normally, this line will never reach
     }
-    this.allTTLInformation = outputStream.toByteArray();
+    return outputStream.toByteArray();
   }
 
   public TDataNodeRegisterResp convertToRpcDataNodeRegisterResp() {
@@ -96,14 +88,10 @@ public class DataNodeRegisterResp implements DataSet {
     resp.setStatus(status);
     resp.setConfigNodeList(configNodeList);
 
-    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
-        || status.getCode() == TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode()
-        || status.getCode() == TSStatusCode.DATANODE_NOT_EXIST.getStatusCode()) {
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      resp.setClusterName(clusterName);
       resp.setDataNodeId(dataNodeId);
-      resp.setTemplateInfo(templateInfo);
-      resp.setAllTriggerInformation(allTriggerInformation);
-      resp.setAllUDFInformation(allUDFInformation);
-      resp.setAllTTLInformation(allTTLInformation);
+      resp.setRuntimeConfiguration(runtimeConfiguration);
     }
 
     return resp;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 439cac9131..4d8d0990c0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.NodeType;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IllegalPathException;
@@ -40,6 +41,7 @@ import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
 import org.apache.iotdb.confignode.consensus.request.auth.AuthorPlan;
 import org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan;
 import org.apache.iotdb.confignode.consensus.request.read.partition.GetDataPartitionPlan;
@@ -80,6 +82,7 @@ import org.apache.iotdb.confignode.consensus.response.TemplateSetInfoResp;
 import org.apache.iotdb.confignode.consensus.statemachine.ConfigNodeRegionStateMachine;
 import org.apache.iotdb.confignode.manager.cq.CQManager;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
+import org.apache.iotdb.confignode.manager.node.ClusterNodeStartUtils;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.confignode.persistence.AuthorInfo;
@@ -94,11 +97,14 @@ import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
 import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDeactivateSchemaTemplateReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
@@ -256,42 +262,65 @@ public class ConfigManager implements IManager {
   }
 
   @Override
-  public DataSet registerDataNode(RegisterDataNodePlan registerDataNodePlan) {
+  public DataSet getSystemConfiguration() {
     TSStatus status = confirmLeader();
-    DataNodeRegisterResp dataSet;
-    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      triggerManager.getTriggerInfo().acquireTriggerTableLock();
-      udfManager.getUdfInfo().acquireUDFTableLock();
-      try {
-        dataSet = (DataNodeRegisterResp) nodeManager.registerDataNode(registerDataNodePlan);
-        dataSet.setTemplateInfo(clusterSchemaManager.getAllTemplateSetInfo());
-        dataSet.setTriggerInformation(
-            triggerManager.getTriggerTable(false).getAllTriggerInformation());
-        dataSet.setAllUDFInformation(udfManager.getUDFTable().getAllUDFInformation());
-        dataSet.setAllTTLInformation(clusterSchemaManager.getAllTTLInfo());
-      } finally {
-        triggerManager.getTriggerInfo().releaseTriggerTableLock();
-        udfManager.getUdfInfo().releaseUDFTableLock();
-      }
+    ConfigurationResp dataSet;
+    // Notice: The Seed-ConfigNode must also have the privilege to give system configuration.
+    // Otherwise, the IoTDB-cluster will not have the ability to restart from scratch.
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        || ConfigNodeDescriptor.getInstance().isSeedConfigNode()
+        || SystemPropertiesUtils.isSeedConfigNode()) {
+      dataSet = (ConfigurationResp) nodeManager.getSystemConfiguration();
     } else {
-      dataSet = new DataNodeRegisterResp();
+      dataSet = new ConfigurationResp();
       dataSet.setStatus(status);
-      dataSet.setConfigNodeList(nodeManager.getRegisteredConfigNodes());
     }
     return dataSet;
   }
 
   @Override
-  public DataSet getConfiguration() {
+  public DataSet registerDataNode(RegisterDataNodePlan registerDataNodePlan) {
     TSStatus status = confirmLeader();
-    ConfigurationResp dataSet;
     if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      dataSet = (ConfigurationResp) nodeManager.getConfiguration();
-    } else {
-      dataSet = new ConfigurationResp();
-      dataSet.setStatus(status);
+      status =
+          ClusterNodeStartUtils.confirmNodeRegistration(
+              NodeType.DataNode,
+              registerDataNodePlan.getDataNodeConfiguration().getLocation(),
+              this);
+      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return nodeManager.registerDataNode(registerDataNodePlan);
+      }
     }
-    return dataSet;
+
+    DataNodeRegisterResp resp = new DataNodeRegisterResp();
+    resp.setStatus(status);
+    resp.setConfigNodeList(getNodeManager().getRegisteredConfigNodes());
+    return resp;
+  }
+
+  @Override
+  public TDataNodeRestartResp restartDataNode(TDataNodeRestartReq req) {
+    TSStatus status = confirmLeader();
+    // Notice: The Seed-ConfigNode must also have the privilege to do Node restart check.
+    // Otherwise, the IoTDB-cluster will not have the ability to restart from scratch.
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        || ConfigNodeDescriptor.getInstance().isSeedConfigNode()
+        || SystemPropertiesUtils.isSeedConfigNode()) {
+      status =
+          ClusterNodeStartUtils.confirmNodeRestart(
+              NodeType.DataNode,
+              req.getClusterName(),
+              req.getDataNodeConfiguration().getLocation().getDataNodeId(),
+              req.getDataNodeConfiguration().getLocation(),
+              this);
+      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return nodeManager.restartDataNode(req.getDataNodeConfiguration().getLocation());
+      }
+    }
+
+    return new TDataNodeRestartResp()
+        .setStatus(status)
+        .setConfigNodeList(getNodeManager().getRegisteredConfigNodes());
   }
 
   @Override
@@ -311,15 +340,7 @@ public class ConfigManager implements IManager {
     TSStatus status = confirmLeader();
     DataNodeRegisterResp dataSet;
     if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      triggerManager.getTriggerInfo().acquireTriggerTableLock();
-      try {
-        dataSet = (DataNodeRegisterResp) nodeManager.updateDataNode(updateDataNodePlan);
-        dataSet.setTemplateInfo(clusterSchemaManager.getAllTemplateSetInfo());
-        dataSet.setTriggerInformation(
-            triggerManager.getTriggerTable(false).getAllTriggerInformation());
-      } finally {
-        triggerManager.getTriggerInfo().releaseTriggerTableLock();
-      }
+      dataSet = (DataNodeRegisterResp) nodeManager.updateDataNode(updateDataNodePlan);
     } else {
       dataSet = new DataNodeRegisterResp();
       dataSet.setStatus(status);
@@ -663,6 +684,13 @@ public class ConfigManager implements IManager {
   }
 
   private TSStatus confirmLeader() {
+    // Make sure the consensus layer has been initialized
+    if (getConsensusManager() == null) {
+      return new TSStatus(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode())
+          .setMessage(
+              "ConsensusManager of target-ConfigNode is not initialized, "
+                  + "please make sure the target-ConfigNode has been started successfully.");
+    }
     return getConsensusManager().confirmLeader();
   }
 
@@ -750,7 +778,45 @@ public class ConfigManager implements IManager {
 
   @Override
   public TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq req) {
-    return nodeManager.registerConfigNode(req);
+    final int ERROR_STATUS_NODE_ID = -1;
+
+    TSStatus status = confirmLeader();
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      // Make sure the global configurations are consist
+      status = checkConfigNodeGlobalConfig(req);
+      if (status == null) {
+        status =
+            ClusterNodeStartUtils.confirmNodeRegistration(
+                NodeType.ConfigNode, req.getConfigNodeLocation(), this);
+        if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          return nodeManager.registerConfigNode(req);
+        }
+      }
+    }
+
+    return new TConfigNodeRegisterResp().setStatus(status).setConfigNodeId(ERROR_STATUS_NODE_ID);
+  }
+
+  @Override
+  public TSStatus restartConfigNode(TConfigNodeRestartReq req) {
+    TSStatus status = confirmLeader();
+    // Notice: The Seed-ConfigNode must also have the privilege to do Node restart check.
+    // Otherwise, the IoTDB-cluster will not have the ability to restart from scratch.
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        || ConfigNodeDescriptor.getInstance().isSeedConfigNode()
+        || SystemPropertiesUtils.isSeedConfigNode()) {
+      status =
+          ClusterNodeStartUtils.confirmNodeRestart(
+              NodeType.ConfigNode,
+              req.getClusterName(),
+              req.getConfigNodeLocation().getConfigNodeId(),
+              req.getConfigNodeLocation(),
+              this);
+      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return nodeManager.restartConfigNode(req.getConfigNodeLocation());
+      }
+    }
+    return status;
   }
 
   public TSStatus checkConfigNodeGlobalConfig(TConfigNodeRegisterReq req) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index 8d34ca5bcc..884db7d765 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -182,7 +182,7 @@ public class ConsensusManager {
     }
     consensusImpl.start();
     if (SystemPropertiesUtils.isRestarted()) {
-      // TODO: Check and notify if current ConfigNode's ip or port has changed
+      // TODO: @Itami-Sho Check and notify if current ConfigNode's ip or port has changed
 
       if (SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass())) {
         // Only SIMPLE_CONSENSUS need invoking `createPeerForConsensusGroup` when restarted,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 5079545cf0..4a1534b88b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -51,11 +51,14 @@ import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDeactivateSchemaTemplateReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
@@ -171,6 +174,13 @@ public interface IManager {
    */
   CQManager getCQManager();
 
+  /**
+   * Get system configurations that is not associated with the DataNodeId
+   *
+   * @return SystemConfigurationResp
+   */
+  DataSet getSystemConfiguration();
+
   /**
    * Register DataNode
    *
@@ -179,11 +189,12 @@ public interface IManager {
   DataSet registerDataNode(RegisterDataNodePlan registerDataNodePlan);
 
   /**
-   * Get configuration information that is not associated with the DataNodeId
+   * Restart DataNode
    *
-   * @return ConfigurationResp
+   * @param req TDataNodeRestartReq
+   * @return SUCCESS_STATUS if allow DataNode to restart, REJECT_START otherwise
    */
-  DataSet getConfiguration();
+  TDataNodeRestartResp restartDataNode(TDataNodeRestartReq req);
 
   /**
    * Remove DataNode
@@ -318,6 +329,8 @@ public interface IManager {
    */
   TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq req);
 
+  TSStatus restartConfigNode(TConfigNodeRestartReq req);
+
   /**
    * Create peer in new node to build consensus group.
    *
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/ClusterNodeStartUtils.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/ClusterNodeStartUtils.java
new file mode 100644
index 0000000000..fde4479c4b
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/ClusterNodeStartUtils.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.node;
+
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.NodeType;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/** Startup check utils before register/restart a ConfigNode/DataNode */
+public class ClusterNodeStartUtils {
+
+  private static final String CLUSTER_NAME =
+      ConfigNodeDescriptor.getInstance().getConf().getClusterName();
+
+  private static final String POSSIBLE_SOLUTIONS = " Possible solutions are as follows:\r\n";
+
+  public static final TSStatus ACCEPT_NODE_REGISTRATION =
+      new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())
+          .setMessage("Accept Node registration.");
+  public static final TSStatus ACCEPT_NODE_RESTART =
+      new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()).setMessage("Accept Node restart.");
+
+  public static TSStatus confirmNodeRegistration(
+      NodeType nodeType, Object nodeLocation, ConfigManager configManager) {
+
+    final String CONF_FILE_NAME =
+        NodeType.ConfigNode.equals(nodeType)
+            ? ConfigNodeConstant.CONF_FILE_NAME
+            : IoTDBConstant.DATA_NODE_CONF_FILE_NAME;
+    TSStatus status = new TSStatus();
+
+    /* Check if there exist conflict TEndPoints */
+    List<TEndPoint> conflictEndPoints;
+    switch (nodeType) {
+      case ConfigNode:
+        conflictEndPoints =
+            checkConflictTEndPointForNewConfigNode(
+                (TConfigNodeLocation) nodeLocation,
+                configManager.getNodeManager().getRegisteredConfigNodes());
+        break;
+      case DataNode:
+      default:
+        conflictEndPoints =
+            checkConflictTEndPointForNewDataNode(
+                (TDataNodeLocation) nodeLocation,
+                configManager.getNodeManager().getRegisteredDataNodes());
+        break;
+    }
+
+    if (!conflictEndPoints.isEmpty()) {
+      /* Reject Node registration because there exist conflict TEndPoints */
+      status.setCode(TSStatusCode.REJECT_NODE_START.getStatusCode());
+      status.setMessage(
+          String.format(
+              "Reject %s registration. Because the following ip:port: %s of the current %s is conflicted with other registered Nodes in the cluster."
+                  + POSSIBLE_SOLUTIONS
+                  + "\t1. Use SQL: \"show cluster details\" to find out the conflict Nodes. Remove them and retry start."
+                  + "\n\t2. Change the conflict ip:port configurations in %s file and retry start.",
+              nodeType.getNodeType(),
+              conflictEndPoints,
+              nodeType.getNodeType(),
+              CONF_FILE_NAME));
+      return status;
+    } else {
+      /* Accept registration if all TEndPoints aren't conflict */
+      return ACCEPT_NODE_REGISTRATION;
+    }
+  }
+
+  public static TSStatus confirmNodeRestart(
+      NodeType nodeType,
+      String clusterName,
+      int nodeId,
+      Object nodeLocation,
+      ConfigManager configManager) {
+
+    final String CONF_FILE_NAME =
+        NodeType.ConfigNode.equals(nodeType)
+            ? ConfigNodeConstant.CONF_FILE_NAME
+            : IoTDBConstant.DATA_NODE_CONF_FILE_NAME;
+    TSStatus status = new TSStatus();
+
+    /* Reject restart if the cluster name is error */
+    if (!CLUSTER_NAME.equals(clusterName)) {
+      status.setCode(TSStatusCode.REJECT_NODE_START.getStatusCode());
+      status.setMessage(
+          String.format(
+              "Reject %s restart. Because the ClusterName of the current %s and the target cluster are inconsistent. "
+                  + "ClusterName of the current Node: %s, ClusterName of the target cluster: %s."
+                  + POSSIBLE_SOLUTIONS
+                  + "\t1. Change the target_config_node_list parameter in %s to join the correct cluster.",
+              nodeType.getNodeType(),
+              nodeType.getNodeType(),
+              clusterName,
+              CLUSTER_NAME,
+              CONF_FILE_NAME));
+      return status;
+    }
+
+    /* Reject restart if the nodeId is error */
+    if (nodeId < 0) {
+      status.setCode(TSStatusCode.REJECT_NODE_START.getStatusCode());
+      status.setMessage(
+          String.format(
+              "Reject %s restart. Because the nodeId of the current %s is %d."
+                  + POSSIBLE_SOLUTIONS
+                  + "\t1. Delete \"data\" dir and retry.",
+              nodeType.getNodeType(),
+              nodeType.getNodeType(),
+              nodeId));
+      return status;
+    }
+
+    Object matchedNodeLocation;
+    switch (nodeType) {
+      case ConfigNode:
+        matchedNodeLocation =
+            matchRegisteredConfigNode(
+                (TConfigNodeLocation) nodeLocation,
+                configManager.getNodeManager().getRegisteredConfigNodes());
+        break;
+      case DataNode:
+      default:
+        matchedNodeLocation =
+            matchRegisteredDataNode(
+                (TDataNodeLocation) nodeLocation,
+                configManager.getNodeManager().getRegisteredDataNodes());
+        break;
+    }
+
+    /* Reject restart because there are no corresponding Node in the cluster */
+    if (matchedNodeLocation == null) {
+      status.setCode(TSStatusCode.REJECT_NODE_START.getStatusCode());
+      status.setMessage(
+          String.format(
+              "Reject %s restart. Because there are no corresponding %s(whose nodeId=%d) in the cluster."
+                  + POSSIBLE_SOLUTIONS
+                  + "\t1. Maybe you've already removed the current %s(whose nodeId=%d). Please delete the useless 'data' dir and retry start.",
+              nodeType.getNodeType(),
+              nodeType.getNodeType(),
+              nodeId,
+              nodeType.getNodeType(),
+              nodeId));
+      return status;
+    }
+
+    boolean isNodeAlive;
+    NodeStatus nodeStatus = configManager.getNodeManager().getNodeStatusByNodeId(nodeId);
+    isNodeAlive = nodeStatus != null && !NodeStatus.Unknown.equals(nodeStatus);
+    if (isNodeAlive) {
+      /* Reject restart because the Node is still alive */
+      status.setCode(TSStatusCode.REJECT_NODE_START.getStatusCode());
+      status.setMessage(
+          String.format(
+              "Reject %s restart. Because there already exists an alive Node with the same nodeId=%d in the target cluster."
+                  + POSSIBLE_SOLUTIONS
+                  + "\t1. Maybe you've just shutdown this Node recently. Please wait about %s for the ConfigNode-leader to mark this Node as Unknown before retry start. You can use SQL \"show cluster details\" to find out this Node's status."
+                  + "\n\t2. Maybe you start this Node by copying the 'data' dir of another alive Node. Please delete 'data' dir and retry start.",
+              nodeType.getNodeType(),
+              nodeId,
+              (BaseNodeCache.HEARTBEAT_TIMEOUT_TIME / 1000) + "s"));
+      return status;
+    }
+
+    boolean isTEndPointUpdated;
+    switch (nodeType) {
+      case ConfigNode:
+        isTEndPointUpdated =
+            isTEndPointsOfTConfigNodeLocationUpdated(
+                (TConfigNodeLocation) nodeLocation, (TConfigNodeLocation) matchedNodeLocation);
+        break;
+      case DataNode:
+      default:
+        isTEndPointUpdated =
+            isTEndPointsOfTDataNodeLocationUpdated(
+                (TDataNodeLocation) nodeLocation, (TDataNodeLocation) matchedNodeLocation);
+        break;
+    }
+
+    if (isTEndPointUpdated) {
+      /* Reject restart because some TEndPoints have changed */
+      // TODO: @Itami-Sho, enable this
+      status.setCode(TSStatusCode.REJECT_NODE_START.getStatusCode());
+      status.setMessage(
+          String.format(
+              "Reject %s restart. Because some TEndPoints of this %s have been changed."
+                  + POSSIBLE_SOLUTIONS
+                  + "\t1. Please delete 'data' dir and retry start. This Node will be registered as a new Node, so don't forget to remove the old one.",
+              nodeType.getNodeType(),
+              nodeType.getNodeType()));
+      return status;
+    } else {
+      /* Accept Node restart */
+      return ACCEPT_NODE_RESTART;
+    }
+  }
+
+  /**
+   * Check if there exist conflict TEndPoints on the ConfigNode to be registered
+   *
+   * @param newConfigNodeLocation The TConfigNode of the ConfigNode to be registered
+   * @param registeredConfigNodes All registered ConfigNodes
+   * @return The conflict TEndPoints if exist
+   */
+  public static List<TEndPoint> checkConflictTEndPointForNewConfigNode(
+      TConfigNodeLocation newConfigNodeLocation, List<TConfigNodeLocation> registeredConfigNodes) {
+    Set<TEndPoint> conflictEndPointSet = new HashSet<>();
+    for (TConfigNodeLocation registeredConfigNode : registeredConfigNodes) {
+      if (registeredConfigNode
+          .getInternalEndPoint()
+          .equals(newConfigNodeLocation.getInternalEndPoint())) {
+        conflictEndPointSet.add(newConfigNodeLocation.getInternalEndPoint());
+      }
+      if (registeredConfigNode
+          .getConsensusEndPoint()
+          .equals(newConfigNodeLocation.getConsensusEndPoint())) {
+        conflictEndPointSet.add(newConfigNodeLocation.getConsensusEndPoint());
+      }
+    }
+
+    return new ArrayList<>(conflictEndPointSet);
+  }
+
+  /**
+   * Check if there exist conflict TEndPoints on the DataNode to be registered
+   *
+   * @param newDataNodeLocation The TDataNodeLocation of the DataNode to be registered
+   * @param registeredDataNodes All registered DataNodes
+   * @return The conflict TEndPoints if exist
+   */
+  public static List<TEndPoint> checkConflictTEndPointForNewDataNode(
+      TDataNodeLocation newDataNodeLocation, List<TDataNodeConfiguration> registeredDataNodes) {
+    Set<TEndPoint> conflictEndPointSet = new HashSet<>();
+    for (TDataNodeConfiguration registeredDataNode : registeredDataNodes) {
+      TDataNodeLocation registeredLocation = registeredDataNode.getLocation();
+
+      if (registeredLocation
+          .getClientRpcEndPoint()
+          .equals(newDataNodeLocation.getClientRpcEndPoint())) {
+        conflictEndPointSet.add(newDataNodeLocation.getClientRpcEndPoint());
+      }
+      if (registeredLocation
+          .getInternalEndPoint()
+          .equals(newDataNodeLocation.getInternalEndPoint())) {
+        conflictEndPointSet.add(newDataNodeLocation.getInternalEndPoint());
+      }
+      if (registeredLocation
+          .getMPPDataExchangeEndPoint()
+          .equals(newDataNodeLocation.getMPPDataExchangeEndPoint())) {
+        conflictEndPointSet.add(newDataNodeLocation.getMPPDataExchangeEndPoint());
+      }
+      if (registeredLocation
+          .getSchemaRegionConsensusEndPoint()
+          .equals(newDataNodeLocation.getSchemaRegionConsensusEndPoint())) {
+        conflictEndPointSet.add(newDataNodeLocation.getSchemaRegionConsensusEndPoint());
+      }
+      if (registeredLocation
+          .getDataRegionConsensusEndPoint()
+          .equals(newDataNodeLocation.getDataRegionConsensusEndPoint())) {
+        conflictEndPointSet.add(newDataNodeLocation.getDataRegionConsensusEndPoint());
+      }
+    }
+
+    return new ArrayList<>(conflictEndPointSet);
+  }
+
+  /**
+   * Check if there exists a registered ConfigNode who has the same index of the given one.
+   *
+   * @param configNodeLocation The given ConfigNode
+   * @param registeredConfigNodes Registered ConfigNodes
+   * @return The ConfigNodeLocation who has the same index of the given one, null otherwise.
+   */
+  public static TConfigNodeLocation matchRegisteredConfigNode(
+      TConfigNodeLocation configNodeLocation, List<TConfigNodeLocation> registeredConfigNodes) {
+    for (TConfigNodeLocation registeredConfigNode : registeredConfigNodes) {
+      if (registeredConfigNode.getConfigNodeId() == configNodeLocation.getConfigNodeId()) {
+        return registeredConfigNode;
+      }
+    }
+
+    return null;
+  }
+
+  /**
+   * Check if there exists a registered DataNode who has the same index of the given one.
+   *
+   * @param dataNodeLocation The given DataNode
+   * @param registeredDataNodes Registered DataNodes
+   * @return The DataNodeLocation who has the same index of the given one, null otherwise.
+   */
+  public static TDataNodeLocation matchRegisteredDataNode(
+      TDataNodeLocation dataNodeLocation, List<TDataNodeConfiguration> registeredDataNodes) {
+    for (TDataNodeConfiguration registeredDataNode : registeredDataNodes) {
+      if (registeredDataNode.getLocation().getDataNodeId() == dataNodeLocation.getDataNodeId()) {
+        return registeredDataNode.getLocation();
+      }
+    }
+
+    return null;
+  }
+
+  /**
+   * Check if some TEndPoints of the specified ConfigNode have updated.
+   *
+   * @return True if some TEndPoints of the specified ConfigNode have updated, false otherwise.
+   */
+  public static boolean isTEndPointsOfTConfigNodeLocationUpdated(
+      TConfigNodeLocation configNodeLocationA, TConfigNodeLocation configNodeLocationB) {
+    if (!configNodeLocationA
+        .getInternalEndPoint()
+        .equals(configNodeLocationB.getInternalEndPoint())) {
+      return true;
+    }
+    return !configNodeLocationA
+        .getConsensusEndPoint()
+        .equals(configNodeLocationB.getConsensusEndPoint());
+  }
+
+  /**
+   * Check if some TEndPoints of the specified DataNode have updated.
+   *
+   * @return True if some TEndPoints of the specified DataNode have updated, false otherwise.
+   */
+  public static boolean isTEndPointsOfTDataNodeLocationUpdated(
+      TDataNodeLocation dataNodeLocationA, TDataNodeLocation dataNodeLocationB) {
+    if (!dataNodeLocationA
+        .getClientRpcEndPoint()
+        .equals(dataNodeLocationB.getClientRpcEndPoint())) {
+      return true;
+    }
+    if (!dataNodeLocationA.getInternalEndPoint().equals(dataNodeLocationB.getInternalEndPoint())) {
+      return true;
+    }
+    if (!dataNodeLocationA
+        .getMPPDataExchangeEndPoint()
+        .equals(dataNodeLocationB.getMPPDataExchangeEndPoint())) {
+      return true;
+    }
+    if (!dataNodeLocationA
+        .getSchemaRegionConsensusEndPoint()
+        .equals(dataNodeLocationB.getSchemaRegionConsensusEndPoint())) {
+      return true;
+    }
+    return !dataNodeLocationA
+        .getDataRegionConsensusEndPoint()
+        .equals(dataNodeLocationB.getDataRegionConsensusEndPoint());
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 396207114e..c64e374863 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
-import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
 import org.apache.iotdb.confignode.client.async.AsyncConfigNodeHeartbeatClientPool;
 import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
@@ -56,6 +55,8 @@ import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.manager.ConsensusManager;
 import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.TriggerManager;
+import org.apache.iotdb.confignode.manager.UDFManager;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache;
 import org.apache.iotdb.confignode.manager.node.heartbeat.ConfigNodeHeartbeatCache;
@@ -68,8 +69,10 @@ import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
 import org.apache.iotdb.confignode.rpc.thrift.TRatisConfig;
+import org.apache.iotdb.confignode.rpc.thrift.TRuntimeConfiguration;
 import org.apache.iotdb.confignode.rpc.thrift.TSetDataNodeStatusReq;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
@@ -110,9 +113,6 @@ public class NodeManager {
   private static final long UNKNOWN_DATANODE_DETECT_INTERVAL =
       CONF.getUnknownDataNodeDetectInterval();
 
-  // when fail to register a new node, set node id to -1
-  private static final int ERROR_STATUS_NODE_ID = -1;
-
   private final IManager configManager;
   private final NodeInfo nodeInfo;
 
@@ -146,6 +146,20 @@ public class NodeManager {
     this.random = new Random(System.currentTimeMillis());
   }
 
+  /**
+   * Get system configurations
+   *
+   * @return ConfigurationResp. The TSStatus will be set to SUCCESS_STATUS.
+   */
+  public DataSet getSystemConfiguration() {
+    ConfigurationResp dataSet = new ConfigurationResp();
+    dataSet.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+    setGlobalConfig(dataSet);
+    setRatisConfig(dataSet);
+    setCQConfig(dataSet);
+    return dataSet;
+  }
+
   private void setGlobalConfig(ConfigurationResp dataSet) {
     // Set TGlobalConfig
     final ConfigNodeConfig configNodeConfig = ConfigNodeDescriptor.getInstance().getConf();
@@ -225,6 +239,26 @@ public class NodeManager {
     dataSet.setCqConfig(cqConfig);
   }
 
+  private TRuntimeConfiguration getRuntimeConfiguration() {
+    getTriggerManager().getTriggerInfo().acquireTriggerTableLock();
+    getUDFManager().getUdfInfo().acquireUDFTableLock();
+
+    try {
+      TRuntimeConfiguration runtimeConfiguration = new TRuntimeConfiguration();
+      runtimeConfiguration.setTemplateInfo(getClusterSchemaManager().getAllTemplateSetInfo());
+      runtimeConfiguration.setAllTriggerInformation(
+          getTriggerManager().getTriggerTable(false).getAllTriggerInformation());
+      runtimeConfiguration.setAllUDFInformation(
+          getUDFManager().getUDFTable().getAllUDFInformation());
+      runtimeConfiguration.setAllTTLInformation(
+          DataNodeRegisterResp.convertAllTTLInformation(getClusterSchemaManager().getAllTTLInfo()));
+      return runtimeConfiguration;
+    } finally {
+      getTriggerManager().getTriggerInfo().releaseTriggerTableLock();
+      getUDFManager().getUdfInfo().releaseUDFTableLock();
+    }
+  }
+
   /**
    * Register DataNode
    *
@@ -233,51 +267,34 @@ public class NodeManager {
    *     success, and DATANODE_ALREADY_REGISTERED when the DataNode is already exist.
    */
   public DataSet registerDataNode(RegisterDataNodePlan registerDataNodePlan) {
-    DataNodeRegisterResp dataSet = new DataNodeRegisterResp();
-    TSStatus status = new TSStatus();
-
-    if (nodeInfo.isRegisteredDataNode(
-        registerDataNodePlan.getDataNodeConfiguration().getLocation())) {
-      status.setCode(TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode());
-      status.setMessage("DataNode already registered.");
-    } else if (registerDataNodePlan.getDataNodeConfiguration().getLocation().getDataNodeId() < 0) {
-      // Generating a new dataNodeId only when current DataNode doesn't exist yet
-      registerDataNodePlan
-          .getDataNodeConfiguration()
-          .getLocation()
-          .setDataNodeId(nodeInfo.generateNextNodeId());
-      getConsensusManager().write(registerDataNodePlan);
-
-      // Adjust the maximum RegionGroup number of each StorageGroup
-      getClusterSchemaManager().adjustMaxRegionGroupNum();
-
-      status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-      status.setMessage("registerDataNode success.");
-    } else {
-      status.setCode(TSStatusCode.REGISTER_DATANODE_WITH_WRONG_ID.getStatusCode());
-      status.setMessage(
-          "Cannot register datanode with wrong id. Maybe it's already removed, or it has another datanode's run-time properties.");
-    }
-
-    dataSet.setStatus(status);
-    dataSet.setDataNodeId(
+    DataNodeRegisterResp resp = new DataNodeRegisterResp();
+
+    // Register new DataNode
+    registerDataNodePlan
+        .getDataNodeConfiguration()
+        .getLocation()
+        .setDataNodeId(nodeInfo.generateNextNodeId());
+    getConsensusManager().write(registerDataNodePlan);
+
+    // Adjust the maximum RegionGroup number of each StorageGroup
+    getClusterSchemaManager().adjustMaxRegionGroupNum();
+
+    resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION);
+    resp.setConfigNodeList(getRegisteredConfigNodes());
+    resp.setClusterName(CONF.getClusterName());
+    resp.setDataNodeId(
         registerDataNodePlan.getDataNodeConfiguration().getLocation().getDataNodeId());
-    dataSet.setConfigNodeList(getRegisteredConfigNodes());
-    return dataSet;
+    resp.setRuntimeConfiguration(getRuntimeConfiguration());
+    return resp;
   }
 
-  /**
-   * Get configuration
-   *
-   * @return ConfigurationResp. The TSStatus will be set to SUCCESS_STATUS.
-   */
-  public DataSet getConfiguration() {
-    ConfigurationResp dataSet = new ConfigurationResp();
-    dataSet.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
-    setGlobalConfig(dataSet);
-    setRatisConfig(dataSet);
-    setCQConfig(dataSet);
-    return dataSet;
+  public TDataNodeRestartResp restartDataNode(TDataNodeLocation dataNodeLocation) {
+    // TODO: @Itami-Sho update peer if necessary
+    TDataNodeRestartResp resp = new TDataNodeRestartResp();
+    resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART);
+    resp.setConfigNodeList(getRegisteredConfigNodes());
+    resp.setRuntimeConfiguration(getRuntimeConfiguration());
+    return resp;
   }
 
   /**
@@ -374,35 +391,18 @@ public class NodeManager {
   }
 
   public TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq req) {
-    if (configManager.getConsensusManager() == null) {
-      TSStatus errorStatus = new TSStatus(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode());
-      errorStatus.setMessage(
-          "ConsensusManager of target-ConfigNode is not initialized, "
-              + "please make sure the target-ConfigNode has been started successfully.");
-      return new TConfigNodeRegisterResp()
-          .setStatus(errorStatus)
-          .setConfigNodeId(ERROR_STATUS_NODE_ID);
-    }
-
-    // Check global configuration
-    TSStatus status = configManager.getConsensusManager().confirmLeader();
-
-    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      TSStatus errorStatus = configManager.checkConfigNodeGlobalConfig(req);
-      if (errorStatus != null) {
-        return new TConfigNodeRegisterResp()
-            .setStatus(errorStatus)
-            .setConfigNodeId(ERROR_STATUS_NODE_ID);
-      }
-
-      int nodeId = nodeInfo.generateNextNodeId();
-      req.getConfigNodeLocation().setConfigNodeId(nodeId);
-
-      configManager.getProcedureManager().addConfigNode(req);
-      return new TConfigNodeRegisterResp().setStatus(StatusUtils.OK).setConfigNodeId(nodeId);
-    }
+    int nodeId = nodeInfo.generateNextNodeId();
+    req.getConfigNodeLocation().setConfigNodeId(nodeId);
+    configManager.getProcedureManager().addConfigNode(req);
+    return new TConfigNodeRegisterResp()
+        .setStatus(ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION)
+        .setClusterName(CONF.getClusterName())
+        .setConfigNodeId(nodeId);
+  }
 
-    return new TConfigNodeRegisterResp().setStatus(status).setConfigNodeId(ERROR_STATUS_NODE_ID);
+  public TSStatus restartConfigNode(TConfigNodeLocation configNodeLocation) {
+    // TODO: @Itami-Sho, update peer if necessary
+    return ClusterNodeStartUtils.ACCEPT_NODE_RESTART;
   }
 
   /**
@@ -1041,4 +1041,12 @@ public class NodeManager {
   private LoadManager getLoadManager() {
     return configManager.getLoadManager();
   }
+
+  private TriggerManager getTriggerManager() {
+    return configManager.getTriggerManager();
+  }
+
+  private UDFManager getUDFManager() {
+    return configManager.getUDFManager();
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
index c228b4c4aa..bcfe13d61a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.confignode.persistence.node;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
@@ -94,32 +93,6 @@ public class NodeInfo implements SnapshotProcessor {
     this.registeredDataNodes = new ConcurrentHashMap<>();
   }
 
-  /**
-   * Only leader use this interface
-   *
-   * @return True if the specific DataNode already registered, false otherwise
-   */
-  public boolean isRegisteredDataNode(TDataNodeLocation dataNodeLocation) {
-    boolean result = false;
-    int originalDataNodeId = dataNodeLocation.getDataNodeId();
-
-    dataNodeInfoReadWriteLock.readLock().lock();
-    try {
-      for (Map.Entry<Integer, TDataNodeConfiguration> entry : registeredDataNodes.entrySet()) {
-        dataNodeLocation.setDataNodeId(entry.getKey());
-        if (entry.getValue().getLocation().equals(dataNodeLocation)) {
-          result = true;
-          break;
-        }
-      }
-    } finally {
-      dataNodeInfoReadWriteLock.readLock().unlock();
-    }
-
-    dataNodeLocation.setDataNodeId(originalDataNodeId);
-    return result;
-  }
-
   /**
    * Persist DataNode info
    *
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index a061d6c313..ff464ac622 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
 import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCService;
 import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCServiceProcessor;
 import org.apache.iotdb.db.service.metrics.ProcessMetrics;
@@ -59,7 +60,9 @@ public class ConfigNode implements ConfigNodeMBean {
   private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
   private static final CommonConfig COMMON_CONF = CommonDescriptor.getInstance().getConfig();
 
+  private static final int STARTUP_RETRY_NUM = 10;
   private static final int SCHEDULE_WAITING_RETRY_NUM = 20;
+  private static final long STARTUP_RETRY_INTERVAL_IN_MS = TimeUnit.SECONDS.toMillis(30);
 
   private static final int SEED_CONFIG_NODE_ID = 0;
 
@@ -94,12 +97,21 @@ public class ConfigNode implements ConfigNodeMBean {
       /* Restart */
       if (SystemPropertiesUtils.isRestarted()) {
         LOGGER.info("{} is in restarting process...", ConfigNodeConstant.GLOBAL_NAME);
-        /* Always set ConfigNodeId before initConsensusManager */
+
+        /* Always restore ClusterId and ConfigNodeId first */
+        CONF.setClusterName(SystemPropertiesUtils.loadClusterNameWhenRestarted());
         CONF.setConfigNodeId(SystemPropertiesUtils.loadConfigNodeIdWhenRestarted());
+
+        if (!SystemPropertiesUtils.isSeedConfigNode()) {
+          // The non-seed-ConfigNodes should send restart request
+          sendRestartConfigNodeRequest();
+        }
+
         configManager.initConsensusManager();
         setUpRPCService();
         LOGGER.info(
-            "{} has successfully started and joined the cluster.", ConfigNodeConstant.GLOBAL_NAME);
+            "{} has successfully restarted and joined the cluster.",
+            ConfigNodeConstant.GLOBAL_NAME);
         return;
       }
 
@@ -109,7 +121,7 @@ public class ConfigNode implements ConfigNodeMBean {
             "The current {} is now starting as the Seed-ConfigNode.",
             ConfigNodeConstant.GLOBAL_NAME);
 
-        /* Always set ConfigNodeId before initConsensusManager */
+        /* Always set ClusterId and ConfigNodeId before initConsensusManager */
         CONF.setConfigNodeId(SEED_CONFIG_NODE_ID);
         configManager.initConsensusManager();
 
@@ -139,7 +151,7 @@ public class ConfigNode implements ConfigNodeMBean {
       // We set up Non-Seed ConfigNode's RPC service before sending the register request
       // in order to facilitate the scheduling of capacity expansion process in ConfigNode-leader
       setUpRPCService();
-      registerConfigNode();
+      sendRegisterConfigNodeRequest();
       // The initial startup of Non-Seed-ConfigNode is not yet finished,
       // we should wait for leader's scheduling
       LOGGER.info(
@@ -156,7 +168,7 @@ public class ConfigNode implements ConfigNodeMBean {
         }
 
         try {
-          TimeUnit.MILLISECONDS.sleep(1000);
+          TimeUnit.MILLISECONDS.sleep(STARTUP_RETRY_INTERVAL_IN_MS);
         } catch (InterruptedException e) {
           LOGGER.warn("Waiting leader's scheduling is interrupted.");
         }
@@ -216,7 +228,7 @@ public class ConfigNode implements ConfigNodeMBean {
   }
 
   /** Register Non-seed ConfigNode when first startup */
-  private void registerConfigNode() throws StartupException, IOException {
+  private void sendRegisterConfigNodeRequest() throws StartupException, IOException {
     TConfigNodeRegisterReq req =
         new TConfigNodeRegisterReq(
             new TConfigNodeLocation(
@@ -239,11 +251,12 @@ public class ConfigNode implements ConfigNodeMBean {
 
     TEndPoint targetConfigNode = CONF.getTargetConfigNode();
     if (targetConfigNode == null) {
-      LOGGER.error("The targetConfigNode setting in conf is empty");
+      LOGGER.error(
+          "Please set the cn_target_config_node_list parameter in iotdb-confignode.properties file.");
       throw new StartupException("The targetConfigNode setting in conf is empty");
     }
 
-    for (int retry = 0; retry < 3; retry++) {
+    for (int retry = 0; retry < STARTUP_RETRY_NUM; retry++) {
       TSStatus status;
       TConfigNodeRegisterResp resp = null;
       Object obj =
@@ -281,17 +294,58 @@ public class ConfigNode implements ConfigNodeMBean {
       }
 
       try {
-        TimeUnit.MILLISECONDS.sleep(1000);
+        TimeUnit.MILLISECONDS.sleep(STARTUP_RETRY_INTERVAL_IN_MS);
       } catch (InterruptedException e) {
         throw new StartupException("Register ConfigNode failed!");
       }
     }
 
     LOGGER.error(
-        "The current ConfigNode can't send register request to the Seed-ConfigNode after all retries!");
+        "The current ConfigNode can't send register request to the ConfigNode-leader after all retries!");
     stop();
   }
 
+  private void sendRestartConfigNodeRequest() throws IOException, StartupException {
+    TConfigNodeRestartReq req =
+        new TConfigNodeRestartReq(
+            CONF.getClusterName(),
+            new TConfigNodeLocation(
+                CONF.getConfigNodeId(),
+                new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort()),
+                new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort())));
+
+    TEndPoint targetConfigNode = CONF.getTargetConfigNode();
+    if (targetConfigNode == null) {
+      LOGGER.error(
+          "Please set the cn_target_config_node_list parameter in iotdb-confignode.properties file.");
+      throw new StartupException("The targetConfigNode setting in conf is empty");
+    }
+
+    for (int retry = 0; retry < STARTUP_RETRY_NUM; retry++) {
+      TSStatus status =
+          (TSStatus)
+              SyncConfigNodeClientPool.getInstance()
+                  .sendSyncRequestToConfigNodeWithRetry(
+                      targetConfigNode, req, ConfigNodeRequestType.RESTART_CONFIG_NODE);
+
+      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        LOGGER.info("Registration request of current ConfigNode is accepted.");
+        return;
+      } else if (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+        targetConfigNode = status.getRedirectNode();
+        LOGGER.info("ConfigNode need redirect to  {}.", targetConfigNode);
+      } else {
+        throw new StartupException(status.getMessage());
+      }
+
+      try {
+        TimeUnit.MILLISECONDS.sleep(STARTUP_RETRY_INTERVAL_IN_MS);
+      } catch (InterruptedException e) {
+        throw new StartupException("Register ConfigNode failed!");
+      }
+    }
+  }
+
   private void setUpRPCService() throws StartupException {
     // Setup RPCService
     ConfigNodeRPCService configNodeRPCService = new ConfigNodeRPCService();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 4354b492b2..c31911089b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -73,7 +73,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigurationResp;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
@@ -85,6 +85,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeUpdateReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
@@ -140,6 +142,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowStorageGroupResp;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp;
 import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
 import org.apache.iotdb.confignode.service.ConfigNode;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
@@ -177,6 +180,17 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
     return configManager.getConsensusManager();
   }
 
+  @Override
+  public TSystemConfigurationResp getSystemConfiguration() {
+    TSystemConfigurationResp resp =
+        ((ConfigurationResp) configManager.getSystemConfiguration())
+            .convertToRpcSystemConfigurationResp();
+
+    // Print log to record the ConfigNode that performs the GetConfigurationRequest
+    LOGGER.info("Execute GetSystemConfiguration with result {}", resp);
+    return resp;
+  }
+
   @Override
   public TDataNodeRegisterResp registerDataNode(TDataNodeRegisterReq req) {
     TDataNodeRegisterResp resp =
@@ -192,12 +206,12 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
   }
 
   @Override
-  public TConfigurationResp getConfiguration() {
-    TConfigurationResp resp =
-        ((ConfigurationResp) configManager.getConfiguration()).convertToRpcDataNodeRegisterResp();
+  public TDataNodeRestartResp restartDataNode(TDataNodeRestartReq req) {
+    TDataNodeRestartResp resp = configManager.restartDataNode(req);
+
+    // Print log to record the ConfigNode that performs the RestartDatanodeRequest
+    LOGGER.info("Execute RestartDatanodeRequest {} with result {}", req, resp);
 
-    // Print log to record the ConfigNode that performs the GetConfigurationRequest
-    LOGGER.info("Execute GetConfigurationRequest with result {}", resp);
     return resp;
   }
 
@@ -467,6 +481,16 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
     return StatusUtils.OK;
   }
 
+  @Override
+  public TSStatus restartConfigNode(TConfigNodeRestartReq req) {
+    TSStatus status = configManager.restartConfigNode(req);
+
+    // Print log to record the ConfigNode that performs the RegisterConfigNodeRequest
+    LOGGER.info("Execute RestartConfigNodeRequest {} with result {}", req, status);
+
+    return status;
+  }
+
   /** For leader to remove ConfigNode configuration in consensus layer */
   @Override
   public TSStatus removeConfigNode(TConfigNodeLocation configNodeLocation) throws TException {
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
index 855ff1b7e3..b80a32802d 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
@@ -511,14 +511,33 @@ public abstract class AbstractEnv implements BaseEnv {
     configNodeWrapperList.get(index).stop();
   }
 
+  @Override
+  public ConfigNodeWrapper getConfigNodeWrapper(int index) {
+    return configNodeWrapperList.get(index);
+  }
+
   @Override
   public DataNodeWrapper getDataNodeWrapper(int index) {
     return dataNodeWrapperList.get(index);
   }
 
   @Override
-  public void registerNewDataNode() {
-    // Config new DataNode
+  public ConfigNodeWrapper generateRandomConfigNodeWrapper() {
+    ConfigNodeWrapper newConfigNodeWrapper =
+        new ConfigNodeWrapper(
+            false,
+            configNodeWrapperList.get(0).getIpAndPortString(),
+            getTestClassName(),
+            getTestMethodName(),
+            EnvUtils.searchAvailablePorts());
+    configNodeWrapperList.add(newConfigNodeWrapper);
+    newConfigNodeWrapper.createDir();
+    newConfigNodeWrapper.changeConfig(ConfigFactory.getConfig().getConfignodeProperties());
+    return newConfigNodeWrapper;
+  }
+
+  @Override
+  public DataNodeWrapper generateRandomDataNodeWrapper() {
     DataNodeWrapper newDataNodeWrapper =
         new DataNodeWrapper(
             configNodeWrapperList.get(0).getIpAndPortString(),
@@ -528,41 +547,21 @@ public abstract class AbstractEnv implements BaseEnv {
     dataNodeWrapperList.add(newDataNodeWrapper);
     newDataNodeWrapper.createDir();
     newDataNodeWrapper.changeConfig(ConfigFactory.getConfig().getEngineProperties());
+    return newDataNodeWrapper;
+  }
 
-    // Start new DataNode
-    List<String> dataNodeEndpoints =
-        Collections.singletonList(newDataNodeWrapper.getIpAndPortString());
-    RequestDelegate<Void> dataNodesDelegate =
-        new ParallelRequestDelegate<>(dataNodeEndpoints, NODE_START_TIMEOUT);
-    dataNodesDelegate.addRequest(
-        () -> {
-          newDataNodeWrapper.start();
-          return null;
-        });
-    try {
-      dataNodesDelegate.requestAll();
-    } catch (SQLException e) {
-      logger.error("Start dataNodes failed", e);
-      fail();
-    }
-
-    // Test whether register success
-    testWorking();
+  @Override
+  public void registerNewDataNode(boolean isNeedVerify) {
+    registerNewDataNode(generateRandomDataNodeWrapper(), isNeedVerify);
   }
 
   @Override
-  public void registerNewConfigNode() {
-    final ConfigNodeWrapper newConfigNodeWrapper =
-        new ConfigNodeWrapper(
-            false,
-            configNodeWrapperList.get(0).getIpAndPortString(),
-            getTestClassName(),
-            getTestMethodName(),
-            EnvUtils.searchAvailablePorts());
-    configNodeWrapperList.add(newConfigNodeWrapper);
-    newConfigNodeWrapper.createDir();
-    newConfigNodeWrapper.changeConfig(ConfigFactory.getConfig().getConfignodeProperties());
+  public void registerNewConfigNode(boolean isNeedVerify) {
+    registerNewConfigNode(generateRandomConfigNodeWrapper(), isNeedVerify);
+  }
 
+  @Override
+  public void registerNewConfigNode(ConfigNodeWrapper newConfigNodeWrapper, boolean isNeedVerify) {
     // Start new ConfigNode
     RequestDelegate<Void> configNodeDelegate =
         new ParallelRequestDelegate<>(
@@ -580,6 +579,36 @@ public abstract class AbstractEnv implements BaseEnv {
       logger.error("Start configNode failed", e);
       fail();
     }
+
+    if (isNeedVerify) {
+      // Test whether register success
+      testWorking();
+    }
+  }
+
+  @Override
+  public void registerNewDataNode(DataNodeWrapper newDataNodeWrapper, boolean isNeedVerify) {
+    // Start new DataNode
+    List<String> dataNodeEndpoints =
+        Collections.singletonList(newDataNodeWrapper.getIpAndPortString());
+    RequestDelegate<Void> dataNodesDelegate =
+        new ParallelRequestDelegate<>(dataNodeEndpoints, NODE_START_TIMEOUT);
+    dataNodesDelegate.addRequest(
+        () -> {
+          newDataNodeWrapper.start();
+          return null;
+        });
+    try {
+      dataNodesDelegate.requestAll();
+    } catch (SQLException e) {
+      logger.error("Start dataNodes failed", e);
+      fail();
+    }
+
+    if (isNeedVerify) {
+      // Test whether register success
+      testWorking();
+    }
   }
 
   @Override
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractNodeWrapper.java
index 561cfe20fe..c27f9bdcfe 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractNodeWrapper.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractNodeWrapper.java
@@ -274,6 +274,8 @@ public abstract class AbstractNodeWrapper implements BaseNodeWrapper {
 
   protected abstract String getCommonConfigPath();
 
+  public abstract String getSystemPropertiesPath();
+
   protected abstract void updateConfig(Properties properties);
 
   protected abstract void addStartCmdParams(List<String> params);
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/ConfigNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/ConfigNodeWrapper.java
index 77636f5418..0347f30fb2 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/ConfigNodeWrapper.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/ConfigNodeWrapper.java
@@ -27,7 +27,7 @@ import java.util.Properties;
 
 public class ConfigNodeWrapper extends AbstractNodeWrapper {
 
-  private final int consensusPort;
+  private int consensusPort;
   private final String targetConfigNodes;
   private final boolean isSeed;
 
@@ -77,6 +77,11 @@ public class ConfigNodeWrapper extends AbstractNodeWrapper {
     return workDirFilePath("conf", "iotdb-common.properties");
   }
 
+  @Override
+  public String getSystemPropertiesPath() {
+    return workDirFilePath("data/confignode/system", "confignode-system.properties");
+  }
+
   @Override
   public final String getId() {
     if (isSeed) {
@@ -121,6 +126,10 @@ public class ConfigNodeWrapper extends AbstractNodeWrapper {
     oldNodeDir.renameTo(new File(getNodePath()));
   }
 
+  public void setConsensusPort(int consensusPort) {
+    this.consensusPort = consensusPort;
+  }
+
   public int getConsensusPort() {
     return consensusPort;
   }
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java
index dd7e67d9b6..22551ab16f 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/DataNodeWrapper.java
@@ -79,6 +79,11 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
     return workDirFilePath("conf", "iotdb-common.properties");
   }
 
+  @Override
+  public String getSystemPropertiesPath() {
+    return workDirFilePath("data/datanode/system/schema", "system.properties");
+  }
+
   @Override
   public final String getId() {
     return "DataNode" + getPort();
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
index 2bfc6eadab..c4c0d77d20 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
@@ -184,18 +184,43 @@ public class RemoteServerEnv implements BaseEnv {
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public ConfigNodeWrapper generateRandomConfigNodeWrapper() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public DataNodeWrapper generateRandomDataNodeWrapper() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ConfigNodeWrapper getConfigNodeWrapper(int index) {
+    throw new UnsupportedOperationException();
+  }
+
   @Override
   public DataNodeWrapper getDataNodeWrapper(int index) {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public void registerNewDataNode() {
+  public void registerNewDataNode(boolean isNeedVerify) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void registerNewConfigNode(boolean isNeedVerify) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void registerNewDataNode(DataNodeWrapper newDataNodeWrapper, boolean isNeedVerify) {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public void registerNewConfigNode() {
+  public void registerNewConfigNode(ConfigNodeWrapper newConfigNodeWrapper, boolean isNeedVerify) {
     throw new UnsupportedOperationException();
   }
 
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
index c7a3a7633c..cfe1e2d554 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
@@ -292,7 +292,7 @@ public interface BaseConfig {
   }
 
   default long getTimePartitionInterval() {
-    return 86400;
+    return 604800000;
   }
 
   default BaseConfig setRatisSnapshotTriggerThreshold(int ratisSnapshotTriggerThreshold) {
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index bfb8c82aca..346a7cd581 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -195,13 +195,29 @@ public interface BaseEnv {
   /** Shutdown an existed ConfigNode */
   void shutdownConfigNode(int index);
 
-  /** @return The TDataNodeLocation of the specified DataNode */
+  /** @return The ConfigNodeWrapper of the specified index */
+  ConfigNodeWrapper getConfigNodeWrapper(int index);
+
+  /** @return The DataNodeWrapper of the specified indexx */
   DataNodeWrapper getDataNodeWrapper(int index);
 
-  /** Register a new DataNode */
-  void registerNewDataNode();
+  /** @return A random available ConfigNodeWrapper */
+  ConfigNodeWrapper generateRandomConfigNodeWrapper();
+
+  /** @return A random available ConfigNodeWrapper */
+  DataNodeWrapper generateRandomDataNodeWrapper();
+
+  /** Register a new DataNode with random ports */
+  void registerNewDataNode(boolean isNeedVerify);
+
+  /** Register a new ConfigNode with random ports */
+  void registerNewConfigNode(boolean isNeedVerify);
+
+  /** Register a new DataNode with specified DataNodeWrapper */
+  void registerNewDataNode(DataNodeWrapper newDataNodeWrapper, boolean isNeedVerify);
 
-  void registerNewConfigNode();
+  /** Register a new DataNode with specified ConfigNodeWrapper */
+  void registerNewConfigNode(ConfigNodeWrapper newConfigNodeWrapper, boolean isNeedVerify);
 
   /** Start an existed DataNode */
   void startDataNode(int index);
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBSnapshotTransferIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBSnapshotTransferIT.java
index 18aa3fc95e..399be1f238 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBSnapshotTransferIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBSnapshotTransferIT.java
@@ -134,7 +134,7 @@ public class IoTDBSnapshotTransferIT {
       Assert.assertEquals(
           2 * (snapshotMagic + 1), readResult.getInt("count(root.emma.william.ethereal)"));
 
-      EnvFactory.getEnv().registerNewConfigNode();
+      EnvFactory.getEnv().registerNewConfigNode(true);
       final TShowRegionResp registerResult = configClient.showRegion(new TShowRegionReq());
       Assert.assertNotNull(result.getRegionInfoList());
 
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java
new file mode 100644
index 0000000000..2477ce1e68
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.it.cluster;
+
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
+import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.it.env.ConfigFactory;
+import org.apache.iotdb.it.env.ConfigNodeWrapper;
+import org.apache.iotdb.it.env.DataNodeWrapper;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.env.BaseConfig;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBClusterNodeErrorStartUpIT {
+
+  private static final BaseConfig CONF = ConfigFactory.getConfig();
+
+  private static final int testConfigNodeNum = 3;
+  private static final int testDataNodeNum = 1;
+
+  protected static String originalConfigNodeConsensusProtocolClass;
+  private static final String testConsensusProtocolClass = ConsensusFactory.RATIS_CONSENSUS;
+
+  private static final String TEST_CLUSTER_NAME = "testCluster";
+  private static final String ERROR_CLUSTER_NAME = "errorCluster";
+
+  @Before
+  public void setUp() throws Exception {
+    originalConfigNodeConsensusProtocolClass = CONF.getConfigNodeConsesusProtocolClass();
+    CONF.setConfigNodeConsesusProtocolClass(testConsensusProtocolClass);
+
+    // Init 3C1D environment
+    EnvFactory.getEnv().initClusterEnvironment(testConfigNodeNum, testDataNodeNum);
+  }
+
+  @After
+  public void tearDown() {
+    EnvFactory.getEnv().cleanAfterClass();
+    CONF.setConfigNodeConsesusProtocolClass(originalConfigNodeConsensusProtocolClass);
+  }
+
+  @Test
+  public void testConflictNodeRegistration() throws IOException, InterruptedException, TException {
+    /* Test ConfigNode conflict register */
+
+    // Construct a ConfigNodeWrapper that conflicts in consensus port with an existed one.
+    ConfigNodeWrapper conflictConfigNodeWrapper =
+        EnvFactory.getEnv().generateRandomConfigNodeWrapper();
+    conflictConfigNodeWrapper.setConsensusPort(
+        EnvFactory.getEnv().getConfigNodeWrapper(1).getConsensusPort());
+    conflictConfigNodeWrapper.changeConfig(ConfigFactory.getConfig().getConfignodeProperties());
+
+    // The registration request should be rejected since there exists conflict port
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+      TConfigNodeRegisterReq req =
+          ConfigNodeTestUtils.generateTConfigNodeRegisterReq(conflictConfigNodeWrapper);
+      TConfigNodeRegisterResp resp = client.registerConfigNode(req);
+      Assert.assertEquals(
+          TSStatusCode.REJECT_NODE_START.getStatusCode(), resp.getStatus().getCode());
+    }
+
+    // The confignode-system.properties file should be empty before register
+    File systemProperties = new File(conflictConfigNodeWrapper.getSystemPropertiesPath());
+    Assert.assertFalse(systemProperties.exists());
+
+    // The confignode-system.properties file should remain empty since the registration will fail
+    EnvFactory.getEnv().registerNewConfigNode(conflictConfigNodeWrapper, false);
+    Assert.assertFalse(systemProperties.exists());
+
+    /* Construct a DataNodeWrapper that conflicts with an existed one. */
+
+    // Construct a DataNodeWrapper that conflicts in internal port with an existed one.
+    DataNodeWrapper conflictDataNodeWrapper = EnvFactory.getEnv().generateRandomDataNodeWrapper();
+    conflictDataNodeWrapper.setInternalPort(
+        EnvFactory.getEnv().getDataNodeWrapper(0).getInternalPort());
+    conflictDataNodeWrapper.changeConfig(ConfigFactory.getConfig().getEngineProperties());
+
+    // The registration request should be rejected since there exists conflict port
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+      TDataNodeRegisterReq req =
+          ConfigNodeTestUtils.generateTDataNodeRegisterReq(conflictDataNodeWrapper);
+      TDataNodeRegisterResp resp = client.registerDataNode(req);
+      Assert.assertEquals(
+          TSStatusCode.REJECT_NODE_START.getStatusCode(), resp.getStatus().getCode());
+    }
+
+    // The system.properties file should be empty before register
+    systemProperties = new File(conflictDataNodeWrapper.getSystemPropertiesPath());
+    Assert.assertFalse(systemProperties.exists());
+
+    // The system.properties file should remain empty since the registration will fail
+    EnvFactory.getEnv().registerNewDataNode(conflictDataNodeWrapper, false);
+    Assert.assertFalse(systemProperties.exists());
+  }
+
+  @Test
+  public void testIllegalNodeRestart() throws IOException, InterruptedException, TException {
+    ConfigNodeWrapper registeredConfigNodeWrapper = EnvFactory.getEnv().getConfigNodeWrapper(1);
+    DataNodeWrapper registeredDataNodeWrapper = EnvFactory.getEnv().getDataNodeWrapper(0);
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+
+      /* Restart with error cluster name */
+
+      TConfigNodeRestartReq configNodeRestartReq =
+          ConfigNodeTestUtils.generateTConfigNodeRestartReq(
+              ERROR_CLUSTER_NAME, 1, registeredConfigNodeWrapper);
+      TSStatus configNodeRestartStatus = client.restartConfigNode(configNodeRestartReq);
+      Assert.assertEquals(
+          TSStatusCode.REJECT_NODE_START.getStatusCode(), configNodeRestartStatus.getCode());
+      Assert.assertTrue(configNodeRestartStatus.getMessage().contains("cluster are inconsistent"));
+
+      TDataNodeRestartReq dataNodeRestartReq =
+          ConfigNodeTestUtils.generateTDataNodeRestartReq(
+              ERROR_CLUSTER_NAME, 2, registeredDataNodeWrapper);
+      TDataNodeRestartResp dataNodeRestartResp = client.restartDataNode(dataNodeRestartReq);
+      Assert.assertEquals(
+          TSStatusCode.REJECT_NODE_START.getStatusCode(),
+          dataNodeRestartResp.getStatus().getCode());
+      Assert.assertTrue(
+          dataNodeRestartResp.getStatus().getMessage().contains("cluster are inconsistent"));
+
+      /* Restart with error NodeId */
+
+      configNodeRestartReq =
+          ConfigNodeTestUtils.generateTConfigNodeRestartReq(
+              TEST_CLUSTER_NAME, 100, registeredConfigNodeWrapper);
+      configNodeRestartStatus = client.restartConfigNode(configNodeRestartReq);
+      Assert.assertEquals(
+          TSStatusCode.REJECT_NODE_START.getStatusCode(), configNodeRestartStatus.getCode());
+      Assert.assertTrue(configNodeRestartStatus.getMessage().contains("whose nodeId="));
+
+      dataNodeRestartReq =
+          ConfigNodeTestUtils.generateTDataNodeRestartReq(
+              TEST_CLUSTER_NAME, 200, registeredDataNodeWrapper);
+      dataNodeRestartResp = client.restartDataNode(dataNodeRestartReq);
+      Assert.assertEquals(
+          TSStatusCode.REJECT_NODE_START.getStatusCode(),
+          dataNodeRestartResp.getStatus().getCode());
+      Assert.assertTrue(dataNodeRestartResp.getStatus().getMessage().contains("whose nodeId="));
+
+      /* Restart an alive Node */
+
+      int registeredConfigNodeId = -1;
+      TShowClusterResp showClusterResp = client.showCluster();
+      for (TConfigNodeLocation configNodeLocation : showClusterResp.getConfigNodeList()) {
+        if (configNodeLocation.getConsensusEndPoint().getPort()
+            == registeredConfigNodeWrapper.getConsensusPort()) {
+          registeredConfigNodeId = configNodeLocation.getConfigNodeId();
+          break;
+        }
+      }
+      Assert.assertNotEquals(-1, registeredConfigNodeId);
+      configNodeRestartReq =
+          ConfigNodeTestUtils.generateTConfigNodeRestartReq(
+              TEST_CLUSTER_NAME, registeredConfigNodeId, registeredConfigNodeWrapper);
+      configNodeRestartStatus = client.restartConfigNode(configNodeRestartReq);
+      Assert.assertEquals(
+          TSStatusCode.REJECT_NODE_START.getStatusCode(), configNodeRestartStatus.getCode());
+      Assert.assertTrue(
+          configNodeRestartStatus
+              .getMessage()
+              .contains("exists an alive Node with the same nodeId"));
+
+      int registeredDataNodeId = -1;
+      showClusterResp = client.showCluster();
+      for (TDataNodeLocation dataNodeLocation : showClusterResp.getDataNodeList()) {
+        if (dataNodeLocation.getInternalEndPoint().getPort()
+            == registeredDataNodeWrapper.getInternalPort()) {
+          registeredDataNodeId = dataNodeLocation.getDataNodeId();
+          break;
+        }
+      }
+      Assert.assertNotEquals(-1, registeredDataNodeId);
+      dataNodeRestartReq =
+          ConfigNodeTestUtils.generateTDataNodeRestartReq(
+              TEST_CLUSTER_NAME, registeredDataNodeId, registeredDataNodeWrapper);
+      dataNodeRestartResp = client.restartDataNode(dataNodeRestartReq);
+      Assert.assertEquals(
+          TSStatusCode.REJECT_NODE_START.getStatusCode(),
+          dataNodeRestartResp.getStatus().getCode());
+      Assert.assertTrue(
+          dataNodeRestartResp
+              .getStatus()
+              .getMessage()
+              .contains("exists an alive Node with the same nodeId"));
+
+      // Shutdown and check
+      EnvFactory.getEnv().shutdownConfigNode(1);
+      EnvFactory.getEnv().shutdownDataNode(0);
+      while (true) {
+        AtomicInteger unknownCnt = new AtomicInteger(0);
+        showClusterResp = client.showCluster();
+        showClusterResp
+            .getNodeStatus()
+            .forEach(
+                (nodeId, status) -> {
+                  if (NodeStatus.Unknown.equals(NodeStatus.parse(status))) {
+                    unknownCnt.getAndIncrement();
+                  }
+                });
+
+        if (unknownCnt.get() == 2) {
+          break;
+        }
+        TimeUnit.SECONDS.sleep(1);
+      }
+
+      /* Restart and updatePeer */
+      // TODO: @Itami-sho, enable this test and delete it
+      int originPort = registeredConfigNodeWrapper.getConsensusPort();
+      registeredConfigNodeWrapper.setConsensusPort(-12345);
+      configNodeRestartReq =
+          ConfigNodeTestUtils.generateTConfigNodeRestartReq(
+              TEST_CLUSTER_NAME, registeredConfigNodeId, registeredConfigNodeWrapper);
+      configNodeRestartStatus = client.restartConfigNode(configNodeRestartReq);
+      Assert.assertEquals(
+          TSStatusCode.REJECT_NODE_START.getStatusCode(), configNodeRestartStatus.getCode());
+      Assert.assertTrue(configNodeRestartStatus.getMessage().contains("have been changed"));
+      registeredConfigNodeWrapper.setConsensusPort(originPort);
+
+      originPort = registeredDataNodeWrapper.getInternalPort();
+      registeredDataNodeWrapper.setInternalPort(-12345);
+      dataNodeRestartReq =
+          ConfigNodeTestUtils.generateTDataNodeRestartReq(
+              TEST_CLUSTER_NAME, registeredDataNodeId, registeredDataNodeWrapper);
+      dataNodeRestartResp = client.restartDataNode(dataNodeRestartReq);
+      Assert.assertEquals(
+          TSStatusCode.REJECT_NODE_START.getStatusCode(),
+          dataNodeRestartResp.getStatus().getCode());
+      Assert.assertTrue(dataNodeRestartResp.getStatus().getMessage().contains("have been changed"));
+      registeredDataNodeWrapper.setInternalPort(originPort);
+
+      // Restart and check
+      EnvFactory.getEnv().startConfigNode(1);
+      EnvFactory.getEnv().startDataNode(0);
+      while (true) {
+        AtomicInteger runningCnt = new AtomicInteger(0);
+        showClusterResp = client.showCluster();
+        showClusterResp
+            .getNodeStatus()
+            .forEach(
+                (nodeId, status) -> {
+                  if (NodeStatus.Running.equals(NodeStatus.parse(status))) {
+                    runningCnt.getAndIncrement();
+                  }
+                });
+
+        if (runningCnt.get() == 3) {
+          break;
+        }
+        TimeUnit.SECONDS.sleep(1);
+      }
+    }
+  }
+}
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterNodeIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeGetterIT.java
similarity index 95%
rename from integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterNodeIT.java
rename to integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeGetterIT.java
index 68b80dd2f1..dcaf66018b 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterNodeIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeGetterIT.java
@@ -16,20 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.it;
+package org.apache.iotdb.confignode.it.cluster;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.common.rpc.thrift.TNodeResource;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeConfigurationResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
-import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
@@ -64,7 +62,7 @@ import static org.junit.Assert.fail;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({ClusterIT.class})
-public class IoTDBClusterNodeIT {
+public class IoTDBClusterNodeGetterIT {
 
   private static final int testConfigNodeNum = 2;
   private static final int testDataNodeNum = 2;
@@ -245,15 +243,6 @@ public class IoTDBClusterNodeIT {
       dataNodeLocation.setSchemaRegionConsensusEndPoint(
           new TEndPoint(dataNodeInfo.getRpcAddresss(), dataNodeInfo.getRpcPort() + 4));
 
-      // Re-register DataNode
-      dataNodeConfiguration.setLocation(dataNodeLocation);
-      dataNodeConfiguration.setResource(new TNodeResource(8, 1024 * 1024));
-      dataNodeRegisterReq.setDataNodeConfiguration(dataNodeConfiguration);
-      TDataNodeRegisterResp dataNodeRegisterResp = client.registerDataNode(dataNodeRegisterReq);
-      assertEquals(
-          TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode(),
-          dataNodeRegisterResp.getStatus().getCode());
-
       /* Test query one DataNodeInfo */
       TDataNodeConfigurationResp dataNodeConfigurationResp =
           client.getDataNodeConfiguration(dataNodeLocation.getDataNodeId());
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRestartIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java
similarity index 99%
rename from integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRestartIT.java
rename to integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java
index e34e142815..28f2df6bb6 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRestartIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.it;
+package org.apache.iotdb.confignode.it.cluster;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRegionLeaderBalancingIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java
similarity index 99%
rename from integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRegionLeaderBalancingIT.java
rename to integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java
index 40501fb8b5..18f1d2f868 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterRegionLeaderBalancingIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.it;
+package org.apache.iotdb.confignode.it.load;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
@@ -168,7 +168,7 @@ public class IoTDBClusterRegionLeaderBalancingIT {
   public void testMCFLeaderDistribution() throws IOException, InterruptedException, TException {
     final int testConfigNodeNum = 1;
     final int testDataNodeNum = 3;
-    final int retryNum = 100;
+    final int retryNum = 50;
     EnvFactory.getEnv().initClusterEnvironment(testConfigNodeNum, testDataNodeNum);
 
     TSStatus status;
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSwitchLeaderIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBConfigNodeSwitchLeaderIT.java
similarity index 99%
rename from integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSwitchLeaderIT.java
rename to integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBConfigNodeSwitchLeaderIT.java
index 47ef166dc3..8205a3e5a6 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSwitchLeaderIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBConfigNodeSwitchLeaderIT.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.it;
+package org.apache.iotdb.confignode.it.load;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java
index 53baa4e271..c9de411582 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java
@@ -229,7 +229,7 @@ public class IoTDBPartitionDurableIT {
           schemaPartitionTableResp.getStatus().getCode());
 
       /* Register a new DataNode */
-      EnvFactory.getEnv().registerNewDataNode();
+      EnvFactory.getEnv().registerNewDataNode(true);
 
       /* Test getOrCreateSchemaPartition, ConfigNode should create SchemaPartition and return */
       schemaPartitionReq =
@@ -370,7 +370,7 @@ public class IoTDBPartitionDurableIT {
           dataPartitionTableResp.getStatus().getCode());
 
       /* Register a new DataNode */
-      EnvFactory.getEnv().registerNewDataNode();
+      EnvFactory.getEnv().registerNewDataNode(true);
 
       /* Test getOrCreateDataPartition, ConfigNode should create DataPartition and return */
       partitionSlotsMap =
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java
index 43a1889901..6914fe5f1f 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/utils/ConfigNodeTestUtils.java
@@ -20,17 +20,26 @@ package org.apache.iotdb.confignode.it.utils;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TNodeResource;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
 import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
+import org.apache.iotdb.it.env.ConfigFactory;
 import org.apache.iotdb.it.env.ConfigNodeWrapper;
 import org.apache.iotdb.it.env.DataNodeWrapper;
+import org.apache.iotdb.itbase.env.BaseConfig;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
 import org.apache.thrift.TException;
@@ -47,6 +56,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class ConfigNodeTestUtils {
+
+  private static final BaseConfig CONF = ConfigFactory.getConfig();
+
   private static final int retryNum = 30;
 
   public static TShowClusterResp getClusterNodeInfos(
@@ -184,4 +196,74 @@ public class ConfigNodeTestUtils {
       }
     }
   }
+
+  public static TConfigNodeLocation generateTConfigNodeLocation(
+      int nodeId, ConfigNodeWrapper configNodeWrapper) {
+    return new TConfigNodeLocation(
+        nodeId,
+        new TEndPoint(configNodeWrapper.getIp(), configNodeWrapper.getPort()),
+        new TEndPoint(configNodeWrapper.getIp(), configNodeWrapper.getConsensusPort()));
+  }
+
+  public static TConfigNodeRegisterReq generateTConfigNodeRegisterReq(
+      ConfigNodeWrapper configNodeWrapper) {
+    return new TConfigNodeRegisterReq()
+        .setConfigNodeLocation(generateTConfigNodeLocation(-1, configNodeWrapper))
+        .setDataRegionConsensusProtocolClass(CONF.getDataRegionConsensusProtocolClass())
+        .setSchemaRegionConsensusProtocolClass(CONF.getSchemaRegionConsensusProtocolClass())
+        .setSeriesPartitionSlotNum(CONF.getSeriesPartitionSlotNum())
+        .setSeriesPartitionExecutorClass(
+            "org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor")
+        .setDefaultTTL(Long.MAX_VALUE)
+        .setTimePartitionInterval(CONF.getTimePartitionInterval())
+        .setSchemaReplicationFactor(CONF.getSchemaReplicationFactor())
+        .setDataReplicationFactor(CONF.getDataReplicationFactor())
+        .setSchemaRegionPerDataNode(1.0)
+        .setDataRegionPerProcessor(1.0)
+        .setReadConsistencyLevel("strong")
+        .setDiskSpaceWarningThreshold(0.05)
+        .setLeastDataRegionGroupNum(CONF.getLeastDataRegionGroupNum());
+  }
+
+  public static TConfigNodeRestartReq generateTConfigNodeRestartReq(
+      String clusterName, int nodeId, ConfigNodeWrapper configNodeWrapper) {
+    return new TConfigNodeRestartReq(
+        clusterName, generateTConfigNodeLocation(nodeId, configNodeWrapper));
+  }
+
+  public static TDataNodeLocation generateTDataNodeLocation(
+      int nodeId, DataNodeWrapper dataNodeWrapper) {
+    TDataNodeLocation dataNodeLocation = new TDataNodeLocation();
+    dataNodeLocation.setDataNodeId(nodeId);
+    dataNodeLocation.setClientRpcEndPoint(
+        new TEndPoint(dataNodeWrapper.getIp(), dataNodeWrapper.getPort()));
+    dataNodeLocation.setInternalEndPoint(
+        new TEndPoint(dataNodeWrapper.getIp(), dataNodeWrapper.getInternalPort()));
+    dataNodeLocation.setMPPDataExchangeEndPoint(
+        new TEndPoint(dataNodeWrapper.getIp(), dataNodeWrapper.getMppDataExchangePort()));
+    dataNodeLocation.setDataRegionConsensusEndPoint(
+        new TEndPoint(dataNodeWrapper.getIp(), dataNodeWrapper.getDataRegionConsensusPort()));
+    dataNodeLocation.setSchemaRegionConsensusEndPoint(
+        new TEndPoint(dataNodeWrapper.getIp(), dataNodeWrapper.getSchemaRegionConsensusPort()));
+    return dataNodeLocation;
+  }
+
+  public static TDataNodeConfiguration generateTDataNodeConfiguration(
+      int nodeId, DataNodeWrapper dataNodeWrapper) {
+    TNodeResource dataNodeResource = new TNodeResource();
+    dataNodeResource.setCpuCoreNum(Runtime.getRuntime().availableProcessors());
+    dataNodeResource.setMaxMemory(Runtime.getRuntime().totalMemory());
+    return new TDataNodeConfiguration(
+        generateTDataNodeLocation(nodeId, dataNodeWrapper), dataNodeResource);
+  }
+
+  public static TDataNodeRegisterReq generateTDataNodeRegisterReq(DataNodeWrapper dataNodeWrapper) {
+    return new TDataNodeRegisterReq(generateTDataNodeConfiguration(-1, dataNodeWrapper));
+  }
+
+  public static TDataNodeRestartReq generateTDataNodeRestartReq(
+      String clusterName, int nodeId, DataNodeWrapper dataNodeWrapper) {
+    return new TDataNodeRestartReq(
+        clusterName, generateTDataNodeConfiguration(nodeId, dataNodeWrapper));
+  }
 }
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
index 74f1eb5b68..a3e4872e2d 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
@@ -193,18 +193,43 @@ public class StandaloneEnv implements BaseEnv {
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public ConfigNodeWrapper generateRandomConfigNodeWrapper() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public DataNodeWrapper generateRandomDataNodeWrapper() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ConfigNodeWrapper getConfigNodeWrapper(int index) {
+    throw new UnsupportedOperationException();
+  }
+
   @Override
   public DataNodeWrapper getDataNodeWrapper(int index) {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public void registerNewDataNode() {
+  public void registerNewDataNode(boolean isNeedVerify) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void registerNewConfigNode(boolean isNeedVerify) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void registerNewDataNode(DataNodeWrapper newDataNodeWrapper, boolean isNeedVerify) {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public void registerNewConfigNode() {
+  public void registerNewConfigNode(ConfigNodeWrapper newConfigNodeWrapper, boolean isNeedVerify) {
     throw new UnsupportedOperationException();
   }
 
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBCheckConfigIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBCheckConfigIT.java
index 296693f718..3910da70db 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBCheckConfigIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBCheckConfigIT.java
@@ -90,7 +90,7 @@ public class IoTDBCheckConfigIT {
 
   @Test
   public void testSaveTimeEncoderToSystemProperties() throws Exception {
-    IoTDBStartCheck.getInstance().checkConfig();
+    IoTDBStartCheck.getInstance().checkSystemConfig();
     // read properties from system.properties
     try (FileInputStream inputStream = new FileInputStream(propertiesFile);
         InputStreamReader inputStreamReader =
@@ -110,7 +110,7 @@ public class IoTDBCheckConfigIT {
     writeSystemFile();
     EnvironmentUtils.reactiveDaemon();
     try {
-      IoTDBStartCheck.getInstance().checkConfig();
+      IoTDBStartCheck.getInstance().checkSystemConfig();
     } catch (ConfigurationException t) {
       t.printStackTrace();
       assertEquals("time_encoder", t.getParameter());
@@ -129,7 +129,7 @@ public class IoTDBCheckConfigIT {
     writeSystemFile();
     EnvironmentUtils.reactiveDaemon();
     try {
-      IoTDBStartCheck.getInstance().checkConfig();
+      IoTDBStartCheck.getInstance().checkSystemConfig();
     } catch (Throwable t) {
       fail(t.getMessage());
     }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeType.java
similarity index 60%
copy from confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
copy to node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeType.java
index d9ee56ae25..38f521f837 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeType.java
@@ -16,14 +16,28 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.iotdb.commons.cluster;
 
-package org.apache.iotdb.confignode.client;
+public enum NodeType {
+  ConfigNode("ConfigNode"),
+  DataNode("DataNode");
 
-public enum ConfigNodeRequestType {
-  ADD_CONSENSUS_GROUP,
-  NOTIFY_REGISTER_SUCCESS,
-  REGISTER_CONFIG_NODE,
-  REMOVE_CONFIG_NODE,
-  DELETE_CONFIG_NODE_PEER,
-  STOP_CONFIG_NODE;
+  private final String nodeType;
+
+  NodeType(String nodeType) {
+    this.nodeType = nodeType;
+  }
+
+  public String getNodeType() {
+    return nodeType;
+  }
+
+  public static NodeType parse(String type) {
+    for (NodeType nodeType : NodeType.values()) {
+      if (nodeType.nodeType.equals(type)) {
+        return nodeType;
+      }
+    }
+    throw new RuntimeException(String.format("NodeType %s doesn't exist.", type));
+  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index 2e58d01d60..2070527674 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -54,6 +54,8 @@ public class IoTDBConstant {
   public static final String CN_ROLE = "confignode";
   public static final String DN_ROLE = "datanode";
 
+  public static final String DATA_NODE_CONF_FILE_NAME = "iotdb-datanode.properties";
+
   public static final String DN_RPC_ADDRESS = "dn_rpc_address";
   public static final String DN_RPC_PORT = "dn_rpc_port";
 
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 3b51b56de9..9b2fc8711f 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -39,7 +39,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigurationResp;
+import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
@@ -51,6 +51,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeUpdateReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
@@ -105,6 +107,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowStorageGroupResp;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp;
 import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -304,6 +307,27 @@ public class ConfigNodeClient
     return false;
   }
 
+  @Override
+  public TSystemConfigurationResp getSystemConfiguration() throws TException {
+    for (int i = 0; i < RETRY_NUM; i++) {
+      try {
+        TSystemConfigurationResp resp = client.getSystemConfiguration();
+        if (!updateConfigNodeLeader(resp.status)) {
+          return resp;
+        }
+      } catch (TException e) {
+        logger.warn(
+            "Failed to connect to ConfigNode {} from DataNode {} when executing {}",
+            configNode,
+            config.getAddressAndPort(),
+            Thread.currentThread().getStackTrace()[1].getMethodName());
+        configLeader = null;
+      }
+      waitAndReconnect();
+    }
+    throw new TException(MSG_RECONNECTION_FAIL);
+  }
+
   @Override
   public TDataNodeRegisterResp registerDataNode(TDataNodeRegisterReq req) throws TException {
     for (int i = 0; i < RETRY_NUM; i++) {
@@ -334,10 +358,10 @@ public class ConfigNodeClient
   }
 
   @Override
-  public TConfigurationResp getConfiguration() throws TException {
+  public TDataNodeRestartResp restartDataNode(TDataNodeRestartReq req) throws TException {
     for (int i = 0; i < RETRY_NUM; i++) {
       try {
-        TConfigurationResp resp = client.getConfiguration();
+        TDataNodeRestartResp resp = client.restartDataNode(req);
         if (!updateConfigNodeLeader(resp.status)) {
           return resp;
         }
@@ -844,6 +868,11 @@ public class ConfigNodeClient
     throw new TException("DataNode to ConfigNode client doesn't support registerConfigNode.");
   }
 
+  @Override
+  public TSStatus restartConfigNode(TConfigNodeRestartReq req) throws TException {
+    throw new TException("DataNode to ConfigNode client doesn't support restartConfigNode.");
+  }
+
   @Override
   public TSStatus addConsensusGroup(TAddConsensusGroupReq registerResp) throws TException {
     throw new TException("DataNode to ConfigNode client doesn't support addConsensusGroup.");
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeInfo.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeInfo.java
index fe7c36731d..4222afc753 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeInfo.java
@@ -67,11 +67,7 @@ public class ConfigNodeInfo {
                 + PROPERTIES_FILE_NAME);
   }
 
-  /**
-   * Update ConfigNodeList both in memory and confignode-system.properties file
-   *
-   * @param latestConfigNodes
-   */
+  /** Update ConfigNodeList both in memory and confignode-system.properties file */
   public void updateConfigNodeList(List<TEndPoint> latestConfigNodes) {
     // check whether the config nodes are latest or not
     configNodeInfoReadWriteLock.readLock().lock();
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 344ca71070..768f317c2f 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -572,7 +572,14 @@ public class IoTDBConfig {
   private boolean isClusterMode = false;
 
   /**
-   * the data node id for cluster mode, the default value -1 should be changed after join cluster
+   * The cluster name that this DataNode joined in the cluster mode. The default value "testCluster"
+   * will be changed after join cluster
+   */
+  private String clusterName = "testCluster";
+
+  /**
+   * The DataNodeId of this DataNode for cluster mode. The default value -1 will be changed after
+   * join cluster
    */
   private int dataNodeId = -1;
 
@@ -3071,6 +3078,14 @@ public class IoTDBConfig {
     checkMultiDirStrategyClassName();
   }
 
+  public String getClusterName() {
+    return clusterName;
+  }
+
+  public void setClusterName(String clusterName) {
+    this.clusterName = clusterName;
+  }
+
   public int getDataNodeId() {
     return dataNodeId;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
index 44c82254ff..3cd0a36c8d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
@@ -60,7 +60,7 @@ public class IoTDBStartCheck {
 
   // this file is located in data/system/schema/system.properties
   // If user delete folder "data", system.properties can reset.
-  private static final String PROPERTIES_FILE_NAME = "system.properties";
+  public static final String PROPERTIES_FILE_NAME = "system.properties";
   private static final String SCHEMA_DIR = config.getSchemaDir();
   private static final String[] WAL_DIRS = commonConfig.getWalDirs();
 
@@ -111,6 +111,7 @@ public class IoTDBStartCheck {
   private static String timeEncoderValue =
       String.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
 
+  private static final String CLUSTER_NAME = "CLUSTER_NAME";
   private static final String DATA_NODE_ID = "data_node_id";
 
   private static final String SCHEMA_REGION_CONSENSUS_PROTOCOL = "schema_region_consensus_protocol";
@@ -243,7 +244,7 @@ public class IoTDBStartCheck {
    * <p>When upgrading the system.properties: (1) create system.properties.tmp (2) delete
    * system.properties (3) rename system.properties.tmp to system.properties
    */
-  public void checkConfig() throws ConfigurationException, IOException {
+  public void checkSystemConfig() throws ConfigurationException, IOException {
     propertiesFile =
         SystemFileFactory.INSTANCE.getFile(
             IoTDBStartCheck.SCHEMA_DIR + File.separator + PROPERTIES_FILE_NAME);
@@ -435,6 +436,9 @@ public class IoTDBStartCheck {
     }
 
     // load configuration from system properties only when start as Data node
+    if (properties.containsKey(CLUSTER_NAME)) {
+      config.setClusterName(properties.getProperty(CLUSTER_NAME));
+    }
     if (properties.containsKey(DATA_NODE_ID)) {
       config.setDataNodeId(Integer.parseInt(properties.getProperty(DATA_NODE_ID)));
     }
@@ -464,8 +468,9 @@ public class IoTDBStartCheck {
     }
   }
 
-  /** call this method to serialize DataNodeId */
-  public void serializeDataNodeId(int dataNodeId) throws IOException {
+  /** call this method to serialize ClusterName and DataNodeId */
+  public void serializeClusterNameAndDataNodeId(String clusterName, int dataNodeId)
+      throws IOException {
     // create an empty tmpPropertiesFile
     if (tmpPropertiesFile.createNewFile()) {
       logger.info("Create system.properties.tmp {}.", tmpPropertiesFile);
@@ -477,6 +482,7 @@ public class IoTDBStartCheck {
     reloadProperties();
 
     try (FileOutputStream tmpFOS = new FileOutputStream(tmpPropertiesFile.toString())) {
+      properties.setProperty(CLUSTER_NAME, clusterName);
       properties.setProperty(DATA_NODE_ID, String.valueOf(dataNodeId));
       properties.store(tmpFOS, SYSTEM_PROPERTIES_STRING);
       // serialize finished, delete old system.properties file
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 181be061e3..59574fc31d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.ConfigurationException;
 import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.commons.service.JMXService;
 import org.apache.iotdb.commons.service.RegisterManager;
 import org.apache.iotdb.commons.service.StartupChecks;
@@ -40,11 +41,14 @@ import org.apache.iotdb.commons.udf.UDFInformation;
 import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
 import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
 import org.apache.iotdb.commons.udf.service.UDFManagementService;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigurationResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRuntimeConfiguration;
+import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.db.client.ConfigNodeClient;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
@@ -101,12 +105,18 @@ public class DataNode implements DataNodeMBean {
       String.format(
           "%s:%s=%s", "org.apache.iotdb.datanode.service", IoTDBConstant.JMX_TYPE, "DataNode");
 
+  private static final File SYSTEM_PROPERTIES =
+      SystemFileFactory.INSTANCE.getFile(
+          config.getSchemaDir() + File.separator + IoTDBStartCheck.PROPERTIES_FILE_NAME);
+
   /**
    * when joining a cluster or getting configuration this node will retry at most "DEFAULT_RETRY"
    * times before returning a failure to the client
    */
   private static final int DEFAULT_RETRY = 10;
 
+  private static final long DEFAULT_RETRY_INTERVAL_IN_MS = config.getJoinClusterRetryIntervalMs();
+
   private final TEndPoint thisNode = new TEndPoint();
 
   /** Hold the information of trigger, udf...... */
@@ -132,104 +142,215 @@ public class DataNode implements DataNodeMBean {
     new DataNodeServerCommandLine().doMain(args);
   }
 
-  protected void serverCheckAndInit() throws ConfigurationException, IOException {
-    config.setClusterMode(true);
-    IoTDBStartCheck.getInstance().checkConfig();
-    // TODO: check configuration for data node
+  protected void doAddNode() {
+    boolean isFirstStart = false;
+    try {
+      // Check if this DataNode is start for the first time and do other pre-checks
+      isFirstStart = prepareDataNode();
 
-    for (TEndPoint endPoint : config.getTargetConfigNodeList()) {
-      if (endPoint.getIp().equals("0.0.0.0")) {
-        throw new ConfigurationException(
-            "The ip address of any target_config_node_list couldn't be 0.0.0.0");
-      }
-    }
+      // Set target ConfigNodeList from iotdb-datanode.properties file
+      ConfigNodeInfo.getInstance().updateConfigNodeList(config.getTargetConfigNodeList());
 
-    thisNode.setIp(config.getInternalAddress());
-    thisNode.setPort(config.getInternalPort());
-  }
+      // Pull and check system configurations from ConfigNode-leader
+      pullAndCheckSystemConfigurations();
 
-  protected void doAddNode() {
-    try {
-      // prepare cluster IoTDB-DataNode
-      prepareDataNode();
-      // pull and check configuration from ConfigNode
-      pullAndCheckConfiguration();
-      // register current DataNode to ConfigNode
-      registerInConfigNode();
-      // active DataNode
+      if (isFirstStart) {
+        // Register this DataNode to the cluster when first start
+        sendRegisterRequestToConfigNode();
+      } else {
+        // Send restart request of this DataNode
+        sendRestartRequestToConfigNode();
+      }
+
+      // Active DataNode
       active();
-      // setup rpc service
+
+      // Setup rpc service
       setUpRPCService();
-      registerManager.register(MetricService.getInstance());
-
-      // init metric service
-      if (MetricConfigDescriptor.getInstance()
-          .getMetricConfig()
-          .getInternalReportType()
-          .equals(InternalReporterType.IOTDB)) {
-        MetricService.getInstance().updateInternalReporter(new IoTDBInternalReporter());
-      }
-      MetricService.getInstance().startInternalReporter();
-      // bind predefined metrics
-      DataNodeMetricsHelper.bind();
+
+      // Setup metric service
+      setUpMetricService();
 
       logger.info("IoTDB configuration: " + config.getConfigMessage());
       logger.info("Congratulation, IoTDB DataNode is set up successfully. Now, enjoy yourself!");
-    } catch (StartupException e) {
+
+    } catch (StartupException | ConfigurationException | IOException e) {
       logger.error("Fail to start server", e);
+      if (isFirstStart) {
+        // Delete the system.properties file when first start failed.
+        // Therefore, the next time this DataNode is start will still be seen as the first time.
+        SYSTEM_PROPERTIES.deleteOnExit();
+      }
       stop();
     }
   }
 
-  /** initialize the current node and its services */
-  public boolean initLocalEngines() {
-    return true;
-  }
-
   /** Prepare cluster IoTDB-DataNode */
-  private void prepareDataNode() throws StartupException {
-    // check iotdb server first
+  private boolean prepareDataNode() throws StartupException, ConfigurationException, IOException {
+    // Set cluster mode
+    config.setClusterMode(true);
+
+    // Notice: Consider this DataNode as first start if the system.properties file doesn't exist
+    boolean isFirstStart = !SYSTEM_PROPERTIES.exists();
+
+    // Check target ConfigNodes
+    for (TEndPoint endPoint : config.getTargetConfigNodeList()) {
+      if (endPoint.getIp().equals("0.0.0.0")) {
+        throw new StartupException(
+            "The ip address of any target_config_node_list couldn't be 0.0.0.0");
+      }
+    }
+
+    // Set this node
+    thisNode.setIp(config.getInternalAddress());
+    thisNode.setPort(config.getInternalPort());
+
+    // Startup checks
     StartupChecks checks = new StartupChecks(IoTDBConstant.DN_ROLE).withDefaultTest();
     checks.verify();
 
-    // Register services
-    JMXService.registerMBean(getInstance(), mbeanName);
+    // Check system configurations
+    IoTDBStartCheck.getInstance().checkSystemConfig();
+
+    return isFirstStart;
   }
 
-  private void pullAndCheckConfiguration() throws StartupException {
-    int retry = DEFAULT_RETRY;
+  /**
+   * Pull and check the following system configurations:
+   *
+   * <p>1. GlobalConfig
+   *
+   * <p>2. RatisConfig
+   *
+   * <p>3. CQConfig
+   *
+   * @throws StartupException When failed connect to ConfigNode-leader
+   */
+  private void pullAndCheckSystemConfigurations() throws StartupException {
+    logger.info("Pulling system configurations from the ConfigNode-leader...");
 
-    ConfigNodeInfo.getInstance().updateConfigNodeList(config.getTargetConfigNodeList());
-    // get and check configuration
+    /* Pull system configurations */
+    int retry = DEFAULT_RETRY;
+    TSystemConfigurationResp configurationResp = null;
     while (retry > 0) {
       try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
-        TConfigurationResp configuration = configNodeClient.getConfiguration();
-        IoTDBDescriptor.getInstance().loadGlobalConfig(configuration.globalConfig);
-        IoTDBDescriptor.getInstance().loadRatisConfig(configuration.ratisConfig);
-        IoTDBDescriptor.getInstance().loadCQConfig(configuration.cqConfig);
-
-        CommonDescriptor.getInstance().loadGlobalConfig(configuration.globalConfig);
-        if (!IoTDBStartCheck.getInstance()
-            .checkConsensusProtocolExists(TConsensusGroupType.DataRegion)) {
-          config.setDataRegionConsensusProtocolClass(
-              configuration.globalConfig.getDataRegionConsensusProtocolClass());
-        }
+        configurationResp = configNodeClient.getSystemConfiguration();
+        break;
+      } catch (TException e) {
+        // Read ConfigNodes from system.properties and retry
+        logger.warn(
+            "Cannot pull system configurations from ConfigNode-leader, because: {}",
+            e.getMessage());
+        ConfigNodeInfo.getInstance().loadConfigNodeList();
+        retry--;
+      }
 
-        if (!IoTDBStartCheck.getInstance()
-            .checkConsensusProtocolExists(TConsensusGroupType.SchemaRegion)) {
-          config.setSchemaRegionConsensusProtocolClass(
-              configuration.globalConfig.getSchemaRegionConsensusProtocolClass());
-        }
+      try {
+        // wait to start the next try
+        Thread.sleep(DEFAULT_RETRY_INTERVAL_IN_MS);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        logger.warn("Unexpected interruption when waiting to register to the cluster", e);
+        break;
+      }
+    }
+    if (configurationResp == null) {
+      // All tries failed
+      logger.error(
+          "Cannot pull system configurations from ConfigNode-leader after {} retries",
+          DEFAULT_RETRY);
+      throw new StartupException("Cannot pull system configurations from ConfigNode-leader");
+    }
 
-        IoTDBStartCheck.getInstance().checkDirectory();
-        IoTDBStartCheck.getInstance().serializeGlobalConfig(configuration.globalConfig);
-        return;
+    /* Load system configurations */
+    IoTDBDescriptor.getInstance().loadGlobalConfig(configurationResp.globalConfig);
+    IoTDBDescriptor.getInstance().loadRatisConfig(configurationResp.ratisConfig);
+    IoTDBDescriptor.getInstance().loadCQConfig(configurationResp.cqConfig);
+    CommonDescriptor.getInstance().loadGlobalConfig(configurationResp.globalConfig);
+
+    /* Set cluster consensus protocol class */
+    if (!IoTDBStartCheck.getInstance()
+        .checkConsensusProtocolExists(TConsensusGroupType.DataRegion)) {
+      config.setDataRegionConsensusProtocolClass(
+          configurationResp.globalConfig.getDataRegionConsensusProtocolClass());
+    }
+
+    if (!IoTDBStartCheck.getInstance()
+        .checkConsensusProtocolExists(TConsensusGroupType.SchemaRegion)) {
+      config.setSchemaRegionConsensusProtocolClass(
+          configurationResp.globalConfig.getSchemaRegionConsensusProtocolClass());
+    }
+
+    /* Check system configurations */
+    try {
+      IoTDBStartCheck.getInstance().checkDirectory();
+      IoTDBStartCheck.getInstance().serializeGlobalConfig(configurationResp.globalConfig);
+      IoTDBDescriptor.getInstance().initClusterSchemaMemoryAllocate();
+      if (!config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)) {
+        // In current implementation, only IoTConsensus need separated memory from Consensus
+        IoTDBDescriptor.getInstance().reclaimConsensusMemory();
+      }
+    } catch (Exception e) {
+      throw new StartupException(e.getMessage());
+    }
+
+    logger.info("Successfully pull system configurations from ConfigNode-leader.");
+  }
+
+  /**
+   * Store runtime configurations, which includes:
+   *
+   * <p>1. All ConfigNodes in cluster
+   *
+   * <p>2. All template information
+   *
+   * <p>3. All UDF information
+   *
+   * <p>4. All trigger information
+   *
+   * <p>5. All TTL information
+   */
+  private void storeRuntimeConfigurations(
+      List<TConfigNodeLocation> configNodeLocations, TRuntimeConfiguration runtimeConfiguration) {
+    /* Store ConfigNodeList */
+    List<TEndPoint> configNodeList = new ArrayList<>();
+    for (TConfigNodeLocation configNodeLocation : configNodeLocations) {
+      configNodeList.add(configNodeLocation.getInternalEndPoint());
+    }
+    ConfigNodeInfo.getInstance().updateConfigNodeList(configNodeList);
+
+    /* Store templateSetInfo */
+    ClusterTemplateManager.getInstance()
+        .updateTemplateSetInfo(runtimeConfiguration.getTemplateInfo());
+
+    /* Store udfInformationList */
+    getUDFInformationList(runtimeConfiguration.getAllUDFInformation());
+
+    /* Store triggerInformationList */
+    getTriggerInformationList(runtimeConfiguration.getAllTriggerInformation());
+
+    /* Store ttl information */
+    StorageEngineV2.getInstance().updateTTLInfo(runtimeConfiguration.getAllTTLInformation());
+  }
+
+  /** Register this DataNode into cluster */
+  private void sendRegisterRequestToConfigNode() throws StartupException, IOException {
+    logger.info("Sending register request to ConfigNode-leader...");
+
+    /* Send register request */
+    int retry = DEFAULT_RETRY;
+    TDataNodeRegisterReq req = new TDataNodeRegisterReq();
+    req.setDataNodeConfiguration(generateDataNodeConfiguration());
+    TDataNodeRegisterResp dataNodeRegisterResp = null;
+    while (retry > 0) {
+      try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
+        dataNodeRegisterResp = configNodeClient.registerDataNode(req);
+        break;
       } catch (TException e) {
-        // read config nodes from system.properties
+        // Read ConfigNodes from system.properties and retry
         logger.warn("Cannot register to the cluster, because: {}", e.getMessage());
         ConfigNodeInfo.getInstance().loadConfigNodeList();
-      } catch (Exception e) {
-        throw new StartupException(e.getMessage());
+        retry--;
       }
 
       try {
@@ -237,95 +358,87 @@ public class DataNode implements DataNodeMBean {
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         logger.warn("Unexpected interruption when getConfiguration from the ConfigNode.", e);
+        break;
       }
 
       // start the next try
       retry--;
     }
-    // all tries failed
-    logger.error("Cannot get configuration from ConfigNode after {} retries", DEFAULT_RETRY);
-    throw new StartupException("Cannot get configuration from ConfigNode");
+    if (dataNodeRegisterResp == null) {
+      // All tries failed
+      logger.error(
+          "Cannot register into cluster after {} retries. Please check dn_target_config_node_list in iotdb-datanode.properties.",
+          DEFAULT_RETRY);
+      throw new StartupException("Cannot register into the cluster.");
+    }
+
+    if (dataNodeRegisterResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+
+      /* Store runtime configurations when register success */
+      String clusterName = dataNodeRegisterResp.getClusterName();
+      config.setClusterName(dataNodeRegisterResp.getClusterName());
+      int dataNodeID = dataNodeRegisterResp.getDataNodeId();
+      config.setDataNodeId(dataNodeID);
+      IoTDBStartCheck.getInstance().serializeClusterNameAndDataNodeId(clusterName, dataNodeID);
+
+      storeRuntimeConfigurations(
+          dataNodeRegisterResp.getConfigNodeList(), dataNodeRegisterResp.getRuntimeConfiguration());
+
+      logger.info("Successfully register to the cluster");
+    } else {
+      /* Throw exception when register failed */
+      logger.error(dataNodeRegisterResp.getStatus().getMessage());
+      throw new StartupException("Cannot register to the cluster.");
+    }
   }
 
-  /** register DataNode with ConfigNode */
-  private void registerInConfigNode() throws StartupException {
-    int retry = DEFAULT_RETRY;
+  private void sendRestartRequestToConfigNode() throws StartupException {
+    logger.info("Sending restart request to ConfigNode-leader...");
 
-    ConfigNodeInfo.getInstance().updateConfigNodeList(config.getTargetConfigNodeList());
+    /* Send restart request */
+    int retry = DEFAULT_RETRY;
+    TDataNodeRestartReq req = new TDataNodeRestartReq();
+    req.setClusterName(config.getClusterName());
+    req.setDataNodeConfiguration(generateDataNodeConfiguration());
+    TDataNodeRestartResp dataNodeRestartResp = null;
     while (retry > 0) {
       try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
-        logger.info("Start registering to the cluster.");
-        TDataNodeRegisterReq req = new TDataNodeRegisterReq();
-        req.setDataNodeConfiguration(generateDataNodeConfiguration());
-        TDataNodeRegisterResp dataNodeRegisterResp = configNodeClient.registerDataNode(req);
-
-        logger.info(dataNodeRegisterResp.getStatus().getMessage());
-        // store config node lists from resp
-        List<TEndPoint> configNodeList = new ArrayList<>();
-        for (TConfigNodeLocation configNodeLocation : dataNodeRegisterResp.getConfigNodeList()) {
-          configNodeList.add(configNodeLocation.getInternalEndPoint());
-        }
-        ConfigNodeInfo.getInstance().updateConfigNodeList(configNodeList);
-        ClusterTemplateManager.getInstance()
-            .updateTemplateSetInfo(dataNodeRegisterResp.getTemplateInfo());
-
-        // store udfInformationList
-        getUDFInformationList(dataNodeRegisterResp.getAllUDFInformation());
-
-        // store triggerInformationList
-        getTriggerInformationList(dataNodeRegisterResp.getAllTriggerInformation());
-
-        // store ttl information
-        StorageEngineV2.getInstance().updateTTLInfo(dataNodeRegisterResp.getAllTTLInformation());
-
-        if (dataNodeRegisterResp.getStatus().getCode()
-                == TSStatusCode.SUCCESS_STATUS.getStatusCode()
-            || dataNodeRegisterResp.getStatus().getCode()
-                == TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode()) {
-          int dataNodeID = dataNodeRegisterResp.getDataNodeId();
-          if (dataNodeID != config.getDataNodeId()) {
-            IoTDBStartCheck.getInstance().serializeDataNodeId(dataNodeID);
-            config.setDataNodeId(dataNodeID);
-          }
-          IoTDBDescriptor.getInstance().initClusterSchemaMemoryAllocate();
-
-          // In current implementation, only IoTConsensus need separated memory from Consensus
-          if (!config
-              .getDataRegionConsensusProtocolClass()
-              .equals(ConsensusFactory.IOT_CONSENSUS)) {
-            IoTDBDescriptor.getInstance().reclaimConsensusMemory();
-          }
-
-          logger.info("Register to the cluster successfully");
-          return;
-        } else if (dataNodeRegisterResp.getStatus().getCode()
-            == TSStatusCode.REGISTER_DATANODE_WITH_WRONG_ID.getStatusCode()) {
-          logger.error(dataNodeRegisterResp.getStatus().getMessage());
-          throw new StartupException("Cannot register to the cluster.");
-        }
-      } catch (IOException e) {
-        logger.warn("Cannot register to the cluster, because: {}", e.getMessage());
+        dataNodeRestartResp = configNodeClient.restartDataNode(req);
+        break;
       } catch (TException e) {
-        // read config nodes from system.properties
-        logger.warn("Cannot register to the cluster, because: {}", e.getMessage());
+        // Read ConfigNodes from system.properties and retry
+        logger.warn(
+            "Cannot send restart request to the ConfigNode-leader, because: {}", e.getMessage());
         ConfigNodeInfo.getInstance().loadConfigNodeList();
+        retry--;
       }
 
       try {
         // wait to start the next try
-        Thread.sleep(config.getJoinClusterRetryIntervalMs());
+        Thread.sleep(DEFAULT_RETRY_INTERVAL_IN_MS);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         logger.warn("Unexpected interruption when waiting to register to the cluster", e);
         break;
       }
+    }
+    if (dataNodeRestartResp == null) {
+      // All tries failed
+      logger.error(
+          "Cannot send restart DataNode request to ConfigNode-leader after {} retries. Please check dn_target_config_node_list in iotdb-datanode.properties.",
+          DEFAULT_RETRY);
+      throw new StartupException("Cannot send restart DataNode request to ConfigNode-leader.");
+    }
 
-      // start the next try
-      retry--;
+    if (dataNodeRestartResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      /* Store runtime configurations when restart request is accepted */
+      storeRuntimeConfigurations(
+          dataNodeRestartResp.getConfigNodeList(), dataNodeRestartResp.getRuntimeConfiguration());
+      logger.info("Restart request is accepted.");
+    } else {
+      /* Throw exception when restart is rejected */
+      throw new StartupException(dataNodeRestartResp.getStatus().getMessage());
     }
-    // all tries failed
-    logger.error("Cannot register to the cluster after {} retries", DEFAULT_RETRY);
-    throw new StartupException("Cannot register to the cluster.");
   }
 
   private void prepareResources() throws StartupException {
@@ -345,7 +458,6 @@ public class DataNode implements DataNodeMBean {
     logger.info("IoTDB DataNode has started.");
 
     try {
-      // TODO: Start consensus layer in some where else
       SchemaRegionConsensusImpl.setupAndGetInstance().start();
       DataRegionConsensusImpl.setupAndGetInstance().start();
     } catch (IOException e) {
@@ -362,6 +474,8 @@ public class DataNode implements DataNodeMBean {
 
   private void setUp() throws StartupException, QueryProcessException {
     logger.info("Setting up IoTDB DataNode...");
+    registerManager.register(new JMXService());
+    JMXService.registerMBean(getInstance(), mbeanName);
 
     // get resources for trigger,udf...
     prepareResources();
@@ -372,10 +486,8 @@ public class DataNode implements DataNodeMBean {
 
     logger.info("Recover the schema...");
     initSchemaEngine();
-    registerManager.register(new JMXService());
     registerManager.register(FlushManager.getInstance());
     registerManager.register(CacheHitRatioMonitor.getInstance());
-    JMXService.registerMBean(getInstance(), mbeanName);
 
     // close wal when using ratis consensus
     if (config.isClusterMode()
@@ -396,7 +508,7 @@ public class DataNode implements DataNodeMBean {
 
     while (!StorageEngineV2.getInstance().isAllSgReady()) {
       try {
-        Thread.sleep(1000);
+        TimeUnit.MILLISECONDS.sleep(1000);
       } catch (InterruptedException e) {
         logger.warn("IoTDB DataNode failed to set up.", e);
         Thread.currentThread().interrupt();
@@ -439,6 +551,21 @@ public class DataNode implements DataNodeMBean {
     initProtocols();
   }
 
+  private void setUpMetricService() throws StartupException {
+    registerManager.register(MetricService.getInstance());
+
+    // init metric service
+    if (MetricConfigDescriptor.getInstance()
+        .getMetricConfig()
+        .getInternalReportType()
+        .equals(InternalReporterType.IOTDB)) {
+      MetricService.getInstance().updateInternalReporter(new IoTDBInternalReporter());
+    }
+    MetricService.getInstance().startInternalReporter();
+    // bind predefined metrics
+    DataNodeMetricsHelper.bind();
+  }
+
   private TDataNodeLocation generateDataNodeLocation() {
     TDataNodeLocation location = new TDataNodeLocation();
     location.setDataNodeId(config.getDataNodeId());
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java b/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java
index 0ded2a8bcd..1b540b06ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.ServerCommandLine;
 import org.apache.iotdb.commons.exception.BadNodeUrlException;
-import org.apache.iotdb.commons.exception.ConfigurationException;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.utils.NodeUrlUtils;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
@@ -36,7 +35,6 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -76,20 +74,6 @@ public class DataNodeServerCommandLine extends ServerCommandLine {
     String mode = args[0];
     LOGGER.info("Running mode {}", mode);
 
-    // Check config of IoTDB, and set some configs in cluster mode
-    try {
-      dataNode.serverCheckAndInit();
-    } catch (ConfigurationException | IOException e) {
-      LOGGER.error("Meet error when doing start checking", e);
-      return -1;
-    }
-
-    // Initialize the current node and its services
-    if (!dataNode.initLocalEngines()) {
-      LOGGER.error("Init local engines error, stop process!");
-      return -1;
-    }
-
     // Start IoTDB kernel first, then start the cluster module
     if (MODE_START.equals(mode)) {
       dataNode.doAddNode();
@@ -116,7 +100,9 @@ public class DataNodeServerCommandLine extends ServerCommandLine {
 
     LOGGER.info("Starting to remove DataNode from cluster, parameter: {}, {}", args[0], args[1]);
 
+    // Load ConfigNodeList from system.properties file
     ConfigNodeInfo.getInstance().loadConfigNodeList();
+
     List<TDataNodeLocation> dataNodeLocations = buildDataNodeLocations(args[1]);
     if (dataNodeLocations.isEmpty()) {
       throw new BadNodeUrlException("No DataNode to remove");
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 7009054844..9f8e0a06e0 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -128,13 +128,12 @@ public enum TSStatusCode {
   // Cluster Manager
   ADD_CONFIGNODE_ERROR(1000),
   REMOVE_CONFIGNODE_ERROR(1001),
-  DATANODE_ALREADY_REGISTERED(1002),
+  REJECT_NODE_START(1002),
   NO_ENOUGH_DATANODE(1003),
   DATANODE_NOT_EXIST(1004),
   DATANODE_STOP_ERROR(1005),
   REMOVE_DATANODE_ERROR(1006),
-  REGISTER_DATANODE_WITH_WRONG_ID(1007),
-  CAN_NOT_CONNECT_DATANODE(1008),
+  CAN_NOT_CONNECT_DATANODE(1007),
 
   // Sync, Load TsFile
   LOAD_FILE_ERROR(1100),
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index d13ae7bad6..c2d4cc09c3 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -22,24 +22,7 @@ namespace java org.apache.iotdb.confignode.rpc.thrift
 namespace py iotdb.thrift.confignode
 
 // DataNode
-struct TDataNodeRegisterReq {
-  1: required common.TDataNodeConfiguration dataNodeConfiguration
-  // Map<StorageGroupName, TStorageGroupSchema>
-  // DataNode can use statusMap to report its status to the ConfigNode when restart
-  2: optional map<string, TStorageGroupSchema> statusMap
-}
-
-struct TDataNodeRegisterResp {
-  1: required common.TSStatus status
-  2: required list<common.TConfigNodeLocation> configNodeList
-  3: optional i32 dataNodeId
-  4: optional binary templateInfo
-  5: optional list<binary> allTriggerInformation
-  6: optional list<binary> allUDFInformation
-  7: optional binary allTTLInformation
-}
-
-struct TConfigurationResp{
+struct TSystemConfigurationResp {
   1: required common.TSStatus status
   2: optional TGlobalConfig globalConfig
   3: optional TRatisConfig ratisConfig
@@ -102,7 +85,37 @@ struct TCQConfig {
   1: required i64 cqMinEveryIntervalInMs
 }
 
-struct TDataNodeUpdateReq{
+struct TRuntimeConfiguration {
+  1: required binary templateInfo
+  2: required list<binary> allTriggerInformation
+  3: required list<binary> allUDFInformation
+  4: required binary allTTLInformation
+}
+
+struct TDataNodeRegisterReq {
+  1: required common.TDataNodeConfiguration dataNodeConfiguration
+}
+
+struct TDataNodeRegisterResp {
+  1: required common.TSStatus status
+  2: required list<common.TConfigNodeLocation> configNodeList
+  3: optional string clusterName
+  4: optional i32 dataNodeId
+  5: optional TRuntimeConfiguration runtimeConfiguration
+}
+
+struct TDataNodeRestartReq {
+  1: required string clusterName
+  2: required common.TDataNodeConfiguration dataNodeConfiguration
+}
+
+struct TDataNodeRestartResp {
+  1: required common.TSStatus status
+  2: required list<common.TConfigNodeLocation> configNodeList
+  3: optional TRuntimeConfiguration runtimeConfiguration
+}
+
+struct TDataNodeUpdateReq {
   1: required common.TDataNodeLocation dataNodeLocation
 }
 
@@ -325,7 +338,13 @@ struct TConfigNodeRegisterReq {
 
 struct TConfigNodeRegisterResp {
   1: required common.TSStatus status
-  2: required i32 configNodeId
+  2: optional string clusterName
+  3: optional i32 configNodeId
+}
+
+struct TConfigNodeRestartReq {
+  1: required string clusterName
+  2: required common.TConfigNodeLocation configNodeLocation
 }
 
 struct TAddConsensusGroupReq {
@@ -637,9 +656,16 @@ service IConfigNodeRPCService {
   TDataNodeRegisterResp registerDataNode(TDataNodeRegisterReq req)
 
   /**
-  * Get configuration information that is not associated with the DataNodeId
-  */
-  TConfigurationResp getConfiguration()
+   * Restart a existed DataNode
+   *
+   * @return SUCCESS_STATUS if restart DataNode success
+   */
+  TDataNodeRestartResp restartDataNode(TDataNodeRestartReq req)
+
+  /**
+   * Get system configurations. i.e. configurations that is not associated with the DataNodeId
+   */
+  TSystemConfigurationResp getSystemConfiguration()
 
   /**
    * Generate a set of DataNodeRemoveProcedure to remove some specific DataNodes from the cluster
@@ -746,8 +772,8 @@ service IConfigNodeRPCService {
   TSchemaPartitionTableResp getOrCreateSchemaPartitionTable(TSchemaPartitionReq req)
 
   // ======================================================
-    // Node Management
-    // ======================================================
+  // Node Management
+  // ======================================================
 
   /**
    * Get the partition info used for schema node query and get the node info in CluterSchemaInfo.
@@ -836,6 +862,8 @@ service IConfigNodeRPCService {
   /** The ConfigNode-leader will notify the Non-Seed-ConfigNode that the registration success */
   common.TSStatus notifyRegisterSuccess()
 
+  common.TSStatus restartConfigNode(TConfigNodeRestartReq req)
+
   /**
    * Remove the specific ConfigNode from the cluster
    *