You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/01/05 04:36:46 UTC

[iotdb] 01/01: replace EnvironmentUtil in the cluster module with the one in the server module reduce deregister timeout and disable WAL in tests to accelerate tests fix query context leak in some tests

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch remove_cluster_environmentutil
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c8ae2669f11bb3aed2d4731f52244929712b51e6
Author: jt <jt...@163.com>
AuthorDate: Tue Jan 5 12:33:34 2021 +0800

    replace EnvironmentUtil in the cluster module with the one in the server module
    reduce deregister timeout and disable WAL in tests to accelerate tests
    fix query context leak in some tests
---
 cluster/pom.xml                                    |   7 +
 .../iotdb/cluster/common/EnvironmentUtils.java     | 218 ---------------------
 .../org/apache/iotdb/cluster/common/IoTDBTest.java |   4 +-
 .../cluster/integration/BaseSingleNodeTest.java    |   2 +-
 .../iotdb/cluster/log/CommitLogCallbackTest.java   |   2 +-
 .../iotdb/cluster/log/CommitLogTaskTest.java       |   2 +-
 .../iotdb/cluster/log/LogDispatcherTest.java       |   2 +-
 .../log/applier/AsyncDataLogApplierTest.java       |   2 +-
 .../cluster/log/applier/DataLogApplierTest.java    |   1 -
 .../iotdb/cluster/log/catchup/CatchUpTaskTest.java |   2 +-
 .../cluster/log/catchup/LogCatchUpTaskTest.java    |   2 +-
 .../log/catchup/SnapshotCatchUpTaskTest.java       |   2 +-
 .../cluster/log/snapshot/DataSnapshotTest.java     |   2 +-
 .../iotdb/cluster/partition/SlotManagerTest.java   |   2 +-
 .../cluster/partition/SlotPartitionTableTest.java  |   2 +-
 .../query/ClusterAggregateExecutorTest.java        |  48 +++--
 .../query/ClusterDataQueryExecutorTest.java        |  22 ++-
 .../cluster/query/ClusterFillExecutorTest.java     |  72 ++++---
 .../cluster/query/ClusterPlanExecutorTest.java     |   8 +-
 .../cluster/query/ClusterQueryRouterTest.java      | 216 +++++++++++---------
 .../ClusterGroupByNoVFilterDataSetTest.java        |  64 +++---
 .../groupby/ClusterGroupByVFilterDataSetTest.java  |  74 +++----
 .../query/groupby/MergeGroupByExecutorTest.java    |  83 ++++----
 .../query/groupby/RemoteGroupByExecutorTest.java   | 146 +++++++-------
 .../query/reader/ClusterTimeGeneratorTest.java     |  34 ++--
 .../cluster/query/reader/DatasourceInfoTest.java   |  16 +-
 .../reader/RemoteSeriesReaderByTimestampTest.java  | 122 ++++++------
 .../query/reader/RemoteSimpleSeriesReaderTest.java | 136 +++++++------
 .../caller/AppendGroupEntryHandlerTest.java        |   2 +-
 .../caller/AppendNodeEntryHandlerTest.java         |   2 +-
 .../handlers/caller/ElectionHandlerTest.java       |   2 +-
 .../handlers/caller/HeartbeatHandlerTest.java      |   2 +-
 .../handlers/caller/LogCatchUpHandlerTest.java     |   2 +-
 .../server/heartbeat/HeartbeatThreadTest.java      |   2 +-
 .../cluster/server/member/DataGroupMemberTest.java | 124 ++++++------
 .../iotdb/cluster/server/member/MemberTest.java    |  14 +-
 .../iotdb/db/query/control/QueryFileManager.java   |   7 +
 .../apache/iotdb/db/service/RegisterManager.java   |  11 +-
 38 files changed, 693 insertions(+), 768 deletions(-)

diff --git a/cluster/pom.xml b/cluster/pom.xml
index 876afa4..5529690 100644
--- a/cluster/pom.xml
+++ b/cluster/pom.xml
@@ -79,6 +79,13 @@
         </dependency>
         <dependency>
             <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-server</artifactId>
