You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2021/08/24 08:48:20 UTC

[iotdb] 01/01: merge with master

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch cluster-
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4699d830f54af64adcff42d51c3acfe937c90f55
Merge: 9e2a556 efef85f
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Tue Aug 24 16:47:46 2021 +0800

    merge with master

 .gitattributes                                     |   2 +-
 .../org/apache/iotdb/cluster/ClusterIoTDB.java     |   1 +
 .../iotdb/cluster/coordinator/Coordinator.java     |  33 +-
 .../apache/iotdb/cluster/metadata/CMManager.java   |   6 +-
 .../cluster/server/member/DataGroupMember.java     |  54 ++
 docs/UserGuide/API/Programming-JDBC.md             |   4 +
 docs/UserGuide/Cluster/Cluster-Setup-Example.md    | 191 +++++++-
 docs/UserGuide/Cluster/Cluster-Setup.md            | 545 ++++++++++++---------
 docs/zh/UserGuide/API/Programming-JDBC.md          |   3 +
 docs/zh/UserGuide/Cluster/Cluster-Setup-Example.md | 225 +++++++--
 docs/zh/UserGuide/Cluster/Cluster-Setup.md         | 509 +++++++++++--------
 .../apache/iotdb/db/engine/cache/ChunkCache.java   |  62 +--
 .../db/engine/cache/TimeSeriesMetadataCache.java   | 100 +---
 .../engine/storagegroup/StorageGroupProcessor.java |   8 +-
 .../org/apache/iotdb/db/metadata/MManager.java     |  19 +-
 .../iotdb/db/query/control/QueryFileManager.java   |   2 +-
 .../java/org/apache/iotdb/db/service/IoTDB.java    |   9 +
 .../java/org/apache/iotdb/session/Session.java     |   2 +-
 site/src/main/.vuepress/config.js                  |  12 +-
 .../read/query/timegenerator/node/LeafNode.java    |   2 +-
 .../iotdb/tsfile/read/ReadOnlyTsFileTest.java      |  69 +++
 21 files changed, 1200 insertions(+), 658 deletions(-)

