You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/01/02 05:11:41 UTC

[iotdb] branch rel/0.12 updated: [To rel/0.12] [IOTDB-2221] Accelerate query by reducing the number of files in QueryDataSource (#4650)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new 75894dc  [To rel/0.12] [IOTDB-2221] Accelerate query by reducing the number of files in QueryDataSource (#4650)
75894dc is described below

commit 75894dc0ce5d7e5198c7274766592e24115078b6
Author: liuminghui233 <36...@users.noreply.github.com>
AuthorDate: Sun Jan 2 13:11:00 2022 +0800

    [To rel/0.12] [IOTDB-2221] Accelerate query by reducing the number of files in QueryDataSource (#4650)
---
 .../cluster/query/ClusterUDTFQueryExecutor.java    |   4 +-
 .../iotdb/cluster/query/LocalQueryExecutor.java    |  10 +-
 .../cluster/server/member/DataGroupMemberTest.java |  18 ++-
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  31 +-----
 .../engine/storagegroup/StorageGroupProcessor.java |  20 +++-
 .../db/engine/storagegroup/TsFileResource.java     |  46 ++++++--
 .../org/apache/iotdb/db/metadata/MManager.java     |  20 +++-
 .../db/query/control/QueryResourceManager.java     |  21 +++-
 .../query/dataset/groupby/GroupByFillDataSet.java  |  21 ++--
 .../groupby/GroupByWithValueFilterDataSet.java     |  20 ++--
 .../groupby/GroupByWithoutValueFilterDataSet.java  |  20 ++--
 .../dataset/groupby/LocalGroupByExecutor.java      |   5 +-
 .../db/query/executor/AggregationExecutor.java     |  35 ++++--
 .../iotdb/db/query/executor/FillQueryExecutor.java |  15 ++-
 .../iotdb/db/query/executor/LastQueryExecutor.java |  16 ++-
 .../db/query/executor/RawDataQueryExecutor.java    |  38 +++++--
 .../iotdb/db/query/executor/UDTFQueryExecutor.java |   4 +-
 .../iotdb/db/query/reader/series/SeriesReader.java |  28 +----
 .../query/timegenerator/ServerTimeGenerator.java   |  54 +++++++--
 .../iotdb/db/engine/merge/ConcurrentMergeTest.java |   1 +
 .../engine/modification/DeletionFileNodeTest.java  |  40 ++++---
 .../storagegroup/StorageGroupProcessorTest.java    |   9 ++
 .../iotdb/db/engine/storagegroup/TTLTest.java      |   3 +
 .../IoTDBQueryWithComplexValueFilterIT.java        | 122 +++++++++++++++++++++
 .../read/query/timegenerator/TimeGenerator.java    |   3 +
 .../query/timegenerator/TsFileTimeGenerator.java   |   6 +
 .../tsfile/read/reader/FakedTimeGenerator.java     |   6 +
 27 files changed, 459 insertions(+), 157 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterUDTFQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterUDTFQueryExecutor.java
index 764437e..ad05be8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterUDTFQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterUDTFQueryExecutor.java
@@ -67,7 +67,7 @@ public class ClusterUDTFQueryExecutor extends ClusterDataQueryExecutor {
             new ArrayList<>(udtfPlan.getDeduplicatedPaths()),
             timestampGenerator.hasOrNode());
     List<IReaderByTimestamp> readersOfSelectedSeries =
-        initSeriesReaderByTimestamp(context, udtfPlan, cached);
+        initSeriesReaderByTimestamp(context, udtfPlan, cached, timestampGenerator.getTimeFilter());
     return new UDTFAlignByTimeDataSet(
         context,
         udtfPlan,
@@ -98,7 +98,7 @@ public class ClusterUDTFQueryExecutor extends ClusterDataQueryExecutor {
             new ArrayList<>(udtfPlan.getDeduplicatedPaths()),
             timestampGenerator.hasOrNode());
     List<IReaderByTimestamp> readersOfSelectedSeries =
-        initSeriesReaderByTimestamp(context, udtfPlan, cached);
+        initSeriesReaderByTimestamp(context, udtfPlan, cached, timestampGenerator.getTimeFilter());
     return new UDTFNonAlignDataSet(
         context,
         udtfPlan,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
index a2fc36a..67f2a0b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
@@ -60,6 +60,7 @@ import org.apache.iotdb.db.query.factory.AggregateResultFactory;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.SerializeUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -773,7 +774,14 @@ public class LocalQueryExecutor {
             aggregationTypeOrdinals,
             queryContext,
             ascending);
-    if (!executor.isEmpty()) {
+    boolean isEmpty;
+    try {
+      isEmpty = executor.isEmpty();
+    } catch (IOException e) {
+      logger.error("Something wrong happened", e);
+      throw new QueryProcessException(e, TSStatusCode.INTERNAL_SERVER_ERROR.ordinal());
+    }
+    if (!isEmpty) {
       long executorId = queryManager.registerGroupByExecutor(executor);
       logger.debug(
           "{}: Build a GroupByExecutor of {} for {}, executorId: {}",
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index ceff919..0bdf5d3 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -1164,18 +1164,16 @@ public class DataGroupMemberTest extends BaseMember {
         request.timeFilterBytes.position(0);
         new DataAsyncService(dataGroupMember).getGroupByExecutor(request, handler);
         executorId = resultRef.get();
-        // TODO: This test is uncompleted because of shared QueryDataSource (IOTDB-2101)
-        // assertEquals(-1L, (long) executorId);
+        assertEquals(-1L, (long) executorId);
 
         // fetch result
-        //        aggrResultRef = new AtomicReference<>();
-        //        aggrResultHandler = new GenericHandler<>(TestUtils.getNode(0), aggrResultRef);
-        //        new DataAsyncService(dataGroupMember)
-        //            .getGroupByResult(TestUtils.getNode(30), executorId, 0, 20,
-        // aggrResultHandler);
-        //
-        //        byteBuffers = aggrResultRef.get();
-        //        assertNull(byteBuffers);
+        aggrResultRef = new AtomicReference<>();
+        aggrResultHandler = new GenericHandler<>(TestUtils.getNode(0), aggrResultRef);
+        new DataAsyncService(dataGroupMember)
+            .getGroupByResult(TestUtils.getNode(30), executorId, 0, 20, aggrResultHandler);
+
+        byteBuffers = aggrResultRef.get();
+        assertNull(byteBuffers);
       } finally {
         dataGroupMember.closeLogManager();
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 33d48ca..a83e2fc 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -45,8 +45,6 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
-import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.db.service.IService;
 import org.apache.iotdb.db.service.IoTDB;
@@ -56,7 +54,6 @@ import org.apache.iotdb.db.utils.UpgradeUtils;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.utils.FilePathUtils;
 import org.apache.iotdb.tsfile.utils.Pair;
 
@@ -71,7 +68,6 @@ import java.util.Arrays;
 import java.util.Comparator;
 import java.util.ConcurrentModificationException;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -1041,26 +1037,8 @@ public class StorageEngine implements IService {
   }
 
   /** get all merge lock of the storage group processor related to the query */
-  public List<StorageGroupProcessor> mergeLock(List<PartialPath> pathList)
-      throws StorageEngineException {
-    Set<StorageGroupProcessor> set = new HashSet<>();
-    for (PartialPath path : pathList) {
-      set.add(getProcessor(path.getDevicePath()));
-    }
-    List<StorageGroupProcessor> list =
-        set.stream()
-            .sorted(Comparator.comparing(StorageGroupProcessor::getVirtualStorageGroupId))
-            .collect(Collectors.toList());
-    list.forEach(StorageGroupProcessor::readLock);
-    return list;
-  }
-
-  /**
-   * get all merge lock of the storage group processor related to the query and init QueryDataSource
-   */
-  public List<StorageGroupProcessor> mergeLockAndInitQueryDataSource(
-      List<PartialPath> pathList, QueryContext context, Filter timeFilter)
-      throws StorageEngineException, QueryProcessException {
+  public Pair<List<StorageGroupProcessor>, Map<StorageGroupProcessor, List<PartialPath>>> mergeLock(
+      List<PartialPath> pathList) throws StorageEngineException, QueryProcessException {
     Map<StorageGroupProcessor, List<PartialPath>> map = new HashMap<>();
     for (PartialPath path : pathList) {
       map.computeIfAbsent(getProcessor(path.getDevicePath()), key -> new ArrayList<>()).add(path);
@@ -1071,10 +1049,7 @@ public class StorageEngine implements IService {
             .collect(Collectors.toList());
     list.forEach(StorageGroupProcessor::readLock);
 
-    // init QueryDataSource
-    QueryResourceManager.getInstance().initQueryDataSource(map, context, timeFilter);
-
-    return list;
+    return new Pair<>(list, map);
   }
 
   /** unlock all merge lock of the storage group processor related to the query */
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index d0b21fd..42d20ed 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1572,8 +1572,19 @@ public class StorageGroupProcessor {
     }
   }
 
+  /**
+   * build query data source by searching all tsfile which fit in query filter
+   *
+   * @param pathList data paths
+   * @param context query context
+   * @param timeFilter time filter
+   * @param singleDeviceId selected deviceId (not null only when all the selected series are under
+   *     the same device)
+   * @return query data source
+   */
   public QueryDataSource query(
       List<PartialPath> pathList,
+      String singleDeviceId,
       QueryContext context,
       QueryFileManager filePathsManager,
       Filter timeFilter)
@@ -1585,6 +1596,7 @@ public class StorageGroupProcessor {
               tsFileManagement.getTsFileList(true),
               upgradeSeqFileList,
               pathList,
+              singleDeviceId,
               context,
               timeFilter,
               true);
@@ -1593,6 +1605,7 @@ public class StorageGroupProcessor {
               tsFileManagement.getTsFileList(false),
               upgradeUnseqFileList,
               pathList,
+              singleDeviceId,
               context,
               timeFilter,
               false);
@@ -1642,6 +1655,7 @@ public class StorageGroupProcessor {
       Collection<TsFileResource> tsFileResources,
       List<TsFileResource> upgradeTsFileResources,
       List<PartialPath> pathList,
+      String singleDeviceId,
       QueryContext context,
       Filter timeFilter,
       boolean isSeq)
@@ -1664,7 +1678,8 @@ public class StorageGroupProcessor {
 
     // for upgrade files and old files must be closed
     for (TsFileResource tsFileResource : upgradeTsFileResources) {
-      if (!tsFileResource.isSatisfied(timeFilter, isSeq, dataTTL, context.isDebug())) {
+      if (!tsFileResource.isSatisfied(
+          singleDeviceId, timeFilter, isSeq, dataTTL, context.isDebug())) {
         continue;
       }
       closeQueryLock.readLock().lock();
@@ -1676,7 +1691,8 @@ public class StorageGroupProcessor {
     }
 
     for (TsFileResource tsFileResource : tsFileResources) {
-      if (!tsFileResource.isSatisfied(timeFilter, isSeq, dataTTL, context.isDebug())) {
+      if (!tsFileResource.isSatisfied(
+          singleDeviceId, timeFilter, isSeq, dataTTL, context.isDebug())) {
         continue;
       }
       closeQueryLock.readLock().lock();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 5ba3272..2ce4cbf 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -608,8 +608,43 @@ public class TsFileResource {
     return timeIndex.stillLives(timeLowerBound);
   }
 
+  public boolean isSatisfied(
+      String deviceId, Filter timeFilter, boolean isSeq, long ttl, boolean debug) {
+    if (deviceId == null) {
+      return isSatisfied(timeFilter, isSeq, ttl, debug);
+    }
+
+    if (!getDevices().contains(deviceId)) {
+      if (debug) {
+        DEBUG_LOGGER.info(
+            "Path: {} file {} is not satisfied because of no device!", deviceId, file);
+      }
+      return false;
+    }
+
+    long startTime = getStartTime(deviceId);
+    long endTime = closed || !isSeq ? getEndTime(deviceId) : Long.MAX_VALUE;
+
+    if (!isAlive(endTime, ttl)) {
+      if (debug) {
+        DEBUG_LOGGER.info("file {} is not satisfied because of ttl!", file);
+      }
+      return false;
+    }
+
+    if (timeFilter != null) {
+      boolean res = timeFilter.satisfyStartEndTime(startTime, endTime);
+      if (debug && !res) {
+        DEBUG_LOGGER.info(
+            "Path: {} file {} is not satisfied because of time filter!", deviceId, fsFactory);
+      }
+      return res;
+    }
+    return true;
+  }
+
   /** @return true if the TsFile lives beyond TTL */
-  public boolean isSatisfied(Filter timeFilter, boolean isSeq, long ttl, boolean debug) {
+  private boolean isSatisfied(Filter timeFilter, boolean isSeq, long ttl, boolean debug) {
     long startTime = getFileStartTime();
     long endTime = closed || !isSeq ? getFileEndTime() : Long.MAX_VALUE;
 
@@ -630,14 +665,9 @@ public class TsFileResource {
     return true;
   }
 
-  /** @return true if the device is contained in the TsFile and it lives beyond TTL */
+  /** @return true if the device is contained in the TsFile */
   public boolean isSatisfied(
-      String deviceId,
-      Filter timeFilter,
-      TsFileFilter fileFilter,
-      boolean isSeq,
-      long ttl,
-      boolean debug) {
+      String deviceId, Filter timeFilter, TsFileFilter fileFilter, boolean isSeq, boolean debug) {
     if (fileFilter != null && fileFilter.fileNotSatisfy(this)) {
       if (debug) {
         DEBUG_LOGGER.info(
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 051ac26..089511d 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.*;
 import org.apache.iotdb.db.qp.physical.sys.*;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
 import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
 import org.apache.iotdb.db.rescon.MemTableManager;
@@ -821,13 +822,20 @@ public class MManager {
     if (plan.isOrderByHeat()) {
       List<StorageGroupProcessor> list;
       try {
-        list =
-            StorageEngine.getInstance()
-                .mergeLockAndInitQueryDataSource(
-                    allMatchedNodes.stream().map(MNode::getPartialPath).collect(toList()),
-                    context,
-                    null);
+        Pair<List<StorageGroupProcessor>, Map<StorageGroupProcessor, List<PartialPath>>>
+            lockListAndProcessorToSeriesMapPair =
+                StorageEngine.getInstance()
+                    .mergeLock(
+                        allMatchedNodes.stream().map(MNode::getPartialPath).collect(toList()));
+        list = lockListAndProcessorToSeriesMapPair.left;
+        Map<StorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+            lockListAndProcessorToSeriesMapPair.right;
+
         try {
+          // init QueryDataSource Cache
+          QueryResourceManager.getInstance()
+              .initQueryDataSourceCache(processorToSeriesMap, context, null);
+
           allMatchedNodes =
               allMatchedNodes.stream()
                   .sorted(
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
index 858d7ef..0c2ddc5 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
@@ -41,6 +41,7 @@ import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * QueryResourceManager manages resource (file streams) used by each query job, and assign Ids to
@@ -110,7 +111,7 @@ public class QueryResourceManager {
    * @param processorToSeriesMap Key: processor of the virtual storage group. Value: selected series
    *     under the virtual storage group
    */
-  public void initQueryDataSource(
+  public void initQueryDataSourceCache(
       Map<StorageGroupProcessor, List<PartialPath>> processorToSeriesMap,
       QueryContext context,
       Filter timeFilter)
@@ -120,11 +121,21 @@ public class QueryResourceManager {
       StorageGroupProcessor processor = entry.getKey();
       List<PartialPath> pathList = entry.getValue();
 
+      // when all the selected series are under the same device, the QueryDataSource will be
+      // filtered according to timeIndex
+      Set<String> selectedDeviceIdSet =
+          pathList.stream().map(PartialPath::getDevice).collect(Collectors.toSet());
+
       long queryId = context.getQueryId();
       String storageGroupPath = processor.getStorageGroupPath();
 
       QueryDataSource cachedQueryDataSource =
-          processor.query(pathList, context, filePathsManager, timeFilter);
+          processor.query(
+              pathList,
+              selectedDeviceIdSet.size() == 1 ? selectedDeviceIdSet.iterator().next() : null,
+              context,
+              filePathsManager,
+              timeFilter);
       cachedQueryDataSourcesMap
           .computeIfAbsent(queryId, k -> new HashMap<>())
           .put(storageGroupPath, cachedQueryDataSource);
@@ -150,7 +161,11 @@ public class QueryResourceManager {
           StorageEngine.getInstance().getProcessor(selectedPath.getDevicePath());
       cachedQueryDataSource =
           processor.query(
-              Collections.singletonList(selectedPath), context, filePathsManager, timeFilter);
+              Collections.singletonList(selectedPath),
+              selectedPath.getDevice(),
+              context,
+              filePathsManager,
+              timeFilter);
     }
 
     // construct QueryDataSource for selectedPath
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
index 6640adb..23e6d0f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.executor.LastQueryExecutor;
 import org.apache.iotdb.db.query.executor.fill.IFill;
 import org.apache.iotdb.db.query.executor.fill.PreviousFill;
@@ -123,13 +124,19 @@ public class GroupByFillDataSet extends QueryDataSet {
         FilterFactory.and(
             TimeFilter.gtEq(lowerBound), TimeFilter.ltEq(groupByEngineDataSet.getStartTime()));
 
-    List<StorageGroupProcessor> list =
-        StorageEngine.getInstance()
-            .mergeLockAndInitQueryDataSource(
-                paths.stream().map(path -> (PartialPath) path).collect(Collectors.toList()),
-                context,
-                timeFilter);
+    Pair<List<StorageGroupProcessor>, Map<StorageGroupProcessor, List<PartialPath>>>
+        lockListAndProcessorToSeriesMapPair =
+            StorageEngine.getInstance()
+                .mergeLock(
+                    paths.stream().map(path -> (PartialPath) path).collect(Collectors.toList()));
+    List<StorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+    Map<StorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+        lockListAndProcessorToSeriesMapPair.right;
+
     try {
+      // init QueryDataSource Cache
+      QueryResourceManager.getInstance()
+          .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
       for (int i = 0; i < paths.size(); i++) {
         PreviousFill fill = previousFillExecutors[i];
         firstNotNullTV[i] = fill.getFillResult();
@@ -142,7 +149,7 @@ public class GroupByFillDataSet extends QueryDataSet {
         }
       }
     } finally {
-      StorageEngine.getInstance().mergeUnLock(list);
+      StorageEngine.getInstance().mergeUnLock(lockList);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
index 4711cbb..339ba09 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
@@ -49,6 +49,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
@@ -92,20 +93,25 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
             TimeFilter.gtEq(groupByTimePlan.getStartTime()),
             TimeFilter.lt(groupByTimePlan.getEndTime()));
 
-    List<StorageGroupProcessor> list =
-        StorageEngine.getInstance()
-            .mergeLockAndInitQueryDataSource(
-                paths.stream().map(p -> (PartialPath) p).collect(Collectors.toList()),
-                context,
-                timeFilter);
+    Pair<List<StorageGroupProcessor>, Map<StorageGroupProcessor, List<PartialPath>>>
+        lockListAndProcessorToSeriesMapPair =
+            StorageEngine.getInstance()
+                .mergeLock(paths.stream().map(p -> (PartialPath) p).collect(Collectors.toList()));
+    List<StorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+    Map<StorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+        lockListAndProcessorToSeriesMapPair.right;
+
     try {
+      // init QueryDataSource Cache
+      QueryResourceManager.getInstance()
+          .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
       for (int i = 0; i < paths.size(); i++) {
         PartialPath path = (PartialPath) paths.get(i);
         allDataReaderList.add(
             getReaderByTime(path, groupByTimePlan, dataTypes.get(i), context, null));
       }
     } finally {
-      StorageEngine.getInstance().mergeUnLock(list);
+      StorageEngine.getInstance().mergeUnLock(lockList);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
index 424915f..5628666 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.factory.AggregateResultFactory;
 import org.apache.iotdb.db.query.filter.TsFileFilter;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -89,13 +90,18 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
       throw new QueryProcessException("TimeFilter cannot be null in GroupBy query.");
     }
 
-    List<StorageGroupProcessor> list =
-        StorageEngine.getInstance()
-            .mergeLockAndInitQueryDataSource(
-                paths.stream().map(p -> (PartialPath) p).collect(Collectors.toList()),
-                context,
-                timeFilter);
+    Pair<List<StorageGroupProcessor>, Map<StorageGroupProcessor, List<PartialPath>>>
+        lockListAndProcessorToSeriesMapPair =
+            StorageEngine.getInstance()
+                .mergeLock(paths.stream().map(p -> (PartialPath) p).collect(Collectors.toList()));
+    List<StorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+    Map<StorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+        lockListAndProcessorToSeriesMapPair.right;
+
     try {
+      // init QueryDataSource Cache
+      QueryResourceManager.getInstance()
+          .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
       // init resultIndexes, group result indexes by path
       for (int i = 0; i < paths.size(); i++) {
         PartialPath path = (PartialPath) paths.get(i);
@@ -120,7 +126,7 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
         pathExecutors.get(path).addAggregateResult(aggrResult);
       }
     } finally {
-      StorageEngine.getInstance().mergeUnLock(list);
+      StorageEngine.getInstance().mergeUnLock(lockList);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
index b8cec3d..f31acd2 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
@@ -88,9 +88,8 @@ public class LocalGroupByExecutor implements GroupByExecutor {
     this.ascending = ascending;
   }
 
-  public boolean isEmpty() {
-    return queryDataSource.getSeqResources().isEmpty()
-        && queryDataSource.getUnseqResources().isEmpty();
+  public boolean isEmpty() throws IOException {
+    return !reader.hasNextFile();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index d7339a8..6ce3328 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -51,6 +51,7 @@ import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -103,11 +104,18 @@ public class AggregationExecutor {
         groupAggregationsBySeries(selectedSeries);
     AggregateResult[] aggregateResultList = new AggregateResult[selectedSeries.size()];
     // TODO-Cluster: group the paths by storage group to reduce communications
-    List<StorageGroupProcessor> list =
-        StorageEngine.getInstance()
-            .mergeLockAndInitQueryDataSource(
-                new ArrayList<>(pathToAggrIndexesMap.keySet()), context, timeFilter);
+    Pair<List<StorageGroupProcessor>, Map<StorageGroupProcessor, List<PartialPath>>>
+        lockListAndProcessorToSeriesMapPair =
+            StorageEngine.getInstance().mergeLock(new ArrayList<>(pathToAggrIndexesMap.keySet()));
+    List<StorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+    Map<StorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+        lockListAndProcessorToSeriesMapPair.right;
+
     try {
+      // init QueryDataSource Cache
+      QueryResourceManager.getInstance()
+          .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
+
       for (Map.Entry<PartialPath, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) {
         aggregateOneSeries(
             entry,
@@ -117,7 +125,7 @@ public class AggregationExecutor {
             context);
       }
     } finally {
-      StorageEngine.getInstance().mergeUnLock(list);
+      StorageEngine.getInstance().mergeUnLock(lockList);
     }
 
     return constructDataSet(Arrays.asList(aggregateResultList), aggregationPlan);
@@ -346,9 +354,20 @@ public class AggregationExecutor {
     Map<PartialPath, List<Integer>> pathToAggrIndexesMap =
         groupAggregationsBySeries(selectedSeries);
     Map<IReaderByTimestamp, List<Integer>> readerToAggrIndexesMap = new HashMap<>();
-    List<StorageGroupProcessor> list =
-        StorageEngine.getInstance().mergeLockAndInitQueryDataSource(selectedSeries, context, null);
+
+    Pair<List<StorageGroupProcessor>, Map<StorageGroupProcessor, List<PartialPath>>>
+        lockListAndProcessorToSeriesMapPair =
+            StorageEngine.getInstance().mergeLock(new ArrayList<>(pathToAggrIndexesMap.keySet()));
+    List<StorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+    Map<StorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+        lockListAndProcessorToSeriesMapPair.right;
+
     try {
+      // init QueryDataSource Cache
+      QueryResourceManager.getInstance()
+          .initQueryDataSourceCache(
+              processorToSeriesMap, context, timestampGenerator.getTimeFilter());
+
       for (int i = 0; i < selectedSeries.size(); i++) {
         PartialPath path = selectedSeries.get(i);
         List<Integer> indexes = pathToAggrIndexesMap.remove(path);
@@ -359,7 +378,7 @@ public class AggregationExecutor {
         }
       }
     } finally {
-      StorageEngine.getInstance().mergeUnLock(list);
+      StorageEngine.getInstance().mergeUnLock(lockList);
     }
 
     List<AggregateResult> aggregateResults = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
index e43c094..662267e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.tsfile.read.filter.TimeFilter;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import javax.activation.UnsupportedDataTypeException;
 
@@ -78,10 +79,16 @@ public class FillQueryExecutor {
       throws StorageEngineException, QueryProcessException, IOException {
     RowRecord record = new RowRecord(queryTime);
 
-    List<StorageGroupProcessor> list =
-        StorageEngine.getInstance()
-            .mergeLockAndInitQueryDataSource(selectedSeries, context, contructTimeFilter());
+    Pair<List<StorageGroupProcessor>, Map<StorageGroupProcessor, List<PartialPath>>>
+        lockListAndProcessorToSeriesMapPair = StorageEngine.getInstance().mergeLock(selectedSeries);
+    List<StorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+    Map<StorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+        lockListAndProcessorToSeriesMapPair.right;
+
     try {
+      // init QueryDataSource Cache
+      QueryResourceManager.getInstance()
+          .initQueryDataSourceCache(processorToSeriesMap, context, contructTimeFilter());
       List<TimeValuePair> timeValuePairs = getTimeValuePairs(context);
       long defaultFillInterval = IoTDBDescriptor.getInstance().getConfig().getDefaultFillInterval();
       for (int i = 0; i < selectedSeries.size(); i++) {
@@ -128,7 +135,7 @@ public class FillQueryExecutor {
         }
       }
     } finally {
-      StorageEngine.getInstance().mergeUnLock(list);
+      StorageEngine.getInstance().mergeUnLock(lockList);
     }
 
     SingleDataSet dataSet = new SingleDataSet(selectedSeries, dataTypes);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
index 72b8cf7..f34f9f6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
@@ -172,10 +172,18 @@ public class LastQueryExecutor {
 
     // Acquire query resources for the rest series paths
     List<LastPointReader> readerList = new ArrayList<>();
-    List<StorageGroupProcessor> list =
-        StorageEngine.getInstance()
-            .mergeLockAndInitQueryDataSource(nonCachedPaths, context, filter);
+
+    Pair<List<StorageGroupProcessor>, Map<StorageGroupProcessor, List<PartialPath>>>
+        lockListAndProcessorToSeriesMapPair = StorageEngine.getInstance().mergeLock(nonCachedPaths);
+    List<StorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+    Map<StorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+        lockListAndProcessorToSeriesMapPair.right;
+
     try {
+      // init QueryDataSource Cache
+      QueryResourceManager.getInstance()
+          .initQueryDataSourceCache(processorToSeriesMap, context, filter);
+
       for (int i = 0; i < nonCachedPaths.size(); i++) {
         QueryDataSource dataSource =
             QueryResourceManager.getInstance()
@@ -192,7 +200,7 @@ public class LastQueryExecutor {
         readerList.add(lastReader);
       }
     } finally {
-      StorageEngine.getInstance().mergeUnLock(list);
+      StorageEngine.getInstance().mergeUnLock(lockList);
     }
 
     // Compute Last result for the rest series paths by scanning Tsfiles
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
index 03f8e37..92372be 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
@@ -40,10 +40,12 @@ import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import static org.apache.iotdb.tsfile.read.query.executor.ExecutorWithTimeGenerator.markFilterdPaths;
@@ -98,10 +100,18 @@ public class RawDataQueryExecutor {
     }
 
     List<ManagedSeriesReader> readersOfSelectedSeries = new ArrayList<>();
-    List<StorageGroupProcessor> list =
-        StorageEngine.getInstance()
-            .mergeLockAndInitQueryDataSource(queryPlan.getDeduplicatedPaths(), context, timeFilter);
+
+    Pair<List<StorageGroupProcessor>, Map<StorageGroupProcessor, List<PartialPath>>>
+        lockListAndProcessorToSeriesMapPair =
+            StorageEngine.getInstance().mergeLock(queryPlan.getDeduplicatedPaths());
+    List<StorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+    Map<StorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+        lockListAndProcessorToSeriesMapPair.right;
+
     try {
+      // init QueryDataSource cache
+      QueryResourceManager.getInstance()
+          .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
       for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) {
         PartialPath path = queryPlan.getDeduplicatedPaths().get(i);
         TSDataType dataType = queryPlan.getDeduplicatedDataTypes().get(i);
@@ -124,7 +134,7 @@ public class RawDataQueryExecutor {
         readersOfSelectedSeries.add(reader);
       }
     } finally {
-      StorageEngine.getInstance().mergeUnLock(list);
+      StorageEngine.getInstance().mergeUnLock(lockList);
     }
     return readersOfSelectedSeries;
   }
@@ -149,7 +159,7 @@ public class RawDataQueryExecutor {
             new ArrayList<>(queryPlan.getDeduplicatedPaths()),
             timestampGenerator.hasOrNode());
     List<IReaderByTimestamp> readersOfSelectedSeries =
-        initSeriesReaderByTimestamp(context, queryPlan, cached);
+        initSeriesReaderByTimestamp(context, queryPlan, cached, timestampGenerator.getTimeFilter());
     return new RawQueryDataSetWithValueFilter(
         queryPlan.getDeduplicatedPaths(),
         queryPlan.getDeduplicatedDataTypes(),
@@ -160,13 +170,21 @@ public class RawDataQueryExecutor {
   }
 
   protected List<IReaderByTimestamp> initSeriesReaderByTimestamp(
-      QueryContext context, RawDataQueryPlan queryPlan, List<Boolean> cached)
+      QueryContext context, RawDataQueryPlan queryPlan, List<Boolean> cached, Filter timeFilter)
       throws QueryProcessException, StorageEngineException {
     List<IReaderByTimestamp> readersOfSelectedSeries = new ArrayList<>();
-    List<StorageGroupProcessor> list =
-        StorageEngine.getInstance()
-            .mergeLockAndInitQueryDataSource(queryPlan.getDeduplicatedPaths(), context, null);
+
+    Pair<List<StorageGroupProcessor>, Map<StorageGroupProcessor, List<PartialPath>>>
+        lockListAndProcessorToSeriesMapPair =
+            StorageEngine.getInstance().mergeLock(queryPlan.getDeduplicatedPaths());
+    List<StorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+    Map<StorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+        lockListAndProcessorToSeriesMapPair.right;
+
     try {
+      // init QueryDataSource Cache
+      QueryResourceManager.getInstance()
+          .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
       for (int i = 0; i < queryPlan.getDeduplicatedPaths().size(); i++) {
         if (cached.get(i)) {
           readersOfSelectedSeries.add(null);
@@ -182,7 +200,7 @@ public class RawDataQueryExecutor {
         readersOfSelectedSeries.add(seriesReaderByTimestamp);
       }
     } finally {
-      StorageEngine.getInstance().mergeUnLock(list);
+      StorageEngine.getInstance().mergeUnLock(lockList);
     }
     return readersOfSelectedSeries;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/UDTFQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/UDTFQueryExecutor.java
index b0ce81a..6c3156b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/UDTFQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/UDTFQueryExecutor.java
@@ -65,7 +65,7 @@ public class UDTFQueryExecutor extends RawDataQueryExecutor {
             new ArrayList<>(udtfPlan.getDeduplicatedPaths()),
             timestampGenerator.hasOrNode());
     List<IReaderByTimestamp> readersOfSelectedSeries =
-        initSeriesReaderByTimestamp(context, udtfPlan, cached);
+        initSeriesReaderByTimestamp(context, udtfPlan, cached, timestampGenerator.getTimeFilter());
     return new UDTFAlignByTimeDataSet(
         context,
         udtfPlan,
@@ -96,7 +96,7 @@ public class UDTFQueryExecutor extends RawDataQueryExecutor {
             new ArrayList<>(udtfPlan.getDeduplicatedPaths()),
             timestampGenerator.hasOrNode());
     List<IReaderByTimestamp> readersOfSelectedSeries =
-        initSeriesReaderByTimestamp(context, udtfPlan, cached);
+        initSeriesReaderByTimestamp(context, udtfPlan, cached, timestampGenerator.getTimeFilter());
     return new UDTFNonAlignDataSet(
         context,
         udtfPlan,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index b76d64b..62e9518 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -1157,12 +1157,7 @@ public class SeriesReader {
         TsFileResource tsFileResource = dataSource.getSeqResourceByIndex(curSeqFileIndex);
         if (tsFileResource != null
             && tsFileResource.isSatisfied(
-                seriesPath.getDevice(),
-                timeFilter,
-                fileFilter,
-                true,
-                dataSource.getDataTTL(),
-                context.isDebug())) {
+                seriesPath.getDevice(), timeFilter, fileFilter, true, context.isDebug())) {
           break;
         }
         curSeqFileIndex--;
@@ -1176,12 +1171,7 @@ public class SeriesReader {
         TsFileResource tsFileResource = dataSource.getUnseqResourceByIndex(curUnseqFileIndex);
         if (tsFileResource != null
             && tsFileResource.isSatisfied(
-                seriesPath.getDevice(),
-                timeFilter,
-                fileFilter,
-                false,
-                dataSource.getDataTTL(),
-                context.isDebug())) {
+                seriesPath.getDevice(), timeFilter, fileFilter, false, context.isDebug())) {
           break;
         }
         curUnseqFileIndex++;
@@ -1290,12 +1280,7 @@ public class SeriesReader {
         TsFileResource tsFileResource = dataSource.getSeqResourceByIndex(curSeqFileIndex);
         if (tsFileResource != null
             && tsFileResource.isSatisfied(
-                seriesPath.getDevice(),
-                timeFilter,
-                fileFilter,
-                true,
-                dataSource.getDataTTL(),
-                context.isDebug())) {
+                seriesPath.getDevice(), timeFilter, fileFilter, true, context.isDebug())) {
           break;
         }
         curSeqFileIndex++;
@@ -1309,12 +1294,7 @@ public class SeriesReader {
         TsFileResource tsFileResource = dataSource.getUnseqResourceByIndex(curUnseqFileIndex);
         if (tsFileResource != null
             && tsFileResource.isSatisfied(
-                seriesPath.getDevice(),
-                timeFilter,
-                fileFilter,
-                false,
-                dataSource.getDataTTL(),
-                context.isDebug())) {
+                seriesPath.getDevice(), timeFilter, fileFilter, false, context.isDebug())) {
           break;
         }
         curUnseqFileIndex++;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java b/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java
index 8bba620..8e4acc4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java
@@ -36,14 +36,17 @@ import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
 import org.apache.iotdb.tsfile.read.filter.factory.FilterType;
 import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
 import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
 import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 /**
  * A timestamp generator for query with filter. e.g. For query clause "select s1, s2 from root where
@@ -54,6 +57,8 @@ public class ServerTimeGenerator extends TimeGenerator {
   protected QueryContext context;
   protected RawDataQueryPlan queryPlan;
 
+  private Filter timeFilter;
+
   public ServerTimeGenerator(QueryContext context) {
     this.context = context;
   }
@@ -73,22 +78,53 @@ public class ServerTimeGenerator extends TimeGenerator {
   public void serverConstructNode(IExpression expression)
       throws IOException, StorageEngineException, QueryProcessException {
     List<PartialPath> pathList = new ArrayList<>();
-    getPartialPathFromExpression(expression, pathList);
-    List<StorageGroupProcessor> list =
-        StorageEngine.getInstance().mergeLockAndInitQueryDataSource(pathList, context, null);
+    timeFilter = getPathListAndConstructTimeFilterFromExpression(expression, pathList);
+
+    Pair<List<StorageGroupProcessor>, Map<StorageGroupProcessor, List<PartialPath>>>
+        lockListAndProcessorToSeriesMapPair = StorageEngine.getInstance().mergeLock(pathList);
+    List<StorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+    Map<StorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+        lockListAndProcessorToSeriesMapPair.right;
+
     try {
+      // init QueryDataSource Cache
+      QueryResourceManager.getInstance()
+          .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
+
       operatorNode = construct(expression);
     } finally {
-      StorageEngine.getInstance().mergeUnLock(list);
+      StorageEngine.getInstance().mergeUnLock(lockList);
     }
   }
 
-  private void getPartialPathFromExpression(IExpression expression, List<PartialPath> pathList) {
+  private Filter getPathListAndConstructTimeFilterFromExpression(
+      IExpression expression, List<PartialPath> pathList) {
     if (expression.getType() == ExpressionType.SERIES) {
       pathList.add((PartialPath) ((SingleSeriesExpression) expression).getSeriesPath());
+      return getTimeFilter(((SingleSeriesExpression) expression).getFilter());
     } else {
-      getPartialPathFromExpression(((IBinaryExpression) expression).getLeft(), pathList);
-      getPartialPathFromExpression(((IBinaryExpression) expression).getRight(), pathList);
+      Filter leftTimeFilter =
+          getTimeFilter(
+              getPathListAndConstructTimeFilterFromExpression(
+                  ((IBinaryExpression) expression).getLeft(), pathList));
+      Filter rightTimeFilter =
+          getTimeFilter(
+              getPathListAndConstructTimeFilterFromExpression(
+                  ((IBinaryExpression) expression).getRight(), pathList));
+
+      if (expression instanceof AndFilter) {
+        if (leftTimeFilter != null && rightTimeFilter != null) {
+          return FilterFactory.and(leftTimeFilter, rightTimeFilter);
+        } else if (leftTimeFilter != null) {
+          return leftTimeFilter;
+        } else return rightTimeFilter;
+      } else {
+        if (leftTimeFilter != null && rightTimeFilter != null) {
+          return FilterFactory.or(leftTimeFilter, rightTimeFilter);
+        } else {
+          return null;
+        }
+      }
     }
   }
 
@@ -148,4 +184,8 @@ public class ServerTimeGenerator extends TimeGenerator {
   protected boolean isAscending() {
     return queryPlan.isAscending();
   }
+
+  public Filter getTimeFilter() {
+    return timeFilter;
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/ConcurrentMergeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/ConcurrentMergeTest.java
index 4e369fc..13ee263 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/ConcurrentMergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/ConcurrentMergeTest.java
@@ -117,6 +117,7 @@ public class ConcurrentMergeTest {
     QueryDataSource queryDataSource =
         processor.query(
             Collections.singletonList(new PartialPath(deviceId, measurementId)),
+            deviceId,
             context,
             null,
             null);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
index 1f48367..7886b43 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
 import org.apache.iotdb.tsfile.read.reader.IPointReader;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.record.TSRecord;
 import org.apache.iotdb.tsfile.write.record.datapoint.DoubleDataPoint;
 
@@ -55,6 +56,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_CONTEXT;
 import static org.apache.iotdb.db.utils.EnvironmentUtils.TEST_QUERY_JOB_ID;
@@ -125,13 +127,19 @@ public class DeletionFileNodeTest {
     SingleSeriesExpression expression =
         new SingleSeriesExpression(
             new PartialPath(processorName + TsFileConstant.PATH_SEPARATOR + measurements[5]), null);
-    List<StorageGroupProcessor> list =
-        StorageEngine.getInstance()
-            .mergeLockAndInitQueryDataSource(
-                Collections.singletonList((PartialPath) expression.getSeriesPath()),
-                TEST_QUERY_CONTEXT,
-                null);
+
+    Pair<List<StorageGroupProcessor>, Map<StorageGroupProcessor, List<PartialPath>>>
+        lockListAndProcessorToSeriesMapPair =
+            StorageEngine.getInstance()
+                .mergeLock(Collections.singletonList((PartialPath) expression.getSeriesPath()));
+    List<StorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+    Map<StorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+        lockListAndProcessorToSeriesMapPair.right;
+
     try {
+      // init QueryDataSource Cache
+      QueryResourceManager.getInstance()
+          .initQueryDataSourceCache(processorToSeriesMap, TEST_QUERY_CONTEXT, null);
       QueryDataSource dataSource =
           QueryResourceManager.getInstance()
               .getQueryDataSource(
@@ -150,7 +158,7 @@ public class DeletionFileNodeTest {
       assertEquals(50, count);
       QueryResourceManager.getInstance().endQuery(TEST_QUERY_JOB_ID);
     } finally {
-      StorageEngine.getInstance().mergeUnLock(list);
+      StorageEngine.getInstance().mergeUnLock(lockList);
     }
   }
 
@@ -253,14 +261,18 @@ public class DeletionFileNodeTest {
         new SingleSeriesExpression(
             new PartialPath(processorName + TsFileConstant.PATH_SEPARATOR + measurements[5]), null);
 
-    List<StorageGroupProcessor> list =
-        StorageEngine.getInstance()
-            .mergeLockAndInitQueryDataSource(
-                Collections.singletonList((PartialPath) expression.getSeriesPath()),
-                TEST_QUERY_CONTEXT,
-                null);
+    Pair<List<StorageGroupProcessor>, Map<StorageGroupProcessor, List<PartialPath>>>
+        lockListAndProcessorToSeriesMapPair =
+            StorageEngine.getInstance()
+                .mergeLock(Collections.singletonList((PartialPath) expression.getSeriesPath()));
+    List<StorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
+    Map<StorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
+        lockListAndProcessorToSeriesMapPair.right;
 
     try {
+      // init QueryDataSource Cache
+      QueryResourceManager.getInstance()
+          .initQueryDataSourceCache(processorToSeriesMap, TEST_QUERY_CONTEXT, null);
       QueryDataSource dataSource =
           QueryResourceManager.getInstance()
               .getQueryDataSource(
@@ -280,7 +292,7 @@ public class DeletionFileNodeTest {
 
       QueryResourceManager.getInstance().endQuery(TEST_QUERY_JOB_ID);
     } finally {
-      StorageEngine.getInstance().mergeUnLock(list);
+      StorageEngine.getInstance().mergeUnLock(lockList);
     }
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index b632143..7214a7b 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -168,6 +168,7 @@ public class StorageGroupProcessorTest {
     QueryDataSource queryDataSource =
         processor.query(
             Collections.singletonList(new PartialPath(deviceId, measurementId)),
+            deviceId,
             context,
             null,
             null);
@@ -201,6 +202,7 @@ public class StorageGroupProcessorTest {
     QueryDataSource queryDataSource =
         processor.query(
             Collections.singletonList(new PartialPath(deviceId, measurementId)),
+            deviceId,
             context,
             null,
             null);
@@ -266,6 +268,7 @@ public class StorageGroupProcessorTest {
     QueryDataSource queryDataSource =
         processor.query(
             Collections.singletonList(new PartialPath(deviceId, measurementId)),
+            deviceId,
             context,
             null,
             null);
@@ -300,6 +303,7 @@ public class StorageGroupProcessorTest {
     QueryDataSource queryDataSource =
         processor.query(
             Collections.singletonList(new PartialPath(deviceId, measurementId)),
+            deviceId,
             context,
             null,
             null);
@@ -343,6 +347,7 @@ public class StorageGroupProcessorTest {
     QueryDataSource queryDataSource =
         processor.query(
             Collections.singletonList(new PartialPath(deviceId, measurementId)),
+            deviceId,
             context,
             null,
             null);
@@ -428,6 +433,7 @@ public class StorageGroupProcessorTest {
     QueryDataSource queryDataSource =
         processor.query(
             Collections.singletonList(new PartialPath(deviceId, measurementId)),
+            deviceId,
             context,
             null,
             null);
@@ -513,6 +519,7 @@ public class StorageGroupProcessorTest {
     QueryDataSource queryDataSource =
         processor.query(
             Collections.singletonList(new PartialPath(deviceId, measurementId)),
+            deviceId,
             context,
             null,
             null);
@@ -598,6 +605,7 @@ public class StorageGroupProcessorTest {
     QueryDataSource queryDataSource =
         processor.query(
             Collections.singletonList(new PartialPath(deviceId, measurementId)),
+            deviceId,
             context,
             null,
             null);
@@ -640,6 +648,7 @@ public class StorageGroupProcessorTest {
     QueryDataSource queryDataSource =
         processor.query(
             Collections.singletonList(new PartialPath(deviceId, measurementId)),
+            deviceId,
             context,
             null,
             null);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index 651f290..f906c03 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -224,6 +224,7 @@ public class TTLTest {
     QueryDataSource dataSource =
         storageGroupProcessor.query(
             Collections.singletonList(new PartialPath(sg1, s1)),
+            sg1,
             EnvironmentUtils.TEST_QUERY_CONTEXT,
             null,
             null);
@@ -238,6 +239,7 @@ public class TTLTest {
     dataSource =
         storageGroupProcessor.query(
             Collections.singletonList(new PartialPath(sg1, s1)),
+            sg1,
             EnvironmentUtils.TEST_QUERY_CONTEXT,
             null,
             null);
@@ -276,6 +278,7 @@ public class TTLTest {
     dataSource =
         storageGroupProcessor.query(
             Collections.singletonList(new PartialPath(sg1, s1)),
+            sg1,
             EnvironmentUtils.TEST_QUERY_CONTEXT,
             null,
             null);
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBQueryWithComplexValueFilterIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBQueryWithComplexValueFilterIT.java
new file mode 100644
index 0000000..4185775
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBQueryWithComplexValueFilterIT.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static org.junit.Assert.fail;
+
+public class IoTDBQueryWithComplexValueFilterIT {
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvironmentUtils.closeStatMonitor();
+    EnvironmentUtils.envSetUp();
+    prepareData();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void testRawQuery1() throws ClassNotFoundException {
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      boolean hasResultSet =
+          statement.execute(
+              "select s1 from root.sg1.d1 where (time > 400 and s1 <= 600) or (s2 > 300 and time <= 500)");
+      Assert.assertTrue(hasResultSet);
+
+      ResultSet resultSet = statement.getResultSet();
+      int cnt = 0;
+
+      while (resultSet.next()) {
+        cnt++;
+      }
+
+      Assert.assertEquals(300, cnt);
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testRawQuery2() throws ClassNotFoundException {
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      boolean hasResultSet =
+          statement.execute(
+              "select s1 from root.sg1.d1 where (time > 400 and s1 <= 600) and (s2 > 300 and time <= 500)");
+      Assert.assertTrue(hasResultSet);
+
+      ResultSet resultSet = statement.getResultSet();
+      int cnt = 0;
+
+      while (resultSet.next()) {
+        cnt++;
+      }
+
+      Assert.assertEquals(100, cnt);
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  private static void prepareData() throws ClassNotFoundException {
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      statement.execute("create storage group root.sg1");
+      statement.execute("create timeseries root.sg1.d1.s1 with datatype=INT32,encoding=PLAIN");
+      statement.execute("create timeseries root.sg1.d1.s2 with datatype=DOUBLE,encoding=PLAIN");
+      for (int i = 0; i < 1000; i++) {
+        statement.execute(
+            String.format(
+                "insert into root.sg1.d1(time,s1,s2) values(%d,%d,%f)", i, i, (double) i));
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TimeGenerator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TimeGenerator.java
index 0da8b94..344e564 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TimeGenerator.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TimeGenerator.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.tsfile.read.expression.ExpressionType;
 import org.apache.iotdb.tsfile.read.expression.IBinaryExpression;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.query.timegenerator.node.AndNode;
 import org.apache.iotdb.tsfile.read.query.timegenerator.node.LeafNode;
 import org.apache.iotdb.tsfile.read.query.timegenerator.node.Node;
@@ -132,4 +133,6 @@ public abstract class TimeGenerator {
   }
 
   protected abstract boolean isAscending();
+
+  public abstract Filter getTimeFilter();
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TsFileTimeGenerator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TsFileTimeGenerator.java
index c68fc74..d38e7f7 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TsFileTimeGenerator.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/timegenerator/TsFileTimeGenerator.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
 import org.apache.iotdb.tsfile.read.controller.IMetadataQuerier;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.reader.IBatchReader;
 import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReader;
 
@@ -55,4 +56,9 @@ public class TsFileTimeGenerator extends TimeGenerator {
   protected boolean isAscending() {
     return true;
   }
+
+  @Override
+  public Filter getTimeFilter() {
+    return null;
+  }
 }
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/FakedTimeGenerator.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/FakedTimeGenerator.java
index 85b30c0..288c53b 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/FakedTimeGenerator.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/FakedTimeGenerator.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
 import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
 import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
 import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
 
@@ -61,6 +62,11 @@ public class FakedTimeGenerator extends TimeGenerator {
     return true;
   }
 
+  @Override
+  public Filter getTimeFilter() {
+    return null;
+  }
+
   @Test
   public void testTimeGenerator() throws IOException {
     FakedTimeGenerator fakedTimeGenerator = new FakedTimeGenerator();