+            <version>0.12.0-SNAPSHOT</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
             <artifactId>iotdb-session</artifactId>
             <version>0.12.0-SNAPSHOT</version>
             <scope>compile</scope>
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/EnvironmentUtils.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/EnvironmentUtils.java
deleted file mode 100644
index ea51381..0000000
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/EnvironmentUtils.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.cluster.common;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.DirectoryNotEmptyException;
-import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
-import java.nio.file.Paths;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import org.apache.iotdb.db.auth.AuthException;
-import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
-import org.apache.iotdb.db.auth.authorizer.LocalFileAuthorizer;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.conf.directories.DirectoryManager;
-import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
-import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
-import org.apache.iotdb.db.engine.flush.FlushManager;
-import org.apache.iotdb.db.engine.merge.manage.MergeManager;
-import org.apache.iotdb.db.exception.StartupException;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.monitor.StatMonitor;
-import org.apache.iotdb.db.query.control.FileReaderManager;
-import org.apache.iotdb.db.query.control.QueryResourceManager;
-import org.apache.iotdb.db.service.IoTDB;
-import org.apache.iotdb.db.service.MetricsService;
-import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
-import org.junit.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * <p>
- * This class is used for cleaning test environment in unit test and integration test
- * </p>
- */
-public class EnvironmentUtils {
-
-  private static final String OUTPUT_DATA_DIR = "target/";
-
-  private EnvironmentUtils() {
-    // util class
-  }
-
-  private static final Logger logger = LoggerFactory.getLogger(EnvironmentUtils.class);
-
-  private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-  private static DirectoryManager directoryManager = DirectoryManager.getInstance();
-
-  private static long testQueryId = 1;
-
-  private static long oldTsFileThreshold = config.getTsFileSizeThreshold();
-
-  private static long oldGroupSizeInByte = config.getMemtableSizeThreshold();
-
-  public static void cleanEnv() throws IOException, StorageEngineException {
-    System.out.println("Cleaning environment...");
-    QueryResourceManager.getInstance().endQuery(testQueryId);
-
-    // clear opened file streams
-    FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
-
-    // clean storage group manager
-    if (!StorageEngine.getInstance().deleteAll()) {
-      logger.error("Can't close the storage group manager in EnvironmentUtils");
-      Assert.fail();
-    }
-    StorageEngine.getInstance().reset();
-    IoTDBDescriptor.getInstance().getConfig().setReadOnly(false);
-
-    StatMonitor.getInstance().close();
-    // clean wal
-    MultiFileLogNodeManager.getInstance().stop();
-    // clean cache
-    if (config.isMetaDataCacheEnable()) {
-      ChunkMetadataCache.getInstance().clear();
-      TimeSeriesMetadataCache.getInstance().clear();
-    }
-    // close metadata
-    IoTDB.metaManager.clear();
-
-    MergeManager.getINSTANCE().stop();
-    MetricsService.getInstance().stop();
-    // delete all directory
-    cleanAllDir();
-
-    config.setTsFileSizeThreshold(oldTsFileThreshold);
-    config.setMemtableSizeThreshold(oldGroupSizeInByte);
-  }
-
-  public static void cleanAllDir() throws IOException {
-    // delete sequential files
-    for (String path : directoryManager.getAllSequenceFileFolders()) {
-      cleanDir(path);
-    }
-    // delete unsequence files
-    for (String path : directoryManager.getAllUnSequenceFileFolders()) {
-      cleanDir(path);
-    }
-    // delete system info
-    cleanDir(config.getSystemDir());
-    // delete wal
-    cleanDir(config.getWalDir());
-    // delete query
-    cleanDir(config.getQueryDir());
-    cleanDir("remote");
-    // delete data files
-    for (String dataDir : config.getDataDirs()) {
-      cleanDir(dataDir);
-    }
-  }
-
-  public static void cleanDir(String dir) throws IOException {
-    deleteRecursively(new File(dir));
-  }
-
-  public static void deleteRecursively(File file) throws IOException {
-    if (file.exists()) {
-      if (file.isDirectory()) {
-        File[] files = file.listFiles();
-        if (files != null) {
-          for (File child : files) {
-            deleteRecursively(child);
-          }
-        }
-      }
-      try {
-        Files.delete(Paths.get(file.getAbsolutePath()));
-      } catch (DirectoryNotEmptyException e) {
-        deleteRecursively(file);
-      } catch (NoSuchFileException e) {
-        // ignore;
-      }
-    }
-  }
-
-
-  /**
-   * disable the system monitor</br> this function should be called before all code in the setup
-   */
-  public static void closeStatMonitor() {
-    config.setEnableStatMonitor(false);
-  }
-
-  /**
-   * disable memory control</br> this function should be called before all code in the setup
-   */
-  public static void envSetUp() throws StartupException {
-    IoTDB.metaManager.init();
-
-    createAllDir();
-    // disable the system monitor
-    config.setEnableStatMonitor(false);
-    IAuthorizer authorizer;
-    try {
-      authorizer = LocalFileAuthorizer.getInstance();
-    } catch (AuthException e) {
-      throw new StartupException(e);
-    }
-    try {
-      authorizer.reset();
-    } catch (AuthException e) {
-      throw new StartupException(e);
-    }
-    StorageEngine.getInstance().reset();
-    MultiFileLogNodeManager.getInstance().start();
-    FlushManager.getInstance().start();
-    MergeManager.getINSTANCE().start();
-    testQueryId = QueryResourceManager.getInstance().assignQueryId(true, 1024, -1);
-  }
-
-  private static void createAllDir() {
-    // create sequential files
-    for (String path : directoryManager.getAllSequenceFileFolders()) {
-      createDir(path);
-    }
-    // create unsequential files
-    for (String path : directoryManager.getAllUnSequenceFileFolders()) {
-      createDir(path);
-    }
-    // create storage group
-    createDir(config.getSystemDir());
-    // create wal
-    createDir(config.getWalDir());
-    // create query
-    createDir(config.getQueryDir());
-    createDir(OUTPUT_DATA_DIR);
-    // create data
-    for (String dataDir : config.getDataDirs()) {
-      createDir(dataDir);
-    }
-  }
-
-  private static void createDir(String dir) {
-    File file = new File(dir);
-    file.mkdirs();
-  }
-}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/IoTDBTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/IoTDBTest.java
index 1321c28..05cc79a 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/IoTDBTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/IoTDBTest.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
@@ -56,7 +57,6 @@ import org.junit.Before;
  */
 public abstract class IoTDBTest {
 
-  private static IoTDB daemon = IoTDB.getInstance();
   private PlanExecutor planExecutor;
   private boolean prevEnableAutoSchema;
   private boolean prevUseAsyncServer;
@@ -68,7 +68,6 @@ public abstract class IoTDBTest {
     prevEnableAutoSchema = IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
     IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(false);
     EnvironmentUtils.closeStatMonitor();
-    daemon.active();
     EnvironmentUtils.envSetUp();
     planExecutor = new PlanExecutor();
     prepareSchema();
@@ -77,7 +76,6 @@ public abstract class IoTDBTest {
 
   @After
   public void tearDown() throws IOException, StorageEngineException {
-    daemon.stop();
     EnvironmentUtils.cleanEnv();
     IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(prevEnableAutoSchema);
     ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(prevUseAsyncServer);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java
index 8b30b8e..a1e48da 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.cluster.integration;
 
 import java.util.Collections;
 import java.util.List;
-import org.apache.iotdb.cluster.common.EnvironmentUtils;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.server.MetaClusterServer;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/CommitLogCallbackTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/CommitLogCallbackTest.java
index 0a4e5bd..7ec98b6 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/CommitLogCallbackTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/CommitLogCallbackTest.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.cluster.log;
 import static org.junit.Assert.*;
 
 import java.io.IOException;
-import org.apache.iotdb.cluster.common.EnvironmentUtils;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.cluster.common.TestMetaGroupMember;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.junit.After;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/CommitLogTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/CommitLogTaskTest.java
index 4c38b5a..b1003d3 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/CommitLogTaskTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/CommitLogTaskTest.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.cluster.log;
 import static org.junit.Assert.*;
 
 import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.iotdb.cluster.common.EnvironmentUtils;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.cluster.common.TestLogManager;
 import org.apache.iotdb.cluster.common.TestUtils;
 import org.apache.iotdb.cluster.log.manage.RaftLogManager;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java
index a1c711a..5b1b86a 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java
@@ -27,7 +27,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.iotdb.cluster.common.EnvironmentUtils;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.cluster.common.TestAsyncClient;
 import org.apache.iotdb.cluster.common.TestMetaGroupMember;
 import org.apache.iotdb.cluster.common.TestSyncClient;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplierTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplierTest.java
index 762294c..b200cfa 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplierTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplierTest.java
@@ -26,7 +26,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListSet;
-import org.apache.iotdb.cluster.common.EnvironmentUtils;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.cluster.common.TestUtils;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogApplier;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
index 8ea425b..c55683f 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
@@ -46,7 +46,6 @@ import org.apache.iotdb.cluster.metadata.CMManager;
 import org.apache.iotdb.cluster.metadata.MetaPuller;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
-import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
 import org.apache.iotdb.cluster.rpc.thrift.GetAllPathsResult;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
index 1b06435..1a95f68 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
@@ -25,7 +25,7 @@ import static org.junit.Assert.assertTrue;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.iotdb.cluster.common.EnvironmentUtils;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.cluster.common.TestAsyncClient;
 import org.apache.iotdb.cluster.common.TestMetaGroupMember;
 import org.apache.iotdb.cluster.common.TestSyncClient;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTaskTest.java
index a751923..bcce6aa 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTaskTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTaskTest.java
@@ -26,7 +26,7 @@ import static org.junit.Assert.fail;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.iotdb.cluster.common.EnvironmentUtils;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.cluster.common.TestAsyncClient;
 import org.apache.iotdb.cluster.common.TestMetaGroupMember;
 import org.apache.iotdb.cluster.common.TestSyncClient;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTaskTest.java
index ade37ff..595b9e5 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTaskTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTaskTest.java
@@ -26,7 +26,7 @@ import static org.junit.Assert.fail;
 
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.iotdb.cluster.common.EnvironmentUtils;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.cluster.common.TestAsyncClient;
 import org.apache.iotdb.cluster.common.TestLog;
 import org.apache.iotdb.cluster.common.TestMetaGroupMember;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java
index 0056d18..d2be286 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java
@@ -25,7 +25,7 @@ import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import org.apache.iotdb.cluster.client.async.AsyncDataClient;
 import org.apache.iotdb.cluster.client.sync.SyncDataClient;
-import org.apache.iotdb.cluster.common.EnvironmentUtils;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.cluster.common.TestDataGroupMember;
 import org.apache.iotdb.cluster.common.TestLogManager;
 import org.apache.iotdb.cluster.common.TestMetaGroupMember;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotManagerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotManagerTest.java
index bf5c6d3..218db7f 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotManagerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotManagerTest.java
@@ -28,7 +28,7 @@ import static org.junit.Assert.assertNull;
 
 import java.io.File;
 import java.io.IOException;
-import org.apache.iotdb.cluster.common.EnvironmentUtils;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.cluster.common.TestUtils;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.partition.slot.SlotManager;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java
index 9e2fd6d..9ca03b3 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java
@@ -37,7 +37,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 import java.util.stream.IntStream;
-import org.apache.iotdb.cluster.common.EnvironmentUtils;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.partition.slot.SlotNodeRemovalResult;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterAggregateExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterAggregateExecutorTest.java
index 929eaa7..2eedf2e 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterAggregateExecutorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterAggregateExecutorTest.java
@@ -73,17 +73,21 @@ public class ClusterAggregateExecutorTest extends BaseQueryTest {
 
     QueryContext context =
         new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
-    executor = new ClusterAggregateExecutor(plan, testMetaMember);
-    QueryDataSet queryDataSet = executor.executeWithoutValueFilter(context, plan);
-    assertTrue(queryDataSet.hasNext());
-    RowRecord record = queryDataSet.next();
-    List<Field> fields = record.getFields();
-    assertEquals(5, fields.size());
-    Object[] answers = new Object[] {0.0, 19.0, 9.5, 20.0, 190.0};
-    for (int i = 0; i < 5; i++) {
-      assertEquals((double)answers[i], Double.parseDouble(fields.get(i).toString()), 0.00001);
+    try {
+      executor = new ClusterAggregateExecutor(plan, testMetaMember);
+      QueryDataSet queryDataSet = executor.executeWithoutValueFilter(context, plan);
+      assertTrue(queryDataSet.hasNext());
+      RowRecord record = queryDataSet.next();
+      List<Field> fields = record.getFields();
+      assertEquals(5, fields.size());
+      Object[] answers = new Object[] {0.0, 19.0, 9.5, 20.0, 190.0};
+      for (int i = 0; i < 5; i++) {
+        assertEquals((double)answers[i], Double.parseDouble(fields.get(i).toString()), 0.00001);
+      }
+      assertFalse(queryDataSet.hasNext());
+    } finally {
+      QueryResourceManager.getInstance().endQuery(context.getQueryId());
     }
-    assertFalse(queryDataSet.hasNext());
   }
 
   @Test
@@ -114,16 +118,20 @@ public class ClusterAggregateExecutorTest extends BaseQueryTest {
 
     QueryContext context =
         new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
-    executor = new ClusterAggregateExecutor(plan, testMetaMember);
-    QueryDataSet queryDataSet = executor.executeWithValueFilter(context, plan);
-    assertTrue(queryDataSet.hasNext());
-    RowRecord record = queryDataSet.next();
-    List<Field> fields = record.getFields();
-    assertEquals(5, fields.size());
-    Object[] answers = new Object[] {3.0, 8.0, 5.5, 6.0, 33.0};
-    for (int i = 0; i < 5; i++) {
-      assertEquals((double)answers[i], Double.parseDouble(fields.get(i).toString()), 0.00001);
+    try {
+      executor = new ClusterAggregateExecutor(plan, testMetaMember);
+      QueryDataSet queryDataSet = executor.executeWithValueFilter(context, plan);
+      assertTrue(queryDataSet.hasNext());
+      RowRecord record = queryDataSet.next();
+      List<Field> fields = record.getFields();
+      assertEquals(5, fields.size());
+      Object[] answers = new Object[] {3.0, 8.0, 5.5, 6.0, 33.0};
+      for (int i = 0; i < 5; i++) {
+        assertEquals((double)answers[i], Double.parseDouble(fields.get(i).toString()), 0.00001);
+      }
+      assertFalse(queryDataSet.hasNext());
+    } finally {
+      QueryResourceManager.getInstance().endQuery(context.getQueryId());
     }
-    assertFalse(queryDataSet.hasNext());
   }
 }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java
index 2258cc7..ac52d51 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterDataQueryExecutorTest.java
@@ -43,9 +43,14 @@ public class ClusterDataQueryExecutorTest extends BaseQueryTest {
     plan.setDeduplicatedPaths(pathList);
     plan.setDeduplicatedDataTypes(dataTypes);
     queryExecutor = new ClusterDataQueryExecutor(plan, testMetaMember);
-    QueryDataSet dataSet = queryExecutor.executeWithoutValueFilter(
-        new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1)));
-    checkSequentialDataset(dataSet, 0, 20);
+    RemoteQueryContext context = new RemoteQueryContext(
+        QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
+    try {
+      QueryDataSet dataSet = queryExecutor.executeWithoutValueFilter(context);
+      checkSequentialDataset(dataSet, 0, 20);
+    } finally {
+      QueryResourceManager.getInstance().endQuery(context.getQueryId());
+    }
   }
 
   @Test
@@ -58,9 +63,14 @@ public class ClusterDataQueryExecutorTest extends BaseQueryTest {
     plan.setDeduplicatedDataTypes(dataTypes);
     plan.setExpression(expression);
     queryExecutor = new ClusterDataQueryExecutor(plan, testMetaMember);
-    QueryDataSet dataSet = queryExecutor.executeWithValueFilter(
-        new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1)));
-    checkSequentialDataset(dataSet, 5, 15);
+    RemoteQueryContext context = new RemoteQueryContext(
+        QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
+    try {
+      QueryDataSet dataSet = queryExecutor.executeWithValueFilter(context);
+      checkSequentialDataset(dataSet, 5, 15);
+    } finally {
+      QueryResourceManager.getInstance().endQuery(context.getQueryId());
+    }
   }
 
 }
\ No newline at end of file
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterFillExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterFillExecutorTest.java
index 170514d..f38bc9e 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterFillExecutorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterFillExecutorTest.java
@@ -59,22 +59,26 @@ public class ClusterFillExecutorTest extends BaseQueryTest {
     QueryContext context =
         new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
 
-    ClusterFillExecutor fillExecutor;
-    QueryDataSet queryDataSet;
-    long[] queryTimes = new long[] {-1, 0, 5, 10, 20};
-    Object[][] answers = new Object[][]{
-        new Object[]{null},
-        new Object[]{0.0},
-        new Object[]{0.0},
-        new Object[]{10.0},
-        new Object[]{10.0},
-    };
-    for (int i = 0; i < queryTimes.length; i++) {
-      fillExecutor = new ClusterFillExecutor(plan.getDeduplicatedPaths(),
-          plan.getDeduplicatedDataTypes(), queryTimes[i], plan.getFillType(), testMetaMember);
-      queryDataSet = fillExecutor.execute(context, plan);
-      checkDoubleDataset(queryDataSet, answers[i]);
-      assertFalse(queryDataSet.hasNext());
+    try {
+      ClusterFillExecutor fillExecutor;
+      QueryDataSet queryDataSet;
+      long[] queryTimes = new long[] {-1, 0, 5, 10, 20};
+      Object[][] answers = new Object[][]{
+          new Object[]{null},
+          new Object[]{0.0},
+          new Object[]{0.0},
+          new Object[]{10.0},
+          new Object[]{10.0},
+      };
+      for (int i = 0; i < queryTimes.length; i++) {
+        fillExecutor = new ClusterFillExecutor(plan.getDeduplicatedPaths(),
+            plan.getDeduplicatedDataTypes(), queryTimes[i], plan.getFillType(), testMetaMember);
+        queryDataSet = fillExecutor.execute(context, plan);
+        checkDoubleDataset(queryDataSet, answers[i]);
+        assertFalse(queryDataSet.hasNext());
+      }
+    } finally {
+      QueryResourceManager.getInstance().endQuery(context.getQueryId());
     }
   }
 
@@ -94,22 +98,26 @@ public class ClusterFillExecutorTest extends BaseQueryTest {
     QueryContext context =
         new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
 
-    ClusterFillExecutor fillExecutor;
-    QueryDataSet queryDataSet;
-    long[] queryTimes = new long[] {-1, 0, 5, 10, 20};
-    Object[][] answers = new Object[][]{
-        new Object[]{null},
-        new Object[]{0.0},
-        new Object[]{5.0},
-        new Object[]{10.0},
-        new Object[]{null},
-    };
-    for (int i = 0; i < queryTimes.length; i++) {
-      fillExecutor = new ClusterFillExecutor(plan.getDeduplicatedPaths(),
-          plan.getDeduplicatedDataTypes(), queryTimes[i], plan.getFillType(), testMetaMember);
-      queryDataSet = fillExecutor.execute(context, plan);
-      checkDoubleDataset(queryDataSet, answers[i]);
-      assertFalse(queryDataSet.hasNext());
+    try {
+      ClusterFillExecutor fillExecutor;
+      QueryDataSet queryDataSet;
+      long[] queryTimes = new long[] {-1, 0, 5, 10, 20};
+      Object[][] answers = new Object[][]{
+          new Object[]{null},
+          new Object[]{0.0},
+          new Object[]{5.0},
+          new Object[]{10.0},
+          new Object[]{null},
+      };
+      for (int i = 0; i < queryTimes.length; i++) {
+        fillExecutor = new ClusterFillExecutor(plan.getDeduplicatedPaths(),
+            plan.getDeduplicatedDataTypes(), queryTimes[i], plan.getFillType(), testMetaMember);
+        queryDataSet = fillExecutor.execute(context, plan);
+        checkDoubleDataset(queryDataSet, answers[i]);
+        assertFalse(queryDataSet.hasNext());
+      }
+    } finally {
+      QueryResourceManager.getInstance().endQuery(context.getQueryId());
     }
   }
 }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterPlanExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterPlanExecutorTest.java
index bc88502..67367cf 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterPlanExecutorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterPlanExecutorTest.java
@@ -59,8 +59,12 @@ public class ClusterPlanExecutorTest extends BaseQueryTest {
     QueryContext context =
         new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
 
-    QueryDataSet dataSet = queryExecutor.processQuery(queryPlan, context);
-    checkSequentialDataset(dataSet, 0, 20);
+    try {
+      QueryDataSet dataSet = queryExecutor.processQuery(queryPlan, context);
+      checkSequentialDataset(dataSet, 0, 20);
+    } finally {
+      QueryResourceManager.getInstance().endQuery(context.getQueryId());
+    }
   }
 
   @Test
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryRouterTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryRouterTest.java
index 0574570..2ddf9cc 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryRouterTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryRouterTest.java
@@ -73,8 +73,12 @@ public class ClusterQueryRouterTest extends BaseQueryTest {
     QueryContext context =
         new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
 
-    QueryDataSet dataSet = clusterQueryRouter.rawDataQuery(queryPlan, context);
-    checkSequentialDataset(dataSet, 0, 20);
+    try {
+      QueryDataSet dataSet = clusterQueryRouter.rawDataQuery(queryPlan, context);
+      checkSequentialDataset(dataSet, 0, 20);
+    } finally {
+      QueryResourceManager.getInstance().endQuery(context.getQueryId());
+    }
   }
 
   @Test
@@ -100,9 +104,13 @@ public class ClusterQueryRouterTest extends BaseQueryTest {
 
     QueryContext context =
         new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
-    QueryDataSet queryDataSet = clusterQueryRouter.aggregate(plan, context);
-    checkDoubleDataset(queryDataSet, new Object[]{0.0, 19.0, 9.5, 20.0, 190.0});
-    assertFalse(queryDataSet.hasNext());
+    try {
+      QueryDataSet queryDataSet = clusterQueryRouter.aggregate(plan, context);
+      checkDoubleDataset(queryDataSet, new Object[]{0.0, 19.0, 9.5, 20.0, 190.0});
+      assertFalse(queryDataSet.hasNext());
+    } finally {
+      QueryResourceManager.getInstance().endQuery(context.getQueryId());
+    }
   }
 
   @Test
@@ -121,20 +129,24 @@ public class ClusterQueryRouterTest extends BaseQueryTest {
     QueryContext context =
         new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
 
-    QueryDataSet queryDataSet;
-    long[] queryTimes = new long[] {-1, 0, 5, 10, 20};
-    Object[][] answers = new Object[][]{
-        new Object[]{null},
-        new Object[]{0.0},
-        new Object[]{0.0},
-        new Object[]{10.0},
-        new Object[]{10.0},
-    };
-    for (int i = 0; i < queryTimes.length; i++) {
-      plan.setQueryTime(queryTimes[i]);
-      queryDataSet = clusterQueryRouter.fill(plan, context);
-      checkDoubleDataset(queryDataSet, answers[i]);
-      assertFalse(queryDataSet.hasNext());
+    try {
+      QueryDataSet queryDataSet;
+      long[] queryTimes = new long[] {-1, 0, 5, 10, 20};
+      Object[][] answers = new Object[][]{
+          new Object[]{null},
+          new Object[]{0.0},
+          new Object[]{0.0},
+          new Object[]{10.0},
+          new Object[]{10.0},
+      };
+      for (int i = 0; i < queryTimes.length; i++) {
+        plan.setQueryTime(queryTimes[i]);
+        queryDataSet = clusterQueryRouter.fill(plan, context);
+        checkDoubleDataset(queryDataSet, answers[i]);
+        assertFalse(queryDataSet.hasNext());
+      }
+    } finally {
+      QueryResourceManager.getInstance().endQuery(context.getQueryId());
     }
   }
 
@@ -155,20 +167,24 @@ public class ClusterQueryRouterTest extends BaseQueryTest {
     QueryContext context =
         new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
 
-    QueryDataSet queryDataSet;
-    long[] queryTimes = new long[] {-1, 0, 5, 10, 20};
-    Object[][] answers = new Object[][]{
-        new Object[]{null},
-        new Object[]{0.0},
-        new Object[]{5.0},
-        new Object[]{10.0},
-        new Object[]{null},
-    };
-    for (int i = 0; i < queryTimes.length; i++) {
-      plan.setQueryTime(queryTimes[i]);
-      queryDataSet = clusterQueryRouter.fill(plan, context);
-      checkDoubleDataset(queryDataSet, answers[i]);
-      assertFalse(queryDataSet.hasNext());
+    try {
+      QueryDataSet queryDataSet;
+      long[] queryTimes = new long[] {-1, 0, 5, 10, 20};
+      Object[][] answers = new Object[][]{
+          new Object[]{null},
+          new Object[]{0.0},
+          new Object[]{5.0},
+          new Object[]{10.0},
+          new Object[]{null},
+      };
+      for (int i = 0; i < queryTimes.length; i++) {
+        plan.setQueryTime(queryTimes[i]);
+        queryDataSet = clusterQueryRouter.fill(plan, context);
+        checkDoubleDataset(queryDataSet, answers[i]);
+        assertFalse(queryDataSet.hasNext());
+      }
+    } finally {
+      QueryResourceManager.getInstance().endQuery(context.getQueryId());
     }
   }
 
@@ -177,44 +193,48 @@ public class ClusterQueryRouterTest extends BaseQueryTest {
       throws IOException, StorageEngineException, QueryFilterOptimizationException, QueryProcessException, IllegalPathException {
     QueryContext queryContext =
         new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
-    GroupByTimePlan groupByPlan = new GroupByTimePlan();
-    List<PartialPath> pathList = new ArrayList<>();
-    List<TSDataType> dataTypes = new ArrayList<>();
-    List<String> aggregations = new ArrayList<>();
-    for (int i = 0; i < 10; i++) {
-      pathList.add(new PartialPath(TestUtils.getTestSeries(i, 0)));
-      dataTypes.add(TSDataType.DOUBLE);
-      aggregations.add(SQLConstant.COUNT);
-    }
-    groupByPlan.setPaths(pathList);
-    groupByPlan.setDeduplicatedPaths(pathList);
-    groupByPlan.setDataTypes(dataTypes);
-    groupByPlan.setDeduplicatedDataTypes(dataTypes);
-    groupByPlan.setAggregations(aggregations);
-    groupByPlan.setDeduplicatedAggregations(aggregations);
+    try {
+      GroupByTimePlan groupByPlan = new GroupByTimePlan();
+      List<PartialPath> pathList = new ArrayList<>();
+      List<TSDataType> dataTypes = new ArrayList<>();
+      List<String> aggregations = new ArrayList<>();
+      for (int i = 0; i < 10; i++) {
+        pathList.add(new PartialPath(TestUtils.getTestSeries(i, 0)));
+        dataTypes.add(TSDataType.DOUBLE);
+        aggregations.add(SQLConstant.COUNT);
+      }
+      groupByPlan.setPaths(pathList);
+      groupByPlan.setDeduplicatedPaths(pathList);
+      groupByPlan.setDataTypes(dataTypes);
+      groupByPlan.setDeduplicatedDataTypes(dataTypes);
+      groupByPlan.setAggregations(aggregations);
+      groupByPlan.setDeduplicatedAggregations(aggregations);
 
-    groupByPlan.setStartTime(0);
-    groupByPlan.setEndTime(20);
-    groupByPlan.setSlidingStep(5);
-    groupByPlan.setInterval(5);
+      groupByPlan.setStartTime(0);
+      groupByPlan.setEndTime(20);
+      groupByPlan.setSlidingStep(5);
+      groupByPlan.setInterval(5);
 
-    IExpression expression = BinaryExpression.and(
-        new SingleSeriesExpression(new PartialPath(TestUtils.getTestSeries(0, 0)), ValueFilter.gtEq(5.0)),
-        new SingleSeriesExpression(new PartialPath(TestUtils.getTestSeries(5, 0)), TimeFilter.ltEq(15))
-    );
-    groupByPlan.setExpression(expression);
-    QueryDataSet queryDataSet = clusterQueryRouter.groupBy(groupByPlan, queryContext);
+      IExpression expression = BinaryExpression.and(
+          new SingleSeriesExpression(new PartialPath(TestUtils.getTestSeries(0, 0)), ValueFilter.gtEq(5.0)),
+          new SingleSeriesExpression(new PartialPath(TestUtils.getTestSeries(5, 0)), TimeFilter.ltEq(15))
+      );
+      groupByPlan.setExpression(expression);
+      QueryDataSet queryDataSet = clusterQueryRouter.groupBy(groupByPlan, queryContext);
 
-    Object[][] answers = new Object[][] {
-        new Object[] {0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0},
-        new Object[] {5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0},
-        new Object[] {5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0},
-        new Object[] {1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0},
-    };
-    for (Object[] answer : answers) {
-      checkDoubleDataset(queryDataSet, answer);
+      Object[][] answers = new Object[][] {
+          new Object[] {0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0},
+          new Object[] {5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0},
+          new Object[] {5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0},
+          new Object[] {1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0},
+      };
+      for (Object[] answer : answers) {
+        checkDoubleDataset(queryDataSet, answer);
+      }
+      assertFalse(queryDataSet.hasNext());
+    } finally {
+      QueryResourceManager.getInstance().endQuery(queryContext.getQueryId());
     }
-    assertFalse(queryDataSet.hasNext());
   }
 
   @Test
@@ -222,38 +242,42 @@ public class ClusterQueryRouterTest extends BaseQueryTest {
       throws StorageEngineException, IOException, QueryFilterOptimizationException, QueryProcessException, IllegalPathException {
     QueryContext queryContext =
         new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
-    GroupByTimePlan groupByPlan = new GroupByTimePlan();
-    List<PartialPath> pathList = new ArrayList<>();
-    List<TSDataType> dataTypes = new ArrayList<>();
-    List<String> aggregations = new ArrayList<>();
-    for (int i = 0; i < 10; i++) {
-      pathList.add(new PartialPath(TestUtils.getTestSeries(i, 0)));
-      dataTypes.add(TSDataType.DOUBLE);
-      aggregations.add(SQLConstant.COUNT);
-    }
-    groupByPlan.setPaths(pathList);
-    groupByPlan.setDeduplicatedPaths(pathList);
-    groupByPlan.setDataTypes(dataTypes);
-    groupByPlan.setDeduplicatedDataTypes(dataTypes);
-    groupByPlan.setAggregations(aggregations);
-    groupByPlan.setDeduplicatedAggregations(aggregations);
+    try {
+      GroupByTimePlan groupByPlan = new GroupByTimePlan();
+      List<PartialPath> pathList = new ArrayList<>();
+      List<TSDataType> dataTypes = new ArrayList<>();
+      List<String> aggregations = new ArrayList<>();
+      for (int i = 0; i < 10; i++) {
+        pathList.add(new PartialPath(TestUtils.getTestSeries(i, 0)));
+        dataTypes.add(TSDataType.DOUBLE);
+        aggregations.add(SQLConstant.COUNT);
+      }
+      groupByPlan.setPaths(pathList);
+      groupByPlan.setDeduplicatedPaths(pathList);
+      groupByPlan.setDataTypes(dataTypes);
+      groupByPlan.setDeduplicatedDataTypes(dataTypes);
+      groupByPlan.setAggregations(aggregations);
+      groupByPlan.setDeduplicatedAggregations(aggregations);
 
-    groupByPlan.setStartTime(0);
-    groupByPlan.setEndTime(20);
-    groupByPlan.setSlidingStep(5);
-    groupByPlan.setInterval(5);
+      groupByPlan.setStartTime(0);
+      groupByPlan.setEndTime(20);
+      groupByPlan.setSlidingStep(5);
+      groupByPlan.setInterval(5);
 
-    QueryDataSet dataSet = clusterQueryRouter.groupBy(groupByPlan, queryContext);
+      QueryDataSet dataSet = clusterQueryRouter.groupBy(groupByPlan, queryContext);
 
-    Object[][] answers = new Object[][] {
-        new Object[] {5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0},
-        new Object[] {5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0},
-        new Object[] {5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0},
-        new Object[] {5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0},
-    };
-    for (Object[] answer : answers) {
-      checkDoubleDataset(dataSet, answer);
+      Object[][] answers = new Object[][] {
+          new Object[] {5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0},
+          new Object[] {5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0},
+          new Object[] {5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0},
+          new Object[] {5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0},
+      };
+      for (Object[] answer : answers) {
+        checkDoubleDataset(dataSet, answer);
+      }
+      assertFalse(dataSet.hasNext());
+    } finally {
+      QueryResourceManager.getInstance().endQuery(queryContext.getQueryId());
     }
-    assertFalse(dataSet.hasNext());
   }
 }
\ No newline at end of file
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByNoVFilterDataSetTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByNoVFilterDataSetTest.java
index a90fa82..74229da 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByNoVFilterDataSetTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByNoVFilterDataSetTest.java
@@ -45,39 +45,43 @@ public class ClusterGroupByNoVFilterDataSetTest extends BaseQueryTest {
       throws StorageEngineException, IOException, QueryProcessException, IllegalPathException {
     QueryContext queryContext =
         new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
-    GroupByTimePlan groupByPlan = new GroupByTimePlan();
-    List<PartialPath> pathList = new ArrayList<>();
-    List<TSDataType> dataTypes = new ArrayList<>();
-    List<String> aggregations = new ArrayList<>();
-    for (int i = 0; i < 10; i++) {
-      pathList.add(new PartialPath(TestUtils.getTestSeries(i, 0)));
-      dataTypes.add(TSDataType.DOUBLE);
-      aggregations.add(SQLConstant.COUNT);
-    }
-    groupByPlan.setPaths(pathList);
-    groupByPlan.setDeduplicatedPaths(pathList);
-    groupByPlan.setDataTypes(dataTypes);
-    groupByPlan.setDeduplicatedDataTypes(dataTypes);
-    groupByPlan.setAggregations(aggregations);
-    groupByPlan.setDeduplicatedAggregations(aggregations);
+    try {
+      GroupByTimePlan groupByPlan = new GroupByTimePlan();
+      List<PartialPath> pathList = new ArrayList<>();
+      List<TSDataType> dataTypes = new ArrayList<>();
+      List<String> aggregations = new ArrayList<>();
+      for (int i = 0; i < 10; i++) {
+        pathList.add(new PartialPath(TestUtils.getTestSeries(i, 0)));
+        dataTypes.add(TSDataType.DOUBLE);
+        aggregations.add(SQLConstant.COUNT);
+      }
+      groupByPlan.setPaths(pathList);
+      groupByPlan.setDeduplicatedPaths(pathList);
+      groupByPlan.setDataTypes(dataTypes);
+      groupByPlan.setDeduplicatedDataTypes(dataTypes);
+      groupByPlan.setAggregations(aggregations);
+      groupByPlan.setDeduplicatedAggregations(aggregations);
 
-    groupByPlan.setStartTime(0);
-    groupByPlan.setEndTime(20);
-    groupByPlan.setSlidingStep(5);
-    groupByPlan.setInterval(5);
+      groupByPlan.setStartTime(0);
+      groupByPlan.setEndTime(20);
+      groupByPlan.setSlidingStep(5);
+      groupByPlan.setInterval(5);
 
-    ClusterGroupByNoVFilterDataSet dataSet = new ClusterGroupByNoVFilterDataSet(queryContext,
-        groupByPlan, testMetaMember);
+      ClusterGroupByNoVFilterDataSet dataSet = new ClusterGroupByNoVFilterDataSet(queryContext,
+          groupByPlan, testMetaMember);
 
-    Object[][] answers = new Object[][] {
-        new Object[] {5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0},
-        new Object[] {5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0},
-        new Object[] {5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0},
-        new Object[] {5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0},
-    };
-    for (Object[] answer : answers) {
-      checkDoubleDataset(dataSet, answer);
+      Object[][] answers = new Object[][] {
+          new Object[] {5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0},
+          new Object[] {5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0},
+          new Object[] {5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0},
+          new Object[] {5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0},
+      };
+      for (Object[] answer : answers) {
+        checkDoubleDataset(dataSet, answer);
+      }
+      assertFalse(dataSet.hasNext());
+    } finally {
+      QueryResourceManager.getInstance().endQuery(queryContext.getQueryId());
     }
-    assertFalse(dataSet.hasNext());
   }
 }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByVFilterDataSetTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByVFilterDataSetTest.java
index 4f02b60..5b2860f 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByVFilterDataSetTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByVFilterDataSetTest.java
@@ -50,45 +50,49 @@ public class ClusterGroupByVFilterDataSetTest extends BaseQueryTest {
       throws IOException, StorageEngineException, QueryProcessException, IllegalPathException {
     QueryContext queryContext =
         new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
-    GroupByTimePlan groupByPlan = new GroupByTimePlan();
-    List<PartialPath> pathList = new ArrayList<>();
-    List<TSDataType> dataTypes = new ArrayList<>();
-    List<String> aggregations = new ArrayList<>();
-    for (int i = 0; i < 10; i++) {
-      pathList.add(new PartialPath(TestUtils.getTestSeries(i, 0)));
-      dataTypes.add(TSDataType.DOUBLE);
-      aggregations.add(SQLConstant.COUNT);
-    }
-    groupByPlan.setPaths(pathList);
-    groupByPlan.setDeduplicatedPaths(pathList);
-    groupByPlan.setDataTypes(dataTypes);
-    groupByPlan.setDeduplicatedDataTypes(dataTypes);
-    groupByPlan.setAggregations(aggregations);
-    groupByPlan.setDeduplicatedAggregations(aggregations);
+    try {
+      GroupByTimePlan groupByPlan = new GroupByTimePlan();
+      List<PartialPath> pathList = new ArrayList<>();
+      List<TSDataType> dataTypes = new ArrayList<>();
+      List<String> aggregations = new ArrayList<>();
+      for (int i = 0; i < 10; i++) {
+        pathList.add(new PartialPath(TestUtils.getTestSeries(i, 0)));
+        dataTypes.add(TSDataType.DOUBLE);
+        aggregations.add(SQLConstant.COUNT);
+      }
+      groupByPlan.setPaths(pathList);
+      groupByPlan.setDeduplicatedPaths(pathList);
+      groupByPlan.setDataTypes(dataTypes);
+      groupByPlan.setDeduplicatedDataTypes(dataTypes);
+      groupByPlan.setAggregations(aggregations);
+      groupByPlan.setDeduplicatedAggregations(aggregations);
 
-    groupByPlan.setStartTime(0);
-    groupByPlan.setEndTime(20);
-    groupByPlan.setSlidingStep(5);
-    groupByPlan.setInterval(5);
+      groupByPlan.setStartTime(0);
+      groupByPlan.setEndTime(20);
+      groupByPlan.setSlidingStep(5);
+      groupByPlan.setInterval(5);
 
-    IExpression expression = BinaryExpression.and(
-        new SingleSeriesExpression(new PartialPath(TestUtils.getTestSeries(0, 0)), ValueFilter.gtEq(5.0)),
-        new SingleSeriesExpression(new PartialPath(TestUtils.getTestSeries(5, 0)), TimeFilter.ltEq(15))
-    );
-    groupByPlan.setExpression(expression);
+      IExpression expression = BinaryExpression.and(
+          new SingleSeriesExpression(new PartialPath(TestUtils.getTestSeries(0, 0)), ValueFilter.gtEq(5.0)),
+          new SingleSeriesExpression(new PartialPath(TestUtils.getTestSeries(5, 0)), TimeFilter.ltEq(15))
+      );
+      groupByPlan.setExpression(expression);
 
-    ClusterGroupByVFilterDataSet dataSet = new ClusterGroupByVFilterDataSet(queryContext,
-        groupByPlan, testMetaMember);
+      ClusterGroupByVFilterDataSet dataSet = new ClusterGroupByVFilterDataSet(queryContext,
+          groupByPlan, testMetaMember);
 
-    Object[][] answers = new Object[][] {
-        new Object[] {0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0},
-        new Object[] {5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0},
-        new Object[] {5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0},
-        new Object[] {1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0},
-    };
-    for (Object[] answer : answers) {
-      checkDoubleDataset(dataSet, answer);
+      Object[][] answers = new Object[][] {
+          new Object[] {0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0},
+          new Object[] {5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0},
+          new Object[] {5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0},
+          new Object[] {1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0},
+      };
+      for (Object[] answer : answers) {
+        checkDoubleDataset(dataSet, answer);
+      }
+      assertFalse(dataSet.hasNext());
+    } finally {
+      QueryResourceManager.getInstance().endQuery(queryContext.getQueryId());
     }
-    assertFalse(dataSet.hasNext());
   }
 }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/MergeGroupByExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/MergeGroupByExecutorTest.java
index b0dd62b..ad9f952 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/MergeGroupByExecutorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/MergeGroupByExecutorTest.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.iotdb.cluster.common.TestUtils;
 import org.apache.iotdb.cluster.query.BaseQueryTest;
 import org.apache.iotdb.cluster.query.RemoteQueryContext;
+import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
@@ -41,58 +42,68 @@ import org.junit.Test;
 public class MergeGroupByExecutorTest extends BaseQueryTest {
 
   @Test
-  public void testNoTimeFilter() throws QueryProcessException, IOException, IllegalPathException {
+  public void testNoTimeFilter()
+      throws QueryProcessException, IOException, IllegalPathException, StorageEngineException {
     PartialPath path = new PartialPath(TestUtils.getTestSeries(0, 0));
     TSDataType dataType = TSDataType.DOUBLE;
     QueryContext context =
         new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
-    Filter timeFilter = null;
+    try {
+      Filter timeFilter = null;
 
-    MergeGroupByExecutor groupByExecutor = new MergeGroupByExecutor(path,
-        Collections.singleton(path.getMeasurement()), dataType, context,
-        timeFilter, testMetaMember, true);
-    AggregationType[] types = AggregationType.values();
-    for (AggregationType type : types) {
-      groupByExecutor.addAggregateResult(AggregateResultFactory.getAggrResultByType(type,
-          TSDataType.DOUBLE, true));
-    }
+      MergeGroupByExecutor groupByExecutor = new MergeGroupByExecutor(path,
+          Collections.singleton(path.getMeasurement()), dataType, context,
+          timeFilter, testMetaMember, true);
+      AggregationType[] types = AggregationType.values();
+      for (AggregationType type : types) {
+        groupByExecutor.addAggregateResult(AggregateResultFactory.getAggrResultByType(type,
+            TSDataType.DOUBLE, true));
+      }
 
-    Object[] answers;
-    List<AggregateResult> aggregateResults;
-    answers = new Object[] {5.0, 2.0, 10.0, 0.0, 4.0, 4.0, 0.0, 4.0, 0.0};
-    aggregateResults = groupByExecutor.calcResult(0, 5);
-    checkAggregations(aggregateResults, answers);
+      Object[] answers;
+      List<AggregateResult> aggregateResults;
+      answers = new Object[] {5.0, 2.0, 10.0, 0.0, 4.0, 4.0, 0.0, 4.0, 0.0};
+      aggregateResults = groupByExecutor.calcResult(0, 5);
+      checkAggregations(aggregateResults, answers);
 
-    answers = new Object[] {5.0, 7.0, 35.0, 5.0, 9.0, 9.0, 5.0, 9.0, 5.0};
-    aggregateResults = groupByExecutor.calcResult(5, 10);
-    checkAggregations(aggregateResults, answers);
+      answers = new Object[] {5.0, 7.0, 35.0, 5.0, 9.0, 9.0, 5.0, 9.0, 5.0};
+      aggregateResults = groupByExecutor.calcResult(5, 10);
+      checkAggregations(aggregateResults, answers);
+    } finally {
+      QueryResourceManager.getInstance().endQuery(context.getQueryId());
+    }
   }
 
   @Test
-  public void testTimeFilter() throws QueryProcessException, IOException, IllegalPathException {
+  public void testTimeFilter()
+      throws QueryProcessException, IOException, IllegalPathException, StorageEngineException {
     PartialPath path = new PartialPath(TestUtils.getTestSeries(0, 0));
     TSDataType dataType = TSDataType.DOUBLE;
     QueryContext context =
         new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
-    Filter timeFilter = TimeFilter.gtEq(3);
+    try {
+      Filter timeFilter = TimeFilter.gtEq(3);
 
-    MergeGroupByExecutor groupByExecutor = new MergeGroupByExecutor(path,
-        Collections.singleton(path.getMeasurement()), dataType, context,
-        timeFilter, testMetaMember, true);
-    AggregationType[] types = AggregationType.values();
-    for (AggregationType type : types) {
-      groupByExecutor.addAggregateResult(AggregateResultFactory.getAggrResultByType(type,
-          TSDataType.DOUBLE, true));
-    }
+      MergeGroupByExecutor groupByExecutor = new MergeGroupByExecutor(path,
+          Collections.singleton(path.getMeasurement()), dataType, context,
+          timeFilter, testMetaMember, true);
+      AggregationType[] types = AggregationType.values();
+      for (AggregationType type : types) {
+        groupByExecutor.addAggregateResult(AggregateResultFactory.getAggrResultByType(type,
+            TSDataType.DOUBLE, true));
+      }
 
-    Object[] answers;
-    List<AggregateResult> aggregateResults;
-    answers = new Object[] {2.0, 3.5, 7.0, 3.0, 4.0, 4.0, 3.0, 4.0, 3.0};
-    aggregateResults = groupByExecutor.calcResult(0, 5);
-    checkAggregations(aggregateResults, answers);
+      Object[] answers;
+      List<AggregateResult> aggregateResults;
+      answers = new Object[] {2.0, 3.5, 7.0, 3.0, 4.0, 4.0, 3.0, 4.0, 3.0};
+      aggregateResults = groupByExecutor.calcResult(0, 5);
+      checkAggregations(aggregateResults, answers);
 
-    answers = new Object[] {5.0, 7.0, 35.0, 5.0, 9.0, 9.0, 5.0, 9.0, 5.0};
-    aggregateResults = groupByExecutor.calcResult(5, 10);
-    checkAggregations(aggregateResults, answers);
+      answers = new Object[] {5.0, 7.0, 35.0, 5.0, 9.0, 9.0, 5.0, 9.0, 5.0};
+      aggregateResults = groupByExecutor.calcResult(5, 10);
+      checkAggregations(aggregateResults, answers);
+    } finally {
+      QueryResourceManager.getInstance().endQuery(context.getQueryId());
+    }
   }
 }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutorTest.java
index 3cd7502..78738f1 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutorTest.java
@@ -53,49 +53,53 @@ public class RemoteGroupByExecutorTest extends BaseQueryTest {
     TSDataType dataType = TSDataType.DOUBLE;
     QueryContext context =
         new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
-    Filter timeFilter = null;
-    List<Integer> aggregationTypes = new ArrayList<>();
-    for (int i = 0; i < AggregationType.values().length; i++) {
-      aggregationTypes.add(i);
-    }
-
-    ClusterReaderFactory readerFactory = new ClusterReaderFactory(testMetaMember);
-    List<GroupByExecutor> groupByExecutors = readerFactory
-        .getGroupByExecutors(path, Collections.singleton(path.getMeasurement()), dataType,
-            context, timeFilter,
-            aggregationTypes, true);
+    try {
+      Filter timeFilter = null;
+      List<Integer> aggregationTypes = new ArrayList<>();
+      for (int i = 0; i < AggregationType.values().length; i++) {
+        aggregationTypes.add(i);
+      }
 
-    for (int i = 0; i < groupByExecutors.size(); i++) {
-      GroupByExecutor groupByExecutor = groupByExecutors.get(i);
-      Object[] answers;
-      if (i == 1) {
-        // a series is only managed by one group
-        List<AggregateResult> aggregateResults;
-        answers = new Object[]{5.0, 2.0, 10.0, 0.0, 4.0, 4.0, 0.0, 4.0, 0.0};
-        aggregateResults = groupByExecutor.calcResult(0, 5);
-        checkAggregations(aggregateResults, answers);
+      ClusterReaderFactory readerFactory = new ClusterReaderFactory(testMetaMember);
+      List<GroupByExecutor> groupByExecutors = readerFactory
+          .getGroupByExecutors(path, Collections.singleton(path.getMeasurement()), dataType,
+              context, timeFilter,
+              aggregationTypes, true);
 
-        answers = new Object[]{5.0, 7.0, 35.0, 5.0, 9.0, 9.0, 5.0, 9.0, 5.0};
-        aggregateResults = groupByExecutor.calcResult(5, 10);
-        checkAggregations(aggregateResults, answers);
-      } else {
-        List<AggregateResult> aggregateResults;
-        answers = new Object[]{0.0, null, 0.0, null, null, null, null, null, null};
-        aggregateResults = groupByExecutor.calcResult(0, 5);
-        if (!(groupByExecutor instanceof EmptyReader)) {
+      for (int i = 0; i < groupByExecutors.size(); i++) {
+        GroupByExecutor groupByExecutor = groupByExecutors.get(i);
+        Object[] answers;
+        if (i == 1) {
+          // a series is only managed by one group
+          List<AggregateResult> aggregateResults;
+          answers = new Object[]{5.0, 2.0, 10.0, 0.0, 4.0, 4.0, 0.0, 4.0, 0.0};
+          aggregateResults = groupByExecutor.calcResult(0, 5);
           checkAggregations(aggregateResults, answers);
-        } else {
-          assertTrue(aggregateResults.isEmpty());
-        }
 
-        answers = new Object[]{0.0, null, 0.0, null, null, null, null, null, null};
-        aggregateResults = groupByExecutor.calcResult(5, 10);
-        if (!(groupByExecutor instanceof EmptyReader)) {
+          answers = new Object[]{5.0, 7.0, 35.0, 5.0, 9.0, 9.0, 5.0, 9.0, 5.0};
+          aggregateResults = groupByExecutor.calcResult(5, 10);
           checkAggregations(aggregateResults, answers);
         } else {
-          assertTrue(aggregateResults.isEmpty());
+          List<AggregateResult> aggregateResults;
+          answers = new Object[]{0.0, null, 0.0, null, null, null, null, null, null};
+          aggregateResults = groupByExecutor.calcResult(0, 5);
+          if (!(groupByExecutor instanceof EmptyReader)) {
+            checkAggregations(aggregateResults, answers);
+          } else {
+            assertTrue(aggregateResults.isEmpty());
+          }
+
+          answers = new Object[]{0.0, null, 0.0, null, null, null, null, null, null};
+          aggregateResults = groupByExecutor.calcResult(5, 10);
+          if (!(groupByExecutor instanceof EmptyReader)) {
+            checkAggregations(aggregateResults, answers);
+          } else {
+            assertTrue(aggregateResults.isEmpty());
+          }
         }
       }
+    } finally {
+      QueryResourceManager.getInstance().endQuery(context.getQueryId());
     }
   }
 
@@ -106,48 +110,52 @@ public class RemoteGroupByExecutorTest extends BaseQueryTest {
     TSDataType dataType = TSDataType.DOUBLE;
     QueryContext context =
         new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
-    Filter timeFilter = TimeFilter.gtEq(3);
-    List<Integer> aggregationTypes = new ArrayList<>();
-    for (int i = 0; i < AggregationType.values().length; i++) {
-      aggregationTypes.add(i);
-    }
-
-    ClusterReaderFactory readerFactory = new ClusterReaderFactory(testMetaMember);
-    List<GroupByExecutor> groupByExecutors = readerFactory
-        .getGroupByExecutors(path, Collections.singleton(path.getMeasurement()), dataType, context
-            , timeFilter, aggregationTypes, true);
+    try {
+      Filter timeFilter = TimeFilter.gtEq(3);
+      List<Integer> aggregationTypes = new ArrayList<>();
+      for (int i = 0; i < AggregationType.values().length; i++) {
+        aggregationTypes.add(i);
+      }
 
-    for (int i = 0; i < groupByExecutors.size(); i++) {
-      GroupByExecutor groupByExecutor = groupByExecutors.get(i);
-      Object[] answers;
-      if (i == 1) {
-        // a series is only managed by one group
-        List<AggregateResult> aggregateResults;
-        answers = new Object[]{2.0, 3.5, 7.0, 3.0, 4.0, 4.0, 3.0, 4.0, 3.0};
-        aggregateResults = groupByExecutor.calcResult(0, 5);
-        checkAggregations(aggregateResults, answers);
+      ClusterReaderFactory readerFactory = new ClusterReaderFactory(testMetaMember);
+      List<GroupByExecutor> groupByExecutors = readerFactory
+          .getGroupByExecutors(path, Collections.singleton(path.getMeasurement()), dataType, context
+              , timeFilter, aggregationTypes, true);
 
-        answers = new Object[]{5.0, 7.0, 35.0, 5.0, 9.0, 9.0, 5.0, 9.0, 5.0};
-        aggregateResults = groupByExecutor.calcResult(5, 10);
-        checkAggregations(aggregateResults, answers);
-      } else {
-        List<AggregateResult> aggregateResults;
-        answers = new Object[]{0.0, null, 0.0, null, null, null, null, null, null};
-        aggregateResults = groupByExecutor.calcResult(0, 5);
-        if (!(groupByExecutor instanceof EmptyReader)) {
+      for (int i = 0; i < groupByExecutors.size(); i++) {
+        GroupByExecutor groupByExecutor = groupByExecutors.get(i);
+        Object[] answers;
+        if (i == 1) {
+          // a series is only managed by one group
+          List<AggregateResult> aggregateResults;
+          answers = new Object[]{2.0, 3.5, 7.0, 3.0, 4.0, 4.0, 3.0, 4.0, 3.0};
+          aggregateResults = groupByExecutor.calcResult(0, 5);
           checkAggregations(aggregateResults, answers);
-        } else {
-          assertTrue(aggregateResults.isEmpty());
-        }
 
-        answers = new Object[]{0.0, null, 0.0, null, null, null, null, null, null};
-        aggregateResults = groupByExecutor.calcResult(5, 10);
-        if (!(groupByExecutor instanceof EmptyReader)) {
+          answers = new Object[]{5.0, 7.0, 35.0, 5.0, 9.0, 9.0, 5.0, 9.0, 5.0};
+          aggregateResults = groupByExecutor.calcResult(5, 10);
           checkAggregations(aggregateResults, answers);
         } else {
-          assertTrue(aggregateResults.isEmpty());
+          List<AggregateResult> aggregateResults;
+          answers = new Object[]{0.0, null, 0.0, null, null, null, null, null, null};
+          aggregateResults = groupByExecutor.calcResult(0, 5);
+          if (!(groupByExecutor instanceof EmptyReader)) {
+            checkAggregations(aggregateResults, answers);
+          } else {
+            assertTrue(aggregateResults.isEmpty());
+          }
+
+          answers = new Object[]{0.0, null, 0.0, null, null, null, null, null, null};
+          aggregateResults = groupByExecutor.calcResult(5, 10);
+          if (!(groupByExecutor instanceof EmptyReader)) {
+            checkAggregations(aggregateResults, answers);
+          } else {
+            assertTrue(aggregateResults.isEmpty());
+          }
         }
       }
+    } finally {
+      QueryResourceManager.getInstance().endQuery(context.getQueryId());
     }
   }
 }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGeneratorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGeneratorTest.java
index 3d3521d..cc1b92f 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGeneratorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGeneratorTest.java
@@ -48,23 +48,27 @@ public class ClusterTimeGeneratorTest extends BaseQueryTest {
     RawDataQueryPlan dataQueryPlan = new RawDataQueryPlan();
     QueryContext context =
         new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
-    IExpression expression =
-        BinaryExpression.and(
-            new SingleSeriesExpression(new PartialPath(TestUtils.getTestSeries(0, 0)),
-                ValueFilter.gtEq(3.0)),
-            new SingleSeriesExpression(new PartialPath(TestUtils.getTestSeries(1, 1)),
-                ValueFilter.ltEq(8.0)));
-    dataQueryPlan.setExpression(expression);
-    dataQueryPlan.addDeduplicatedPaths(new PartialPath(TestUtils.getTestSeries(0, 0)));
-    dataQueryPlan.addDeduplicatedPaths(new PartialPath(TestUtils.getTestSeries(1, 1)));
+    try {
+      IExpression expression =
+          BinaryExpression.and(
+              new SingleSeriesExpression(new PartialPath(TestUtils.getTestSeries(0, 0)),
+                  ValueFilter.gtEq(3.0)),
+              new SingleSeriesExpression(new PartialPath(TestUtils.getTestSeries(1, 1)),
+                  ValueFilter.ltEq(8.0)));
+      dataQueryPlan.setExpression(expression);
+      dataQueryPlan.addDeduplicatedPaths(new PartialPath(TestUtils.getTestSeries(0, 0)));
+      dataQueryPlan.addDeduplicatedPaths(new PartialPath(TestUtils.getTestSeries(1, 1)));
 
-    ClusterTimeGenerator timeGenerator = new ClusterTimeGenerator(expression, context,
-        testMetaMember, dataQueryPlan);
-    for (int i = 3; i <= 8; i++) {
-      assertTrue(timeGenerator.hasNext());
-      assertEquals(i, timeGenerator.next());
+      ClusterTimeGenerator timeGenerator = new ClusterTimeGenerator(expression, context,
+          testMetaMember, dataQueryPlan);
+      for (int i = 3; i <= 8; i++) {
+        assertTrue(timeGenerator.hasNext());
+        assertEquals(i, timeGenerator.next());
+      }
+      assertFalse(timeGenerator.hasNext());
+    } finally {
+      QueryResourceManager.getInstance().endQuery(context.getQueryId());
     }
-    assertFalse(timeGenerator.hasNext());
   }
 
 }
\ No newline at end of file
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/DatasourceInfoTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/DatasourceInfoTest.java
index 45fc256..e6815aa 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/DatasourceInfoTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/DatasourceInfoTest.java
@@ -32,6 +32,8 @@ import org.apache.iotdb.cluster.query.RemoteQueryContext;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
@@ -59,7 +61,7 @@ public class DatasourceInfoTest {
   }
 
   @Test
-  public void testFailedAll() {
+  public void testFailedAll() throws StorageEngineException {
     PartitionGroup group = new PartitionGroup();
     group.add(TestUtils.getNode(0));
     group.add(TestUtils.getNode(1));
@@ -68,10 +70,14 @@ public class DatasourceInfoTest {
     SingleSeriesQueryRequest request = new SingleSeriesQueryRequest();
     RemoteQueryContext context = new RemoteQueryContext(1);
 
-    DataSourceInfo sourceInfo = new DataSourceInfo(group, TSDataType.DOUBLE,
-      request, context, metaGroupMember, group);
-    boolean hasClient = sourceInfo.hasNextDataClient(false, Long.MIN_VALUE);
+    try {
+      DataSourceInfo sourceInfo = new DataSourceInfo(group, TSDataType.DOUBLE,
+        request, context, metaGroupMember, group);
+      boolean hasClient = sourceInfo.hasNextDataClient(false, Long.MIN_VALUE);
 
-    assertFalse(hasClient);
+      assertFalse(hasClient);
+    } finally {
+      QueryResourceManager.getInstance().endQuery(context.getQueryId());
+    }
   }
 }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java
index d2f7d82..88b4dab 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java
@@ -38,6 +38,8 @@ import org.apache.iotdb.cluster.query.RemoteQueryContext;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.utils.SerializeUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -116,7 +118,7 @@ public class RemoteSeriesReaderByTimestampTest {
   private MetaGroupMember metaGroupMember = new MetaGroupMember();
 
   @Test
-  public void test() throws IOException {
+  public void test() throws IOException, StorageEngineException {
     PartitionGroup group = new PartitionGroup();
     group.add(TestUtils.getNode(0));
     group.add(TestUtils.getNode(1));
@@ -125,20 +127,24 @@ public class RemoteSeriesReaderByTimestampTest {
     SingleSeriesQueryRequest request = new SingleSeriesQueryRequest();
     RemoteQueryContext context = new RemoteQueryContext(1);
 
-    DataSourceInfo sourceInfo = new DataSourceInfo(group, TSDataType.DOUBLE,
-        request, context, metaGroupMember, group);
-    sourceInfo.hasNextDataClient(true, Long.MIN_VALUE);
+    try {
+      DataSourceInfo sourceInfo = new DataSourceInfo(group, TSDataType.DOUBLE,
+          request, context, metaGroupMember, group);
+      sourceInfo.hasNextDataClient(true, Long.MIN_VALUE);
 
-    RemoteSeriesReaderByTimestamp reader = new RemoteSeriesReaderByTimestamp(sourceInfo);
+      RemoteSeriesReaderByTimestamp reader = new RemoteSeriesReaderByTimestamp(sourceInfo);
 
-    for (int i = 0; i < 100; i++) {
-      assertEquals(i * 1.0, reader.getValueInTimestamp(i));
+      for (int i = 0; i < 100; i++) {
+        assertEquals(i * 1.0, reader.getValueInTimestamp(i));
+      }
+      assertNull(reader.getValueInTimestamp(101));
+    } finally {
+      QueryResourceManager.getInstance().endQuery(context.getQueryId());
     }
-    assertNull(reader.getValueInTimestamp(101));
   }
 
   @Test
-  public void testFailedNode() throws IOException {
+  public void testFailedNode() throws IOException, StorageEngineException {
     batchData = TestUtils.genBatchData(TSDataType.DOUBLE, 0, 100);
     PartitionGroup group = new PartitionGroup();
     group.add(TestUtils.getNode(0));
@@ -148,55 +154,59 @@ public class RemoteSeriesReaderByTimestampTest {
     SingleSeriesQueryRequest request = new SingleSeriesQueryRequest();
     RemoteQueryContext context = new RemoteQueryContext(1);
 
-    DataSourceInfo sourceInfo = new DataSourceInfo(group, TSDataType.DOUBLE,
-        request, context, metaGroupMember, group);
-    long startTime = System.currentTimeMillis();
-    sourceInfo.hasNextDataClient(true, Long.MIN_VALUE);
-    RemoteSeriesReaderByTimestamp reader = new RemoteSeriesReaderByTimestamp(sourceInfo);
-
-    long endTime = System.currentTimeMillis();
-    System.out.println(
-        Thread.currentThread().getStackTrace()[1].getLineNumber() + " begin: " + (endTime
-            - startTime));
-    // normal read
-    assertEquals(TestUtils.getNode(0), sourceInfo.getCurrentNode());
-    for (int i = 0; i < 50; i++) {
-      assertEquals(i * 1.0, reader.getValueInTimestamp(i));
-    }
-
-    endTime = System.currentTimeMillis();
-    System.out.println(
-        Thread.currentThread().getStackTrace()[1].getLineNumber() + " begin: " + (endTime
-            - startTime));
-    failedNodes.add(TestUtils.getNode(0));
-    for (int i = 50; i < 80; i++) {
-      assertEquals(i * 1.0, reader.getValueInTimestamp(i));
-    }
-    assertEquals(TestUtils.getNode(1), sourceInfo.getCurrentNode());
-
-    // a bad client, change to another node again
-    failedNodes.add(TestUtils.getNode(1));
-    for (int i = 80; i < 90; i++) {
-      assertEquals(i * 1.0, reader.getValueInTimestamp(i));
-    }
-    assertEquals(TestUtils.getNode(2), sourceInfo.getCurrentNode());
+    try {
+      DataSourceInfo sourceInfo = new DataSourceInfo(group, TSDataType.DOUBLE,
+          request, context, metaGroupMember, group);
+      long startTime = System.currentTimeMillis();
+      sourceInfo.hasNextDataClient(true, Long.MIN_VALUE);
+      RemoteSeriesReaderByTimestamp reader = new RemoteSeriesReaderByTimestamp(sourceInfo);
+
+      long endTime = System.currentTimeMillis();
+      System.out.println(
+          Thread.currentThread().getStackTrace()[1].getLineNumber() + " begin: " + (endTime
+              - startTime));
+      // normal read
+      assertEquals(TestUtils.getNode(0), sourceInfo.getCurrentNode());
+      for (int i = 0; i < 50; i++) {
+        assertEquals(i * 1.0, reader.getValueInTimestamp(i));
+      }
 
-    endTime = System.currentTimeMillis();
-    System.out.println(
-        Thread.currentThread().getStackTrace()[1].getLineNumber() + " begin: " + (endTime
-            - startTime));
-    // all node failed
-    failedNodes.add(TestUtils.getNode(2));
+      endTime = System.currentTimeMillis();
+      System.out.println(
+          Thread.currentThread().getStackTrace()[1].getLineNumber() + " begin: " + (endTime
+              - startTime));
+      failedNodes.add(TestUtils.getNode(0));
+      for (int i = 50; i < 80; i++) {
+        assertEquals(i * 1.0, reader.getValueInTimestamp(i));
+      }
+      assertEquals(TestUtils.getNode(1), sourceInfo.getCurrentNode());
 
-    try {
-      reader.getValueInTimestamp(90);
-      fail();
-    } catch (IOException e) {
-      assertEquals("no available client.", e.getMessage());
+      // a bad client, change to another node again
+      failedNodes.add(TestUtils.getNode(1));
+      for (int i = 80; i < 90; i++) {
+        assertEquals(i * 1.0, reader.getValueInTimestamp(i));
+      }
+      assertEquals(TestUtils.getNode(2), sourceInfo.getCurrentNode());
+
+      endTime = System.currentTimeMillis();
+      System.out.println(
+          Thread.currentThread().getStackTrace()[1].getLineNumber() + " begin: " + (endTime
+              - startTime));
+      // all node failed
+      failedNodes.add(TestUtils.getNode(2));
+
+      try {
+        reader.getValueInTimestamp(90);
+        fail();
+      } catch (IOException e) {
+        assertEquals("no available client.", e.getMessage());
+      }
+      endTime = System.currentTimeMillis();
+      System.out.println(
+          Thread.currentThread().getStackTrace()[1].getLineNumber() + " begin: " + (endTime
+              - startTime));
+    } finally {
+      QueryResourceManager.getInstance().endQuery(context.getQueryId());
     }
-    endTime = System.currentTimeMillis();
-    System.out.println(
-        Thread.currentThread().getStackTrace()[1].getLineNumber() + " begin: " + (endTime
-            - startTime));
   }
 }
\ No newline at end of file
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReaderTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReaderTest.java
index 550feee..76193b9 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReaderTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReaderTest.java
@@ -40,6 +40,8 @@ import org.apache.iotdb.cluster.query.RemoteQueryContext;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.utils.SerializeUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -114,7 +116,7 @@ public class RemoteSimpleSeriesReaderTest {
   }
 
   @Test
-  public void testSingle() throws IOException {
+  public void testSingle() throws IOException, StorageEngineException {
     PartitionGroup group = new PartitionGroup();
     group.add(TestUtils.getNode(0));
     group.add(TestUtils.getNode(1));
@@ -123,25 +125,29 @@ public class RemoteSimpleSeriesReaderTest {
     SingleSeriesQueryRequest request = new SingleSeriesQueryRequest();
     RemoteQueryContext context = new RemoteQueryContext(1);
 
-    DataSourceInfo sourceInfo = new DataSourceInfo(group, TSDataType.DOUBLE,
-        request, context, metaGroupMember, group);
-    sourceInfo.hasNextDataClient(false, Long.MIN_VALUE);
-
-    reader = new RemoteSimpleSeriesReader(sourceInfo);
-
-    for (int i = 0; i < 100; i++) {
-      assertTrue(reader.hasNextTimeValuePair());
-      TimeValuePair curr = reader.currentTimeValuePair();
-      TimeValuePair pair = reader.nextTimeValuePair();
-      assertEquals(pair, curr);
-      assertEquals(i, pair.getTimestamp());
-      assertEquals(i * 1.0, pair.getValue().getDouble(), 0.00001);
+    try {
+      DataSourceInfo sourceInfo = new DataSourceInfo(group, TSDataType.DOUBLE,
+          request, context, metaGroupMember, group);
+      sourceInfo.hasNextDataClient(false, Long.MIN_VALUE);
+
+      reader = new RemoteSimpleSeriesReader(sourceInfo);
+
+      for (int i = 0; i < 100; i++) {
+        assertTrue(reader.hasNextTimeValuePair());
+        TimeValuePair curr = reader.currentTimeValuePair();
+        TimeValuePair pair = reader.nextTimeValuePair();
+        assertEquals(pair, curr);
+        assertEquals(i, pair.getTimestamp());
+        assertEquals(i * 1.0, pair.getValue().getDouble(), 0.00001);
+      }
+      assertFalse(reader.hasNextTimeValuePair());
+    } finally {
+      QueryResourceManager.getInstance().endQuery(context.getQueryId());
     }
-    assertFalse(reader.hasNextTimeValuePair());
   }
 
   @Test
-  public void testFailedNode() throws IOException {
+  public void testFailedNode() throws IOException, StorageEngineException {
     System.out.println("Start testFailedNode()");
 
     batchData = TestUtils.genBatchData(TSDataType.DOUBLE, 0, 100);
@@ -153,54 +159,58 @@ public class RemoteSimpleSeriesReaderTest {
     SingleSeriesQueryRequest request = new SingleSeriesQueryRequest();
     RemoteQueryContext context = new RemoteQueryContext(1);
 
-    DataSourceInfo sourceInfo = new DataSourceInfo(group, TSDataType.DOUBLE,
-        request, context, metaGroupMember, group);
-    sourceInfo.hasNextDataClient(false, Long.MIN_VALUE);
-    reader = new RemoteSimpleSeriesReader(sourceInfo);
-
-    // normal read
-    Assert.assertEquals(TestUtils.getNode(0), sourceInfo.getCurrentNode());
-    for (int i = 0; i < 50; i++) {
-      assertTrue(reader.hasNextTimeValuePair());
-      TimeValuePair curr = reader.currentTimeValuePair();
-      TimeValuePair pair = reader.nextTimeValuePair();
-      assertEquals(pair, curr);
-      assertEquals(i, pair.getTimestamp());
-      assertEquals(i * 1.0, pair.getValue().getDouble(), 0.00001);
-    }
-
-    this.batchUsed = false;
-    this.batchData = TestUtils.genBatchData(TSDataType.DOUBLE, 0, 100);
-    // a bad client, change to another node
-    failedNodes.add(TestUtils.getNode(0));
-    reader.clearCurDataForTest();
-    for (int i = 50; i < 80; i++) {
-      TimeValuePair pair = reader.nextTimeValuePair();
-      assertEquals(i - 50, pair.getTimestamp());
-      assertEquals((i - 50) * 1.0, pair.getValue().getDouble(), 0.00001);
-    }
-    Assert.assertEquals(TestUtils.getNode(1), sourceInfo.getCurrentNode());
-
-    this.batchUsed = false;
-    this.batchData = TestUtils.genBatchData(TSDataType.DOUBLE, 0, 100);
-    // a bad client, change to another node again
-    failedNodes.add(TestUtils.getNode(1));
-    reader.clearCurDataForTest();
-    for (int i = 80; i < 90; i++) {
-      TimeValuePair pair = reader.nextTimeValuePair();
-      assertEquals(i - 80, pair.getTimestamp());
-      assertEquals((i - 80) * 1.0, pair.getValue().getDouble(), 0.00001);
-    }
-    assertEquals(TestUtils.getNode(2), sourceInfo.getCurrentNode());
-
-    // all node failed
-    failedNodes.add(TestUtils.getNode(2));
-    reader.clearCurDataForTest();
     try {
-      reader.nextTimeValuePair();
-      fail();
-    } catch (IOException e) {
-      assertEquals(e.getMessage(), "no available client.");
+      DataSourceInfo sourceInfo = new DataSourceInfo(group, TSDataType.DOUBLE,
+          request, context, metaGroupMember, group);
+      sourceInfo.hasNextDataClient(false, Long.MIN_VALUE);
+      reader = new RemoteSimpleSeriesReader(sourceInfo);
+
+      // normal read
+      Assert.assertEquals(TestUtils.getNode(0), sourceInfo.getCurrentNode());
+      for (int i = 0; i < 50; i++) {
+        assertTrue(reader.hasNextTimeValuePair());
+        TimeValuePair curr = reader.currentTimeValuePair();
+        TimeValuePair pair = reader.nextTimeValuePair();
+        assertEquals(pair, curr);
+        assertEquals(i, pair.getTimestamp());
+        assertEquals(i * 1.0, pair.getValue().getDouble(), 0.00001);
+      }
+
+      this.batchUsed = false;
+      this.batchData = TestUtils.genBatchData(TSDataType.DOUBLE, 0, 100);
+      // a bad client, change to another node
+      failedNodes.add(TestUtils.getNode(0));
+      reader.clearCurDataForTest();
+      for (int i = 50; i < 80; i++) {
+        TimeValuePair pair = reader.nextTimeValuePair();
+        assertEquals(i - 50, pair.getTimestamp());
+        assertEquals((i - 50) * 1.0, pair.getValue().getDouble(), 0.00001);
+      }
+      Assert.assertEquals(TestUtils.getNode(1), sourceInfo.getCurrentNode());
+
+      this.batchUsed = false;
+      this.batchData = TestUtils.genBatchData(TSDataType.DOUBLE, 0, 100);
+      // a bad client, change to another node again
+      failedNodes.add(TestUtils.getNode(1));
+      reader.clearCurDataForTest();
+      for (int i = 80; i < 90; i++) {
+        TimeValuePair pair = reader.nextTimeValuePair();
+        assertEquals(i - 80, pair.getTimestamp());
+        assertEquals((i - 80) * 1.0, pair.getValue().getDouble(), 0.00001);
+      }
+      assertEquals(TestUtils.getNode(2), sourceInfo.getCurrentNode());
+
+      // all node failed
+      failedNodes.add(TestUtils.getNode(2));
+      reader.clearCurDataForTest();
+      try {
+        reader.nextTimeValuePair();
+        fail();
+      } catch (IOException e) {
+        assertEquals(e.getMessage(), "no available client.");
+      }
+    } finally {
+      QueryResourceManager.getInstance().endQuery(context.getQueryId());
     }
   }
 }
\ No newline at end of file
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandlerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandlerTest.java
index 8ea899b..be530af 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandlerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandlerTest.java
@@ -26,7 +26,7 @@ import static org.junit.Assert.assertTrue;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.iotdb.cluster.common.EnvironmentUtils;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.cluster.common.TestException;
 import org.apache.iotdb.cluster.common.TestLog;
 import org.apache.iotdb.cluster.common.TestMetaGroupMember;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
index 899b0f4..cc71df0 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
@@ -27,7 +27,7 @@ import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.iotdb.cluster.common.EnvironmentUtils;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.cluster.common.TestException;
 import org.apache.iotdb.cluster.common.TestLog;
 import org.apache.iotdb.cluster.common.TestMetaGroupMember;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/ElectionHandlerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/ElectionHandlerTest.java
index fbd5054..92f4f9b 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/ElectionHandlerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/ElectionHandlerTest.java
@@ -26,7 +26,7 @@ import static org.junit.Assert.assertTrue;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.iotdb.cluster.common.EnvironmentUtils;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.cluster.common.TestException;
 import org.apache.iotdb.cluster.common.TestMetaGroupMember;
 import org.apache.iotdb.cluster.common.TestUtils;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandlerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandlerTest.java
index ddd536a..6345906 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandlerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandlerTest.java
@@ -24,7 +24,7 @@ import static junit.framework.TestCase.assertFalse;
 import static junit.framework.TestCase.assertTrue;
 
 import java.io.IOException;
-import org.apache.iotdb.cluster.common.EnvironmentUtils;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.cluster.common.TestException;
 import org.apache.iotdb.cluster.common.TestLogManager;
 import org.apache.iotdb.cluster.common.TestMetaGroupMember;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpHandlerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpHandlerTest.java
index fde5756..e35bae6 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpHandlerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpHandlerTest.java
@@ -25,7 +25,7 @@ import static junit.framework.TestCase.assertTrue;
 
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.iotdb.cluster.common.EnvironmentUtils;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.cluster.common.TestException;
 import org.apache.iotdb.cluster.common.TestLog;
 import org.apache.iotdb.cluster.common.TestMetaGroupMember;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThreadTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThreadTest.java
index 23c1617..ea2af1e 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThreadTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThreadTest.java
@@ -27,7 +27,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListSet;
-import org.apache.iotdb.cluster.common.EnvironmentUtils;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.cluster.common.TestAsyncClient;
 import org.apache.iotdb.cluster.common.TestLogManager;
 import org.apache.iotdb.cluster.common.TestMetaGroupMember;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index f3cab45..6cc5418 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -971,70 +971,74 @@ public class DataGroupMemberTest extends MemberTest {
     request.setTimeFilterBytes(SerializeUtils.serializeFilter(timeFilter));
     QueryContext queryContext =
         new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
-    request.setQueryId(queryContext.getQueryId());
-    request.setRequestor(TestUtils.getNode(0));
-    request.setDataTypeOrdinal(TSDataType.DOUBLE.ordinal());
-    request.setDeviceMeasurements(Collections.singleton(TestUtils.getTestMeasurement(0)));
-    request.setAscending(true);
-
-    DataGroupMember dataGroupMember;
-    AtomicReference<Long> resultRef;
-    GenericHandler<Long> handler;
-    Long executorId;
-    AtomicReference<List<ByteBuffer>> aggrResultRef;
-    GenericHandler<List<ByteBuffer>> aggrResultHandler;
-    List<ByteBuffer> byteBuffers;
-    List<AggregateResult> aggregateResults;
-    Object[] answers;
-    // get an executor from a node holding this timeseries
-    request.setHeader(TestUtils.getNode(10));
-    dataGroupMember = getDataGroupMember(TestUtils.getNode(10));
     try {
-      resultRef = new AtomicReference<>();
-      handler = new GenericHandler<>(TestUtils.getNode(0), resultRef);
-      new DataAsyncService(dataGroupMember).getGroupByExecutor(request, handler);
-      executorId = resultRef.get();
-      assertEquals(1L, (long) executorId);
-
-      // fetch result
-      aggrResultRef = new AtomicReference<>();
-      aggrResultHandler = new GenericHandler<>(TestUtils.getNode(0), aggrResultRef);
-      new DataAsyncService(dataGroupMember)
-          .getGroupByResult(TestUtils.getNode(10), executorId, 0, 20, aggrResultHandler);
-
-      byteBuffers = aggrResultRef.get();
-      assertNotNull(byteBuffers);
-      aggregateResults = new ArrayList<>();
-      for (ByteBuffer byteBuffer : byteBuffers) {
-        aggregateResults.add(AggregateResult.deserializeFrom(byteBuffer));
+      request.setQueryId(queryContext.getQueryId());
+      request.setRequestor(TestUtils.getNode(0));
+      request.setDataTypeOrdinal(TSDataType.DOUBLE.ordinal());
+      request.setDeviceMeasurements(Collections.singleton(TestUtils.getTestMeasurement(0)));
+      request.setAscending(true);
+
+      DataGroupMember dataGroupMember;
+      AtomicReference<Long> resultRef;
+      GenericHandler<Long> handler;
+      Long executorId;
+      AtomicReference<List<ByteBuffer>> aggrResultRef;
+      GenericHandler<List<ByteBuffer>> aggrResultHandler;
+      List<ByteBuffer> byteBuffers;
+      List<AggregateResult> aggregateResults;
+      Object[] answers;
+      // get an executor from a node holding this timeseries
+      request.setHeader(TestUtils.getNode(10));
+      dataGroupMember = getDataGroupMember(TestUtils.getNode(10));
+      try {
+        resultRef = new AtomicReference<>();
+        handler = new GenericHandler<>(TestUtils.getNode(0), resultRef);
+        new DataAsyncService(dataGroupMember).getGroupByExecutor(request, handler);
+        executorId = resultRef.get();
+        assertEquals(1L, (long) executorId);
+
+        // fetch result
+        aggrResultRef = new AtomicReference<>();
+        aggrResultHandler = new GenericHandler<>(TestUtils.getNode(0), aggrResultRef);
+        new DataAsyncService(dataGroupMember)
+            .getGroupByResult(TestUtils.getNode(10), executorId, 0, 20, aggrResultHandler);
+
+        byteBuffers = aggrResultRef.get();
+        assertNotNull(byteBuffers);
+        aggregateResults = new ArrayList<>();
+        for (ByteBuffer byteBuffer : byteBuffers) {
+          aggregateResults.add(AggregateResult.deserializeFrom(byteBuffer));
+        }
+        answers = new Object[]{15.0, 12.0, 180.0, 5.0, 19.0, 19.0, 5.0, 19.0, 5.0};
+        checkAggregates(answers, aggregateResults);
+      } finally {
+        dataGroupMember.closeLogManager();
       }
-      answers = new Object[]{15.0, 12.0, 180.0, 5.0, 19.0, 19.0, 5.0, 19.0, 5.0};
-      checkAggregates(answers, aggregateResults);
-    } finally {
-      dataGroupMember.closeLogManager();
-    }
 
-    // get an executor from a node not holding this timeseries
-    request.setHeader(TestUtils.getNode(30));
-    dataGroupMember = getDataGroupMember(TestUtils.getNode(30));
-    try {
-      resultRef = new AtomicReference<>();
-      handler = new GenericHandler<>(TestUtils.getNode(0), resultRef);
-      request.timeFilterBytes.position(0);
-      new DataAsyncService(dataGroupMember).getGroupByExecutor(request, handler);
-      executorId = resultRef.get();
-      assertEquals(-1L, (long) executorId);
-
-      // fetch result
-      aggrResultRef = new AtomicReference<>();
-      aggrResultHandler = new GenericHandler<>(TestUtils.getNode(0), aggrResultRef);
-      new DataAsyncService(dataGroupMember)
-          .getGroupByResult(TestUtils.getNode(30), executorId, 0, 20, aggrResultHandler);
-
-      byteBuffers = aggrResultRef.get();
-      assertNull(byteBuffers);
+      // get an executor from a node not holding this timeseries
+      request.setHeader(TestUtils.getNode(30));
+      dataGroupMember = getDataGroupMember(TestUtils.getNode(30));
+      try {
+        resultRef = new AtomicReference<>();
+        handler = new GenericHandler<>(TestUtils.getNode(0), resultRef);
+        request.timeFilterBytes.position(0);
+        new DataAsyncService(dataGroupMember).getGroupByExecutor(request, handler);
+        executorId = resultRef.get();
+        assertEquals(-1L, (long) executorId);
+
+        // fetch result
+        aggrResultRef = new AtomicReference<>();
+        aggrResultHandler = new GenericHandler<>(TestUtils.getNode(0), aggrResultRef);
+        new DataAsyncService(dataGroupMember)
+            .getGroupByResult(TestUtils.getNode(30), executorId, 0, 20, aggrResultHandler);
+
+        byteBuffers = aggrResultRef.get();
+        assertNull(byteBuffers);
+      } finally {
+        dataGroupMember.closeLogManager();
+      }
     } finally {
-      dataGroupMember.closeLogManager();
+      QueryResourceManager.getInstance().endQuery(queryContext.getQueryId());
     }
   }
 
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
index 280d272..04903e3 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
@@ -30,7 +30,9 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.cluster.client.DataClientProvider;
 import org.apache.iotdb.cluster.client.async.AsyncDataClient;
-import org.apache.iotdb.cluster.common.EnvironmentUtils;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.service.RegisterManager;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.cluster.common.TestAsyncDataClient;
 import org.apache.iotdb.cluster.common.TestAsyncMetaClient;
 import org.apache.iotdb.cluster.common.TestDataGroupMember;
@@ -69,9 +71,11 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class MemberTest {
-
+private static final Logger logger = LoggerFactory.getLogger(MemberTest.class);
   public static AtomicLong dummyResponse = new AtomicLong(Response.RESPONSE_AGREE);
 
   Map<Node, DataGroupMember> dataGroupMemberMap;
@@ -89,6 +93,7 @@ public class MemberTest {
   private boolean prevUseAsyncServer;
   private int preLogBufferSize;
   private boolean prevUseAsyncApplier;
+  private boolean prevEnableWAL;
 
   @Before
   public void setUp() throws Exception {
@@ -100,6 +105,8 @@ public class MemberTest {
     ClusterDescriptor.getInstance().getConfig().setRaftLogBufferSize(4096);
     testThreadPool = Executors.newFixedThreadPool(4);
     prevLeaderWait = RaftMember.getWaitLeaderTimeMs();
+    prevEnableWAL = IoTDBDescriptor.getInstance().getConfig().isEnableWal();
+    IoTDBDescriptor.getInstance().getConfig().setEnableWal(false);
     RaftMember.setWaitLeaderTimeMs(10);
 
     allNodes = new PartitionGroup();
@@ -149,6 +156,7 @@ public class MemberTest {
 
     testMetaMember.setPartitionTable(partitionTable);
     MetaPuller.getInstance().init(testMetaMember);
+    logger.warn("Member test set up");
   }
 
   @After
@@ -165,6 +173,7 @@ public class MemberTest {
       member.closeLogManager();
     }
     metaGroupMemberMap.clear();
+    RegisterManager.setDeregisterTimeOut(100);
     EnvironmentUtils.cleanEnv();
     ClusterDescriptor.getInstance().getConfig().setSeedNodeUrls(prevUrls);
     new File(MetaGroupMember.PARTITION_FILE_NAME).delete();
@@ -174,6 +183,7 @@ public class MemberTest {
     ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(prevUseAsyncServer);
     ClusterDescriptor.getInstance().getConfig().setRaftLogBufferSize(preLogBufferSize);
     ClusterDescriptor.getInstance().getConfig().setUseAsyncApplier(prevUseAsyncApplier);
+    IoTDBDescriptor.getInstance().getConfig().setEnableWal(prevEnableWAL);
   }
 
   DataGroupMember getDataGroupMember(Node node) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java
index b781305..942ed7f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryFileManager.java
@@ -26,6 +26,8 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * <p>
@@ -95,6 +97,7 @@ public class QueryFileManager {
     if (tsFiles != null) {
       for (TsFileResource tsFile : sealedFilePathsMap.get(queryId)) {
         FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, true);
+        logger.warn("{} is read-unlocked by {}", tsFile, queryId, new Exception());
       }
       sealedFilePathsMap.remove(queryId);
     }
@@ -102,6 +105,7 @@ public class QueryFileManager {
     if (tsFiles != null) {
       for (TsFileResource tsFile : unsealedFilePathsMap.get(queryId)) {
         FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, false);
+        logger.warn("{} is read-unlocked by {}", tsFile, queryId, new Exception());
       }
       unsealedFilePathsMap.remove(queryId);
     }
@@ -119,6 +123,9 @@ public class QueryFileManager {
     if (!pathMap.get(queryId).contains(tsFile)) {
       pathMap.get(queryId).add(tsFile);
       FileReaderManager.getInstance().increaseFileReaderReference(tsFile, isClosed);
+      logger.warn("{} is read-locked by {}", tsFile, queryId, new Exception());
     }
   }
+
+  private static final Logger logger = LoggerFactory.getLogger(QueryFileManager.class);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/RegisterManager.java b/server/src/main/java/org/apache/iotdb/db/service/RegisterManager.java
index 7be1939..85d70f7 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RegisterManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RegisterManager.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.List;
 import org.apache.iotdb.db.exception.ShutdownException;
 import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.utils.TestOnly;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,6 +31,7 @@ public class RegisterManager {
 
   private static final Logger logger = LoggerFactory.getLogger(RegisterManager.class);
   private List<IService> iServices;
+  private static long DEREGISTER_TIME_OUT = 10_000L;
 
   public RegisterManager() {
     iServices = new ArrayList<>();
@@ -57,7 +59,7 @@ public class RegisterManager {
     Collections.reverse(iServices);
     for (IService service : iServices) {
       try {
-        service.waitAndStop(10000);
+        service.waitAndStop(DEREGISTER_TIME_OUT);
         logger.info("{} deregistered", service.getID());
       } catch (Exception e) {
         logger.error("Failed to stop {} because:", service.getID().getName(), e);
@@ -74,9 +76,14 @@ public class RegisterManager {
     //we stop JMXServer at last
     Collections.reverse(iServices);
     for (IService service : iServices) {
-      service.shutdown(10000);
+      service.shutdown(DEREGISTER_TIME_OUT);
     }
     iServices.clear();
     logger.info("deregister all service.");
   }
+
+  @TestOnly
+  public static void setDeregisterTimeOut(long deregisterTimeOut) {
+    DEREGISTER_TIME_OUT = deregisterTimeOut;
+  }
 }