diff --cc cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
index 78a2981,0000000..07770fc
mode 100644,000000..100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
@@@ -1,615 -1,0 +1,616 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.iotdb.cluster;
 +
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.ThreadPoolExecutor;
 +import org.apache.iotdb.cluster.client.DataClientProvider;
 +import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
 +import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
 +import org.apache.iotdb.cluster.config.ClusterConfig;
 +import org.apache.iotdb.cluster.config.ClusterConstant;
 +import org.apache.iotdb.cluster.config.ClusterDescriptor;
 +import org.apache.iotdb.cluster.coordinator.Coordinator;
 +import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
 +import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
 +import org.apache.iotdb.cluster.log.LogDispatcher;
 +import org.apache.iotdb.cluster.metadata.CMManager;
 +import org.apache.iotdb.cluster.metadata.MetaPuller;
 +import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
 +import org.apache.iotdb.cluster.partition.slot.SlotStrategy;
 +import org.apache.iotdb.cluster.rpc.thrift.Node;
 +import org.apache.iotdb.cluster.server.ClusterRPCService;
 +import org.apache.iotdb.cluster.server.ClusterTSServiceImpl;
 +import org.apache.iotdb.cluster.server.HardLinkCleaner;
 +import org.apache.iotdb.cluster.server.Response;
 +import org.apache.iotdb.cluster.server.clusterinfo.ClusterInfoServer;
 +import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 +import org.apache.iotdb.cluster.server.monitor.NodeReport;
 +import org.apache.iotdb.cluster.server.raft.DataRaftHeartBeatService;
 +import org.apache.iotdb.cluster.server.raft.DataRaftService;
 +import org.apache.iotdb.cluster.server.raft.MetaRaftHeartBeatService;
 +import org.apache.iotdb.cluster.server.raft.MetaRaftService;
 +import org.apache.iotdb.cluster.server.service.DataGroupServiceImpls;
 +import org.apache.iotdb.cluster.server.service.MetaAsyncService;
 +import org.apache.iotdb.cluster.server.service.MetaSyncService;
 +import org.apache.iotdb.cluster.utils.ClusterUtils;
 +import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitor;
 +import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 +import org.apache.iotdb.db.conf.IoTDBConfigCheck;
 +import org.apache.iotdb.db.conf.IoTDBConstant;
 +import org.apache.iotdb.db.conf.IoTDBDescriptor;
 +import org.apache.iotdb.db.exception.StartupException;
 +import org.apache.iotdb.db.exception.query.QueryProcessException;
 +import org.apache.iotdb.db.service.IoTDB;
 +import org.apache.iotdb.db.service.JMXService;
 +import org.apache.iotdb.db.service.RegisterManager;
 +import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
 +import org.apache.iotdb.db.utils.TestOnly;
 +
 +import org.apache.thrift.TException;
 +import org.apache.thrift.async.TAsyncClientManager;
 +import org.apache.thrift.protocol.TBinaryProtocol.Factory;
 +import org.apache.thrift.protocol.TCompactProtocol;
 +import org.apache.thrift.protocol.TProtocolFactory;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.IOException;
 +import java.util.HashSet;
 +import java.util.Set;
 +import java.util.concurrent.ScheduledExecutorService;
 +import java.util.concurrent.TimeUnit;
 +
 +import static org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
 +import static org.apache.iotdb.cluster.utils.ClusterUtils.UNKNOWN_CLIENT_IP;
 +
 +// we do not inherent IoTDB instance, as it may break the singleton mode of IoTDB.
 +public class ClusterIoTDB implements ClusterIoTDBMBean {
 +
 +  private static final Logger logger = LoggerFactory.getLogger(ClusterIoTDB.class);
 +  private final String mbeanName =
 +      String.format(
 +          "%s:%s=%s", "org.apache.iotdb.cluster.service", IoTDBConstant.JMX_TYPE, "ClusterIoTDB");
 +
 +  // TODO fix me: better to throw exception if the client can not be get. Then we can remove this
 +  // field.
 +  public static boolean printClientConnectionErrorStack = false;
 +
 +  // establish the cluster as a seed
 +  private static final String MODE_START = "-s";
 +  // join an established cluster
 +  private static final String MODE_ADD = "-a";
 +  // send a request to remove a node, more arguments: ip-of-removed-node
 +  // metaport-of-removed-node
 +  private static final String MODE_REMOVE = "-r";
 +
 +  private MetaGroupMember metaGroupEngine;
 +
 +  // TODO we can split dataGroupServiceImpls into two parts: the rpc impl and the engine.
 +  private DataGroupServiceImpls dataGroupEngine;
 +
 +  private Node thisNode;
 +  private Coordinator coordinator;
 +
 +  private IoTDB iotdb = IoTDB.getInstance();
 +
 +  // Cluster IoTDB uses a individual registerManager with its parent.
 +  private RegisterManager registerManager = new RegisterManager();
 +
 +  /**
 +   * a single thread pool, every "REPORT_INTERVAL_SEC" seconds, "reportThread" will print the status
 +   * of all raft members in this node
 +   */
 +  private ScheduledExecutorService reportThread;
 +
 +  private boolean allowReport = true;
 +
 +  /** hardLinkCleaner will periodically clean expired hardlinks created during snapshots */
 +  private ScheduledExecutorService hardLinkCleanerThread;
 +
 +  // currently, dataClientProvider is only used for those instances who do not belong to any
 +  // DataGroup..
 +  // TODO: however, why not let all dataGroupMembers getting clients from dataClientProvider
 +  private DataClientProvider dataClientProvider;
 +
 +  private ClusterIoTDB() {
 +    // we do not init anything here, so that we can re-initialize the instance in IT.
 +  }
 +
 +  public void initLocalEngines() {
 +    ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
 +    thisNode = new Node();
 +    // set internal rpc ip and ports
 +    thisNode.setInternalIp(config.getInternalIp());
 +    thisNode.setMetaPort(config.getInternalMetaPort());
 +    thisNode.setDataPort(config.getInternalDataPort());
 +    // set client rpc ip and ports
 +    thisNode.setClientPort(config.getClusterRpcPort());
 +    thisNode.setClientIp(IoTDBDescriptor.getInstance().getConfig().getRpcAddress());
 +    coordinator = new Coordinator();
 +    // local engine
 +    TProtocolFactory protocolFactory =
 +        ThriftServiceThread.getProtocolFactory(
 +            IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable());
 +    metaGroupEngine = new MetaGroupMember(protocolFactory, thisNode, coordinator);
++    IoTDB.setClusterMode();
 +    IoTDB.setMetaManager(CMManager.getInstance());
 +    ((CMManager) IoTDB.metaManager).setMetaGroupMember(metaGroupEngine);
 +    ((CMManager) IoTDB.metaManager).setCoordinator(coordinator);
 +    MetaPuller.getInstance().init(metaGroupEngine);
 +
 +    dataGroupEngine = new DataGroupServiceImpls(protocolFactory, metaGroupEngine);
 +    dataClientProvider = new DataClientProvider(protocolFactory);
 +    initTasks();
 +    JMXService.registerMBean(metaGroupEngine, metaGroupEngine.getMBeanName());
 +  }
 +
 +  private void initTasks() {
 +    reportThread = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("NodeReportThread");
 +    reportThread.scheduleAtFixedRate(
 +        this::generateNodeReport,
 +        ClusterConstant.REPORT_INTERVAL_SEC,
 +        ClusterConstant.REPORT_INTERVAL_SEC,
 +        TimeUnit.SECONDS);
 +    hardLinkCleanerThread =
 +        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("HardLinkCleaner");
 +    hardLinkCleanerThread.scheduleAtFixedRate(
 +        new HardLinkCleaner(),
 +        ClusterConstant.CLEAN_HARDLINK_INTERVAL_SEC,
 +        ClusterConstant.CLEAN_HARDLINK_INTERVAL_SEC,
 +        TimeUnit.SECONDS);
 +  }
 +
 +  /**
 +   * Generate a report containing the status of both MetaGroupMember and DataGroupMembers of this
 +   * node. This will help to see if the node is in a consistent and right state during debugging.
 +   */
 +  private void generateNodeReport() {
 +    if (logger.isDebugEnabled() && allowReport) {
 +      try {
 +        NodeReport report = new NodeReport(thisNode);
 +        report.setMetaMemberReport(metaGroupEngine.genMemberReport());
 +        report.setDataMemberReportList(dataGroupEngine.genMemberReports());
 +        logger.debug(report.toString());
 +      } catch (Exception e) {
 +        logger.error("exception occurred when generating node report", e);
 +      }
 +    }
 +  }
 +
 +  public static void main(String[] args) {
 +    if (args.length < 1) {
 +      logger.error(
 +          "Usage: <-s|-a|-r> "
 +              + "[-D{} <configure folder>] \n"
 +              + "-s: start the node as a seed\n"
 +              + "-a: start the node as a new node\n"
 +              + "-r: remove the node out of the cluster\n",
 +          IoTDBConstant.IOTDB_CONF);
 +      return;
 +    }
 +
 +    try {
 +      IoTDBConfigCheck.getInstance().checkConfig();
 +    } catch (IOException e) {
 +      logger.error("meet error when doing start checking", e);
 +    }
 +
 +    // init server's configuration first, because the cluster configuration may read settings from
 +    // the server's configuration.
 +    IoTDBDescriptor.getInstance().getConfig().setSyncEnable(false);
 +    // auto create schema is took over by cluster module, so we disable it in the server module.
 +    IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(false);
 +
 +    // params check
 +    try {
 +      ClusterDescriptor.getInstance().replaceHostnameWithIp();
 +    } catch (Exception e) {
 +      logger.error("replace hostname with ip failed, {}", e.getMessage());
 +      return;
 +    }
 +
 +    String mode = args[0];
 +    logger.info("Running mode {}", mode);
 +
 +    ClusterIoTDB cluster = ClusterIoTDBHolder.INSTANCE;
 +    cluster.initLocalEngines();
 +    // we start IoTDB kernel first.
 +    // cluster.iotdb.active();
 +
 +    // then we start the cluster module.
 +    if (MODE_START.equals(mode)) {
 +      cluster.activeStartNodeMode();
 +    } else if (MODE_ADD.equals(mode)) {
 +      cluster.activeAddNodeMode();
 +    } else if (MODE_REMOVE.equals(mode)) {
 +      try {
 +        cluster.doRemoveNode(args);
 +      } catch (IOException e) {
 +        logger.error("Fail to remove node in cluster", e);
 +      }
 +    } else {
 +      logger.error("Unrecognized mode {}", mode);
 +    }
 +  }
 +
 +  public void activeStartNodeMode() {
 +    try {
 +      preInitCluster();
 +      metaGroupEngine.buildCluster();
 +      postInitCluster();
 +      startClientRPC();
 +    } catch (StartupException
 +        | StartUpCheckFailureException
 +        | ConfigInconsistentException
 +        | QueryProcessException e) {
 +      logger.error("Fail to start  server", e);
 +      stop();
 +    }
 +  }
 +
 +  private void preInitCluster() throws StartupException {
 +    stopRaftInfoReport();
 +    startServerCheck();
 +    preStartCustomize();
 +    iotdb.active();
 +    JMXService.registerMBean(this, mbeanName);
 +    // register MetaGroupMember. MetaGroupMember has the same position with "StorageEngine" in the
 +    // cluster moduel.
 +    // TODO fixme it is better to remove coordinator out of metaGroupEngine
 +
 +    registerManager.register(metaGroupEngine);
 +    registerManager.register(dataGroupEngine);
 +
 +    // rpc service initialize
 +    if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
 +      MetaAsyncService metaAsyncService = new MetaAsyncService(metaGroupEngine);
 +      MetaRaftHeartBeatService.getInstance().initAsyncedServiceImpl(metaAsyncService);
 +      MetaRaftService.getInstance().initAsyncedServiceImpl(metaAsyncService);
 +      DataRaftService.getInstance().initAsyncedServiceImpl(dataGroupEngine);
 +      DataRaftHeartBeatService.getInstance().initAsyncedServiceImpl(dataGroupEngine);
 +    } else {
 +      MetaSyncService syncService = new MetaSyncService(metaGroupEngine);
 +      MetaRaftHeartBeatService.getInstance().initSyncedServiceImpl(syncService);
 +      MetaRaftService.getInstance().initSyncedServiceImpl(syncService);
 +      DataRaftService.getInstance().initSyncedServiceImpl(dataGroupEngine);
 +      DataRaftHeartBeatService.getInstance().initSyncedServiceImpl(dataGroupEngine);
 +    }
 +    // start RPC service
 +    logger.info("start Meta Heartbeat RPC service... ");
 +    registerManager.register(MetaRaftHeartBeatService.getInstance());
 +    // TODO: better to start the Meta RPC service untill the heartbeatservice has elected the
 +    // leader.
 +    // and quorum of followers have caught up.
 +    logger.info("start Meta RPC service... ");
 +    registerManager.register(MetaRaftService.getInstance());
 +  }
 +
 +  private void postInitCluster() throws StartupException, QueryProcessException {
 +    logger.info("start Data Heartbeat RPC service... ");
 +    registerManager.register(DataRaftHeartBeatService.getInstance());
 +    logger.info("start Data RPC service... ");
 +    registerManager.register(DataRaftService.getInstance());
 +    // RPC based DBA API
 +    registerManager.register(ClusterInfoServer.getInstance());
 +    // JMX based DBA API
 +    registerManager.register(ClusterMonitor.INSTANCE);
 +  }
 +
 +  private void startClientRPC() throws QueryProcessException, StartupException {
 +    // we must wait until the metaGroup established.
 +    // So that the ClusterRPCService can work.
 +    ClusterTSServiceImpl clusterRPCServiceImpl = new ClusterTSServiceImpl();
 +    clusterRPCServiceImpl.setCoordinator(coordinator);
 +    clusterRPCServiceImpl.setExecutor(metaGroupEngine);
 +    ClusterRPCService.getInstance().initSyncedServiceImpl(clusterRPCServiceImpl);
 +    registerManager.register(ClusterRPCService.getInstance());
 +  }
 +
 +  public void activeAddNodeMode() {
 +    try {
 +      long startTime = System.currentTimeMillis();
 +
 +      preInitCluster();
 +      metaGroupEngine.joinCluster();
 +      postInitCluster();
 +      dataGroupEngine.pullSnapshots();
 +      startClientRPC();
 +      logger.info(
 +          "Adding this node {} to cluster costs {} ms",
 +          thisNode,
 +          (System.currentTimeMillis() - startTime));
 +    } catch (StartupException
 +        | QueryProcessException
 +        | StartUpCheckFailureException
 +        | ConfigInconsistentException e) {
 +      stop();
 +      logger.error("Fail to join cluster", e);
 +    }
 +  }
 +
 +  private void startServerCheck() throws StartupException {
 +    ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
 +    // check the initial replicateNum and refuse to start when the replicateNum <= 0
 +    if (config.getReplicationNum() <= 0) {
 +      String message =
 +          String.format(
 +              "ReplicateNum should be greater than 0 instead of %d.", config.getReplicationNum());
 +      throw new StartupException(metaGroupEngine.getName(), message);
 +    }
 +    // check the initial cluster size and refuse to start when the size < quorum
 +    int quorum = config.getReplicationNum() / 2 + 1;
 +    if (config.getSeedNodeUrls().size() < quorum) {
 +      String message =
 +          String.format(
 +              "Seed number less than quorum, seed number: %s, quorum: " + "%s.",
 +              config.getSeedNodeUrls().size(), quorum);
 +      throw new StartupException(metaGroupEngine.getName(), message);
 +    }
 +
 +    // assert not duplicated nodes
 +    Set<Node> seedNodes = new HashSet<>();
 +    for (String url : config.getSeedNodeUrls()) {
 +      Node node = ClusterUtils.parseNode(url);
 +      if (seedNodes.contains(node)) {
 +        String message =
 +            String.format(
 +                "SeedNodes must not repeat each other. SeedNodes: %s", config.getSeedNodeUrls());
 +        throw new StartupException(metaGroupEngine.getName(), message);
 +      }
 +      seedNodes.add(node);
 +    }
 +
 +    // assert this node is in all nodes when restart
 +    if (!metaGroupEngine.getAllNodes().isEmpty()) {
 +      if (!metaGroupEngine.getAllNodes().contains(metaGroupEngine.getThisNode())) {
 +        String message =
 +            String.format(
 +                "All nodes in partitionTables must contains local node in start-server mode. "
 +                    + "LocalNode: %s, AllNodes: %s",
 +                metaGroupEngine.getThisNode(), metaGroupEngine.getAllNodes());
 +        throw new StartupException(metaGroupEngine.getName(), message);
 +      } else {
 +        return;
 +      }
 +    }
 +
 +    // assert this node is in seed nodes list
 +
 +    if (!seedNodes.contains(thisNode)) {
 +      String message =
 +          String.format(
 +              "SeedNodes must contains local node in start-server mode. LocalNode: %s ,SeedNodes: %s",
 +              thisNode.toString(), config.getSeedNodeUrls());
 +      throw new StartupException(metaGroupEngine.getName(), message);
 +    }
 +  }
 +
 +  private void doRemoveNode(String[] args) throws IOException {
 +    if (args.length != 3) {
 +      logger.error("Usage: -r <ip> <metaPort>");
 +      return;
 +    }
 +    String ip = args[1];
 +    int metaPort = Integer.parseInt(args[2]);
 +    ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
 +    TProtocolFactory factory =
 +        config.isRpcThriftCompressionEnabled() ? new TCompactProtocol.Factory() : new Factory();
 +    Node nodeToRemove = new Node();
 +    nodeToRemove.setInternalIp(ip).setMetaPort(metaPort).setClientIp(UNKNOWN_CLIENT_IP);
 +    // try sending the request to each seed node
 +    for (String url : config.getSeedNodeUrls()) {
 +      Node node = ClusterUtils.parseNode(url);
 +      if (node == null) {
 +        continue;
 +      }
 +      AsyncMetaClient client = new AsyncMetaClient(factory, new TAsyncClientManager(), node, null);
 +      Long response = null;
 +      long startTime = System.currentTimeMillis();
 +      try {
 +        logger.info("Start removing node {} with the help of node {}", nodeToRemove, node);
 +        response = SyncClientAdaptor.removeNode(client, nodeToRemove);
 +      } catch (TException e) {
 +        logger.warn("Cannot send remove node request through {}, try next node", node);
 +      } catch (InterruptedException e) {
 +        Thread.currentThread().interrupt();
 +        logger.warn("Cannot send remove node request through {}, try next node", node);
 +      }
 +      if (response != null) {
 +        handleNodeRemovalResp(response, nodeToRemove, startTime);
 +        return;
 +      }
 +    }
 +  }
 +
 +  private void handleNodeRemovalResp(Long response, Node nodeToRemove, long startTime) {
 +    if (response == Response.RESPONSE_AGREE) {
 +      logger.info(
 +          "Node {} is successfully removed, cost {}ms",
 +          nodeToRemove,
 +          (System.currentTimeMillis() - startTime));
 +    } else if (response == Response.RESPONSE_CLUSTER_TOO_SMALL) {
 +      logger.error("Cluster size is too small, cannot remove any node");
 +    } else if (response == Response.RESPONSE_REJECT) {
 +      logger.error("Node {} is not found in the cluster, please check", nodeToRemove);
 +    } else if (response == Response.RESPONSE_DATA_MIGRATION_NOT_FINISH) {
 +      logger.warn(
 +          "The data migration of the previous membership change operation is not finished. Please try again later");
 +    } else {
 +      logger.error("Unexpected response {}", response);
 +    }
 +  }
 +
 +  /** Developers may perform pre-start customizations here for debugging or experiments. */
 +  @SuppressWarnings("java:S125") // leaving examples
 +  private void preStartCustomize() {
 +    // customize data distribution
 +    // The given example tries to divide storage groups like "root.sg_1", "root.sg_2"... into k
 +    // nodes evenly, and use default strategy for other groups
 +    SlotPartitionTable.setSlotStrategy(
 +        new SlotStrategy() {
 +          SlotStrategy defaultStrategy = new SlotStrategy.DefaultStrategy();
 +          int k = 3;
 +
 +          @Override
 +          public int calculateSlotByTime(String storageGroupName, long timestamp, int maxSlotNum) {
 +            int sgSerialNum = extractSerialNumInSGName(storageGroupName) % k;
 +            if (sgSerialNum >= 0) {
 +              return maxSlotNum / k * sgSerialNum;
 +            } else {
 +              return defaultStrategy.calculateSlotByTime(storageGroupName, timestamp, maxSlotNum);
 +            }
 +          }
 +
 +          @Override
 +          public int calculateSlotByPartitionNum(
 +              String storageGroupName, long partitionId, int maxSlotNum) {
 +            int sgSerialNum = extractSerialNumInSGName(storageGroupName) % k;
 +            if (sgSerialNum >= 0) {
 +              return maxSlotNum / k * sgSerialNum;
 +            } else {
 +              return defaultStrategy.calculateSlotByPartitionNum(
 +                  storageGroupName, partitionId, maxSlotNum);
 +            }
 +          }
 +
 +          private int extractSerialNumInSGName(String storageGroupName) {
 +            String[] s = storageGroupName.split("_");
 +            if (s.length != 2) {
 +              return -1;
 +            }
 +            try {
 +              return Integer.parseInt(s[1]);
 +            } catch (NumberFormatException e) {
 +              return -1;
 +            }
 +          }
 +        });
 +  }
 +
 +  //  @TestOnly
 +  //  public void setMetaClusterServer(MetaGroupMember RaftTSMetaServiceImpl) {
 +  //    metaServer = RaftTSMetaServiceImpl;
 +  //  }
 +
 +  public void stop() {
 +    deactivate();
 +  }
 +
 +  private void deactivate() {
 +    logger.info("Deactivating Cluster IoTDB...");
 +    stopThreadPools();
 +    registerManager.deregisterAll();
 +    JMXService.deregisterMBean(mbeanName);
 +    logger.info("ClusterIoTDB is deactivated.");
 +    // stop the iotdb kernel
 +    iotdb.stop();
 +  }
 +
 +  private void stopThreadPools() {
 +    stopThreadPool(reportThread, "reportThread");
 +    stopThreadPool(hardLinkCleanerThread, "hardLinkCleanerThread");
 +  }
 +
 +  private void stopThreadPool(ExecutorService pool, String name) {
 +    if (pool != null) {
 +      pool.shutdownNow();
 +      try {
 +        pool.awaitTermination(THREAD_POLL_WAIT_TERMINATION_TIME_S, TimeUnit.SECONDS);
 +      } catch (InterruptedException e) {
 +        Thread.currentThread().interrupt();
 +        logger.error("Unexpected interruption when waiting for {} to end", name, e);
 +      }
 +    }
 +  }
 +
 +  public DataClientProvider getClientProvider() {
 +    return dataClientProvider;
 +  }
 +
 +  @TestOnly
 +  public void setClientProvider(DataClientProvider dataClientProvider) {
 +    this.dataClientProvider = dataClientProvider;
 +  }
 +
 +  public MetaGroupMember getMetaGroupEngine() {
 +    return metaGroupEngine;
 +  }
 +
 +  public Node getThisNode() {
 +    return thisNode;
 +  }
 +
 +  public Coordinator getCoordinator() {
 +    return coordinator;
 +  }
 +
 +  public IoTDB getIotdb() {
 +    return iotdb;
 +  }
 +
 +  public RegisterManager getRegisterManager() {
 +    return registerManager;
 +  }
 +
 +  public DataGroupServiceImpls getDataGroupEngine() {
 +    return dataGroupEngine;
 +  }
 +
 +  public void setMetaGroupEngine(MetaGroupMember metaGroupEngine) {
 +    this.metaGroupEngine = metaGroupEngine;
 +  }
 +
 +  public static ClusterIoTDB getInstance() {
 +    return ClusterIoTDBHolder.INSTANCE;
 +  }
 +
 +  @Override
 +  public boolean startRaftInfoReport() {
 +    logger.info("Raft status report is enabled.");
 +    allowReport = true;
 +    if (logger.isDebugEnabled()) {
 +      return true;
 +    }
 +    return false;
 +  }
 +
 +  @Override
 +  public void stopRaftInfoReport() {
 +    logger.info("Raft status report is disabled.");
 +    allowReport = false;
 +  }
 +
 +  @Override
 +  public void enablePrintClientConnectionErrorStack() {
 +    printClientConnectionErrorStack = true;
 +  }
 +
 +  @Override
 +  public void disablePrintClientConnectionErrorStack() {
 +    printClientConnectionErrorStack = false;
 +  }
 +
 +  private static class ClusterIoTDBHolder {
 +
 +    private static final ClusterIoTDB INSTANCE = new ClusterIoTDB();
 +
 +    private ClusterIoTDBHolder() {}
 +  }
 +}
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 90d7459,0cb09cf..a927bb9
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@@ -90,8 -93,8 +95,9 @@@ import org.apache.iotdb.db.qp.physical.
  import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
  import org.apache.iotdb.db.qp.physical.sys.LogPlan;
  import org.apache.iotdb.db.service.IoTDB;
 +import org.apache.iotdb.db.service.JMXService;
  import org.apache.iotdb.db.utils.TestOnly;
+ import org.apache.iotdb.rpc.TSStatusCode;
  import org.apache.iotdb.service.rpc.thrift.EndPoint;
  import org.apache.iotdb.service.rpc.thrift.TSStatus;
  import org.apache.iotdb.tsfile.utils.Pair;