You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/03/11 09:40:53 UTC

[incubator-iotdb] branch cherry_pick_cluster_2 created (now 0a45c19)

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a change to branch cherry_pick_cluster_2
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 0a45c19  cheery pick changes from cluster_new: 1. getAllClosedStorageGroupTsFile is now grouped by partition 2. fix empty AggregationResult is not correctly serialized 3. fix two empty AvgAggrResult merge to a wrong result 4. fix reset in First/LastValue 5. change member protection levels 6. extract GroupByExecutor and LocalGroupByExecutor 7. extract getters of readers and datasets 8. extract fill initialization

This branch includes the following new commits:

     new 0a45c19  cheery pick changes from cluster_new: 1. getAllClosedStorageGroupTsFile is now grouped by partition 2. fix empty AggregationResult is not correctly serialized 3. fix two empty AvgAggrResult merge to a wrong result 4. fix reset in First/LastValue 5. change member protection levels 6. extract GroupByExecutor and LocalGroupByExecutor 7. extract getters of readers and datasets 8. extract fill initialization

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: cheery pick changes from cluster_new: 1. getAllClosedStorageGroupTsFile is now grouped by partition 2. fix empty AggregationResult is not correctly serialized 3. fix two empty AvgAggrResult merge to a wrong result 4. fix reset in First/LastValue 5. change member protection levels 6. extract GroupByExecutor and LocalGroupByExecutor 7. extract getters of readers and datasets 8. extract fill initialization

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch cherry_pick_cluster_2
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 0a45c19c8f07ca8014ef7d7c4a37fd9956b1de32
Author: jt2594838 <jt...@163.com>
AuthorDate: Wed Mar 11 17:40:39 2020 +0800

    cheery pick changes from cluster_new:
    1. getAllClosedStorageGroupTsFile is now grouped by partition
    2. fix empty AggregationResult is not correctly serialized
    3. fix two empty AvgAggrResult merge to a wrong result
    4. fix reset in First/LastValue
    5. change member protection levels
    6. extract GroupByExecutor and LocalGroupByExecutor
    7. extract getters of readers and datasets
    8. extract fill initialization
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  20 ++-
 .../org/apache/iotdb/db/metadata/MManager.java     |   2 +-
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    |   1 -
 .../qp/strategy/optimizer/ConcatPathOptimizer.java |   2 +-
 .../db/query/aggregation/AggregateResult.java      | 101 ++++++-----
 .../db/query/aggregation/impl/AvgAggrResult.java   |   9 +
 .../aggregation/impl/FirstValueAggrResult.java     |   6 +
 .../aggregation/impl/LastValueAggrResult.java      |   6 +
 .../dataset/groupby/GroupByEngineDataSet.java      |  13 +-
 .../db/query/dataset/groupby/GroupByExecutor.java  |  15 ++
 .../groupby/GroupByWithValueFilterDataSet.java     |  26 ++-
 .../groupby/GroupByWithoutValueFilterDataSet.java  | 185 ++------------------
 .../dataset/groupby/LocalGroupByExecutor.java      | 187 +++++++++++++++++++++
 .../db/query/executor/AggregationExecutor.java     |  16 +-
 .../iotdb/db/query/executor/FillQueryExecutor.java |  11 +-
 .../iotdb/db/query/executor/QueryRouter.java       |  23 ++-
 .../java/org/apache/iotdb/db/query/fill/IFill.java |   8 +
 .../iotdb/db/query/reader/series/SeriesReader.java |   4 +-
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  17 +-
 .../org/apache/iotdb/db/utils/FilePathUtils.java   |   6 +
 .../iotdb/db/qp/plan/ConcatOptimizerTest.java      |  22 +++
 .../tsfile/read/query/dataset/QueryDataSet.java    |   3 +
 22 files changed, 420 insertions(+), 263 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 9680f39..a4923ad 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -529,15 +529,23 @@ public class StorageEngine implements IService {
 
   /**
    *
-   * @return TsFiles (seq or unseq) grouped by their storage group.
+   * @return TsFiles (seq or unseq) grouped by their storage group and partition number.
    */
-  public Map<String, List<TsFileResource>> getAllClosedStorageGroupTsFile() {
-    Map<String, List<TsFileResource>> ret = new HashMap<>();
+  public Map<String, Map<Integer, List<TsFileResource>>> getAllClosedStorageGroupTsFile() {
+    Map<String, Map<Integer, List<TsFileResource>>> ret = new HashMap<>();
     for (Entry<String, StorageGroupProcessor> entry : processorMap
         .entrySet()) {
-      ret.computeIfAbsent(entry.getKey(), sg -> new ArrayList<>()).addAll(entry.getValue().getSequenceFileTreeSet());
-      ret.get(entry.getKey()).addAll(entry.getValue().getUnSequenceFileList());
-      ret.get(entry.getKey()).removeIf(file -> !file.isClosed());
+      List<TsFileResource> sequenceFiles = entry.getValue().getSequenceFileTreeSet();
+      for (TsFileResource sequenceFile : sequenceFiles) {
+        if (!sequenceFile.isClosed()) {
+          continue;
+        }
+        String[] fileSplits = FilePathUtils.splitTsFilePath(sequenceFile);
+        int partitionNum = Integer.parseInt(fileSplits[fileSplits.length - 2]);
+        Map<Integer, List<TsFileResource>> storageGroupFiles = ret.computeIfAbsent(entry.getKey()
+            ,n -> new HashMap<>());
+        storageGroupFiles.computeIfAbsent(partitionNum, n -> new ArrayList<>()).add(sequenceFile);
+      }
     }
     return ret;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 00b21cc..f5a20ee 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -885,7 +885,7 @@ public class MManager {
    * and the wildcard will be removed.
    * If the wildcard is at the tail, then the inference will go on until the storage groups are found
    * and the wildcard will be kept.
-   * (2) Suppose the part of the path is a substring that begin after the storage group name. (e.g.,
+   * (2) Suppose the path of the path is a substring that begin after the storage group name. (e.g.,
    *  For "root.*.sg1.a.*.b.*" and "root.x.sg1" is a storage group, then this part is "a.*.b.*").
    *  For this part, keep what it is.
    *
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 45c53a7..4e06220 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -299,7 +299,6 @@ public class PhysicalGenerator {
           try {
             // remove stars in SELECT to get actual paths
             List<String> actualPaths = getMatchedTimeseries(fullPath.getFullPath());
-
             // for actual non exist path
             if (actualPaths.isEmpty() && originAggregations.isEmpty()) {
               String nonExistMeasurement = fullPath.getMeasurement();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
index 32a9a50..0cf280f 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
@@ -119,7 +119,7 @@ public class ConcatPathOptimizer implements ILogicalOptimizer {
     if(!isAlignByDevice){
       sfwOperator.setFilterOperator(concatFilter(prefixPaths, filter, filterPaths));
     }
-    filter.setPathSet(filterPaths);
+    sfwOperator.getFilterOperator().setPathSet(filterPaths);
     // GROUP_BY_DEVICE leaves the concatFilter to PhysicalGenerator to optimize filter without prefix first
 
     return sfwOperator;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
index ee083ee..ad95518 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.Objects;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.query.factory.AggregateResultFactory;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
@@ -44,7 +45,7 @@ public abstract class AggregateResult {
   private double doubleValue;
   private Binary binaryValue;
 
-  private boolean hasResult;
+  protected boolean hasResult;
 
   /**
    * construct.
@@ -110,29 +111,32 @@ public abstract class AggregateResult {
     TSDataType dataType = TSDataType.deserialize(buffer.getShort());
     AggregateResult aggregateResult = AggregateResultFactory
         .getAggrResultByType(aggregationType, dataType);
-    switch (dataType) {
-      case BOOLEAN:
-        aggregateResult.setBooleanValue(ReadWriteIOUtils.readBool(buffer));
-        break;
-      case INT32:
-        aggregateResult.setIntValue(buffer.getInt());
-        break;
-      case INT64:
-        aggregateResult.setLongValue(buffer.getLong());
-        break;
-      case FLOAT:
-        aggregateResult.setFloatValue(buffer.getFloat());
-        break;
-      case DOUBLE:
-        aggregateResult.setDoubleValue(buffer.getDouble());
-        break;
-      case TEXT:
-        aggregateResult.setBinaryValue(ReadWriteIOUtils.readBinary(buffer));
-        break;
-      default:
-        throw new IllegalArgumentException("Invalid Aggregation Type: " + dataType.name());
+    boolean hasResult = ReadWriteIOUtils.readBool(buffer);
+    if (hasResult) {
+      switch (dataType) {
+        case BOOLEAN:
+          aggregateResult.setBooleanValue(ReadWriteIOUtils.readBool(buffer));
+          break;
+        case INT32:
+          aggregateResult.setIntValue(buffer.getInt());
+          break;
+        case INT64:
+          aggregateResult.setLongValue(buffer.getLong());
+          break;
+        case FLOAT:
+          aggregateResult.setFloatValue(buffer.getFloat());
+          break;
+        case DOUBLE:
+          aggregateResult.setDoubleValue(buffer.getDouble());
+          break;
+        case TEXT:
+          aggregateResult.setBinaryValue(ReadWriteIOUtils.readBinary(buffer));
+          break;
+        default:
+          throw new IllegalArgumentException("Invalid Aggregation Type: " + dataType.name());
+      }
+      aggregateResult.deserializeSpecificFields(buffer);
     }
-    aggregateResult.deserializeSpecificFields(buffer);
     return aggregateResult;
   }
 
@@ -141,29 +145,32 @@ public abstract class AggregateResult {
   public void serializeTo(OutputStream outputStream) throws IOException {
     aggregationType.serializeTo(outputStream);
     ReadWriteIOUtils.write(resultDataType, outputStream);
-    switch (resultDataType) {
-      case BOOLEAN:
-        ReadWriteIOUtils.write(booleanValue, outputStream);
-        break;
-      case INT32:
-        ReadWriteIOUtils.write(intValue, outputStream);
-        break;
-      case INT64:
-        ReadWriteIOUtils.write(longValue, outputStream);
-        break;
-      case FLOAT:
-        ReadWriteIOUtils.write(floatValue, outputStream);
-        break;
-      case DOUBLE:
-        ReadWriteIOUtils.write(doubleValue, outputStream);
-        break;
-      case TEXT:
-        ReadWriteIOUtils.write(binaryValue, outputStream);
-        break;
-      default:
-        throw new IllegalArgumentException("Invalid Aggregation Type: " + resultDataType.name());
+    ReadWriteIOUtils.write(hasResult(), outputStream);
+    if (hasResult()) {
+      switch (resultDataType) {
+        case BOOLEAN:
+          ReadWriteIOUtils.write(booleanValue, outputStream);
+          break;
+        case INT32:
+          ReadWriteIOUtils.write(intValue, outputStream);
+          break;
+        case INT64:
+          ReadWriteIOUtils.write(longValue, outputStream);
+          break;
+        case FLOAT:
+          ReadWriteIOUtils.write(floatValue, outputStream);
+          break;
+        case DOUBLE:
+          ReadWriteIOUtils.write(doubleValue, outputStream);
+          break;
+        case TEXT:
+          ReadWriteIOUtils.write(binaryValue, outputStream);
+          break;
+        default:
+          throw new IllegalArgumentException("Invalid Aggregation Type: " + resultDataType.name());
+      }
+      serializeSpecificFields(outputStream);
     }
-    serializeSpecificFields(outputStream);
   }
 
   protected abstract void serializeSpecificFields(OutputStream outputStream) throws IOException;
@@ -294,4 +301,8 @@ public abstract class AggregateResult {
   public String toString() {
     return String.valueOf(getResult());
   }
+
+  public AggregationType getAggregationType() {
+    return aggregationType;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
index 1e44444..e93d0e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
@@ -45,6 +45,11 @@ public class AvgAggrResult extends AggregateResult {
   }
 
   @Override
+  protected boolean hasResult() {
+    return cnt > 0;
+  }
+
+  @Override
   public Double getResult() {
     if (cnt > 0) {
       setDoubleValue(avg);
@@ -120,6 +125,10 @@ public class AvgAggrResult extends AggregateResult {
   @Override
   public void merge(AggregateResult another) {
     AvgAggrResult anotherAvg = (AvgAggrResult) another;
+    if (anotherAvg.cnt == 0) {
+      // avoid two empty results producing an NaN
+      return;
+    }
     avg = avg * ((double) cnt / (cnt + anotherAvg.cnt)) +
         anotherAvg.avg * ((double) anotherAvg.cnt / (cnt + anotherAvg.cnt));
     cnt += anotherAvg.cnt;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
index 52da9c0..2dc3e2b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
@@ -42,6 +42,12 @@ public class FirstValueAggrResult extends AggregateResult {
   }
 
   @Override
+  public void reset() {
+    super.reset();
+    timestamp = Long.MAX_VALUE;
+  }
+
+  @Override
   public Object getResult() {
     return hasResult() ? getValue() : null;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
index 2077af4..13a6a67 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
@@ -41,6 +41,12 @@ public class LastValueAggrResult extends AggregateResult {
   }
 
   @Override
+  public void reset() {
+    super.reset();
+    timestamp = Long.MIN_VALUE;
+  }
+
+  @Override
   public Object getResult() {
     return hasResult() ? getValue() : null;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
index 94d290e..4ca7ceb 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
@@ -27,18 +27,21 @@ import org.apache.iotdb.tsfile.utils.Pair;
 public abstract class GroupByEngineDataSet extends QueryDataSet {
 
   protected long queryId;
-  private long interval;
-  private long slidingStep;
+  protected long interval;
+  protected long slidingStep;
   // total query [startTime, endTime)
-  private long startTime;
-  private long endTime;
+  protected long startTime;
+  protected long endTime;
 
   // current interval [curStartTime, curEndTime)
   protected long curStartTime;
   protected long curEndTime;
-  private int usedIndex;
+  protected int usedIndex;
   protected boolean hasCachedTimeInterval;
 
+  public GroupByEngineDataSet() {
+  }
+
   /**
    * groupBy query.
    */
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByExecutor.java
new file mode 100644
index 0000000..ced8008
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByExecutor.java
@@ -0,0 +1,15 @@
+package org.apache.iotdb.db.query.dataset.groupby;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+public interface GroupByExecutor {
+  void addAggregateResult(AggregateResult aggrResult, int index);
+
+  void resetAggregateResults();
+
+  List<Pair<AggregateResult, Integer>> calcResult(long curStartTime, long curEndTime) throws IOException, QueryProcessException;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
index 44402cc..a951001 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
@@ -29,11 +29,14 @@ import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.factory.AggregateResultFactory;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp;
 import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
 
 public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
@@ -53,7 +56,10 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
   /**
    * group by batch calculation size.
    */
-  private int timeStampFetchSize;
+  protected int timeStampFetchSize;
+
+  public GroupByWithValueFilterDataSet() {
+  }
 
   /**
    * constructor.
@@ -74,18 +80,28 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
   /**
    * init reader and aggregate function.
    */
-  private void initGroupBy(QueryContext context, GroupByPlan groupByPlan)
+  protected void initGroupBy(QueryContext context, GroupByPlan groupByPlan)
       throws StorageEngineException {
-    this.timestampGenerator = new ServerTimeGenerator(groupByPlan.getExpression(), context);
+    this.timestampGenerator = getTimeGenerator(groupByPlan.getExpression(), context);
     this.allDataReaderList = new ArrayList<>();
     this.groupByPlan = groupByPlan;
     for (int i = 0; i < paths.size(); i++) {
       Path path = paths.get(i);
-      allDataReaderList.add(new SeriesReaderByTimestamp(path, dataTypes.get(i), context,
-          QueryResourceManager.getInstance().getQueryDataSource(path, context, null), null));
+      allDataReaderList.add(getReaderByTime(path, dataTypes.get(i), context, null));
     }
   }
 
+  protected TimeGenerator getTimeGenerator(IExpression expression, QueryContext context)
+      throws StorageEngineException {
+    return new ServerTimeGenerator(expression, context);
+  }
+
+  protected IReaderByTimestamp getReaderByTime(Path path,
+      TSDataType dataType, QueryContext context, TsFileFilter fileFilter) throws StorageEngineException {
+    return new SeriesReaderByTimestamp(path, dataType, context,
+        QueryResourceManager.getInstance().getQueryDataSource(path, context, null), fileFilter);
+  }
+
   @Override
   protected RowRecord nextWithoutConstraint() throws IOException {
     if (!hasCachedTimeInterval) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
index 023cbf3..82c478a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
@@ -20,27 +20,20 @@
 package org.apache.iotdb.db.query.dataset.groupby;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.factory.AggregateResultFactory;
-import org.apache.iotdb.db.query.reader.series.IAggregateReader;
-import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -54,7 +47,9 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
       .getLogger(GroupByWithoutValueFilterDataSet.class);
 
   private Map<Path, GroupByExecutor> pathExecutors = new HashMap<>();
-  private TimeRange timeRange;
+
+  public GroupByWithoutValueFilterDataSet() {
+  }
 
   /**
    * constructor.
@@ -66,7 +61,7 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
     initGroupBy(context, groupByPlan);
   }
 
-  private void initGroupBy(QueryContext context, GroupByPlan groupByPlan)
+  protected void initGroupBy(QueryContext context, GroupByPlan groupByPlan)
       throws StorageEngineException {
     IExpression expression = groupByPlan.getExpression();
 
@@ -80,7 +75,7 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
       if (!pathExecutors.containsKey(path)) {
         //init GroupByExecutor
         pathExecutors.put(path,
-            new GroupByExecutor(path, dataTypes.get(i), context, timeFilter));
+           getGroupByExecutor(path, dataTypes.get(i), context, timeFilter, null));
       }
       AggregateResult aggrResult = AggregateResultFactory
           .getAggrResultByName(groupByPlan.getDeduplicatedAggregations().get(i),
@@ -97,7 +92,6 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
     }
     hasCachedTimeInterval = false;
     RowRecord record = new RowRecord(curStartTime);
-    timeRange = new TimeRange(curStartTime, curEndTime - 1);
 
     AggregateResult[] fields = new AggregateResult[paths.size()];
 
@@ -105,7 +99,7 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
       for (Entry<Path, GroupByExecutor> pathGroupByExecutorEntry : pathExecutors.entrySet()) {
         GroupByExecutor executor = pathGroupByExecutorEntry.getValue();
         executor.resetAggregateResults();
-        List<Pair<AggregateResult, Integer>> aggregations = executor.calcResult();
+        List<Pair<AggregateResult, Integer>> aggregations = executor.calcResult(curStartTime, curEndTime);
         for (Pair<AggregateResult, Integer> aggregation : aggregations) {
           fields[aggregation.right] = aggregation.left;
         }
@@ -125,165 +119,10 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
     return record;
   }
 
-  private class GroupByExecutor {
-
-    private IAggregateReader reader;
-    private BatchData preCachedData;
-    //<aggFunction - indexForRecord> of path
-    private List<Pair<AggregateResult, Integer>> results = new ArrayList<>();
-
-    GroupByExecutor(Path path, TSDataType dataType, QueryContext context, Filter timeFilter)
-        throws StorageEngineException {
-      QueryDataSource queryDataSource = QueryResourceManager.getInstance()
-          .getQueryDataSource(path, context, timeFilter);
-      // update filter by TTL
-      timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
-      this.reader = new SeriesAggregateReader(path, dataType, context, queryDataSource, timeFilter,
-          null, null);
-      this.preCachedData = null;
-    }
-
-    private List<Pair<AggregateResult, Integer>> calcResult()
-        throws IOException, QueryProcessException {
-      if (calcFromCacheData()) {
-        return results;
-      }
-
-      //read page data firstly
-      if (readAndCalcFromPage()) {
-        return results;
-      }
-
-      //read chunk finally
-      while (reader.hasNextChunk()) {
-        Statistics chunkStatistics = reader.currentChunkStatistics();
-        if (chunkStatistics.getStartTime() >= curEndTime) {
-          return results;
-        }
-        //calc from chunkMetaData
-        if (reader.canUseCurrentChunkStatistics() && timeRange.contains(
-            new TimeRange(chunkStatistics.getStartTime(), chunkStatistics.getEndTime()))) {
-          calcFromStatistics(chunkStatistics);
-          reader.skipCurrentChunk();
-          if(isEndCalc()){
-            return results;
-          }
-          continue;
-        }
-        if (readAndCalcFromPage()) {
-          return results;
-        }
-      }
-      return results;
-    }
-
-    private void addAggregateResult(AggregateResult aggrResult, int index) {
-      results.add(new Pair<>(aggrResult, index));
-    }
-
-    private boolean isEndCalc() {
-      for (Pair<AggregateResult, Integer> result : results) {
-        if (!result.left.isCalculatedAggregationResult()) {
-          return false;
-        }
-      }
-      return true;
-    }
-
-    private boolean calcFromCacheData() throws IOException {
-      calcFromBatch(preCachedData);
-      // The result is calculated from the cache
-      return (preCachedData != null && preCachedData.getMaxTimestamp() >= curEndTime)
-          || isEndCalc();
-    }
-
-    private void calcFromBatch(BatchData batchData) throws IOException {
-      // is error data
-      if (batchData == null
-          || !batchData.hasCurrent()
-          || batchData.getMaxTimestamp() < curStartTime
-          || batchData.currentTime() >= curEndTime) {
-        return;
-      }
-
-      for (Pair<AggregateResult, Integer> result : results) {
-        //current agg method has been calculated
-        if (result.left.isCalculatedAggregationResult()) {
-          continue;
-        }
-        //lazy reset batch data for calculation
-        batchData.resetBatchData();
-        //skip points that cannot be calculated
-        while (batchData.currentTime() < curStartTime && batchData.hasCurrent()) {
-          batchData.next();
-        }
-        if (batchData.hasCurrent()) {
-          result.left.updateResultFromPageData(batchData, curEndTime);
-        }
-      }
-      //can calc for next interval
-      if (batchData.getMaxTimestamp() >= curEndTime) {
-        preCachedData = batchData;
-      }
-    }
-
-    private void calcFromStatistics(Statistics statistics)
-        throws QueryProcessException {
-      for (Pair<AggregateResult, Integer> result : results) {
-        //cacl is compile
-        if (result.left.isCalculatedAggregationResult()) {
-          continue;
-        }
-        result.left.updateResultFromStatistics(statistics);
-      }
-    }
-
-    // clear all results
-    private void resetAggregateResults() {
-      for (Pair<AggregateResult, Integer> result : results) {
-        result.left.reset();
-      }
-    }
-
-
-    private boolean readAndCalcFromPage() throws IOException, QueryProcessException {
-      while (reader.hasNextPage()) {
-        Statistics pageStatistics = reader.currentPageStatistics();
-        //must be non overlapped page
-        if (pageStatistics != null) {
-          //current page max than time range
-          if (pageStatistics.getStartTime() >= curEndTime) {
-            return true;
-          }
-          //can use pageHeader
-          if (reader.canUseCurrentPageStatistics() && timeRange.contains(
-              new TimeRange(pageStatistics.getStartTime(), pageStatistics.getEndTime()))) {
-            calcFromStatistics(pageStatistics);
-            reader.skipCurrentPage();
-            if (isEndCalc()) {
-              return true;
-            }
-            continue;
-          }
-        }
-        // calc from page data
-        BatchData batchData = reader.nextPage();
-        if (batchData == null || !batchData.hasCurrent()) {
-          continue;
-        }
-        // stop calc and cached current batchData
-        if (batchData.currentTime() >= curEndTime) {
-          preCachedData = batchData;
-          return true;
-        }
-
-        calcFromBatch(batchData);
-        if (isEndCalc() || batchData.currentTime() >= curEndTime) {
-          return true;
-        }
-      }
-      return false;
-    }
+  protected GroupByExecutor getGroupByExecutor(Path path,
+      TSDataType dataType,
+      QueryContext context, Filter timeFilter, TsFileFilter fileFilter)
+      throws StorageEngineException {
+    return new LocalGroupByExecutor(path, dataType, context, timeFilter, fileFilter);
   }
-
 }
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
new file mode 100644
index 0000000..a009266
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
@@ -0,0 +1,187 @@
+package org.apache.iotdb.db.query.dataset.groupby;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.reader.series.IAggregateReader;
+import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+public class LocalGroupByExecutor implements GroupByExecutor {
+
+  private IAggregateReader reader;
+  private BatchData preCachedData;
+  //<aggFunction - indexForRecord> of path
+  private List<Pair<AggregateResult, Integer>> results = new ArrayList<>();
+  private TimeRange timeRange;
+
+  public LocalGroupByExecutor(Path path, TSDataType dataType, QueryContext context, Filter timeFilter,
+      TsFileFilter fileFilter)
+      throws StorageEngineException {
+    QueryDataSource queryDataSource = QueryResourceManager.getInstance()
+        .getQueryDataSource(path, context, timeFilter);
+    // update filter by TTL
+    timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+    this.reader = new SeriesAggregateReader(path, dataType, context, queryDataSource, timeFilter,
+        null, fileFilter);
+    this.preCachedData = null;
+    timeRange = new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE);
+  }
+
+  @Override
+  public void addAggregateResult(AggregateResult aggrResult, int index) {
+    results.add(new Pair<>(aggrResult, index));
+  }
+
+  private boolean isEndCalc() {
+    for (Pair<AggregateResult, Integer> result : results) {
+      if (!result.left.isCalculatedAggregationResult()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private boolean calcFromCacheData(long curStartTime, long curEndTime) throws IOException {
+    calcFromBatch(preCachedData, curStartTime, curEndTime);
+    // The result is calculated from the cache
+    return (preCachedData != null && preCachedData.getMaxTimestamp() >= curEndTime)
+        || isEndCalc();
+  }
+
+  private void calcFromBatch(BatchData batchData, long curStartTime, long curEndTime) throws IOException {
+    // is error data
+    if (batchData == null
+        || !batchData.hasCurrent()
+        || batchData.getMaxTimestamp() < curStartTime
+        || batchData.currentTime() >= curEndTime) {
+      return;
+    }
+
+    for (Pair<AggregateResult, Integer> result : results) {
+      //current agg method has been calculated
+      if (result.left.isCalculatedAggregationResult()) {
+        continue;
+      }
+      //lazy reset batch data for calculation
+      batchData.resetBatchData();
+      //skip points that cannot be calculated
+      while (batchData.currentTime() < curStartTime && batchData.hasCurrent()) {
+        batchData.next();
+      }
+      if (batchData.hasCurrent()) {
+        result.left.updateResultFromPageData(batchData, curEndTime);
+      }
+    }
+    //can calc for next interval
+    if (batchData.getMaxTimestamp() >= curEndTime) {
+      preCachedData = batchData;
+    }
+  }
+
+  private void calcFromStatistics(Statistics pageStatistics)
+      throws QueryProcessException {
+    for (Pair<AggregateResult, Integer> result : results) {
+      //cacl is compile
+      if (result.left.isCalculatedAggregationResult()) {
+        continue;
+      }
+      result.left.updateResultFromStatistics(pageStatistics);
+    }
+  }
+
+  @Override
+  public List<Pair<AggregateResult, Integer>> calcResult(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+    timeRange.set(curStartTime, curEndTime - 1);
+    if (calcFromCacheData(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    //read page data firstly
+    if (readAndCalcFromPage(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    //read chunk finally
+    while (reader.hasNextChunk()) {
+      Statistics chunkStatistics = reader.currentChunkStatistics();
+      if (chunkStatistics.getStartTime() >= curEndTime) {
+        return results;
+      }
+      //calc from chunkMetaData
+      if (reader.canUseCurrentChunkStatistics()
+          && timeRange.contains(chunkStatistics.getStartTime(), chunkStatistics.getEndTime())) {
+        calcFromStatistics(chunkStatistics);
+        reader.skipCurrentChunk();
+        continue;
+      }
+      if (readAndCalcFromPage(curStartTime, curEndTime)) {
+        return results;
+      }
+    }
+    return results;
+  }
+
+  // clear all results
+  @Override
+  public void resetAggregateResults() {
+    for (Pair<AggregateResult, Integer> result : results) {
+      result.left.reset();
+    }
+  }
+
+
+  private boolean readAndCalcFromPage(long curStartTime, long curEndTime) throws IOException,
+      QueryProcessException {
+    while (reader.hasNextPage()) {
+      Statistics pageStatistics = reader.currentPageStatistics();
+      //must be non overlapped page
+      if (pageStatistics != null) {
+        //current page max than time range
+        if (pageStatistics.getStartTime() >= curEndTime) {
+          return true;
+        }
+        //can use pageHeader
+        if (reader.canUseCurrentPageStatistics()
+            && timeRange.contains(pageStatistics.getStartTime(), pageStatistics.getEndTime())) {
+          calcFromStatistics(pageStatistics);
+          reader.skipCurrentPage();
+          if (isEndCalc()) {
+            return true;
+          }
+          continue;
+        }
+      }
+      // calc from page data
+      BatchData batchData = reader.nextPage();
+      if (batchData == null || !batchData.hasCurrent()) {
+        continue;
+      }
+      // stop calc and cached current batchData
+      if (batchData.currentTime() >= curEndTime) {
+        preCachedData = batchData;
+        return true;
+      }
+
+      calcFromBatch(batchData, curStartTime, curEndTime);
+      if (isEndCalc() || batchData.currentTime() >= curEndTime) {
+        return true;
+      }
+    }
+    return false;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 6882e7b..1e63f5c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -57,7 +57,7 @@ public class AggregationExecutor {
 
   private List<Path> selectedSeries;
   protected List<TSDataType> dataTypes;
-  private List<String> aggregations;
+  protected List<String> aggregations;
   protected IExpression expression;
 
   /**
@@ -65,7 +65,7 @@ public class AggregationExecutor {
    **/
   private int aggregateFetchSize;
 
-  AggregationExecutor(AggregationPlan aggregationPlan) {
+  protected AggregationExecutor(AggregationPlan aggregationPlan) {
     this.selectedSeries = aggregationPlan.getDeduplicatedPaths();
     this.dataTypes = aggregationPlan.getDeduplicatedDataTypes();
     this.aggregations = aggregationPlan.getDeduplicatedAggregations();
@@ -78,7 +78,7 @@ public class AggregationExecutor {
    *
    * @param context query context
    */
-  QueryDataSet executeWithoutValueFilter(QueryContext context)
+  public QueryDataSet executeWithoutValueFilter(QueryContext context)
       throws StorageEngineException, IOException, QueryProcessException {
 
     Filter timeFilter = null;
@@ -109,7 +109,7 @@ public class AggregationExecutor {
    * @param context query context
    * @return AggregateResult list
    */
-  private List<AggregateResult> aggregateOneSeries(
+  protected List<AggregateResult> aggregateOneSeries(
       Map.Entry<Path, List<Integer>> pathToAggrIndexes,
       Filter timeFilter, QueryContext context)
       throws IOException, QueryProcessException, StorageEngineException {
@@ -128,7 +128,7 @@ public class AggregationExecutor {
     return aggregateResultList;
   }
 
-  private static void aggregateOneSeries(Path seriesPath, QueryContext context, Filter timeFilter,
+  public static void aggregateOneSeries(Path seriesPath, QueryContext context, Filter timeFilter,
       TSDataType tsDataType, List<AggregateResult> aggregateResultList, TsFileFilter fileFilter)
       throws StorageEngineException, IOException, QueryProcessException {
 
@@ -227,7 +227,7 @@ public class AggregationExecutor {
    *
    * @param context query context.
    */
-  QueryDataSet executeWithValueFilter(QueryContext context)
+  public QueryDataSet executeWithValueFilter(QueryContext context)
       throws StorageEngineException, IOException {
 
     TimeGenerator timestampGenerator = getTimeGenerator(context);
@@ -249,11 +249,11 @@ public class AggregationExecutor {
     return constructDataSet(aggregateResults);
   }
 
-  private TimeGenerator getTimeGenerator(QueryContext context) throws StorageEngineException {
+  protected TimeGenerator getTimeGenerator(QueryContext context) throws StorageEngineException {
     return new ServerTimeGenerator(expression, context);
   }
 
-  private IReaderByTimestamp getReaderByTime(Path path, TSDataType dataType,
+  protected IReaderByTimestamp getReaderByTime(Path path, TSDataType dataType,
       QueryContext context) throws StorageEngineException {
     return new SeriesReaderByTimestamp(path,
         dataType, context,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
index a14742d..08cdde1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
@@ -88,9 +88,7 @@ public class FillQueryExecutor {
       } else {
         fill = typeIFillMap.get(dataType).copy();
       }
-      fill.setDataType(dataType);
-      fill.setQueryTime(queryTime);
-      fill.constructReaders(path, context);
+      configureFill(fill, dataType, path, context, queryTime);
 
       TimeValuePair timeValuePair = fill.getFillResult();
       if (timeValuePair == null || timeValuePair.getValue() == null) {
@@ -104,4 +102,11 @@ public class FillQueryExecutor {
     dataSet.setRecord(record);
     return dataSet;
   }
+
+  protected void configureFill(IFill fill, TSDataType dataType, Path path, QueryContext context,
+      long queryTime) throws StorageEngineException {
+    fill.setDataType(dataType);
+    fill.setQueryTime(queryTime);
+    fill.constructReaders(path, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
index 9807933..6ed625e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
@@ -138,12 +138,22 @@ public class QueryRouter implements IQueryRouter {
     groupByPlan.setExpression(optimizedExpression);
 
     if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
-      return new GroupByWithoutValueFilterDataSet(context, groupByPlan);
+      return getGroupByWithoutValueFilterDataSet(context, groupByPlan);
     } else {
-      return new GroupByWithValueFilterDataSet(context, groupByPlan);
+      return getGroupByWithValueFilterDataSet(context, groupByPlan);
     }
   }
 
+  protected GroupByWithoutValueFilterDataSet getGroupByWithoutValueFilterDataSet(QueryContext context, GroupByPlan plan)
+      throws StorageEngineException {
+    return new GroupByWithoutValueFilterDataSet(context, plan);
+  }
+
+  protected GroupByWithValueFilterDataSet getGroupByWithValueFilterDataSet(QueryContext context, GroupByPlan plan)
+      throws StorageEngineException {
+    return new GroupByWithValueFilterDataSet(context, plan);
+  }
+
   @Override
   public QueryDataSet fill(FillQueryPlan fillQueryPlan, QueryContext context)
       throws StorageEngineException, QueryProcessException, IOException {
@@ -152,11 +162,18 @@ public class QueryRouter implements IQueryRouter {
     long queryTime = fillQueryPlan.getQueryTime();
     Map<TSDataType, IFill> fillType = fillQueryPlan.getFillType();
 
-    FillQueryExecutor fillQueryExecutor = new FillQueryExecutor(fillPaths, dataTypes, queryTime,
+    FillQueryExecutor fillQueryExecutor = getFillExecutor(fillPaths, dataTypes, queryTime,
         fillType);
     return fillQueryExecutor.execute(context);
   }
 
+  protected FillQueryExecutor getFillExecutor(
+      List<Path> fillPaths,
+      List<TSDataType> dataTypes, long queryTime,
+      Map<TSDataType, IFill> fillType) {
+    return new FillQueryExecutor(fillPaths, dataTypes, queryTime, fillType);
+  }
+
   @Override
   public QueryDataSet lastQuery(LastQueryPlan lastQueryPlan, QueryContext context)
           throws StorageEngineException, QueryProcessException, IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/fill/IFill.java b/server/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
index f1d9a21..d8eb77b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
@@ -57,6 +57,14 @@ public abstract class IFill {
         timeFilter, null, null);
   }
 
+  public void setAllDataReader(IBatchReader allDataReader) {
+    this.allDataReader = allDataReader;
+  }
+
+  public Filter getFilter() {
+    return constructFilter();
+  }
+
   public abstract TimeValuePair getFillResult() throws IOException, UnSupportedFillTypeException;
 
   public TSDataType getDataType() {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index 1d7825d..ef27d37 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -45,7 +45,7 @@ import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
 import java.io.IOException;
 import java.util.*;
 
-class SeriesReader {
+public class SeriesReader {
 
   private final Path seriesPath;
   private final TSDataType dataType;
@@ -94,7 +94,7 @@ class SeriesReader {
   private boolean hasCachedNextOverlappedPage;
   private BatchData cachedBatchData;
 
-  SeriesReader(Path seriesPath, TSDataType dataType, QueryContext context,
+  public SeriesReader(Path seriesPath, TSDataType dataType, QueryContext context,
       QueryDataSource dataSource, Filter timeFilter, Filter valueFilter, TsFileFilter fileFilter) {
     this.seriesPath = seriesPath;
     this.dataType = dataType;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 3ad965c..803bdf5 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -239,10 +239,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     for (long statementId : statementIds) {
       Set<Long> queryIds = statementId2QueryId.getOrDefault(statementId, Collections.emptySet());
       for (long queryId : queryIds) {
-        queryId2DataSet.remove(queryId);
-
         try {
-          QueryResourceManager.getInstance().endQuery(queryId);
+          releaseQueryResource(queryId);
         } catch (StorageEngineException e) {
           // release as many as resources as possible, so do not break as soon as one exception is
           // raised
@@ -300,7 +298,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   /**
    * release single operation resource
    */
-  private void releaseQueryResource(long queryId) throws StorageEngineException {
+  protected void releaseQueryResource(long queryId) throws StorageEngineException {
     // remove the corresponding Physical Plan
     queryId2DataSet.remove(queryId);
     QueryResourceManager.getInstance().endQuery(queryId);
@@ -778,6 +776,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     columnTypes.add(TSDataType.TEXT.toString()); // the DEVICE column of ALIGN_BY_DEVICE result
     List<TSDataType> deduplicatedColumnsType = new ArrayList<>();
     deduplicatedColumnsType.add(TSDataType.TEXT); // the DEVICE column of ALIGN_BY_DEVICE result
+
     Set<String> deduplicatedMeasurements = new LinkedHashSet<>();
     Map<String, TSDataType> checker = plan.getMeasurementDataTypeMap();
 
@@ -812,7 +811,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     plan.setPaths(null);
   }
 
-
   @Override
   public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
     try {
@@ -831,8 +829,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
             fillRpcReturnData(req.fetchSize, queryDataSet, sessionIdUsernameMap.get(req.sessionId));
         boolean hasResultSet = result.bufferForTime().limit() != 0;
         if (!hasResultSet) {
-          QueryResourceManager.getInstance().endQuery(req.queryId);
-          queryId2DataSet.remove(req.queryId);
+          releaseQueryResource(req.queryId);
         }
         TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
         resp.setHasResultSet(hasResultSet);
@@ -942,7 +939,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     return queryDataSet;
   }
 
-  private QueryContext genQueryContext(long queryId) {
+  protected QueryContext genQueryContext(long queryId) {
     return new QueryContext(queryId);
   }
 
@@ -1019,7 +1016,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     return AuthorityChecker.check(username, paths, plan.getOperatorType(), targetUser);
   }
 
-  void handleClientExit() {
+  protected void handleClientExit() {
     Long sessionId = currSessionId.get();
     if (sessionId != null) {
       TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
@@ -1299,7 +1296,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     return null;
   }
 
-  private TSStatus executePlan(PhysicalPlan plan) {
+  protected TSStatus executePlan(PhysicalPlan plan) {
     boolean execRet;
     try {
       execRet = executeNonQuery(plan);
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
index 4109382..d89e6fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
@@ -19,9 +19,12 @@
 package org.apache.iotdb.db.utils;
 
 import java.io.File;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 
 public class FilePathUtils {
 
+  private static final String PATH_SPLIT_STRING = File.separator.equals("\\") ? "\\\\" : "/";
+
   private FilePathUtils() {
     // forbidding instantiation
   }
@@ -39,4 +42,7 @@ public class FilePathUtils {
     return filePath;
   }
 
+  public static String[] splitTsFilePath(TsFileResource resource) {
+    return resource.getFile().getAbsolutePath().split(PATH_SPLIT_STRING);
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/plan/ConcatOptimizerTest.java b/server/src/test/java/org/apache/iotdb/db/qp/plan/ConcatOptimizerTest.java
index 015a094..ffa3526 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/plan/ConcatOptimizerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/plan/ConcatOptimizerTest.java
@@ -34,6 +34,8 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
 import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
 import org.apache.iotdb.tsfile.read.filter.ValueFilter;
 import org.junit.After;
@@ -97,4 +99,24 @@ public class ConcatOptimizerTest {
         ValueFilter.lt(10));
     assertEquals(seriesExpression.toString(), ((RawDataQueryPlan) plan).getExpression().toString());
   }
+
+  @Test
+  public void testConcatMultipleDeviceInFilter() throws QueryProcessException {
+    String inputSQL = "select s1 from root.laptop.* where s1 < 10";
+    PhysicalPlan plan = processor.parseSQLToPhysicalPlan(inputSQL);
+    IExpression expression = BinaryExpression.and(
+        BinaryExpression.and(
+            new SingleSeriesExpression(
+                new Path("root.laptop.d1.s1"),
+                ValueFilter.lt(10)),
+            new SingleSeriesExpression(
+                new Path("root.laptop.d2.s1"),
+                ValueFilter.lt(10))
+        ),
+        new SingleSeriesExpression(
+            new Path("root.laptop.d3.s1"),
+            ValueFilter.lt(10))
+    );
+    assertEquals(expression.toString(), ((RawDataQueryPlan) plan).getExpression().toString());
+  }
 }
\ No newline at end of file
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
index f976d0f..bebae01 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
@@ -33,6 +33,9 @@ public abstract class QueryDataSet {
   protected int rowOffset = 0;
   protected int alreadyReturnedRowNum = 0;
 
+  public QueryDataSet() {
+  }
+
   public QueryDataSet(List<Path> paths, List<TSDataType> dataTypes) {
     this.paths = paths;
     this.dataTypes = dataTypes;