You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/10/16 12:15:25 UTC

[iotdb] branch master updated: [IOTDB-4658] Fix ConfigNode restart bug and add Cluster restart IT (#7623)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 26d6c9e265 [IOTDB-4658] Fix ConfigNode restart bug and add Cluster restart IT (#7623)
26d6c9e265 is described below

commit 26d6c9e265e13ce23a93b591f1efae7a1ddcb75c
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Sun Oct 16 20:15:18 2022 +0800

    [IOTDB-4658] Fix ConfigNode restart bug and add Cluster restart IT (#7623)
---
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |   2 +-
 .../confignode/conf/SystemPropertiesUtils.java     |  24 +-
 .../statemachine/PartitionRegionStateMachine.java  |  14 +-
 .../iotdb/confignode/manager/node/NodeManager.java |   6 +-
 .../iotdb/confignode/persistence/NodeInfo.java     |   2 -
 .../procedure/env/ConfigNodeProcedureEnv.java      |  16 -
 .../impl/node/AddConfigNodeProcedure.java          |   7 +-
 .../iotdb/confignode/service/ConfigNode.java       |  14 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |   2 +-
 .../thrift/ConfigNodeRPCServiceProcessorTest.java  | 338 ---------------------
 .../java/org/apache/iotdb/it/env/AbstractEnv.java  |  76 ++++-
 .../org/apache/iotdb/it/env/RemoteServerEnv.java   |  14 +-
 .../java/org/apache/iotdb/itbase/env/BaseEnv.java  |   8 +-
 .../iotdb/confignode/IoTDBClusterPartitionIT.java  |  52 +++-
 .../iotdb/confignode/IoTDBClusterRestartIT.java    |  88 ++++++
 .../apache/iotdb/confignode/IoTDBConfigNodeIT.java |   8 +-
 .../confignode/IoTDBConfigNodeSnapshotIT.java      |   2 +-
 .../iotdb/confignode/IoTDBStorageGroupIT.java      | 183 +++++++++++
 .../org/apache/iotdb/db/it/env/StandaloneEnv.java  |  14 +-
 19 files changed, 451 insertions(+), 419 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 627ba6ccb5..7f2fb656f5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -32,7 +32,7 @@ public class ConfigNodeConfig {
   /**
    * the config node id for cluster mode, the default value -1 should be changed after join cluster
    */
-  private int configNodeId = 0;
+  private volatile int configNodeId = -1;
 
   /** could set ip or hostname */
   private String internalAddress = "0.0.0.0";
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
index 9c166589d1..71acfbfb51 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
@@ -193,6 +193,9 @@ public class SystemPropertiesUtils {
    */
   public static void storeSystemParameters() throws IOException {
     Properties systemProperties = getSystemProperties();
+
+    systemProperties.setProperty("config_node_id", String.valueOf(conf.getConfigNodeId()));
+
     // Startup configuration
     systemProperties.setProperty("internal_address", String.valueOf(conf.getInternalAddress()));
     systemProperties.setProperty("internal_port", String.valueOf(conf.getInternalPort()));
@@ -239,14 +242,21 @@ public class SystemPropertiesUtils {
     storeSystemProperties(systemProperties);
   }
 
-  public static void storeConfigNodeId(int nodeId) throws IOException {
-    if (!systemPropertiesFile.exists()) {
-      return;
-    }
-
+  /**
+   * Load the config_node_id in confignode-system.properties file. We only invoke this interface
+   * when restarted.
+   *
+   * @return The property of config_node_id in confignode-system.properties file
+   * @throws IOException When load confignode-system.properties file failed
+   */
+  public static int loadConfigNodeId() throws IOException {
     Properties systemProperties = getSystemProperties();
-    systemProperties.setProperty("config_node_id", String.valueOf(nodeId));
-    storeSystemProperties(systemProperties);
+    try {
+      return Integer.parseInt(systemProperties.getProperty("config_node_id", null));
+    } catch (NumberFormatException e) {
+      throw new IOException(
+          "The parameter config_node_id doesn't exist in confignode-system.properties");
+    }
   }
 
   private static synchronized Properties getSystemProperties() throws IOException {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index a901e5d187..dae61189d3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.confignode.consensus.statemachine;
 
-import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.auth.AuthException;
@@ -48,12 +47,10 @@ public class PartitionRegionStateMachine
   private final ConfigPlanExecutor executor;
   private ConfigManager configManager;
   private final TEndPoint currentNodeTEndPoint;
-  private final int currentNodeId;
 
   public PartitionRegionStateMachine(ConfigManager configManager, ConfigPlanExecutor executor) {
     this.executor = executor;
     this.configManager = configManager;
-    this.currentNodeId = ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId();
     this.currentNodeTEndPoint =
         new TEndPoint()
             .setIp(ConfigNodeDescriptor.getInstance().getConf().getInternalAddress())
@@ -149,8 +146,10 @@ public class PartitionRegionStateMachine
 
   @Override
   public void notifyLeaderChanged(ConsensusGroupId groupId, int newLeaderId) {
-    TConfigNodeLocation newLeaderConfigNodeLocation =
-        configManager.getConfigNodeLocation(currentNodeId);
+    // We get currentNodeId here because the currentNodeId
+    // couldn't initialize earlier than the PartitionRegionStateMachine
+    int currentNodeId = ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId();
+
     if (currentNodeId == newLeaderId) {
       LOGGER.info(
           "Current node [nodeId: {}, ip:port: {}] becomes Leader",
@@ -162,11 +161,10 @@ public class PartitionRegionStateMachine
       configManager.getPartitionManager().startRegionCleaner();
     } else {
       LOGGER.info(
-          "Current node [nodeId:{}, ip:port: {}] is not longer the leader, the new leader is [nodeId:{}, ip:port: {}]",
+          "Current node [nodeId:{}, ip:port: {}] is not longer the leader, the new leader is [nodeId:{}]",
           currentNodeId,
           currentNodeTEndPoint,
-          newLeaderId,
-          newLeaderConfigNodeLocation.getInternalEndPoint());
+          newLeaderId);
       configManager.getProcedureManager().shiftExecutor(false);
       configManager.getLoadManager().stopLoadBalancingService();
       configManager.getNodeManager().stopHeartbeatService();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 7cb7159060..9e2558838d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -265,7 +265,7 @@ public class NodeManager {
             .setConfigNodeId(ERROR_STATUS_NODE_ID);
       }
 
-      int nodeId = generateNodeId();
+      int nodeId = nodeInfo.generateNextNodeId();
       req.getConfigNodeLocation().setConfigNodeId(nodeId);
 
       configManager.getProcedureManager().addConfigNode(req);
@@ -778,8 +778,4 @@ public class NodeManager {
   private PartitionManager getPartitionManager() {
     return configManager.getPartitionManager();
   }
-
-  public int generateNodeId() {
-    return nodeInfo.generateNextNodeId();
-  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
index 2d3876a3f3..7adcf7e389 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
@@ -305,8 +305,6 @@ public class NodeInfo implements SnapshotProcessor {
 
       registeredConfigNodes.add(applyConfigNodePlan.getConfigNodeLocation());
       SystemPropertiesUtils.storeConfigNodeList(new ArrayList<>(registeredConfigNodes));
-      SystemPropertiesUtils.storeConfigNodeId(
-          applyConfigNodePlan.getConfigNodeLocation().getConfigNodeId());
       LOGGER.info(
           "Successfully apply ConfigNode: {}. Current ConfigNodeGroup: {}",
           applyConfigNodePlan.getConfigNodeLocation(),
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 2bbab37673..b1f73ff5de 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -94,20 +94,8 @@ public class ConfigNodeProcedureEnv {
 
   private final DataNodeRemoveHandler dataNodeRemoveHandler;
 
-  private static boolean skipForTest = false;
-
-  private static boolean invalidCacheResult = true;
-
   private final ReentrantLock removeConfigNodeLock;
 
-  public static void setSkipForTest(boolean skipForTest) {
-    ConfigNodeProcedureEnv.skipForTest = skipForTest;
-  }
-
-  public static void setInvalidCacheResult(boolean result) {
-    ConfigNodeProcedureEnv.invalidCacheResult = result;
-  }
-
   public ConfigNodeProcedureEnv(ConfigManager configManager, ProcedureScheduler scheduler) {
     this.configManager = configManager;
     this.scheduler = scheduler;
@@ -148,10 +136,6 @@ public class ConfigNodeProcedureEnv {
    * @throws TException Thrift IOE
    */
   public boolean invalidateCache(String storageGroupName) throws IOException, TException {
-    // TODO: Remove it after IT is supported
-    if (skipForTest) {
-      return invalidCacheResult;
-    }
     List<TDataNodeConfiguration> allDataNodes =
         configManager.getNodeManager().getRegisteredDataNodes();
     TInvalidateCacheReq invalidateCacheReq = new TInvalidateCacheReq();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
index 7c3e0cd582..9d7d0ab760 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
@@ -61,19 +61,22 @@ public class AddConfigNodeProcedure extends AbstractNodeProcedure<AddConfigNodeS
           setNextState(AddConfigNodeState.CREATE_PEER);
           break;
         case CREATE_PEER:
+          LOG.info("Executing createPeerForConsensusGroup on {}...", tConfigNodeLocation);
           env.addConsensusGroup(tConfigNodeLocation);
           setNextState(AddConfigNodeState.ADD_PEER);
-          LOG.info("Add consensus group {}", tConfigNodeLocation);
+          LOG.info("Successfully createPeerForConsensusGroup on {}", tConfigNodeLocation);
           break;
         case ADD_PEER:
+          LOG.info("Executing addPeer {}...", tConfigNodeLocation);
           env.addConfigNodePeer(tConfigNodeLocation);
           setNextState(AddConfigNodeState.REGISTER_SUCCESS);
-          LOG.info("Add Peer of {}", tConfigNodeLocation);
+          LOG.info("Successfully addPeer {}", tConfigNodeLocation);
           break;
         case REGISTER_SUCCESS:
           env.notifyRegisterSuccess(tConfigNodeLocation);
           env.applyConfigNode(tConfigNodeLocation);
           env.broadCastTheLatestConfigNodeGroup();
+          LOG.info("The ConfigNode: {} is successfully added to the cluster", tConfigNodeLocation);
           return Flow.NO_MORE_STATE;
       }
     } catch (Exception e) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index fb26ccc4f9..81aa8bcc82 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -87,6 +87,8 @@ public class ConfigNode implements ConfigNodeMBean {
 
       /* Restart */
       if (SystemPropertiesUtils.isRestarted()) {
+        LOGGER.info("{} is in restarting process...", ConfigNodeConstant.GLOBAL_NAME);
+        CONF.setConfigNodeId(SystemPropertiesUtils.loadConfigNodeId());
         configManager.initConsensusManager();
         setUpRPCService();
         LOGGER.info(
@@ -96,10 +98,18 @@ public class ConfigNode implements ConfigNodeMBean {
 
       /* Initial startup of Seed-ConfigNode */
       if (ConfigNodeDescriptor.getInstance().isSeedConfigNode()) {
+        LOGGER.info(
+            "The current {} is now starting as the Seed-ConfigNode.",
+            ConfigNodeConstant.GLOBAL_NAME);
+
+        // Init consensusGroup
+        CONF.setConfigNodeId(SEED_CONFIG_NODE_ID);
         configManager.initConsensusManager();
 
+        // Persistence system parameters after the consensusGroup is built,
+        // or the consensusGroup will not be initialized successfully otherwise.
         SystemPropertiesUtils.storeSystemParameters();
-        SystemPropertiesUtils.storeConfigNodeId(SEED_CONFIG_NODE_ID);
+
         // Seed-ConfigNode should apply itself when first start
         configManager
             .getNodeManager()
@@ -232,9 +242,7 @@ public class ConfigNode implements ConfigNodeMBean {
       }
 
       if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        SystemPropertiesUtils.storeConfigNodeId(resp.getConfigNodeId());
         CONF.setConfigNodeId(resp.getConfigNodeId());
-
         configManager.initConsensusManager();
         return;
       } else if (status.getCode() == TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 67e45635fa..ee060dc306 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -408,7 +408,7 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
     TConfigNodeRegisterResp resp = configManager.registerConfigNode(req);
 
     // Print log to record the ConfigNode that performs the RegisterConfigNodeRequest
-    LOGGER.info("Execute RegisterConfigNodeRequest {} with result {}", req, resp.getStatus());
+    LOGGER.info("Execute RegisterConfigNodeRequest {} with result {}", req, resp);
 
     return resp;
   }
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
deleted file mode 100644
index 50a1e800d2..0000000000
--- a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
+++ /dev/null
@@ -1,338 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.service.thrift;
-
-import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.common.rpc.thrift.TNodeResource;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
-import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.exception.ConfigurationException;
-import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.exception.StartupException;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.path.PathPatternTree;
-import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
-import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
-import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
-import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
-import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
-import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
-import org.apache.iotdb.confignode.conf.ConfigNodeStartupCheck;
-import org.apache.iotdb.confignode.manager.ConfigManager;
-import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
-import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
-import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
-import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
-import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupsReq;
-import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
-import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
-import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
-import org.apache.iotdb.confignode.rpc.thrift.TSetDataReplicationFactorReq;
-import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaReplicationFactorReq;
-import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
-import org.apache.iotdb.confignode.rpc.thrift.TSetTimePartitionIntervalReq;
-import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
-import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.tsfile.utils.PublicBAOS;
-
-import org.apache.ratis.util.FileUtils;
-import org.apache.thrift.TException;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-public class ConfigNodeRPCServiceProcessorTest {
-
-  ConfigNodeRPCServiceProcessor processor;
-
-  @BeforeClass
-  public static void beforeClass() throws StartupException, ConfigurationException, IOException {
-    final ConfigNodeConfig configNodeConfig = ConfigNodeDescriptor.getInstance().getConf();
-    UDFExecutableManager.setupAndGetInstance(
-        configNodeConfig.getTemporaryLibDir(), configNodeConfig.getUdfLibDir());
-    UDFClassLoaderManager.setupAndGetInstance(configNodeConfig.getUdfLibDir());
-    UDFRegistrationService.setupAndGetInstance(configNodeConfig.getSystemUdfDir());
-    ConfigNodeStartupCheck.getInstance().startUpCheck();
-  }
-
-  @Before
-  public void before() throws IOException {
-    ConfigManager configManager = new ConfigManager();
-    configManager.initConsensusManager();
-    processor = new ConfigNodeRPCServiceProcessor(configManager);
-    processor.getConsensusManager().singleCopyMayWaitUntilLeaderReady();
-  }
-
-  @After
-  public void after() throws IOException {
-    processor.close();
-    FileUtils.deleteFully(new File(ConfigNodeDescriptor.getInstance().getConf().getConsensusDir()));
-    FileUtils.deleteFully(
-        new File(CommonDescriptor.getInstance().getConfig().getProcedureWalFolder()));
-  }
-
-  @AfterClass
-  public static void afterClass() throws IOException {
-    UDFExecutableManager.getInstance().stop();
-    UDFClassLoaderManager.getInstance().stop();
-    UDFRegistrationService.getInstance().stop();
-    FileUtils.deleteFully(new File(ConfigNodeConstant.DATA_DIR));
-  }
-
-  private void checkGlobalConfig(TGlobalConfig globalConfig) {
-    Assert.assertEquals(
-        ConfigNodeDescriptor.getInstance().getConf().getDataRegionConsensusProtocolClass(),
-        globalConfig.getDataRegionConsensusProtocolClass());
-    Assert.assertEquals(
-        ConfigNodeDescriptor.getInstance().getConf().getSchemaRegionConsensusProtocolClass(),
-        globalConfig.getSchemaRegionConsensusProtocolClass());
-    Assert.assertEquals(
-        ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionSlotNum(),
-        globalConfig.getSeriesPartitionSlotNum());
-    Assert.assertEquals(
-        ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionExecutorClass(),
-        globalConfig.getSeriesPartitionExecutorClass());
-  }
-
-  private void registerDataNodes() throws TException {
-    for (int i = 0; i < 3; i++) {
-      TDataNodeLocation dataNodeLocation = new TDataNodeLocation();
-      dataNodeLocation.setDataNodeId(-1);
-      dataNodeLocation.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6667 + i));
-      dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003 + i));
-      dataNodeLocation.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0", 8777 + i));
-      dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010 + i));
-      dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010 + i));
-
-      TDataNodeConfiguration dataNodeConfiguration = new TDataNodeConfiguration();
-      dataNodeConfiguration.setLocation(dataNodeLocation);
-      dataNodeConfiguration.setResource(new TNodeResource(8, 1024 * 1024));
-
-      TDataNodeRegisterReq req = new TDataNodeRegisterReq(dataNodeConfiguration);
-      TDataNodeRegisterResp resp = processor.registerDataNode(req);
-
-      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getStatus().getCode());
-      Assert.assertEquals(i, resp.getDataNodeId());
-      checkGlobalConfig(resp.getGlobalConfig());
-    }
-  }
-
-  @Test
-  public void testSetAndQueryStorageGroup() throws IllegalPathException, TException {
-    TSStatus status;
-    final String sg0 = "root.sg0";
-    final String sg1 = "root.sg1";
-
-    // register DataNodes
-    registerDataNodes();
-
-    // set StorageGroup0 by default values
-    TSetStorageGroupReq setReq0 = new TSetStorageGroupReq(new TStorageGroupSchema(sg0));
-    status = processor.setStorageGroup(setReq0);
-    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
-
-    // set StorageGroup1 by specific values
-    TSetStorageGroupReq setReq1 =
-        new TSetStorageGroupReq(
-            new TStorageGroupSchema(sg1)
-                .setTTL(1024L)
-                .setSchemaReplicationFactor(5)
-                .setDataReplicationFactor(5)
-                .setTimePartitionInterval(2048L));
-    status = processor.setStorageGroup(setReq1);
-    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
-
-    // test count all StorageGroups
-    TCountStorageGroupResp countResp =
-        processor.countMatchedStorageGroups(Arrays.asList("root", "**"));
-    Assert.assertEquals(
-        TSStatusCode.SUCCESS_STATUS.getStatusCode(), countResp.getStatus().getCode());
-    Assert.assertEquals(2, countResp.getCount());
-
-    // test count one StorageGroup
-    countResp = processor.countMatchedStorageGroups(Arrays.asList("root", "sg0"));
-    Assert.assertEquals(
-        TSStatusCode.SUCCESS_STATUS.getStatusCode(), countResp.getStatus().getCode());
-    Assert.assertEquals(1, countResp.getCount());
-
-    // test query all StorageGroupSchemas
-    TStorageGroupSchemaResp getResp =
-        processor.getMatchedStorageGroupSchemas(Arrays.asList("root", "**"));
-    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), getResp.getStatus().getCode());
-    Map<String, TStorageGroupSchema> schemaMap = getResp.getStorageGroupSchemaMap();
-    Assert.assertEquals(2, schemaMap.size());
-    TStorageGroupSchema storageGroupSchema = schemaMap.get(sg0);
-    Assert.assertNotNull(storageGroupSchema);
-    Assert.assertEquals(sg0, storageGroupSchema.getName());
-    Assert.assertEquals(Long.MAX_VALUE, storageGroupSchema.getTTL());
-    Assert.assertEquals(1, storageGroupSchema.getSchemaReplicationFactor());
-    Assert.assertEquals(1, storageGroupSchema.getDataReplicationFactor());
-    Assert.assertEquals(604800000, storageGroupSchema.getTimePartitionInterval());
-    storageGroupSchema = schemaMap.get(sg1);
-    Assert.assertNotNull(storageGroupSchema);
-    Assert.assertEquals(sg1, storageGroupSchema.getName());
-    Assert.assertEquals(1024L, storageGroupSchema.getTTL());
-    Assert.assertEquals(5, storageGroupSchema.getSchemaReplicationFactor());
-    Assert.assertEquals(5, storageGroupSchema.getDataReplicationFactor());
-    Assert.assertEquals(2048L, storageGroupSchema.getTimePartitionInterval());
-
-    // test fail by re-register
-    status = processor.setStorageGroup(setReq0);
-    Assert.assertEquals(
-        TSStatusCode.STORAGE_GROUP_ALREADY_EXISTS.getStatusCode(), status.getCode());
-
-    // test StorageGroup setter interfaces
-    PartialPath patternPath = new PartialPath(sg1);
-    status =
-        processor.setTTL(new TSetTTLReq(Arrays.asList(patternPath.getNodes()), Long.MAX_VALUE));
-    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
-    status = processor.setSchemaReplicationFactor(new TSetSchemaReplicationFactorReq(sg1, 1));
-    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
-    status = processor.setDataReplicationFactor(new TSetDataReplicationFactorReq(sg1, 1));
-    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
-    status = processor.setTimePartitionInterval(new TSetTimePartitionIntervalReq(sg1, 604800L));
-    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
-
-    // test setter results
-    getResp = processor.getMatchedStorageGroupSchemas(Arrays.asList("root", "sg1"));
-    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), getResp.getStatus().getCode());
-    schemaMap = getResp.getStorageGroupSchemaMap();
-    Assert.assertEquals(1, schemaMap.size());
-    storageGroupSchema = schemaMap.get(sg1);
-    Assert.assertNotNull(storageGroupSchema);
-    Assert.assertEquals(sg1, storageGroupSchema.getName());
-    Assert.assertEquals(Long.MAX_VALUE, storageGroupSchema.getTTL());
-    Assert.assertEquals(1, storageGroupSchema.getSchemaReplicationFactor());
-    Assert.assertEquals(1, storageGroupSchema.getDataReplicationFactor());
-    Assert.assertEquals(604800, storageGroupSchema.getTimePartitionInterval());
-  }
-
-  /** Generate a PatternTree and serialize it into a ByteBuffer */
-  private ByteBuffer generatePatternTreeBuffer(String[] paths)
-      throws IllegalPathException, IOException {
-    PathPatternTree patternTree = new PathPatternTree();
-    for (String path : paths) {
-      patternTree.appendPathPattern(new PartialPath(path));
-    }
-    patternTree.constructTree();
-
-    PublicBAOS baos = new PublicBAOS();
-    patternTree.serialize(baos);
-    return ByteBuffer.wrap(baos.toByteArray());
-  }
-
-  @Test
-  public void testDeleteStorageGroup() throws TException {
-    TSStatus status;
-    final String sg0 = "root.sg0";
-    final String sg1 = "root.sg1";
-    // register DataNodes
-    registerDataNodes();
-    ConfigNodeProcedureEnv.setSkipForTest(true);
-    TSetStorageGroupReq setReq0 = new TSetStorageGroupReq(new TStorageGroupSchema(sg0));
-    // set StorageGroup0 by default values
-    status = processor.setStorageGroup(setReq0);
-    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
-    // set StorageGroup1 by specific values
-    TSetStorageGroupReq setReq1 = new TSetStorageGroupReq(new TStorageGroupSchema(sg1));
-    status = processor.setStorageGroup(setReq1);
-    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
-    TDeleteStorageGroupsReq deleteStorageGroupsReq = new TDeleteStorageGroupsReq();
-    List<String> sgs = Arrays.asList(sg0, sg1);
-    deleteStorageGroupsReq.setPrefixPathList(sgs);
-    TSStatus deleteSgStatus = processor.deleteStorageGroups(deleteStorageGroupsReq);
-    TStorageGroupSchemaResp root =
-        processor.getMatchedStorageGroupSchemas(Arrays.asList("root", "*"));
-    Assert.assertTrue(root.getStorageGroupSchemaMap().isEmpty());
-    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), deleteSgStatus.getCode());
-  }
-
-  @Test
-  public void testDeleteStorageGroupInvalidateCacheFailed() throws TException {
-    TSStatus status;
-    final String sg0 = "root.sg0";
-    final String sg1 = "root.sg1";
-    // register DataNodes
-    registerDataNodes();
-    ConfigNodeProcedureEnv.setSkipForTest(true);
-    ConfigNodeProcedureEnv.setInvalidCacheResult(false);
-    TSetStorageGroupReq setReq0 = new TSetStorageGroupReq(new TStorageGroupSchema(sg0));
-    // set StorageGroup0 by default values
-    status = processor.setStorageGroup(setReq0);
-    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
-    // set StorageGroup1 by specific values
-    TSetStorageGroupReq setReq1 = new TSetStorageGroupReq(new TStorageGroupSchema(sg1));
-    status = processor.setStorageGroup(setReq1);
-    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
-    TDeleteStorageGroupsReq deleteStorageGroupsReq = new TDeleteStorageGroupsReq();
-    List<String> sgs = Arrays.asList(sg0, sg1);
-    deleteStorageGroupsReq.setPrefixPathList(sgs);
-    TSStatus deleteSgStatus = processor.deleteStorageGroups(deleteStorageGroupsReq);
-    TStorageGroupSchemaResp root =
-        processor.getMatchedStorageGroupSchemas(Arrays.asList("root", "*"));
-    // rollback success
-    Assert.assertEquals(root.getStorageGroupSchemaMap().size(), 2);
-    Assert.assertEquals(TSStatusCode.MULTIPLE_ERROR.getStatusCode(), deleteSgStatus.getCode());
-  }
-
-  @Test
-  public void testGetSchemaNodeManagementPartition()
-      throws TException, IllegalPathException, IOException {
-    final String sg = "root.sg";
-    final int storageGroupNum = 2;
-
-    TSStatus status;
-    TSchemaNodeManagementReq nodeManagementReq;
-    TSchemaNodeManagementResp nodeManagementResp;
-
-    // register DataNodes
-    registerDataNodes();
-
-    // set StorageGroups
-    for (int i = 0; i < storageGroupNum; i++) {
-      TSetStorageGroupReq setReq = new TSetStorageGroupReq(new TStorageGroupSchema(sg + i));
-      status = processor.setStorageGroup(setReq);
-      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
-    }
-
-    ByteBuffer byteBuffer = generatePatternTreeBuffer(new String[] {"root"});
-    nodeManagementReq = new TSchemaNodeManagementReq(byteBuffer);
-    nodeManagementReq.setLevel(-1);
-    nodeManagementResp = processor.getSchemaNodeManagementPartition(nodeManagementReq);
-    Assert.assertEquals(
-        TSStatusCode.SUCCESS_STATUS.getStatusCode(), nodeManagementResp.getStatus().getCode());
-    Assert.assertEquals(2, nodeManagementResp.getMatchedNodeSize());
-    Assert.assertNotNull(nodeManagementResp.getSchemaRegionMap());
-    Assert.assertEquals(0, nodeManagementResp.getSchemaRegionMapSize());
-  }
-}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
index b1dc1bdb30..f2474a761b 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
@@ -35,9 +35,11 @@ import org.apache.iotdb.jdbc.Config;
 import org.apache.iotdb.jdbc.Constant;
 import org.apache.iotdb.jdbc.IoTDBConnection;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.session.ISession;
 import org.apache.iotdb.session.Session;
 
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 
 import java.io.File;
@@ -165,6 +167,7 @@ public abstract class AbstractEnv implements BaseEnv {
   }
 
   public void testWorking() {
+    logger.info("Testing DataNode connection...");
     List<String> endpoints =
         dataNodeWrapperList.stream()
             .map(DataNodeWrapper::getIpAndPortString)
@@ -178,6 +181,7 @@ public abstract class AbstractEnv implements BaseEnv {
             Exception lastException = null;
             for (int i = 0; i < 30; i++) {
               try (Connection ignored = getConnection(dataNodeEndpoint, PROBE_TIMEOUT_MS)) {
+                logger.info("Successfully connecting to DataNode: {}.", dataNodeEndpoint);
                 return null;
               } catch (Exception e) {
                 lastException = e;
@@ -200,23 +204,40 @@ public abstract class AbstractEnv implements BaseEnv {
   }
 
   private void checkNodeHeartbeat() throws Exception {
+    logger.info("Testing cluster environment...");
     TShowClusterResp showClusterResp;
     Exception lastException = null;
     boolean flag;
     for (int i = 0; i < 30; i++) {
       try (SyncConfigNodeIServiceClient client =
-          (SyncConfigNodeIServiceClient) getConfigNodeConnection()) {
+          (SyncConfigNodeIServiceClient) getLeaderConfigNodeConnection()) {
         flag = true;
         showClusterResp = client.showCluster();
-        Map<Integer, String> nodeStatus = showClusterResp.getNodeStatus();
-        for (String status : nodeStatus.values()) {
-          if (!status.equals("Running")) {
-            flag = false;
-            break;
+
+        // Check resp status
+        if (showClusterResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          flag = false;
+        }
+
+        // Check the number of nodes
+        if (showClusterResp.getNodeStatus().size()
+            != configNodeWrapperList.size() + dataNodeWrapperList.size()) {
+          flag = false;
+        }
+
+        // Check the status of nodes
+        if (flag) {
+          Map<Integer, String> nodeStatus = showClusterResp.getNodeStatus();
+          for (String status : nodeStatus.values()) {
+            if (!status.equals("Running")) {
+              flag = false;
+              break;
+            }
           }
         }
-        int nodeNum = configNodeWrapperList.size() + dataNodeWrapperList.size();
-        if (flag && nodeStatus.size() == nodeNum) {
+
+        if (flag) {
+          logger.info("The cluster is now ready for testing!");
           return;
         }
       } catch (Exception e) {
@@ -373,25 +394,48 @@ public abstract class AbstractEnv implements BaseEnv {
   }
 
   @Override
-  public IConfigNodeRPCService.Iface getConfigNodeConnection() throws IOException {
+  public IConfigNodeRPCService.Iface getLeaderConfigNodeConnection() throws IOException {
     IClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager =
         new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>()
             .createClientManager(
                 new DataNodeClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
     for (int i = 0; i < 30; i++) {
-      try {
-        // Return ConfigNode connection of the Seed-ConfigNode
-        return clientManager.borrowClient(
-            new TEndPoint(
-                configNodeWrapperList.get(0).getIp(), configNodeWrapperList.get(0).getPort()));
-      } catch (IOException ignored) {
+      for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) {
+        try (SyncConfigNodeIServiceClient client =
+            clientManager.borrowClient(
+                new TEndPoint(configNodeWrapper.getIp(), configNodeWrapper.getPort()))) {
+          TShowClusterResp resp = client.showCluster();
+          // Only the ConfigNodeClient who connects to the ConfigNode-leader
+          // will respond the SUCCESS_STATUS
+          if (resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            logger.info(
+                "Successfully get connection to the leader ConfigNode: {}",
+                configNodeWrapper.getIp());
+            return client;
+          }
+        } catch (TException e) {
+          logger.error(
+              "Borrow ConfigNodeClient from ConfigNode: {} failed because: {}, retrying...",
+              configNodeWrapper.getIp(),
+              e);
+        }
       }
     }
     throw new IOException("Failed to get config node connection");
   }
 
   @Override
-  public void restartDataNode(int index) {
+  public void startConfigNode(int index) {
+    configNodeWrapperList.get(index).start();
+  }
+
+  @Override
+  public void shutdownConfigNode(int index) {
+    configNodeWrapperList.get(index).stop();
+  }
+
+  @Override
+  public void startDataNode(int index) {
     dataNodeWrapperList.get(index).start();
   }
 
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
index 7d81391010..95cf12bf35 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
@@ -147,7 +147,7 @@ public class RemoteServerEnv implements BaseEnv {
   }
 
   @Override
-  public IConfigNodeRPCService.Iface getConfigNodeConnection() throws IOException {
+  public IConfigNodeRPCService.Iface getLeaderConfigNodeConnection() throws IOException {
     return null;
   }
 
@@ -159,7 +159,17 @@ public class RemoteServerEnv implements BaseEnv {
   }
 
   @Override
-  public void restartDataNode(int index) {
+  public void startConfigNode(int index) {
+    getConfigNodeWrapperList().get(index).start();
+  }
+
+  @Override
+  public void shutdownConfigNode(int index) {
+    getConfigNodeWrapperList().get(index).stop();
+  }
+
+  @Override
+  public void startDataNode(int index) {
     getDataNodeWrapperList().get(index).start();
   }
 
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index 0f022ca5dd..e42a578008 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -71,7 +71,7 @@ public interface BaseEnv {
 
   void setDataNodeWrapperList(List<DataNodeWrapper> dataNodeWrapperList);
 
-  IConfigNodeRPCService.Iface getConfigNodeConnection() throws IOException;
+  IConfigNodeRPCService.Iface getLeaderConfigNodeConnection() throws IOException;
 
   default ISession getSessionConnection() throws IoTDBConnectionException {
     return getSessionConnection(
@@ -132,7 +132,11 @@ public interface BaseEnv {
     return session;
   }
 
-  void restartDataNode(int index);
+  void startConfigNode(int index);
+
+  void shutdownConfigNode(int index);
+
+  void startDataNode(int index);
 
   void shutdownDataNode(int index);
 }
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBClusterPartitionIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBClusterPartitionIT.java
index cb73ebeeb0..d247992a0c 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBClusterPartitionIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBClusterPartitionIT.java
@@ -39,6 +39,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
@@ -161,7 +163,7 @@ public class IoTDBClusterPartitionIT {
     final String allSg1 = "root.sg1.**";
 
     try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
       TSStatus status;
       ByteBuffer buffer;
       TSchemaPartitionReq schemaPartitionReq;
@@ -296,7 +298,7 @@ public class IoTDBClusterPartitionIT {
     final int timePartitionBatchSize = 10;
 
     try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
       TSStatus status;
       TDataPartitionReq dataPartitionReq;
       TDataPartitionTableResp dataPartitionTableResp;
@@ -334,7 +336,8 @@ public class IoTDBClusterPartitionIT {
             for (int retry = 0; retry < 5; retry++) {
               // Build new Client since it's unstable
               try (SyncConfigNodeIServiceClient configNodeClient =
-                  (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+                  (SyncConfigNodeIServiceClient)
+                      EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
                 dataPartitionTableResp =
                     configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
                 if (dataPartitionTableResp != null) {
@@ -401,7 +404,7 @@ public class IoTDBClusterPartitionIT {
     EnvFactory.getEnv().shutdownDataNode(testDataNodeId);
 
     try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
       final String sg0 = sg + 0;
       final String sg1 = sg + 1;
 
@@ -422,7 +425,7 @@ public class IoTDBClusterPartitionIT {
       for (int retry = 0; retry < 5; retry++) {
         // Build new Client since it's unstable in Win8 environment
         try (SyncConfigNodeIServiceClient configNodeClient =
-            (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+            (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
           dataPartitionTableResp = configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
           if (dataPartitionTableResp != null) {
             break;
@@ -501,7 +504,7 @@ public class IoTDBClusterPartitionIT {
       for (int retry = 0; retry < 5; retry++) {
         // Build new Client since it's unstable in Win8 environment
         try (SyncConfigNodeIServiceClient configNodeClient =
-            (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+            (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
           dataPartitionTableResp = configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
           if (dataPartitionTableResp != null) {
             break;
@@ -542,7 +545,7 @@ public class IoTDBClusterPartitionIT {
       // since there exists one DataNode is shutdown
       Assert.assertEquals(unknownCnt * 2, runningCnt);
 
-      EnvFactory.getEnv().restartDataNode(testDataNodeId);
+      EnvFactory.getEnv().startDataNode(testDataNodeId);
       // Wait for heartbeat check
       while (true) {
         boolean containUnknown = false;
@@ -596,7 +599,7 @@ public class IoTDBClusterPartitionIT {
     final int timePartitionBatchSize = 10;
 
     try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
       ByteBuffer buffer;
       TSchemaPartitionReq schemaPartitionReq;
 
@@ -632,7 +635,7 @@ public class IoTDBClusterPartitionIT {
         for (int retry = 0; retry < 5; retry++) {
           // Build new Client since it's unstable
           try (SyncConfigNodeIServiceClient configNodeClient =
-              (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+              (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
             dataPartitionTableResp =
                 configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
             if (dataPartitionTableResp != null) {
@@ -752,4 +755,35 @@ public class IoTDBClusterPartitionIT {
       Assert.assertEquals(seriesPartitionBatchSize, getSeriesSlotListResp.getSeriesSlotListSize());
     }
   }
+
+  @Test
+  public void testGetSchemaNodeManagementPartition()
+      throws IOException, TException, IllegalPathException {
+    final String sg = "root.sg";
+    final int storageGroupNum = 2;
+
+    TSStatus status;
+    TSchemaNodeManagementReq nodeManagementReq;
+    TSchemaNodeManagementResp nodeManagementResp;
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+      // set StorageGroups
+      for (int i = 0; i < storageGroupNum; i++) {
+        TSetStorageGroupReq setReq = new TSetStorageGroupReq(new TStorageGroupSchema(sg + i));
+        status = client.setStorageGroup(setReq);
+        Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+      }
+
+      ByteBuffer byteBuffer = generatePatternTreeBuffer(new String[] {"root"});
+      nodeManagementReq = new TSchemaNodeManagementReq(byteBuffer);
+      nodeManagementReq.setLevel(-1);
+      nodeManagementResp = client.getSchemaNodeManagementPartition(nodeManagementReq);
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), nodeManagementResp.getStatus().getCode());
+      Assert.assertEquals(2, nodeManagementResp.getMatchedNodeSize());
+      Assert.assertNotNull(nodeManagementResp.getSchemaRegionMap());
+      Assert.assertEquals(0, nodeManagementResp.getSchemaRegionMapSize());
+    }
+  }
 }
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBClusterRestartIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBClusterRestartIT.java
new file mode 100644
index 0000000000..c9f3057f5d
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBClusterRestartIT.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode;
+
+import org.apache.iotdb.it.env.AbstractEnv;
+import org.apache.iotdb.it.env.ConfigFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.concurrent.TimeUnit;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBClusterRestartIT {
+
+  protected static String originalConfigNodeConsensusProtocolClass;
+  private static final String ratisConsensusProtocolClass =
+      "org.apache.iotdb.consensus.ratis.RatisConsensus";
+
+  private static final int testConfigNodeNum = 3;
+  private static final int testDataNodeNum = 3;
+
+  @Before
+  public void setUp() throws Exception {
+    originalConfigNodeConsensusProtocolClass =
+        ConfigFactory.getConfig().getConfigNodeConsesusProtocolClass();
+    ConfigFactory.getConfig().setConfigNodeConsesusProtocolClass(ratisConsensusProtocolClass);
+
+    // Init 3C3D cluster environment
+    EnvFactory.getEnv().initClusterEnvironment(testConfigNodeNum, testDataNodeNum);
+  }
+
+  @After
+  public void tearDown() {
+    EnvFactory.getEnv().cleanAfterClass();
+    ConfigFactory.getConfig()
+        .setConfigNodeConsesusProtocolClass(originalConfigNodeConsensusProtocolClass);
+  }
+
+  @Test
+  public void clusterRestartTest() throws InterruptedException {
+    // Shutdown all cluster nodes
+    for (int i = 0; i < testConfigNodeNum; i++) {
+      EnvFactory.getEnv().shutdownConfigNode(i);
+    }
+    for (int i = 0; i < testDataNodeNum; i++) {
+      EnvFactory.getEnv().shutdownDataNode(i);
+    }
+
+    // Sleep 1s before restart
+    TimeUnit.SECONDS.sleep(1);
+
+    // Restart all cluster nodes
+    for (int i = 0; i < testConfigNodeNum; i++) {
+      EnvFactory.getEnv().startConfigNode(i);
+    }
+    for (int i = 0; i < testDataNodeNum; i++) {
+      EnvFactory.getEnv().startDataNode(i);
+    }
+
+    ((AbstractEnv) EnvFactory.getEnv()).testWorking();
+  }
+
+  // TODO: Add persistence tests in the future
+}
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeIT.java
index aba7da3fcd..ee1279da99 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeIT.java
@@ -179,7 +179,7 @@ public class IoTDBConfigNodeIT {
     List<DataNodeWrapper> dataNodeWrappers = EnvFactory.getEnv().getDataNodeWrapperList();
 
     try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
       // add ConfigNode
       for (int i = 0; i < 2; i++) {
         ConfigNodeWrapper configNodeWrapper =
@@ -251,7 +251,7 @@ public class IoTDBConfigNodeIT {
     List<TDataNodeInfo> dataNodeInfos;
 
     try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
       // add DataNode
       DataNodeWrapper dataNodeWrapper =
           new DataNodeWrapper(
@@ -375,7 +375,7 @@ public class IoTDBConfigNodeIT {
   @Test
   public void showClusterAndNodesTest() {
     try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
 
       TShowClusterResp clusterNodes;
       TShowConfigNodesResp showConfigNodesResp;
@@ -526,7 +526,7 @@ public class IoTDBConfigNodeIT {
     paths.add("root.ln.**");
 
     try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
       cleanUserAndRole(client);
 
       // create user
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeSnapshotIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeSnapshotIT.java
index 07e999db7d..3f67c4b1c7 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeSnapshotIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeSnapshotIT.java
@@ -126,7 +126,7 @@ public class IoTDBConfigNodeSnapshotIT {
     final int timePartitionSlotsNum = 10;
 
     try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
 
       List<TCreateTriggerReq> createTriggerReqs = createTrigger(client);
 
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBStorageGroupIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBStorageGroupIT.java
new file mode 100644
index 0000000000..19df2f2a74
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBStorageGroupIT.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupsReq;
+import org.apache.iotdb.confignode.rpc.thrift.TSetDataReplicationFactorReq;
+import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaReplicationFactorReq;
+import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TSetTimePartitionIntervalReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBStorageGroupIT {
+
+  @Before
+  public void setUp() throws Exception {
+    EnvFactory.getEnv().initBeforeClass();
+  }
+
+  @After
+  public void tearDown() {
+    EnvFactory.getEnv().cleanAfterClass();
+  }
+
+  @Test
+  public void testSetAndQueryStorageGroup() throws IOException, TException, IllegalPathException {
+    TSStatus status;
+    final String sg0 = "root.sg0";
+    final String sg1 = "root.sg1";
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+      // set StorageGroup0 by default values
+      TSetStorageGroupReq setReq0 = new TSetStorageGroupReq(new TStorageGroupSchema(sg0));
+      status = client.setStorageGroup(setReq0);
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+
+      // set StorageGroup1 by specific values
+      TSetStorageGroupReq setReq1 =
+          new TSetStorageGroupReq(
+              new TStorageGroupSchema(sg1)
+                  .setTTL(1024L)
+                  .setSchemaReplicationFactor(5)
+                  .setDataReplicationFactor(5)
+                  .setTimePartitionInterval(2048L));
+      status = client.setStorageGroup(setReq1);
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+
+      // test count all StorageGroups
+      TCountStorageGroupResp countResp =
+          client.countMatchedStorageGroups(Arrays.asList("root", "**"));
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), countResp.getStatus().getCode());
+      Assert.assertEquals(2, countResp.getCount());
+
+      // test count one StorageGroup
+      countResp = client.countMatchedStorageGroups(Arrays.asList("root", "sg0"));
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), countResp.getStatus().getCode());
+      Assert.assertEquals(1, countResp.getCount());
+
+      // test query all StorageGroupSchemas
+      TStorageGroupSchemaResp getResp =
+          client.getMatchedStorageGroupSchemas(Arrays.asList("root", "**"));
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), getResp.getStatus().getCode());
+      Map<String, TStorageGroupSchema> schemaMap = getResp.getStorageGroupSchemaMap();
+      Assert.assertEquals(2, schemaMap.size());
+      TStorageGroupSchema storageGroupSchema = schemaMap.get(sg0);
+      Assert.assertNotNull(storageGroupSchema);
+      Assert.assertEquals(sg0, storageGroupSchema.getName());
+      Assert.assertEquals(Long.MAX_VALUE, storageGroupSchema.getTTL());
+      Assert.assertEquals(1, storageGroupSchema.getSchemaReplicationFactor());
+      Assert.assertEquals(1, storageGroupSchema.getDataReplicationFactor());
+      Assert.assertEquals(604800000, storageGroupSchema.getTimePartitionInterval());
+      storageGroupSchema = schemaMap.get(sg1);
+      Assert.assertNotNull(storageGroupSchema);
+      Assert.assertEquals(sg1, storageGroupSchema.getName());
+      Assert.assertEquals(1024L, storageGroupSchema.getTTL());
+      Assert.assertEquals(5, storageGroupSchema.getSchemaReplicationFactor());
+      Assert.assertEquals(5, storageGroupSchema.getDataReplicationFactor());
+      Assert.assertEquals(2048L, storageGroupSchema.getTimePartitionInterval());
+
+      // test fail by re-register
+      status = client.setStorageGroup(setReq0);
+      Assert.assertEquals(
+          TSStatusCode.STORAGE_GROUP_ALREADY_EXISTS.getStatusCode(), status.getCode());
+
+      // test StorageGroup setter interfaces
+      PartialPath patternPath = new PartialPath(sg1);
+      status = client.setTTL(new TSetTTLReq(Arrays.asList(patternPath.getNodes()), Long.MAX_VALUE));
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+      status = client.setSchemaReplicationFactor(new TSetSchemaReplicationFactorReq(sg1, 1));
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+      status = client.setDataReplicationFactor(new TSetDataReplicationFactorReq(sg1, 1));
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+      status = client.setTimePartitionInterval(new TSetTimePartitionIntervalReq(sg1, 604800L));
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+
+      // test setter results
+      getResp = client.getMatchedStorageGroupSchemas(Arrays.asList("root", "sg1"));
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), getResp.getStatus().getCode());
+      schemaMap = getResp.getStorageGroupSchemaMap();
+      Assert.assertEquals(1, schemaMap.size());
+      storageGroupSchema = schemaMap.get(sg1);
+      Assert.assertNotNull(storageGroupSchema);
+      Assert.assertEquals(sg1, storageGroupSchema.getName());
+      Assert.assertEquals(Long.MAX_VALUE, storageGroupSchema.getTTL());
+      Assert.assertEquals(1, storageGroupSchema.getSchemaReplicationFactor());
+      Assert.assertEquals(1, storageGroupSchema.getDataReplicationFactor());
+      Assert.assertEquals(604800, storageGroupSchema.getTimePartitionInterval());
+    }
+  }
+
+  @Test
+  public void testDeleteStorageGroup() throws TException, IOException {
+    TSStatus status;
+    final String sg0 = "root.sg0";
+    final String sg1 = "root.sg1";
+
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+      TSetStorageGroupReq setReq0 = new TSetStorageGroupReq(new TStorageGroupSchema(sg0));
+      // set StorageGroup0 by default values
+      status = client.setStorageGroup(setReq0);
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+      // set StorageGroup1 by specific values
+      TSetStorageGroupReq setReq1 = new TSetStorageGroupReq(new TStorageGroupSchema(sg1));
+      status = client.setStorageGroup(setReq1);
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+      TDeleteStorageGroupsReq deleteStorageGroupsReq = new TDeleteStorageGroupsReq();
+      List<String> sgs = Arrays.asList(sg0, sg1);
+      deleteStorageGroupsReq.setPrefixPathList(sgs);
+      TSStatus deleteSgStatus = client.deleteStorageGroups(deleteStorageGroupsReq);
+      TStorageGroupSchemaResp root =
+          client.getMatchedStorageGroupSchemas(Arrays.asList("root", "*"));
+      Assert.assertTrue(root.getStorageGroupSchemaMap().isEmpty());
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), deleteSgStatus.getCode());
+    }
+  }
+}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
index daadfe77b0..1f756954db 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
@@ -131,7 +131,7 @@ public class StandaloneEnv implements BaseEnv {
   }
 
   @Override
-  public IConfigNodeRPCService.Iface getConfigNodeConnection() {
+  public IConfigNodeRPCService.Iface getLeaderConfigNodeConnection() {
     return null;
   }
 
@@ -178,7 +178,17 @@ public class StandaloneEnv implements BaseEnv {
   }
 
   @Override
-  public void restartDataNode(int index) {
+  public void startConfigNode(int index) {
+    // Do nothing
+  }
+
+  @Override
+  public void shutdownConfigNode(int index) {
+    // Do nothing
+  }
+
+  @Override
+  public void startDataNode(int index) {
     // Do nothing
   }