You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/03/01 09:05:45 UTC

[iotdb] 01/01: implement split pipeline for series scan

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch morePipeline
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fbcef2dca4f13548283442479857aca4f95b66d8
Author: Alima777 <wx...@gmail.com>
AuthorDate: Wed Mar 1 17:05:28 2023 +0800

    implement split pipeline for series scan
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   6 +-
 .../db/engine/querycontext/QueryDataSource.java    |   8 +
 .../exchange/IMPPDataExchangeManager.java          |   3 +-
 .../execution/exchange/MPPDataExchangeManager.java |   6 +-
 .../execution/exchange/source/ISourceHandle.java   |   2 +
 .../exchange/source/LocalSourceHandle.java         |   4 +
 .../execution/exchange/source/SourceHandle.java    |   6 +
 .../operator/source/ExchangeOperator.java          |   4 +
 .../operator/source/SeriesScanOperator.java        |  14 +-
 .../source/SeriesScanTraverseOperator.java         | 225 +++++++++++++++++++++
 .../execution/operator/source/SeriesScanUtil.java  |  60 +++++-
 .../plan/execution/memory/MemorySourceHandle.java  |   3 +
 .../plan/planner/LocalExecutionPlanContext.java    |   4 +-
 .../db/mpp/plan/planner/OperatorTreeGenerator.java | 138 ++++++++++---
 .../planner/plan/parameter/SeriesScanOptions.java  |  16 ++
 15 files changed, 457 insertions(+), 42 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 75a461b006..f6f463015a 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -388,13 +388,13 @@ public class IoTDBConfig {
   private int avgSeriesPointNumberThreshold = 100000;
 
   /** Enable inner space compaction for sequence files */
-  private boolean enableSeqSpaceCompaction = true;
+  private boolean enableSeqSpaceCompaction = false;
 
   /** Enable inner space compaction for unsequence files */
-  private boolean enableUnseqSpaceCompaction = true;
+  private boolean enableUnseqSpaceCompaction = false;
 
   /** Compact the unsequence files into the overlapped sequence files */
-  private boolean enableCrossSpaceCompaction = true;
+  private boolean enableCrossSpaceCompaction = false;
 
   /**
    * The strategy of inner space compaction task. There are just one inner space compaction strategy
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
index 8793b2c135..f01e8a5f7c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
@@ -121,4 +121,12 @@ public class QueryDataSource {
     }
     this.unSeqFileOrderIndex = unSeqFileOrderIndex;
   }
+
+  public void setUnSeqFileOrderIndex(int[] unSeqFileOrderIndex) {
+    this.unSeqFileOrderIndex = unSeqFileOrderIndex;
+  }
+
+  public int[] getUnSeqFileOrderIndex() {
+    return unSeqFileOrderIndex;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/IMPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/IMPPDataExchangeManager.java
index bbbce26e7c..eb7df0be21 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/IMPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/IMPPDataExchangeManager.java
@@ -49,7 +49,8 @@ public interface IMPPDataExchangeManager {
       String localPlanNodeId,
       FragmentInstanceContext instanceContext);
 
-  ISinkChannel createLocalSinkChannelForPipeline(DriverContext driverContext, String planNodeId);
+  ISinkChannel createLocalSinkChannelForPipeline(
+      DriverContext driverContext, String planNodeId, boolean allowRunning);
   /**
    * Create a source handle who fetches data blocks from a remote upstream fragment instance for a
    * plan node of a local fragment instance in async manner.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index 39cf36a5c0..28d4612007 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -484,14 +484,16 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
    * sink map.
    */
   public ISinkChannel createLocalSinkChannelForPipeline(
-      DriverContext driverContext, String planNodeId) {
+      DriverContext driverContext, String planNodeId, boolean allowRunning) {
     LOGGER.debug("Create local sink handle for {}", driverContext.getDriverTaskID());
     SharedTsBlockQueue queue =
         new SharedTsBlockQueue(
             driverContext.getDriverTaskID().getFragmentInstanceId().toThrift(),
             planNodeId,
             localMemoryManager);
-    queue.allowAddingTsBlock();
+    if (allowRunning) {
+      queue.allowAddingTsBlock();
+    }
     return new LocalSinkChannel(
         queue,
         new SinkListenerImpl(driverContext.getFragmentInstanceContext(), driverContext::failed));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/ISourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/ISourceHandle.java
index 14ac3429e6..421dff21cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/ISourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/ISourceHandle.java
@@ -85,4 +85,6 @@ public interface ISourceHandle {
 
   /** Set max bytes this handle can reserve from memory pool */
   void setMaxBytesCanReserve(long maxBytesCanReserve);
+
+  void allowRunning();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java
index 7dc6ad2983..672fe9cd08 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java
@@ -255,4 +255,8 @@ public class LocalSourceHandle implements ISourceHandle {
     // do nothing, the maxBytesCanReserve of SharedTsBlockQueue should be set by corresponding
     // LocalSinkChannel
   }
+
+  public void allowRunning() {
+    queue.allowAddingTsBlock();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
index db462721d0..6b874e13c6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
@@ -429,6 +429,12 @@ public class SourceHandle implements ISourceHandle {
     }
   }
 
+  @Override
+  public void allowRunning() {
+    throw new UnsupportedOperationException(
+        "AllowRunning() can only be invoked by localSourceHandle.");
+  }
+
   @Override
   public String toString() {
     return String.format(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
index 8699c3e135..4fe5cf0966 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
@@ -123,4 +123,8 @@ public class ExchangeOperator implements SourceOperator {
   public void close() throws Exception {
     sourceHandle.close();
   }
+
+  public void allowRunning() {
+    this.sourceHandle.allowRunning();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
index 4032dea5e2..cfc5e9889a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
@@ -110,7 +110,7 @@ public class SeriesScanOperator extends AbstractDataSourceOperator {
     }
   }
 
-  private final TsBlockBuilder builder;
+  private TsBlockBuilder builder;
   private boolean finished = false;
 
   public SeriesScanOperator(
@@ -128,6 +128,18 @@ public class SeriesScanOperator extends AbstractDataSourceOperator {
     this.builder = new TsBlockBuilder(seriesScanUtil.getTsDataTypeList());
   }
 
+  public SeriesScanOperator(OperatorContext context, PlanNodeId sourceId) {
+    this.sourceId = sourceId;
+    this.operatorContext = context;
+    this.maxReturnSize =
+        Math.min(maxReturnSize, TSFileDescriptor.getInstance().getConfig().getPageSizeInByte());
+  }
+
+  public void setSeriesScanUtils(SeriesScanUtil seriesScanUtil) {
+    this.seriesScanUtil = seriesScanUtil;
+    this.builder = new TsBlockBuilder(seriesScanUtil.getTsDataTypeList());
+  }
+
   @Override
   public TsBlock next() {
     if (retainedTsBlock != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanTraverseOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanTraverseOperator.java
new file mode 100644
index 0000000000..58f1846abf
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanTraverseOperator.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.source;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.SeriesScanOptions;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SeriesScanTraverseOperator extends AbstractSourceOperator
+    implements DataSourceOperator {
+
+  private final PartialPath seriesPath;
+  private final Ordering scanOrder;
+  private List<Operator> childSourceOperator;
+  private int curChildIndex = 0;
+
+  private final List<SeriesScanOperator> scanOperatorList;
+  private final SeriesScanOptions.Builder seriesScanOptionsBuilder;
+  private final int dop;
+
+  public SeriesScanTraverseOperator(
+      OperatorContext operatorContext,
+      PartialPath seriesPath,
+      Ordering scanOrder,
+      List<Operator> childSourceOperator,
+      List<SeriesScanOperator> scanOperatorList,
+      SeriesScanOptions.Builder seriesScanOptionsBuilder) {
+    this.operatorContext = operatorContext;
+    this.seriesPath = seriesPath;
+    this.scanOrder = scanOrder;
+    this.childSourceOperator = childSourceOperator;
+    this.scanOperatorList = scanOperatorList;
+    this.dop = childSourceOperator.size();
+    this.seriesScanOptionsBuilder = seriesScanOptionsBuilder;
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    if (curChildIndex >= childSourceOperator.size()) {
+      return NOT_BLOCKED;
+    }
+    return childSourceOperator.get(curChildIndex).isBlocked();
+  }
+
+  @Override
+  public TsBlock next() {
+    if (!childSourceOperator.get(curChildIndex).hasNextWithTimer()) {
+      curChildIndex++;
+      return null;
+    }
+    return childSourceOperator.get(curChildIndex).nextWithTimer();
+  }
+
+  @Override
+  public boolean hasNext() {
+    return curChildIndex < childSourceOperator.size();
+  }
+
+  @Override
+  public void close() throws Exception {
+    for (Operator child : childSourceOperator) {
+      child.close();
+    }
+  }
+
+  @Override
+  public boolean isFinished() {
+    return !this.hasNextWithTimer();
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0;
+  }
+
+  @Override
+  public void initQueryDataSource(QueryDataSource dataSource) {
+    dataSource.fillOrderIndexes(seriesPath.getDevice(), scanOrder.isAscending());
+    // updated filter concerning TTL
+    seriesScanOptionsBuilder.withTTL(dataSource.getDataTTL());
+
+    List<TsFileResource> seqResources = dataSource.getSeqResources();
+    List<TsFileResource> unSeqResources = dataSource.getUnseqResources();
+    int[] satisfiedSeqFileIndexList = new int[seqResources.size()];
+    int seqFileNum = 0;
+    long minTime = Long.MAX_VALUE, maxTime = Long.MIN_VALUE;
+    for (int i = 0; i < seqResources.size(); i++) {
+      TsFileResource tsFileResource = seqResources.get(i);
+      if (tsFileResource != null
+          && tsFileResource.isSatisfied(
+              seriesPath.getDevice(), getGlobalTimeFilter(), true, false)) {
+        satisfiedSeqFileIndexList[seqFileNum++] = i;
+        minTime = Math.min(minTime, tsFileResource.getStartTime(seriesPath.getDevice()));
+        maxTime = Math.max(maxTime, tsFileResource.getEndTime(seriesPath.getDevice()));
+      }
+    }
+
+    for (int i = 0; i < unSeqResources.size(); i++) {
+      TsFileResource tsFileResource = unSeqResources.get(i);
+      if (tsFileResource != null
+          && tsFileResource.isSatisfied(
+              seriesPath.getDevice(), getGlobalTimeFilter(), false, false)) {
+        minTime = Math.min(minTime, tsFileResource.getStartTime(seriesPath.getDevice()));
+        maxTime = Math.max(maxTime, tsFileResource.getEndTime(seriesPath.getDevice()));
+      }
+    }
+
+    // Avoid split one file to more than one ScanOperator
+    int splitNum = dop;
+    if (seqFileNum < dop) {
+      splitNum = seqFileNum;
+      childSourceOperator = childSourceOperator.subList(0, splitNum);
+    }
+    long avgTime = (maxTime - minTime) / splitNum;
+    long startTime = minTime, endTime = minTime + avgTime;
+    int curSeqFile = 0;
+    for (int i = 0; i < splitNum; i++) {
+      if (i == splitNum - 1 && endTime < maxTime) {
+        endTime = maxTime;
+      }
+      List<Integer> seqFileIndexList = new ArrayList<>();
+      AndFilter timeRangeFilter =
+          new AndFilter(TimeFilter.gtEq(startTime), TimeFilter.ltEq(endTime));
+      Filter newGlobalFilter =
+          getGlobalTimeFilter() == null
+              ? timeRangeFilter
+              : new AndFilter(getGlobalTimeFilter(), timeRangeFilter);
+
+      // update timeFilter using timeRange in tsFileResource
+      long curMinTime = startTime, curMaxTime = endTime;
+      while (curSeqFile < seqFileNum
+          && seqResources
+              .get(satisfiedSeqFileIndexList[curSeqFile])
+              .isSatisfied(seriesPath.getDevice(), newGlobalFilter, true, false)) {
+        int seqFileIndex = satisfiedSeqFileIndexList[curSeqFile];
+        curMinTime =
+            Math.min(
+                curMinTime, seqResources.get(seqFileIndex).getStartTime(seriesPath.getDevice()));
+        curMaxTime =
+            Math.max(curMaxTime, seqResources.get(seqFileIndex).getEndTime(seriesPath.getDevice()));
+        // make sure one tsFile can only be processed in one ScanOperator
+        seqFileIndexList.add(satisfiedSeqFileIndexList[curSeqFile++]);
+      }
+      // make sure at least one tsFile can be processed in one ScanOperator
+      if (seqFileIndexList.isEmpty()) {
+        if (curSeqFile < seqFileNum) {
+          int seqFileIndex = satisfiedSeqFileIndexList[curSeqFile];
+          curMinTime =
+              Math.min(
+                  curMinTime, seqResources.get(seqFileIndex).getStartTime(seriesPath.getDevice()));
+          curMaxTime =
+              Math.max(
+                  curMaxTime, seqResources.get(seqFileIndex).getEndTime(seriesPath.getDevice()));
+          seqFileIndexList.add(satisfiedSeqFileIndexList[curSeqFile++]);
+          // if there is no more tsFile can be processed
+        } else {
+          childSourceOperator = childSourceOperator.subList(0, i);
+          return;
+        }
+      }
+      SeriesScanOptions scanOptions = seriesScanOptionsBuilder.build();
+      if (curMinTime != startTime || curMaxTime != endTime) {
+        timeRangeFilter.setLeft(TimeFilter.gtEq(curMinTime));
+        timeRangeFilter.setRight(TimeFilter.ltEq(curMaxTime));
+      }
+      scanOptions.setGlobalTimeFilter(newGlobalFilter);
+      SeriesScanUtil seriesScanUtil =
+          new SeriesScanUtil(
+              seriesPath, scanOrder, scanOptions, operatorContext.getInstanceContext());
+      seriesScanUtil.initQueryDataSource(
+          dataSource, seqFileIndexList, dataSource.getUnSeqFileOrderIndex());
+      scanOperatorList.get(i).setSeriesScanUtils(seriesScanUtil);
+      if (childSourceOperator.get(i) instanceof ExchangeOperator) {
+        ((ExchangeOperator) childSourceOperator.get(i)).allowRunning();
+      }
+      // update next time range
+      startTime = curMaxTime;
+      endTime = Math.min(curMaxTime + avgTime, maxTime);
+    }
+  }
+
+  Filter getGlobalTimeFilter() {
+    return seriesScanOptionsBuilder.getGlobalTimeFilter();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index 755795447f..338b0ecdeb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -48,10 +48,13 @@ import org.apache.iotdb.tsfile.read.reader.IPointReader;
 import org.apache.iotdb.tsfile.read.reader.series.PaginationController;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
+import com.google.common.collect.Lists;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
@@ -84,6 +87,8 @@ public class SeriesScanUtil {
   // file index
   protected int curSeqFileIndex;
   protected int curUnseqFileIndex;
+  // If seqFileIndexIterator != null, means tsFileResources have been filtered well
+  public Iterator<Integer> seqFileIndexIterator;
 
   // TimeSeriesMetadata cache
   protected ITimeSeriesMetadata firstTimeSeriesMetadata;
@@ -165,6 +170,17 @@ public class SeriesScanUtil {
     curUnseqFileIndex = 0;
   }
 
+  public void initQueryDataSource(
+      QueryDataSource dataSource, List<Integer> seqFileIndexList, int[] unSeqFileOrderIndex) {
+    this.dataSource =
+        new QueryDataSource(dataSource.getSeqResources(), dataSource.getUnseqResources());
+
+    this.dataSource.setUnSeqFileOrderIndex(unSeqFileOrderIndex);
+    this.orderUtils.setSeqFileIndexList(seqFileIndexList);
+    this.curSeqFileIndex = seqFileIndexIterator.next();
+    this.curUnseqFileIndex = 0;
+  }
+
   protected PriorityMergeReader getPriorityMergeReader() {
     return new PriorityMergeReader();
   }
@@ -1223,6 +1239,8 @@ public class SeriesScanUtil {
     TsFileResource getNextUnseqFileResource(boolean isDelete);
 
     void setCurSeqFileIndex(QueryDataSource dataSource);
+
+    void setSeqFileIndexList(List<Integer> seqFileIndexList);
   }
 
   class DescTimeOrderUtils implements TimeOrderUtils {
@@ -1300,7 +1318,13 @@ public class SeriesScanUtil {
                 seriesPath.getDevice(), getGlobalTimeFilter(), true, false)) {
           break;
         }
-        curSeqFileIndex--;
+        if (seqFileIndexIterator == null) {
+          curSeqFileIndex--;
+        } else if (seqFileIndexIterator.hasNext()) {
+          curSeqFileIndex = seqFileIndexIterator.next();
+        } else {
+          return false;
+        }
       }
       return dataSource.hasNextSeqResource(curSeqFileIndex, getAscending());
     }
@@ -1323,7 +1347,13 @@ public class SeriesScanUtil {
     public TsFileResource getNextSeqFileResource(boolean isDelete) {
       TsFileResource tsFileResource = dataSource.getSeqResourceByIndex(curSeqFileIndex);
       if (isDelete) {
-        curSeqFileIndex--;
+        if (seqFileIndexIterator == null) {
+          curSeqFileIndex--;
+        } else if (seqFileIndexIterator.hasNext()) {
+          curSeqFileIndex = seqFileIndexIterator.next();
+        } else {
+          curSeqFileIndex = -1;
+        }
       }
       return tsFileResource;
     }
@@ -1341,6 +1371,11 @@ public class SeriesScanUtil {
     public void setCurSeqFileIndex(QueryDataSource dataSource) {
       curSeqFileIndex = dataSource.getSeqResourcesSize() - 1;
     }
+
+    @Override
+    public void setSeqFileIndexList(List<Integer> seqFileIndexList) {
+      seqFileIndexIterator = Lists.reverse(seqFileIndexList).iterator();
+    }
   }
 
   class AscTimeOrderUtils implements TimeOrderUtils {
@@ -1418,7 +1453,13 @@ public class SeriesScanUtil {
                 seriesPath.getDevice(), getGlobalTimeFilter(), true, false)) {
           break;
         }
-        curSeqFileIndex++;
+        if (seqFileIndexIterator == null) {
+          curSeqFileIndex++;
+        } else if (seqFileIndexIterator.hasNext()) {
+          curSeqFileIndex = seqFileIndexIterator.next();
+        } else {
+          return false;
+        }
       }
       return dataSource.hasNextSeqResource(curSeqFileIndex, getAscending());
     }
@@ -1441,7 +1482,13 @@ public class SeriesScanUtil {
     public TsFileResource getNextSeqFileResource(boolean isDelete) {
       TsFileResource tsFileResource = dataSource.getSeqResourceByIndex(curSeqFileIndex);
       if (isDelete) {
-        curSeqFileIndex++;
+        if (seqFileIndexIterator == null) {
+          curSeqFileIndex++;
+        } else if (seqFileIndexIterator.hasNext()) {
+          curSeqFileIndex = seqFileIndexIterator.next();
+        } else {
+          curSeqFileIndex = dataSource.getSeqResourcesSize();
+        }
       }
       return tsFileResource;
     }
@@ -1459,5 +1506,10 @@ public class SeriesScanUtil {
     public void setCurSeqFileIndex(QueryDataSource dataSource) {
       curSeqFileIndex = 0;
     }
+
+    @Override
+    public void setSeqFileIndexList(List<Integer> seqFileIndexList) {
+      seqFileIndexIterator = seqFileIndexList.iterator();
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
index a071b4b83c..47a6c06f48 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
@@ -110,4 +110,7 @@ public class MemorySourceHandle implements ISourceHandle {
 
   @Override
   public void setMaxBytesCanReserve(long maxBytesCanReserve) {}
+
+  @Override
+  public void allowRunning() {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
index 3dfde552ca..6734c7545f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
@@ -62,8 +62,8 @@ public class LocalExecutionPlanContext {
   private int degreeOfParallelism =
       IoTDBDescriptor.getInstance().getConfig().getDegreeOfParallelism();
   // this is shared with all subContexts
-  private AtomicInteger nextPipelineId;
-  private List<PipelineDriverFactory> pipelineDriverFactories;
+  private final AtomicInteger nextPipelineId;
+  private final List<PipelineDriverFactory> pipelineDriverFactories;
   private List<ExchangeOperator> exchangeOperatorList = new ArrayList<>();
   private int exchangeSumNum = 0;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index eb53954cfa..08e6c6b5f7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -122,6 +122,7 @@ import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperat
 import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanTraverseOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.ShowQueriesOperator;
 import org.apache.iotdb.db.mpp.execution.operator.window.EventWindowParameter;
 import org.apache.iotdb.db.mpp.execution.operator.window.SeriesWindowParameter;
@@ -136,6 +137,7 @@ import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.mpp.plan.expression.visitor.ColumnTransformerVisitor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.CountSchemaMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.DevicesCountNode;
@@ -184,6 +186,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNo
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesSourceNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.ShowQueriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
@@ -270,14 +273,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
   @Override
   public Operator visitSeriesScan(SeriesScanNode node, LocalExecutionPlanContext context) {
     PartialPath seriesPath = node.getSeriesPath();
-
-    OperatorContext operatorContext =
-        context
-            .getDriverContext()
-            .addOperatorContext(
-                context.getNextOperatorId(),
-                node.getPlanNodeId(),
-                SeriesScanOperator.class.getSimpleName());
+    ((DataDriverContext) context.getDriverContext()).addPath(seriesPath);
+    context.getDriverContext().setInputDriver(true);
 
     Filter timeFilter = node.getTimeFilter();
     Filter valueFilter = node.getValueFilter();
@@ -286,26 +283,36 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
       seriesScanOptionsBuilder.withGlobalTimeFilter(timeFilter.copy());
     }
     if (valueFilter != null) {
-      seriesScanOptionsBuilder.withGlobalTimeFilter(valueFilter.copy());
+      seriesScanOptionsBuilder.withQueryFilter(valueFilter.copy());
     }
     seriesScanOptionsBuilder.withAllSensors(
         context.getAllSensors(seriesPath.getDevice(), seriesPath.getMeasurement()));
     seriesScanOptionsBuilder.withLimit(node.getLimit());
     seriesScanOptionsBuilder.withOffset(node.getOffset());
 
-    SeriesScanOperator seriesScanOperator =
-        new SeriesScanOperator(
-            operatorContext,
-            node.getPlanNodeId(),
-            seriesPath,
-            node.getScanOrder(),
-            seriesScanOptionsBuilder.build());
+    if (context.getDegreeOfParallelism() == 1) {
+      OperatorContext operatorContext =
+          context
+              .getDriverContext()
+              .addOperatorContext(
+                  context.getNextOperatorId(),
+                  node.getPlanNodeId(),
+                  SeriesScanOperator.class.getSimpleName());
 
-    ((DataDriverContext) context.getDriverContext()).addSourceOperator(seriesScanOperator);
-    ((DataDriverContext) context.getDriverContext()).addPath(seriesPath);
-    context.getDriverContext().setInputDriver(true);
-    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
-    return seriesScanOperator;
+      SeriesScanOperator scanOperator =
+          new SeriesScanOperator(
+              operatorContext,
+              node.getPlanNodeId(),
+              seriesPath,
+              node.getScanOrder(),
+              seriesScanOptionsBuilder.build());
+
+      ((DataDriverContext) context.getDriverContext()).addSourceOperator(scanOperator);
+      context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+      return scanOperator;
+    } else {
+      return divideSplitToPipeline(context, node, seriesScanOptionsBuilder);
+    }
   }
 
   @Override
@@ -351,6 +358,65 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     return seriesScanOperator;
   }
 
+  private Operator divideSplitToPipeline(
+      LocalExecutionPlanContext context,
+      SeriesScanNode node,
+      SeriesScanOptions.Builder seriesScanOptionsBuilder) {
+    OperatorContext operatorContext =
+        context
+            .getDriverContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                SeriesScanTraverseOperator.class.getSimpleName());
+
+    int dop = context.getDegreeOfParallelism();
+    List<SeriesScanOperator> scanOperatorList = new ArrayList<>();
+    List<Operator> childSourceOperator = new ArrayList<>();
+    for (int i = 0; i < dop; i++) {
+      PlanNodeId planNodeId = new PlanNodeId(String.format("%s-%d", node.getPlanNodeId(), i));
+      // the first split belongs to parentPipeline
+      LocalExecutionPlanContext subContext = (i == 0) ? context : context.createSubContext();
+      OperatorContext scanOperatorContext =
+          subContext
+              .getDriverContext()
+              .addOperatorContext(
+                  subContext.getNextOperatorId(),
+                  planNodeId,
+                  SeriesScanOperator.class.getSimpleName());
+      SeriesScanOperator seriesScanOperator =
+          new SeriesScanOperator(
+              scanOperatorContext,
+              planNodeId,
+              node.getSeriesPath(),
+              node.getScanOrder(),
+              seriesScanOptionsBuilder.build());
+      scanOperatorList.add(seriesScanOperator);
+
+      if (i == 0) {
+        childSourceOperator.add(seriesScanOperator);
+        context.getTimeSliceAllocator().recordExecutionWeight(scanOperatorContext, 1);
+      } else {
+        subContext.getTimeSliceAllocator().recordExecutionWeight(scanOperatorContext, 1);
+        Operator exchangeOperator =
+            createNewPipelineForChildOperation(
+                context, subContext, seriesScanOperator, planNodeId, false);
+        childSourceOperator.add(exchangeOperator);
+      }
+    }
+    SeriesScanTraverseOperator traverseOperator =
+        new SeriesScanTraverseOperator(
+            operatorContext,
+            node.getSeriesPath(),
+            node.getScanOrder(),
+            childSourceOperator,
+            scanOperatorList,
+            seriesScanOptionsBuilder);
+    ((DataDriverContext) context.getDriverContext()).addSourceOperator(traverseOperator);
+    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+    return traverseOperator;
+  }
+
   @Override
   public Operator visitSeriesAggregationScan(
       SeriesAggregationScanNode node, LocalExecutionPlanContext context) {
@@ -2488,14 +2554,24 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
   private Operator createNewPipelineForChildNode(
       LocalExecutionPlanContext context, LocalExecutionPlanContext subContext, PlanNode childNode) {
     Operator childOperation = childNode.accept(this, subContext);
+    return createNewPipelineForChildOperation(
+        context, subContext, childOperation, childNode.getPlanNodeId(), true);
+  }
+
+  private Operator createNewPipelineForChildOperation(
+      LocalExecutionPlanContext context,
+      LocalExecutionPlanContext subContext,
+      Operator childOperation,
+      PlanNodeId planNodeId,
+      boolean allowRunning) {
     ISinkChannel localSinkChannel =
         MPP_DATA_EXCHANGE_MANAGER.createLocalSinkChannelForPipeline(
             // Attention, there is no parent node, use first child node instead
-            subContext.getDriverContext(), childNode.getPlanNodeId().getId());
+            subContext.getDriverContext(), planNodeId.getId(), allowRunning);
     subContext.setISink(localSinkChannel);
     subContext.addPipelineDriverFactory(childOperation, subContext.getDriverContext());
 
-    ExchangeOperator sourceOperator =
+    ExchangeOperator exchangeOperator =
         new ExchangeOperator(
             context
                 .getDriverContext()
@@ -2504,12 +2580,12 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
             MPP_DATA_EXCHANGE_MANAGER.createLocalSourceHandleForPipeline(
                 ((LocalSinkChannel) localSinkChannel).getSharedTsBlockQueue(),
                 context.getDriverContext()),
-            childNode.getPlanNodeId(),
+            planNodeId,
             childOperation.calculateMaxReturnSize());
 
-    context.getTimeSliceAllocator().recordExecutionWeight(sourceOperator.getOperatorContext(), 1);
-    context.addExchangeOperator(sourceOperator);
-    return sourceOperator;
+    context.getTimeSliceAllocator().recordExecutionWeight(exchangeOperator.getOperatorContext(), 1);
+    context.addExchangeOperator(exchangeOperator);
+    return exchangeOperator;
   }
 
   public List<Operator> dealWithConsumeChildrenOneByOneNode(
@@ -2542,13 +2618,17 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
           LocalExecutionPlanContext subContext = context.createSubContext();
           // Only context.getDegreeOfParallelism() - 1 can be allocated to child
           int dopForChild = context.getDegreeOfParallelism() - 1;
-          subContext.setDegreeOfParallelism(dopForChild);
+          if (childNode instanceof SeriesSourceNode) {
+            subContext.setDegreeOfParallelism(1);
+          } else {
+            subContext.setDegreeOfParallelism(dopForChild);
+          }
           int originPipeNum = context.getPipelineNumber();
           Operator childOperation = childNode.accept(this, subContext);
           ISinkChannel localSinkChannel =
               MPP_DATA_EXCHANGE_MANAGER.createLocalSinkChannelForPipeline(
                   // Attention, there is no parent node, use first child node instead
-                  context.getDriverContext(), childNode.getPlanNodeId().getId());
+                  context.getDriverContext(), childNode.getPlanNodeId().getId(), true);
           subContext.setISink(localSinkChannel);
           subContext.addPipelineDriverFactory(childOperation, subContext.getDriverContext());
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/SeriesScanOptions.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/SeriesScanOptions.java
index 7e3bcd6f29..b90f813c96 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/SeriesScanOptions.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/SeriesScanOptions.java
@@ -72,6 +72,10 @@ public class SeriesScanOptions {
     return globalTimeFilter;
   }
 
+  public void setGlobalTimeFilter(Filter globalTimeFilter) {
+    this.globalTimeFilter = globalTimeFilter;
+  }
+
   public Filter getQueryFilter() {
     return queryFilter;
   }
@@ -125,6 +129,10 @@ public class SeriesScanOptions {
       return this;
     }
 
+    public Filter getGlobalTimeFilter() {
+      return globalTimeFilter;
+    }
+
     public Builder withQueryFilter(Filter queryFilter) {
       this.queryFilter = queryFilter;
       return this;
@@ -140,6 +148,14 @@ public class SeriesScanOptions {
       return this;
     }
 
+    public Builder withTTL(long dataTTL) {
+      this.globalTimeFilter = updateFilterUsingTTL(globalTimeFilter, dataTTL);
+      if (this.queryFilter != null) {
+        this.queryFilter = updateFilterUsingTTL(queryFilter, dataTTL);
+      }
+      return this;
+    }
+
     public void withAllSensors(Set<String> allSensors) {
       this.allSensors = allSensors;
     }