You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/03/01 09:05:45 UTC
[iotdb] 01/01: implement split pipeline for series scan
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch morePipeline
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit fbcef2dca4f13548283442479857aca4f95b66d8
Author: Alima777 <wx...@gmail.com>
AuthorDate: Wed Mar 1 17:05:28 2023 +0800
implement split pipeline for series scan
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 6 +-
.../db/engine/querycontext/QueryDataSource.java | 8 +
.../exchange/IMPPDataExchangeManager.java | 3 +-
.../execution/exchange/MPPDataExchangeManager.java | 6 +-
.../execution/exchange/source/ISourceHandle.java | 2 +
.../exchange/source/LocalSourceHandle.java | 4 +
.../execution/exchange/source/SourceHandle.java | 6 +
.../operator/source/ExchangeOperator.java | 4 +
.../operator/source/SeriesScanOperator.java | 14 +-
.../source/SeriesScanTraverseOperator.java | 225 +++++++++++++++++++++
.../execution/operator/source/SeriesScanUtil.java | 60 +++++-
.../plan/execution/memory/MemorySourceHandle.java | 3 +
.../plan/planner/LocalExecutionPlanContext.java | 4 +-
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 138 ++++++++++---
.../planner/plan/parameter/SeriesScanOptions.java | 16 ++
15 files changed, 457 insertions(+), 42 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 75a461b006..f6f463015a 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -388,13 +388,13 @@ public class IoTDBConfig {
private int avgSeriesPointNumberThreshold = 100000;
/** Enable inner space compaction for sequence files */
- private boolean enableSeqSpaceCompaction = true;
+ private boolean enableSeqSpaceCompaction = false;
/** Enable inner space compaction for unsequence files */
- private boolean enableUnseqSpaceCompaction = true;
+ private boolean enableUnseqSpaceCompaction = false;
/** Compact the unsequence files into the overlapped sequence files */
- private boolean enableCrossSpaceCompaction = true;
+ private boolean enableCrossSpaceCompaction = false;
/**
* The strategy of inner space compaction task. There are just one inner space compaction strategy
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
index 8793b2c135..f01e8a5f7c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
@@ -121,4 +121,12 @@ public class QueryDataSource {
}
this.unSeqFileOrderIndex = unSeqFileOrderIndex;
}
+
+ public void setUnSeqFileOrderIndex(int[] unSeqFileOrderIndex) {
+ this.unSeqFileOrderIndex = unSeqFileOrderIndex;
+ }
+
+ public int[] getUnSeqFileOrderIndex() {
+ return unSeqFileOrderIndex;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/IMPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/IMPPDataExchangeManager.java
index bbbce26e7c..eb7df0be21 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/IMPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/IMPPDataExchangeManager.java
@@ -49,7 +49,8 @@ public interface IMPPDataExchangeManager {
String localPlanNodeId,
FragmentInstanceContext instanceContext);
- ISinkChannel createLocalSinkChannelForPipeline(DriverContext driverContext, String planNodeId);
+ ISinkChannel createLocalSinkChannelForPipeline(
+ DriverContext driverContext, String planNodeId, boolean allowRunning);
/**
* Create a source handle who fetches data blocks from a remote upstream fragment instance for a
* plan node of a local fragment instance in async manner.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index 39cf36a5c0..28d4612007 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -484,14 +484,16 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
* sink map.
*/
public ISinkChannel createLocalSinkChannelForPipeline(
- DriverContext driverContext, String planNodeId) {
+ DriverContext driverContext, String planNodeId, boolean allowRunning) {
LOGGER.debug("Create local sink handle for {}", driverContext.getDriverTaskID());
SharedTsBlockQueue queue =
new SharedTsBlockQueue(
driverContext.getDriverTaskID().getFragmentInstanceId().toThrift(),
planNodeId,
localMemoryManager);
- queue.allowAddingTsBlock();
+ if (allowRunning) {
+ queue.allowAddingTsBlock();
+ }
return new LocalSinkChannel(
queue,
new SinkListenerImpl(driverContext.getFragmentInstanceContext(), driverContext::failed));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/ISourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/ISourceHandle.java
index 14ac3429e6..421dff21cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/ISourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/ISourceHandle.java
@@ -85,4 +85,6 @@ public interface ISourceHandle {
/** Set max bytes this handle can reserve from memory pool */
void setMaxBytesCanReserve(long maxBytesCanReserve);
+
+ void allowRunning();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java
index 7dc6ad2983..672fe9cd08 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/LocalSourceHandle.java
@@ -255,4 +255,8 @@ public class LocalSourceHandle implements ISourceHandle {
// do nothing, the maxBytesCanReserve of SharedTsBlockQueue should be set by corresponding
// LocalSinkChannel
}
+
+ public void allowRunning() {
+ queue.allowAddingTsBlock();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
index db462721d0..6b874e13c6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/source/SourceHandle.java
@@ -429,6 +429,12 @@ public class SourceHandle implements ISourceHandle {
}
}
+ @Override
+ public void allowRunning() {
+ throw new UnsupportedOperationException(
+ "AllowRunning() can only be invoked by localSourceHandle.");
+ }
+
@Override
public String toString() {
return String.format(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
index 8699c3e135..4fe5cf0966 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/ExchangeOperator.java
@@ -123,4 +123,8 @@ public class ExchangeOperator implements SourceOperator {
public void close() throws Exception {
sourceHandle.close();
}
+
+ public void allowRunning() {
+ this.sourceHandle.allowRunning();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
index 4032dea5e2..cfc5e9889a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java
@@ -110,7 +110,7 @@ public class SeriesScanOperator extends AbstractDataSourceOperator {
}
}
- private final TsBlockBuilder builder;
+ private TsBlockBuilder builder;
private boolean finished = false;
public SeriesScanOperator(
@@ -128,6 +128,18 @@ public class SeriesScanOperator extends AbstractDataSourceOperator {
this.builder = new TsBlockBuilder(seriesScanUtil.getTsDataTypeList());
}
+ public SeriesScanOperator(OperatorContext context, PlanNodeId sourceId) {
+ this.sourceId = sourceId;
+ this.operatorContext = context;
+ this.maxReturnSize =
+ Math.min(maxReturnSize, TSFileDescriptor.getInstance().getConfig().getPageSizeInByte());
+ }
+
+ public void setSeriesScanUtils(SeriesScanUtil seriesScanUtil) {
+ this.seriesScanUtil = seriesScanUtil;
+ this.builder = new TsBlockBuilder(seriesScanUtil.getTsDataTypeList());
+ }
+
@Override
public TsBlock next() {
if (retainedTsBlock != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanTraverseOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanTraverseOperator.java
new file mode 100644
index 0000000000..58f1846abf
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanTraverseOperator.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.source;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.SeriesScanOptions;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SeriesScanTraverseOperator extends AbstractSourceOperator
+ implements DataSourceOperator {
+
+ private final PartialPath seriesPath;
+ private final Ordering scanOrder;
+ private List<Operator> childSourceOperator;
+ private int curChildIndex = 0;
+
+ private final List<SeriesScanOperator> scanOperatorList;
+ private final SeriesScanOptions.Builder seriesScanOptionsBuilder;
+ private final int dop;
+
+ public SeriesScanTraverseOperator(
+ OperatorContext operatorContext,
+ PartialPath seriesPath,
+ Ordering scanOrder,
+ List<Operator> childSourceOperator,
+ List<SeriesScanOperator> scanOperatorList,
+ SeriesScanOptions.Builder seriesScanOptionsBuilder) {
+ this.operatorContext = operatorContext;
+ this.seriesPath = seriesPath;
+ this.scanOrder = scanOrder;
+ this.childSourceOperator = childSourceOperator;
+ this.scanOperatorList = scanOperatorList;
+ this.dop = childSourceOperator.size();
+ this.seriesScanOptionsBuilder = seriesScanOptionsBuilder;
+ }
+
+ @Override
+ public ListenableFuture<?> isBlocked() {
+ if (curChildIndex >= childSourceOperator.size()) {
+ return NOT_BLOCKED;
+ }
+ return childSourceOperator.get(curChildIndex).isBlocked();
+ }
+
+ @Override
+ public TsBlock next() {
+ if (!childSourceOperator.get(curChildIndex).hasNextWithTimer()) {
+ curChildIndex++;
+ return null;
+ }
+ return childSourceOperator.get(curChildIndex).nextWithTimer();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return curChildIndex < childSourceOperator.size();
+ }
+
+ @Override
+ public void close() throws Exception {
+ for (Operator child : childSourceOperator) {
+ child.close();
+ }
+ }
+
+ @Override
+ public boolean isFinished() {
+ return !this.hasNextWithTimer();
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return maxReturnSize;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return maxReturnSize;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
+
+ @Override
+ public void initQueryDataSource(QueryDataSource dataSource) {
+ dataSource.fillOrderIndexes(seriesPath.getDevice(), scanOrder.isAscending());
+ // updated filter concerning TTL
+ seriesScanOptionsBuilder.withTTL(dataSource.getDataTTL());
+
+ List<TsFileResource> seqResources = dataSource.getSeqResources();
+ List<TsFileResource> unSeqResources = dataSource.getUnseqResources();
+ int[] satisfiedSeqFileIndexList = new int[seqResources.size()];
+ int seqFileNum = 0;
+ long minTime = Long.MAX_VALUE, maxTime = Long.MIN_VALUE;
+ for (int i = 0; i < seqResources.size(); i++) {
+ TsFileResource tsFileResource = seqResources.get(i);
+ if (tsFileResource != null
+ && tsFileResource.isSatisfied(
+ seriesPath.getDevice(), getGlobalTimeFilter(), true, false)) {
+ satisfiedSeqFileIndexList[seqFileNum++] = i;
+ minTime = Math.min(minTime, tsFileResource.getStartTime(seriesPath.getDevice()));
+ maxTime = Math.max(maxTime, tsFileResource.getEndTime(seriesPath.getDevice()));
+ }
+ }
+
+ for (int i = 0; i < unSeqResources.size(); i++) {
+ TsFileResource tsFileResource = unSeqResources.get(i);
+ if (tsFileResource != null
+ && tsFileResource.isSatisfied(
+ seriesPath.getDevice(), getGlobalTimeFilter(), false, false)) {
+ minTime = Math.min(minTime, tsFileResource.getStartTime(seriesPath.getDevice()));
+ maxTime = Math.max(maxTime, tsFileResource.getEndTime(seriesPath.getDevice()));
+ }
+ }
+
+ // Avoid split one file to more than one ScanOperator
+ int splitNum = dop;
+ if (seqFileNum < dop) {
+ splitNum = seqFileNum;
+ childSourceOperator = childSourceOperator.subList(0, splitNum);
+ }
+ long avgTime = (maxTime - minTime) / splitNum;
+ long startTime = minTime, endTime = minTime + avgTime;
+ int curSeqFile = 0;
+ for (int i = 0; i < splitNum; i++) {
+ if (i == splitNum - 1 && endTime < maxTime) {
+ endTime = maxTime;
+ }
+ List<Integer> seqFileIndexList = new ArrayList<>();
+ AndFilter timeRangeFilter =
+ new AndFilter(TimeFilter.gtEq(startTime), TimeFilter.ltEq(endTime));
+ Filter newGlobalFilter =
+ getGlobalTimeFilter() == null
+ ? timeRangeFilter
+ : new AndFilter(getGlobalTimeFilter(), timeRangeFilter);
+
+ // update timeFilter using timeRange in tsFileResource
+ long curMinTime = startTime, curMaxTime = endTime;
+ while (curSeqFile < seqFileNum
+ && seqResources
+ .get(satisfiedSeqFileIndexList[curSeqFile])
+ .isSatisfied(seriesPath.getDevice(), newGlobalFilter, true, false)) {
+ int seqFileIndex = satisfiedSeqFileIndexList[curSeqFile];
+ curMinTime =
+ Math.min(
+ curMinTime, seqResources.get(seqFileIndex).getStartTime(seriesPath.getDevice()));
+ curMaxTime =
+ Math.max(curMaxTime, seqResources.get(seqFileIndex).getEndTime(seriesPath.getDevice()));
+ // make sure one tsFile can only be processed in one ScanOperator
+ seqFileIndexList.add(satisfiedSeqFileIndexList[curSeqFile++]);
+ }
+ // make sure at least one tsFile can be processed in one ScanOperator
+ if (seqFileIndexList.isEmpty()) {
+ if (curSeqFile < seqFileNum) {
+ int seqFileIndex = satisfiedSeqFileIndexList[curSeqFile];
+ curMinTime =
+ Math.min(
+ curMinTime, seqResources.get(seqFileIndex).getStartTime(seriesPath.getDevice()));
+ curMaxTime =
+ Math.max(
+ curMaxTime, seqResources.get(seqFileIndex).getEndTime(seriesPath.getDevice()));
+ seqFileIndexList.add(satisfiedSeqFileIndexList[curSeqFile++]);
+ // if there is no more tsFile can be processed
+ } else {
+ childSourceOperator = childSourceOperator.subList(0, i);
+ return;
+ }
+ }
+ SeriesScanOptions scanOptions = seriesScanOptionsBuilder.build();
+ if (curMinTime != startTime || curMaxTime != endTime) {
+ timeRangeFilter.setLeft(TimeFilter.gtEq(curMinTime));
+ timeRangeFilter.setRight(TimeFilter.ltEq(curMaxTime));
+ }
+ scanOptions.setGlobalTimeFilter(newGlobalFilter);
+ SeriesScanUtil seriesScanUtil =
+ new SeriesScanUtil(
+ seriesPath, scanOrder, scanOptions, operatorContext.getInstanceContext());
+ seriesScanUtil.initQueryDataSource(
+ dataSource, seqFileIndexList, dataSource.getUnSeqFileOrderIndex());
+ scanOperatorList.get(i).setSeriesScanUtils(seriesScanUtil);
+ if (childSourceOperator.get(i) instanceof ExchangeOperator) {
+ ((ExchangeOperator) childSourceOperator.get(i)).allowRunning();
+ }
+ // update next time range
+ startTime = curMaxTime;
+ endTime = Math.min(curMaxTime + avgTime, maxTime);
+ }
+ }
+
+ Filter getGlobalTimeFilter() {
+ return seriesScanOptionsBuilder.getGlobalTimeFilter();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
index 755795447f..338b0ecdeb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java
@@ -48,10 +48,13 @@ import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.read.reader.series.PaginationController;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+import com.google.common.collect.Lists;
+
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.Comparator;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
@@ -84,6 +87,8 @@ public class SeriesScanUtil {
// file index
protected int curSeqFileIndex;
protected int curUnseqFileIndex;
+ // If seqFileIndexIterator != null, means tsFileResources have been filtered well
+ public Iterator<Integer> seqFileIndexIterator;
// TimeSeriesMetadata cache
protected ITimeSeriesMetadata firstTimeSeriesMetadata;
@@ -165,6 +170,17 @@ public class SeriesScanUtil {
curUnseqFileIndex = 0;
}
+ public void initQueryDataSource(
+ QueryDataSource dataSource, List<Integer> seqFileIndexList, int[] unSeqFileOrderIndex) {
+ this.dataSource =
+ new QueryDataSource(dataSource.getSeqResources(), dataSource.getUnseqResources());
+
+ this.dataSource.setUnSeqFileOrderIndex(unSeqFileOrderIndex);
+ this.orderUtils.setSeqFileIndexList(seqFileIndexList);
+ this.curSeqFileIndex = seqFileIndexIterator.next();
+ this.curUnseqFileIndex = 0;
+ }
+
protected PriorityMergeReader getPriorityMergeReader() {
return new PriorityMergeReader();
}
@@ -1223,6 +1239,8 @@ public class SeriesScanUtil {
TsFileResource getNextUnseqFileResource(boolean isDelete);
void setCurSeqFileIndex(QueryDataSource dataSource);
+
+ void setSeqFileIndexList(List<Integer> seqFileIndexList);
}
class DescTimeOrderUtils implements TimeOrderUtils {
@@ -1300,7 +1318,13 @@ public class SeriesScanUtil {
seriesPath.getDevice(), getGlobalTimeFilter(), true, false)) {
break;
}
- curSeqFileIndex--;
+ if (seqFileIndexIterator == null) {
+ curSeqFileIndex--;
+ } else if (seqFileIndexIterator.hasNext()) {
+ curSeqFileIndex = seqFileIndexIterator.next();
+ } else {
+ return false;
+ }
}
return dataSource.hasNextSeqResource(curSeqFileIndex, getAscending());
}
@@ -1323,7 +1347,13 @@ public class SeriesScanUtil {
public TsFileResource getNextSeqFileResource(boolean isDelete) {
TsFileResource tsFileResource = dataSource.getSeqResourceByIndex(curSeqFileIndex);
if (isDelete) {
- curSeqFileIndex--;
+ if (seqFileIndexIterator == null) {
+ curSeqFileIndex--;
+ } else if (seqFileIndexIterator.hasNext()) {
+ curSeqFileIndex = seqFileIndexIterator.next();
+ } else {
+ curSeqFileIndex = -1;
+ }
}
return tsFileResource;
}
@@ -1341,6 +1371,11 @@ public class SeriesScanUtil {
public void setCurSeqFileIndex(QueryDataSource dataSource) {
curSeqFileIndex = dataSource.getSeqResourcesSize() - 1;
}
+
+ @Override
+ public void setSeqFileIndexList(List<Integer> seqFileIndexList) {
+ seqFileIndexIterator = Lists.reverse(seqFileIndexList).iterator();
+ }
}
class AscTimeOrderUtils implements TimeOrderUtils {
@@ -1418,7 +1453,13 @@ public class SeriesScanUtil {
seriesPath.getDevice(), getGlobalTimeFilter(), true, false)) {
break;
}
- curSeqFileIndex++;
+ if (seqFileIndexIterator == null) {
+ curSeqFileIndex++;
+ } else if (seqFileIndexIterator.hasNext()) {
+ curSeqFileIndex = seqFileIndexIterator.next();
+ } else {
+ return false;
+ }
}
return dataSource.hasNextSeqResource(curSeqFileIndex, getAscending());
}
@@ -1441,7 +1482,13 @@ public class SeriesScanUtil {
public TsFileResource getNextSeqFileResource(boolean isDelete) {
TsFileResource tsFileResource = dataSource.getSeqResourceByIndex(curSeqFileIndex);
if (isDelete) {
- curSeqFileIndex++;
+ if (seqFileIndexIterator == null) {
+ curSeqFileIndex++;
+ } else if (seqFileIndexIterator.hasNext()) {
+ curSeqFileIndex = seqFileIndexIterator.next();
+ } else {
+ curSeqFileIndex = dataSource.getSeqResourcesSize();
+ }
}
return tsFileResource;
}
@@ -1459,5 +1506,10 @@ public class SeriesScanUtil {
public void setCurSeqFileIndex(QueryDataSource dataSource) {
curSeqFileIndex = 0;
}
+
+ @Override
+ public void setSeqFileIndexList(List<Integer> seqFileIndexList) {
+ seqFileIndexIterator = seqFileIndexList.iterator();
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
index a071b4b83c..47a6c06f48 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/MemorySourceHandle.java
@@ -110,4 +110,7 @@ public class MemorySourceHandle implements ISourceHandle {
@Override
public void setMaxBytesCanReserve(long maxBytesCanReserve) {}
+
+ @Override
+ public void allowRunning() {}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
index 3dfde552ca..6734c7545f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
@@ -62,8 +62,8 @@ public class LocalExecutionPlanContext {
private int degreeOfParallelism =
IoTDBDescriptor.getInstance().getConfig().getDegreeOfParallelism();
// this is shared with all subContexts
- private AtomicInteger nextPipelineId;
- private List<PipelineDriverFactory> pipelineDriverFactories;
+ private final AtomicInteger nextPipelineId;
+ private final List<PipelineDriverFactory> pipelineDriverFactories;
private List<ExchangeOperator> exchangeOperatorList = new ArrayList<>();
private int exchangeSumNum = 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index eb53954cfa..08e6c6b5f7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -122,6 +122,7 @@ import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperat
import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanTraverseOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.ShowQueriesOperator;
import org.apache.iotdb.db.mpp.execution.operator.window.EventWindowParameter;
import org.apache.iotdb.db.mpp.execution.operator.window.SeriesWindowParameter;
@@ -136,6 +137,7 @@ import org.apache.iotdb.db.mpp.plan.expression.Expression;
import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.mpp.plan.expression.visitor.ColumnTransformerVisitor;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.CountSchemaMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.DevicesCountNode;
@@ -184,6 +186,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNo
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesSourceNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.ShowQueriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
@@ -270,14 +273,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
@Override
public Operator visitSeriesScan(SeriesScanNode node, LocalExecutionPlanContext context) {
PartialPath seriesPath = node.getSeriesPath();
-
- OperatorContext operatorContext =
- context
- .getDriverContext()
- .addOperatorContext(
- context.getNextOperatorId(),
- node.getPlanNodeId(),
- SeriesScanOperator.class.getSimpleName());
+ ((DataDriverContext) context.getDriverContext()).addPath(seriesPath);
+ context.getDriverContext().setInputDriver(true);
Filter timeFilter = node.getTimeFilter();
Filter valueFilter = node.getValueFilter();
@@ -286,26 +283,36 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
seriesScanOptionsBuilder.withGlobalTimeFilter(timeFilter.copy());
}
if (valueFilter != null) {
- seriesScanOptionsBuilder.withGlobalTimeFilter(valueFilter.copy());
+ seriesScanOptionsBuilder.withQueryFilter(valueFilter.copy());
}
seriesScanOptionsBuilder.withAllSensors(
context.getAllSensors(seriesPath.getDevice(), seriesPath.getMeasurement()));
seriesScanOptionsBuilder.withLimit(node.getLimit());
seriesScanOptionsBuilder.withOffset(node.getOffset());
- SeriesScanOperator seriesScanOperator =
- new SeriesScanOperator(
- operatorContext,
- node.getPlanNodeId(),
- seriesPath,
- node.getScanOrder(),
- seriesScanOptionsBuilder.build());
+ if (context.getDegreeOfParallelism() == 1) {
+ OperatorContext operatorContext =
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ SeriesScanOperator.class.getSimpleName());
- ((DataDriverContext) context.getDriverContext()).addSourceOperator(seriesScanOperator);
- ((DataDriverContext) context.getDriverContext()).addPath(seriesPath);
- context.getDriverContext().setInputDriver(true);
- context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
- return seriesScanOperator;
+ SeriesScanOperator scanOperator =
+ new SeriesScanOperator(
+ operatorContext,
+ node.getPlanNodeId(),
+ seriesPath,
+ node.getScanOrder(),
+ seriesScanOptionsBuilder.build());
+
+ ((DataDriverContext) context.getDriverContext()).addSourceOperator(scanOperator);
+ context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+ return scanOperator;
+ } else {
+ return divideSplitToPipeline(context, node, seriesScanOptionsBuilder);
+ }
}
@Override
@@ -351,6 +358,65 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
return seriesScanOperator;
}
+ private Operator divideSplitToPipeline(
+ LocalExecutionPlanContext context,
+ SeriesScanNode node,
+ SeriesScanOptions.Builder seriesScanOptionsBuilder) {
+ OperatorContext operatorContext =
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ SeriesScanTraverseOperator.class.getSimpleName());
+
+ int dop = context.getDegreeOfParallelism();
+ List<SeriesScanOperator> scanOperatorList = new ArrayList<>();
+ List<Operator> childSourceOperator = new ArrayList<>();
+ for (int i = 0; i < dop; i++) {
+ PlanNodeId planNodeId = new PlanNodeId(String.format("%s-%d", node.getPlanNodeId(), i));
+ // the first split belongs to parentPipeline
+ LocalExecutionPlanContext subContext = (i == 0) ? context : context.createSubContext();
+ OperatorContext scanOperatorContext =
+ subContext
+ .getDriverContext()
+ .addOperatorContext(
+ subContext.getNextOperatorId(),
+ planNodeId,
+ SeriesScanOperator.class.getSimpleName());
+ SeriesScanOperator seriesScanOperator =
+ new SeriesScanOperator(
+ scanOperatorContext,
+ planNodeId,
+ node.getSeriesPath(),
+ node.getScanOrder(),
+ seriesScanOptionsBuilder.build());
+ scanOperatorList.add(seriesScanOperator);
+
+ if (i == 0) {
+ childSourceOperator.add(seriesScanOperator);
+ context.getTimeSliceAllocator().recordExecutionWeight(scanOperatorContext, 1);
+ } else {
+ subContext.getTimeSliceAllocator().recordExecutionWeight(scanOperatorContext, 1);
+ Operator exchangeOperator =
+ createNewPipelineForChildOperation(
+ context, subContext, seriesScanOperator, planNodeId, false);
+ childSourceOperator.add(exchangeOperator);
+ }
+ }
+ SeriesScanTraverseOperator traverseOperator =
+ new SeriesScanTraverseOperator(
+ operatorContext,
+ node.getSeriesPath(),
+ node.getScanOrder(),
+ childSourceOperator,
+ scanOperatorList,
+ seriesScanOptionsBuilder);
+ ((DataDriverContext) context.getDriverContext()).addSourceOperator(traverseOperator);
+ context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+ return traverseOperator;
+ }
+
@Override
public Operator visitSeriesAggregationScan(
SeriesAggregationScanNode node, LocalExecutionPlanContext context) {
@@ -2488,14 +2554,24 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
private Operator createNewPipelineForChildNode(
LocalExecutionPlanContext context, LocalExecutionPlanContext subContext, PlanNode childNode) {
Operator childOperation = childNode.accept(this, subContext);
+ return createNewPipelineForChildOperation(
+ context, subContext, childOperation, childNode.getPlanNodeId(), true);
+ }
+
+ private Operator createNewPipelineForChildOperation(
+ LocalExecutionPlanContext context,
+ LocalExecutionPlanContext subContext,
+ Operator childOperation,
+ PlanNodeId planNodeId,
+ boolean allowRunning) {
ISinkChannel localSinkChannel =
MPP_DATA_EXCHANGE_MANAGER.createLocalSinkChannelForPipeline(
// Attention, there is no parent node, use first child node instead
- subContext.getDriverContext(), childNode.getPlanNodeId().getId());
+ subContext.getDriverContext(), planNodeId.getId(), allowRunning);
subContext.setISink(localSinkChannel);
subContext.addPipelineDriverFactory(childOperation, subContext.getDriverContext());
- ExchangeOperator sourceOperator =
+ ExchangeOperator exchangeOperator =
new ExchangeOperator(
context
.getDriverContext()
@@ -2504,12 +2580,12 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
MPP_DATA_EXCHANGE_MANAGER.createLocalSourceHandleForPipeline(
((LocalSinkChannel) localSinkChannel).getSharedTsBlockQueue(),
context.getDriverContext()),
- childNode.getPlanNodeId(),
+ planNodeId,
childOperation.calculateMaxReturnSize());
- context.getTimeSliceAllocator().recordExecutionWeight(sourceOperator.getOperatorContext(), 1);
- context.addExchangeOperator(sourceOperator);
- return sourceOperator;
+ context.getTimeSliceAllocator().recordExecutionWeight(exchangeOperator.getOperatorContext(), 1);
+ context.addExchangeOperator(exchangeOperator);
+ return exchangeOperator;
}
public List<Operator> dealWithConsumeChildrenOneByOneNode(
@@ -2542,13 +2618,17 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
LocalExecutionPlanContext subContext = context.createSubContext();
// Only context.getDegreeOfParallelism() - 1 can be allocated to child
int dopForChild = context.getDegreeOfParallelism() - 1;
- subContext.setDegreeOfParallelism(dopForChild);
+ if (childNode instanceof SeriesSourceNode) {
+ subContext.setDegreeOfParallelism(1);
+ } else {
+ subContext.setDegreeOfParallelism(dopForChild);
+ }
int originPipeNum = context.getPipelineNumber();
Operator childOperation = childNode.accept(this, subContext);
ISinkChannel localSinkChannel =
MPP_DATA_EXCHANGE_MANAGER.createLocalSinkChannelForPipeline(
// Attention, there is no parent node, use first child node instead
- context.getDriverContext(), childNode.getPlanNodeId().getId());
+ context.getDriverContext(), childNode.getPlanNodeId().getId(), true);
subContext.setISink(localSinkChannel);
subContext.addPipelineDriverFactory(childOperation, subContext.getDriverContext());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/SeriesScanOptions.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/SeriesScanOptions.java
index 7e3bcd6f29..b90f813c96 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/SeriesScanOptions.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/SeriesScanOptions.java
@@ -72,6 +72,10 @@ public class SeriesScanOptions {
return globalTimeFilter;
}
+ public void setGlobalTimeFilter(Filter globalTimeFilter) {
+ this.globalTimeFilter = globalTimeFilter;
+ }
+
public Filter getQueryFilter() {
return queryFilter;
}
@@ -125,6 +129,10 @@ public class SeriesScanOptions {
return this;
}
+ public Filter getGlobalTimeFilter() {
+ return globalTimeFilter;
+ }
+
public Builder withQueryFilter(Filter queryFilter) {
this.queryFilter = queryFilter;
return this;
@@ -140,6 +148,14 @@ public class SeriesScanOptions {
return this;
}
+ public Builder withTTL(long dataTTL) {
+ this.globalTimeFilter = updateFilterUsingTTL(globalTimeFilter, dataTTL);
+ if (this.queryFilter != null) {
+ this.queryFilter = updateFilterUsingTTL(queryFilter, dataTTL);
+ }
+ return this;
+ }
+
public void withAllSensors(Set<String> allSensors) {
this.allSensors = allSensors;
}