You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/12/02 07:19:16 UTC

[iotdb] branch expr updated: check partition table before creating data members remove close partition

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/expr by this push:
     new cab918b  check partition table before creating data members remove close partition
cab918b is described below

commit cab918b30c96d96a93b06dde0b5c1ff57bbaea11
Author: jt <jt...@163.com>
AuthorDate: Thu Dec 2 15:18:25 2021 +0800

    check partition table before creating data members
    remove close partition
---
 .../iotdb/cluster/ClusterFileFlushPolicy.java      |  78 --------------
 .../PartitionTableUnavailableException.java        |   4 +-
 .../cluster/server/member/DataGroupMember.java     |  29 +-----
 .../cluster/server/member/MetaGroupMember.java     |  29 ++----
 .../iotdb/cluster/server/member/RaftMember.java    |  24 +++--
 .../cluster/server/service/DataGroupEngine.java    |  12 ++-
 .../server/service/DataGroupServiceImpls.java      | 115 +++++++++++++--------
 .../cluster/server/member/MetaGroupMemberTest.java |  69 -------------
 8 files changed, 105 insertions(+), 255 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterFileFlushPolicy.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterFileFlushPolicy.java
deleted file mode 100644
index c4e29cc..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterFileFlushPolicy.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster;
-
-import org.apache.iotdb.cluster.server.member.MetaGroupMember;
-import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
-import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
-import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-public class ClusterFileFlushPolicy implements TsFileFlushPolicy {
-
-  private static final Logger logger = LoggerFactory.getLogger(ClusterFileFlushPolicy.class);
-
-  private ExecutorService closePartitionExecutor;
-  private MetaGroupMember metaGroupMember;
-
-  public ClusterFileFlushPolicy(MetaGroupMember metaGroupMember) {
-    this.metaGroupMember = metaGroupMember;
-    this.closePartitionExecutor =
-        new ThreadPoolExecutor(
-            16,
-            1024,
-            0,
-            TimeUnit.SECONDS,
-            new LinkedBlockingDeque<>(),
-            r -> {
-              Thread thread = new Thread(r);
-              thread.setName("ClusterFileFlushPolicy-" + thread.getId());
-              return thread;
-            });
-  }
-
-  @Override
-  public void apply(
-      StorageGroupProcessor storageGroupProcessor, TsFileProcessor processor, boolean isSeq) {
-    logger.info(
-        "The memtable size reaches the threshold, async flush it to tsfile: {}",
-        processor.getTsFileResource().getTsFile().getAbsolutePath());
-
-    if (processor.shouldClose()) {
-      // find the related DataGroupMember and close the processor through it
-      // we execute it in another thread to avoid deadlocks
-      closePartitionExecutor.submit(
-          () ->
-              metaGroupMember.closePartition(
-                  storageGroupProcessor.getVirtualStorageGroupId(),
-                  processor.getTimeRangeId(),
-                  isSeq));
-    }
-    // flush the memtable anyway to avoid the insertion trigger the policy again
-    processor.asyncFlush();
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/exception/PartitionTableUnavailableException.java b/cluster/src/main/java/org/apache/iotdb/cluster/exception/PartitionTableUnavailableException.java
index 5a9679e..f8b9466 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/exception/PartitionTableUnavailableException.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/exception/PartitionTableUnavailableException.java
@@ -21,8 +21,10 @@ package org.apache.iotdb.cluster.exception;
 
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 
+import org.apache.thrift.TException;
+
 /** Raised when a node receives requests before the its partition table is set up. */
-public class PartitionTableUnavailableException extends Exception {
+public class PartitionTableUnavailableException extends TException {
 
   public PartitionTableUnavailableException(Node node) {
     super(String.format("Partition table of %s is not ready, cannot serve", node));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 975bba2..104adfa 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -183,7 +183,7 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
             + "-raftId-"
             + nodes.getRaftId()
             + "";
-    allNodes = nodes;
+    setAllNodes(nodes);
     mbeanName =
         String.format(
             "%s:%s=%s%d",
@@ -639,33 +639,6 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
     return metaGroupMember;
   }
 
-  /**
-   * If the member is the leader, let all members in the group close the specified partition of a
-   * storage group, else just return false.
-   */
-  boolean closePartition(String storageGroupName, long partitionId, boolean isSeq) {
-    if (character != NodeCharacter.LEADER) {
-      return false;
-    }
-    CloseFileLog log = new CloseFileLog(storageGroupName, partitionId, isSeq);
-    VotingLog votingLog;
-    synchronized (logManager) {
-      log.setCurrLogTerm(getTerm().get());
-      log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
-
-      logManager.append(log);
-      votingLog = buildVotingLog(log);
-      votingLogList.insert(votingLog);
-      logger.info("Send the close file request of {} to other nodes", log);
-    }
-    try {
-      return appendLogInGroup(votingLog);
-    } catch (LogExecutionException e) {
-      logger.error("Cannot close partition {}#{} seq:{}", storageGroupName, partitionId, isSeq, e);
-    }
-    return false;
-  }
-
   public boolean flushFileWhenDoSnapshot(
       Map<String, List<Pair<Long, Boolean>>> storageGroupPartitions,
       List<Integer> requiredSlots,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index aba662c..4764895 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -215,7 +215,7 @@ public class MetaGroupMember extends RaftMember implements IService, MetaGroupMe
         new ClientManager(
             ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
             ClientManager.Type.MetaGroupClient));
-    allNodes = new PartitionGroup();
+    setAllNodes(new PartitionGroup());
     initPeerMap();
 
     // committed logs are applied to the state machine (the IoTDB instance) through the applier
@@ -239,22 +239,6 @@ public class MetaGroupMember extends RaftMember implements IService, MetaGroupMe
     loadPartitionTable();
   }
 
-  /**
-   * Find the DataGroupMember that manages the partition of "storageGroupName"@"partitionId", and
-   * close the partition through that member. Notice: only partitions owned by this node can be
-   * closed by the method.
-   */
-  public boolean closePartition(String storageGroupName, long partitionId, boolean isSeq) {
-    RaftNode raftNode =
-        partitionTable.routeToHeaderByTime(
-            storageGroupName, partitionId * StorageEngine.getTimePartitionInterval());
-    DataGroupMember localDataMember = getLocalDataMember(raftNode);
-    if (localDataMember == null || localDataMember.getCharacter() != NodeCharacter.LEADER) {
-      return false;
-    }
-    return localDataMember.closePartition(storageGroupName, partitionId, isSeq);
-  }
-
   DataGroupEngine getDataGroupEngine() {
     return ClusterIoTDB.getInstance().getDataGroupEngine();
   }
@@ -650,16 +634,19 @@ public class MetaGroupMember extends RaftMember implements IService, MetaGroupMe
       // register the follower, the response.getFollower() contains the node information of the
       // receiver.
       Node localNode = null;
+      Node follower = response.getFollower();
       for (Node node : allNodes) {
-        if (node.getInternalIp().equals(response.getFollower().internalIp)
-            && node.getMetaPort() == response.getFollower().getMetaPort()) {
+        if (node.getInternalIp().equals(follower.internalIp)
+            && node.getMetaPort() == follower.getMetaPort()) {
           localNode = node;
+          localNode.setDataPort(follower.dataPort);
+          localNode.setClientIp(follower.clientIp);
+          localNode.setClientPort(follower.clientPort);
         }
       }
       if (localNode == null) {
         logger.warn(
-            "Received a heartbeat response from a node that is not in the node list: {}",
-            response.getFollower());
+            "Received a heartbeat response from a node that is not in the node list: {}", follower);
         return;
       }
       registerNodeIdentifier(localNode, response.getFollowerIdentifier());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index ce8579a..0dcc4ce 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -1139,13 +1139,10 @@ public abstract class RaftMember implements RaftMemberMBean {
     Timer.Statistic.RAFT_SENDER_APPEND_LOG.calOperationCostTimeFromStart(startTime);
 
     try {
-      if (appendLogInGroup(votingLog)) {
-        return StatusUtils.OK;
-      }
+      return appendLogInGroup(votingLog);
     } catch (LogExecutionException e) {
       return handleLogExecutionException(log, IOUtils.getRootCause(e));
     }
-    return StatusUtils.TIME_OUT;
   }
 
   protected TSStatus processPlanLocallyV2(PhysicalPlan plan) {
@@ -1893,7 +1890,7 @@ public abstract class RaftMember implements RaftMemberMBean {
    *
    * @return true if the log is accepted by the quorum of the group, false otherwise
    */
-  boolean appendLogInGroup(VotingLog log) throws LogExecutionException {
+  TSStatus appendLogInGroup(VotingLog log) throws LogExecutionException {
     long totalStartTime = Statistic.LOG_DISPATCHER_TOTAL.getOperationStartTime();
     if (allNodes.size() == 1) {
       // single node group, no followers
@@ -1901,7 +1898,9 @@ public abstract class RaftMember implements RaftMemberMBean {
       logger.debug(MSG_LOG_IS_ACCEPTED, name, log);
       commitLog(log.getLog());
       Timer.Statistic.RAFT_SENDER_COMMIT_LOG.calOperationCostTimeFromStart(startTime);
-      return true;
+      return StatusUtils.OK
+          .deepCopy()
+          .setMessage(log.getLog().getCurrLogIndex() + "-" + log.getLog().getCurrLogTerm());
     }
 
     int retryTime = 0;
@@ -1910,7 +1909,7 @@ public abstract class RaftMember implements RaftMemberMBean {
       logger.debug("{}: Send log {} to other nodes, retry times: {}", name, log, retryTime);
       if (character != NodeCharacter.LEADER) {
         logger.debug("{}: Has lose leadership, so need not to send log", name);
-        return false;
+        return StatusUtils.NO_LEADER;
       }
       AppendLogResult result = sendLogToFollowers(log);
       Timer.Statistic.RAFT_SENDER_SEND_LOG_TO_FOLLOWERS.calOperationCostTimeFromStart(startTime);
@@ -1921,14 +1920,17 @@ public abstract class RaftMember implements RaftMemberMBean {
           Statistic.LOG_DISPATCHER_FROM_CREATE_TO_OK.calOperationCostTimeFromStart(
               log.getLog().getCreateTime());
           Statistic.LOG_DISPATCHER_TOTAL.calOperationCostTimeFromStart(totalStartTime);
-          return true;
+          return StatusUtils.getStatus(TSStatusCode.WEAKLY_ACCEPTED)
+              .setMessage(log.getLog().getCurrLogIndex() + "-" + log.getLog().getCurrLogTerm());
         case OK:
           startTime = Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
           logger.debug(MSG_LOG_IS_ACCEPTED, name, log);
           commitLog(log.getLog());
           Timer.Statistic.RAFT_SENDER_COMMIT_LOG.calOperationCostTimeFromStart(startTime);
           Statistic.LOG_DISPATCHER_TOTAL.calOperationCostTimeFromStart(totalStartTime);
-          return true;
+          return StatusUtils.OK
+              .deepCopy()
+              .setMessage(log.getLog().getCurrLogIndex() + "-" + log.getLog().getCurrLogTerm());
         case TIME_OUT:
           logger.debug("{}: log {} timed out, retrying...", name, log);
           try {
@@ -1938,13 +1940,13 @@ public abstract class RaftMember implements RaftMemberMBean {
           }
           retryTime++;
           if (retryTime > 5) {
-            return false;
+            return StatusUtils.TIME_OUT;
           }
           break;
         case LEADERSHIP_STALE:
           // abort the appending, the new leader will fix the local logs by catch-up
         default:
-          return false;
+          return StatusUtils.NO_LEADER;
       }
     }
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupEngine.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupEngine.java
index 58593af..74bda7e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupEngine.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupEngine.java
@@ -122,7 +122,11 @@ public class DataGroupEngine implements IService, DataGroupEngineMBean {
   }
 
   public <T> DataAsyncService getDataAsyncService(
-      RaftNode header, AsyncMethodCallback<T> resultHandler, Object request) {
+      RaftNode header, AsyncMethodCallback<T> resultHandler, Object request)
+      throws PartitionTableUnavailableException {
+    if (!metaGroupMember.isReady()) {
+      throw new PartitionTableUnavailableException(thisNode);
+    }
     return asyncServiceMap.computeIfAbsent(
         header,
         h -> {
@@ -131,7 +135,11 @@ public class DataGroupEngine implements IService, DataGroupEngineMBean {
         });
   }
 
-  public DataSyncService getDataSyncService(RaftNode header) {
+  public DataSyncService getDataSyncService(RaftNode header)
+      throws PartitionTableUnavailableException {
+    if (!metaGroupMember.isReady()) {
+      throw new PartitionTableUnavailableException(thisNode);
+    }
     return syncServiceMap.computeIfAbsent(
         header,
         h -> {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
index ddec446..6c9dc83 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
@@ -60,7 +60,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
 
   @Override
   public void sendHeartbeat(
-      HeartBeatRequest request, AsyncMethodCallback<HeartBeatResponse> resultHandler) {
+      HeartBeatRequest request, AsyncMethodCallback<HeartBeatResponse> resultHandler)
+      throws TException {
     DataAsyncService service =
         DataGroupEngine.getInstance()
             .getDataAsyncService(request.getHeader(), resultHandler, request);
@@ -70,7 +71,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
   }
 
   @Override
-  public void startElection(ElectionRequest request, AsyncMethodCallback<Long> resultHandler) {
+  public void startElection(ElectionRequest request, AsyncMethodCallback<Long> resultHandler)
+      throws TException {
     DataAsyncService service =
         DataGroupEngine.getInstance()
             .getDataAsyncService(request.getHeader(), resultHandler, request);
@@ -81,7 +83,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
 
   @Override
   public void appendEntries(
-      AppendEntriesRequest request, AsyncMethodCallback<AppendEntryResult> resultHandler) {
+      AppendEntriesRequest request, AsyncMethodCallback<AppendEntryResult> resultHandler)
+      throws TException {
     DataAsyncService service =
         DataGroupEngine.getInstance()
             .getDataAsyncService(request.getHeader(), resultHandler, request);
@@ -92,7 +95,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
 
   @Override
   public void appendEntry(
-      AppendEntryRequest request, AsyncMethodCallback<AppendEntryResult> resultHandler) {
+      AppendEntryRequest request, AsyncMethodCallback<AppendEntryResult> resultHandler)
+      throws TException {
     DataAsyncService service =
         DataGroupEngine.getInstance()
             .getDataAsyncService(request.getHeader(), resultHandler, request);
@@ -102,7 +106,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
   }
 
   @Override
-  public void sendSnapshot(SendSnapshotRequest request, AsyncMethodCallback<Void> resultHandler) {
+  public void sendSnapshot(SendSnapshotRequest request, AsyncMethodCallback<Void> resultHandler)
+      throws TException {
     DataAsyncService service =
         DataGroupEngine.getInstance()
             .getDataAsyncService(request.getHeader(), resultHandler, request);
@@ -113,7 +118,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
 
   @Override
   public void pullSnapshot(
-      PullSnapshotRequest request, AsyncMethodCallback<PullSnapshotResp> resultHandler) {
+      PullSnapshotRequest request, AsyncMethodCallback<PullSnapshotResp> resultHandler)
+      throws TException {
     DataAsyncService service =
         DataGroupEngine.getInstance()
             .getDataAsyncService(request.getHeader(), resultHandler, request);
@@ -124,7 +130,7 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
 
   @Override
   public void executeNonQueryPlan(
-      ExecutNonQueryReq request, AsyncMethodCallback<TSStatus> resultHandler) {
+      ExecutNonQueryReq request, AsyncMethodCallback<TSStatus> resultHandler) throws TException {
     DataAsyncService service =
         DataGroupEngine.getInstance()
             .getDataAsyncService(request.getHeader(), resultHandler, request);
@@ -135,7 +141,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
 
   @Override
   public void requestCommitIndex(
-      RaftNode header, AsyncMethodCallback<RequestCommitIndexResponse> resultHandler) {
+      RaftNode header, AsyncMethodCallback<RequestCommitIndexResponse> resultHandler)
+      throws TException {
     DataAsyncService service =
         DataGroupEngine.getInstance()
             .getDataAsyncService(header, resultHandler, "Request commit index");
@@ -146,7 +153,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
 
   @Override
   public void readFile(
-      String filePath, long offset, int length, AsyncMethodCallback<ByteBuffer> resultHandler) {
+      String filePath, long offset, int length, AsyncMethodCallback<ByteBuffer> resultHandler)
+      throws TException {
     try {
       resultHandler.onComplete(IOUtils.readFile(filePath, offset, length));
     } catch (IOException e) {
@@ -156,7 +164,7 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
 
   @Override
   public void querySingleSeries(
-      SingleSeriesQueryRequest request, AsyncMethodCallback<Long> resultHandler) {
+      SingleSeriesQueryRequest request, AsyncMethodCallback<Long> resultHandler) throws TException {
     DataAsyncService service =
         DataGroupEngine.getInstance()
             .getDataAsyncService(
@@ -180,7 +188,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
 
   @Override
   public void fetchSingleSeries(
-      RaftNode header, long readerId, AsyncMethodCallback<ByteBuffer> resultHandler) {
+      RaftNode header, long readerId, AsyncMethodCallback<ByteBuffer> resultHandler)
+      throws TException {
     DataAsyncService service =
         DataGroupEngine.getInstance()
             .getDataAsyncService(header, resultHandler, "Fetch reader:" + readerId);
@@ -209,7 +218,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
       RaftNode header,
       List<String> paths,
       boolean withAlias,
-      AsyncMethodCallback<GetAllPathsResult> resultHandler) {
+      AsyncMethodCallback<GetAllPathsResult> resultHandler)
+      throws TException {
     DataAsyncService service =
         DataGroupEngine.getInstance()
             .getDataAsyncService(header, resultHandler, "Find path:" + paths);
@@ -220,7 +230,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
 
   @Override
   public void endQuery(
-      RaftNode header, Node thisNode, long queryId, AsyncMethodCallback<Void> resultHandler) {
+      RaftNode header, Node thisNode, long queryId, AsyncMethodCallback<Void> resultHandler)
+      throws TException {
     DataAsyncService service =
         DataGroupEngine.getInstance().getDataAsyncService(header, resultHandler, "End query");
     if (service != null) {
@@ -230,7 +241,7 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
 
   @Override
   public void querySingleSeriesByTimestamp(
-      SingleSeriesQueryRequest request, AsyncMethodCallback<Long> resultHandler) {
+      SingleSeriesQueryRequest request, AsyncMethodCallback<Long> resultHandler) throws TException {
     DataAsyncService service =
         DataGroupEngine.getInstance()
             .getDataAsyncService(
@@ -252,7 +263,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
       RaftNode header,
       long readerId,
       List<Long> timestamps,
-      AsyncMethodCallback<ByteBuffer> resultHandler) {
+      AsyncMethodCallback<ByteBuffer> resultHandler)
+      throws TException {
     DataAsyncService service =
         DataGroupEngine.getInstance()
             .getDataAsyncService(header, resultHandler, "Fetch by timestamp:" + readerId);
@@ -263,7 +275,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
 
   @Override
   public void pullTimeSeriesSchema(
-      PullSchemaRequest request, AsyncMethodCallback<PullSchemaResp> resultHandler) {
+      PullSchemaRequest request, AsyncMethodCallback<PullSchemaResp> resultHandler)
+      throws TException {
     DataAsyncService service =
         DataGroupEngine.getInstance()
             .getDataAsyncService(request.getHeader(), resultHandler, request);
@@ -274,7 +287,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
 
   @Override
   public void pullMeasurementSchema(
-      PullSchemaRequest request, AsyncMethodCallback<PullSchemaResp> resultHandler) {
+      PullSchemaRequest request, AsyncMethodCallback<PullSchemaResp> resultHandler)
+      throws TException {
     DataAsyncService service =
         DataGroupEngine.getInstance()
             .getDataAsyncService(request.getHeader(), resultHandler, "Pull measurement schema");
@@ -285,7 +299,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
 
   @Override
   public void getAllDevices(
-      RaftNode header, List<String> paths, AsyncMethodCallback<Set<String>> resultHandler) {
+      RaftNode header, List<String> paths, AsyncMethodCallback<Set<String>> resultHandler)
+      throws TException {
     DataAsyncService service =
         DataGroupEngine.getInstance().getDataAsyncService(header, resultHandler, "Get all devices");
     if (service != null) {
@@ -295,7 +310,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
 
   @Override
   public void getDevices(
-      RaftNode header, ByteBuffer planBinary, AsyncMethodCallback<ByteBuffer> resultHandler) {
+      RaftNode header, ByteBuffer planBinary, AsyncMethodCallback<ByteBuffer> resultHandler)
+      throws TException {
     DataAsyncService service =
         DataGroupEngine.getInstance().getDataAsyncService(header, resultHandler, "get devices");
     if (service != null) {
@@ -305,10 +321,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
 
   @Override
   public void getNodeList(
-      RaftNode header,
-      String path,
-      int nodeLevel,
-      AsyncMethodCallback<List<String>> resultHandler) {
+      RaftNode header, String path, int nodeLevel, AsyncMethodCallback<List<String>> resultHandler)
+      throws TException {
     DataAsyncService service =
         DataGroupEngine.getInstance().getDataAsyncService(header, resultHandler, "Get node list");
     if (service != null) {
@@ -318,7 +332,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
 
   @Override
   public void getChildNodeInNextLevel(
-      RaftNode header, String path, AsyncMethodCallback<Set<String>> resultHandler) {
+      RaftNode header, String path, AsyncMethodCallback<Set<String>> resultHandler)
+      throws TException {
     DataAsyncService service =
         DataGroupEngine.getInstance()
             .getDataAsyncService(header, resultHandler, "Get child node in next level");
@@ -329,7 +344,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
 
   @Override
   public void getChildNodePathInNextLevel(
-      RaftNode header, String path, AsyncMethodCallback<Set<String>> resultHandler) {
+      RaftNode header, String path, AsyncMethodCallback<Set<String>> resultHandler)
+      throws TException {
     DataAsyncService service =
         DataGroupEngine.getInstance()
             .getDataAsyncService(header, resultHandler, "Get child node path in next level");
@@ -340,7 +356,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
 
   @Override
   public void getAllMeasurementSchema(
-      RaftNode header, ByteBuffer planBytes, AsyncMethodCallback<ByteBuffer> resultHandler) {
+      RaftNode header, ByteBuffer planBytes, AsyncMethodCallback<ByteBuffer> resultHandler)
+      throws TException {
     DataAsyncService service =
         DataGroupEngine.getInstance()
             .getDataAsyncService(header, resultHandler, "Get all measurement schema");
@@ -351,7 +368,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
 
   @Override
   public void getAggrResult(
-      GetAggrResultRequest request, AsyncMethodCallback<List<ByteBuffer>> resultHandler) {
+      GetAggrResultRequest request, AsyncMethodCallback<List<ByteBuffer>> resultHandler)
+      throws TException {
     DataAsyncService service =
         DataGroupEngine.getInstance()
             .getDataAsyncService(request.getHeader(), resultHandler, request);
@@ -362,9 +380,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
 
   @Override
   public void getUnregisteredTimeseries(
-      RaftNode header,
-      List<String> timeseriesList,
-      AsyncMethodCallback<List<String>> resultHandler) {
+      RaftNode header, List<String> timeseriesList, AsyncMethodCallback<List<String>> resultHandler)
+      throws TException {
     DataAsyncService service =
         DataGroupEngine.getInstance()
             .getDataAsyncService(header, resultHandler, "Check if measurements are registered");
@@ -374,7 +391,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
   }
 
   @Override
-  public void getGroupByExecutor(GroupByRequest request, AsyncMethodCallback<Long> resultHandler) {
+  public void getGroupByExecutor(GroupByRequest request, AsyncMethodCallback<Long> resultHandler)
+      throws TException {
     DataAsyncService service =
         DataGroupEngine.getInstance()
             .getDataAsyncService(request.getHeader(), resultHandler, request);
@@ -389,7 +407,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
       long executorId,
       long startTime,
       long endTime,
-      AsyncMethodCallback<List<ByteBuffer>> resultHandler) {
+      AsyncMethodCallback<List<ByteBuffer>> resultHandler)
+      throws TException {
     DataAsyncService service =
         DataGroupEngine.getInstance().getDataAsyncService(header, resultHandler, "Fetch group by");
     if (service != null) {
@@ -399,7 +418,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
 
   @Override
   public void previousFill(
-      PreviousFillRequest request, AsyncMethodCallback<ByteBuffer> resultHandler) {
+      PreviousFillRequest request, AsyncMethodCallback<ByteBuffer> resultHandler)
+      throws TException {
     DataAsyncService service =
         DataGroupEngine.getInstance()
             .getDataAsyncService(request.getHeader(), resultHandler, request);
@@ -410,7 +430,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
 
   @Override
   public void matchTerm(
-      long index, long term, RaftNode header, AsyncMethodCallback<Boolean> resultHandler) {
+      long index, long term, RaftNode header, AsyncMethodCallback<Boolean> resultHandler)
+      throws TException {
     DataAsyncService service =
         DataGroupEngine.getInstance().getDataAsyncService(header, resultHandler, "Match term");
     if (service != null) {
@@ -419,7 +440,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
   }
 
   @Override
-  public void last(LastQueryRequest request, AsyncMethodCallback<ByteBuffer> resultHandler) {
+  public void last(LastQueryRequest request, AsyncMethodCallback<ByteBuffer> resultHandler)
+      throws TException {
     DataAsyncService service =
         DataGroupEngine.getInstance()
             .getDataAsyncService(request.getHeader(), resultHandler, "last");
@@ -433,7 +455,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
       RaftNode header,
       List<String> pathsToQuery,
       int level,
-      AsyncMethodCallback<Integer> resultHandler) {
+      AsyncMethodCallback<Integer> resultHandler)
+      throws TException {
     DataAsyncService service =
         DataGroupEngine.getInstance().getDataAsyncService(header, resultHandler, "count path");
     if (service != null) {
@@ -454,7 +477,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
 
   @Override
   public void onSnapshotApplied(
-      RaftNode header, List<Integer> slots, AsyncMethodCallback<Boolean> resultHandler) {
+      RaftNode header, List<Integer> slots, AsyncMethodCallback<Boolean> resultHandler)
+      throws TException {
     DataAsyncService service =
         DataGroupEngine.getInstance()
             .getDataAsyncService(header, resultHandler, "Snapshot applied");
@@ -630,7 +654,7 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
   }
 
   @Override
-  public boolean onSnapshotApplied(RaftNode header, List<Integer> slots) {
+  public boolean onSnapshotApplied(RaftNode header, List<Integer> slots) throws TException {
     return DataGroupEngine.getInstance()
         .getDataSyncService(header)
         .onSnapshotApplied(header, slots);
@@ -644,14 +668,14 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
   }
 
   @Override
-  public HeartBeatResponse sendHeartbeat(HeartBeatRequest request) {
+  public HeartBeatResponse sendHeartbeat(HeartBeatRequest request) throws TException {
     return DataGroupEngine.getInstance()
         .getDataSyncService(request.getHeader())
         .sendHeartbeat(request);
   }
 
   @Override
-  public long startElection(ElectionRequest request) {
+  public long startElection(ElectionRequest request) throws TException {
     return DataGroupEngine.getInstance()
         .getDataSyncService(request.getHeader())
         .startElection(request);
@@ -698,7 +722,7 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
   }
 
   @Override
-  public boolean matchTerm(long index, long term, RaftNode header) {
+  public boolean matchTerm(long index, long term, RaftNode header) throws TException {
     return DataGroupEngine.getInstance().getDataSyncService(header).matchTerm(index, term, header);
   }
 
@@ -752,7 +776,7 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
   }
 
   @Override
-  public void acknowledgeAppendEntry(AppendEntryResult ack) {
+  public void acknowledgeAppendEntry(AppendEntryResult ack) throws TException {
     DataGroupEngine.getInstance().getDataSyncService(ack.getHeader()).acknowledgeAppendEntry(ack);
   }
 
@@ -760,15 +784,16 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
   public void appendEntryIndirect(
       AppendEntryRequest request,
       List<Node> subReceivers,
-      AsyncMethodCallback<AppendEntryResult> resultHandler) {
+      AsyncMethodCallback<AppendEntryResult> resultHandler)
+      throws TException {
     DataGroupEngine.getInstance()
         .getDataAsyncService(request.getHeader(), resultHandler, request)
         .appendEntryIndirect(request, subReceivers, resultHandler);
   }
 
   @Override
-  public void acknowledgeAppendEntry(
-      AppendEntryResult ack, AsyncMethodCallback<Void> resultHandler) {
+  public void acknowledgeAppendEntry(AppendEntryResult ack, AsyncMethodCallback<Void> resultHandler)
+      throws TException {
     DataGroupEngine.getInstance()
         .getDataAsyncService(ack.getHeader(), resultHandler, ack)
         .acknowledgeAppendEntry(ack, resultHandler);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 952ac66..b9d1efa 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -592,75 +592,6 @@ public class MetaGroupMemberTest extends BaseMember {
   }
 
   @Test
-  public void testClosePartition()
-      throws QueryProcessException, StorageEngineException, StorageGroupNotSetException,
-          IllegalPathException {
-    System.out.println("Start testClosePartition()");
-    // the operation is accepted
-    dummyResponse.set(Response.RESPONSE_AGREE);
-    InsertRowPlan insertPlan = new InsertRowPlan();
-    insertPlan.setDeviceId(new PartialPath(TestUtils.getTestSg(0)));
-    insertPlan.setNeedInferType(true);
-    insertPlan.setMeasurements(new String[] {TestUtils.getTestMeasurement(0)});
-    insertPlan.setDataTypes(new TSDataType[insertPlan.getMeasurements().length]);
-    for (int i = 0; i < 10; i++) {
-      insertPlan.setTime(i);
-      insertPlan.setValues(new Object[] {String.valueOf(i)});
-      insertPlan.setMeasurementMNodes(
-          new IMeasurementMNode[] {TestUtils.getTestMeasurementMNode(0)});
-      PlanExecutor planExecutor = new PlanExecutor();
-      planExecutor.processNonQuery(insertPlan);
-    }
-
-    ExecutorService testThreadPool = Executors.newFixedThreadPool(4);
-    assertTrue(testMetaMember.closePartition(TestUtils.getTestSg(0), 0, true));
-
-    StorageGroupProcessor processor =
-        StorageEngine.getInstance().getProcessor(new PartialPath(TestUtils.getTestSg(0)));
-    assertTrue(processor.getWorkSequenceTsFileProcessors().isEmpty());
-
-    int prevTimeout = ClusterConstant.getConnectionTimeoutInMS();
-    ClusterConstant.setConnectionTimeoutInMS(100);
-    try {
-      System.out.println("Create the first file");
-      for (int i = 20; i < 30; i++) {
-        insertPlan.setTime(i);
-        insertPlan.setValues(new Object[] {String.valueOf(i)});
-        PlanExecutor planExecutor = new PlanExecutor();
-        planExecutor.processNonQuery(insertPlan);
-      }
-      // the net work is down
-      dummyResponse.set(Long.MIN_VALUE);
-
-      System.out.println("Close the first file");
-      assertFalse(testMetaMember.closePartition(TestUtils.getTestSg(0), 0, true));
-      assertFalse(processor.getWorkSequenceTsFileProcessors().isEmpty());
-
-      // network resume in 100ms
-      dummyResponse.set(Response.RESPONSE_AGREE);
-      assertTrue(testMetaMember.closePartition(TestUtils.getTestSg(0), 0, true));
-      assertTrue(processor.getWorkSequenceTsFileProcessors().isEmpty());
-
-      System.out.println("Create the second file");
-      for (int i = 30; i < 40; i++) {
-        insertPlan.setTime(i);
-        insertPlan.setValues(new Object[] {String.valueOf(i)});
-        PlanExecutor planExecutor = new PlanExecutor();
-        planExecutor.processNonQuery(insertPlan);
-      }
-
-      // indicating the leader is stale
-      System.out.println("Close the second file");
-      dummyResponse.set(100);
-      assertFalse(testMetaMember.closePartition(TestUtils.getTestSg(0), 0, true));
-      assertFalse(processor.getWorkSequenceTsFileProcessors().isEmpty());
-    } finally {
-      ClusterConstant.setConnectionTimeoutInMS(prevTimeout);
-    }
-    testThreadPool.shutdownNow();
-  }
-
-  @Test
   public void testAddNode() {
     System.out.println("Start testAddNode()");
     Node newNode = TestUtils.getNode(11);