You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/02/26 13:13:00 UTC
[iotdb] branch master updated: [IOTDB-4898] Push offset and limit down to ScanOperator if possible
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 415f68e36b [IOTDB-4898] Push offset and limit down to ScanOperator if possible
415f68e36b is described below
commit 415f68e36b88659b1e231be2b047483ea791337a
Author: liuminghui233 <36...@users.noreply.github.com>
AuthorDate: Sun Feb 26 21:12:54 2023 +0800
[IOTDB-4898] Push offset and limit down to ScanOperator if possible
---
.../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 12 +-
.../traverser/TraverserWithLimitOffsetWrapper.java | 6 +-
.../impl/read/AbstractShowSchemaPlanImpl.java | 10 +-
.../impl/read/SchemaRegionReadPlanFactory.java | 6 +-
.../impl/read/ShowDevicesPlanImpl.java | 2 +-
.../impl/read/ShowTimeSeriesPlanImpl.java | 4 +-
.../plan/schemaregion/read/IShowSchemaPlan.java | 4 +-
.../apache/iotdb/db/metadata/tag/TagManager.java | 4 +-
.../operator/schema/source/DeviceSchemaSource.java | 6 +-
.../schema/source/SchemaSourceFactory.java | 6 +-
.../schema/source/TimeSeriesSchemaSource.java | 8 +-
.../apache/iotdb/db/mpp/plan/analyze/Analysis.java | 4 +
.../plan/analyze/ColumnPaginationController.java | 10 +-
.../db/mpp/plan/analyze/ExpressionAnalyzer.java | 34 +++
.../memory/StatementMemorySourceVisitor.java | 5 +-
.../db/mpp/plan/expression/ExpressionFactory.java | 4 +
.../mpp/plan/optimization/LimitOffsetPushDown.java | 241 +++++++++++++++++
.../db/mpp/plan/optimization/PlanOptimizer.java | 3 +-
.../iotdb/db/mpp/plan/parser/ASTVisitor.java | 16 +-
.../db/mpp/plan/planner/LogicalPlanBuilder.java | 10 +-
.../db/mpp/plan/planner/LogicalPlanVisitor.java | 8 +-
.../iotdb/db/mpp/plan/planner/LogicalPlanner.java | 2 +-
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 2 +
.../planner/distribution/DistributionPlanner.java | 23 +-
.../plan/planner/plan/node/PlanGraphPrinter.java | 16 ++
.../db/mpp/plan/planner/plan/node/PlanVisitor.java | 51 ++--
.../node/metedata/read/DevicesSchemaScanNode.java | 8 +-
.../node/metedata/read/SchemaQueryScanNode.java | 12 +-
.../metedata/read/TimeSeriesSchemaScanNode.java | 8 +-
.../plan/planner/plan/node/process/LimitNode.java | 10 +-
.../plan/planner/plan/node/process/OffsetNode.java | 10 +-
.../plan/node/source/AlignedSeriesScanNode.java | 24 +-
.../planner/plan/node/source/SeriesScanNode.java | 20 +-
.../iotdb/db/mpp/plan/statement/Statement.java | 2 +-
.../db/mpp/plan/statement/crud/QueryStatement.java | 37 ++-
.../mpp/plan/statement/metadata/ShowStatement.java | 12 +-
.../plan/statement/sys/ShowQueriesStatement.java | 17 +-
.../plan/optimization/LimitOffsetPushDownTest.java | 299 +++++++++++++++++++++
.../db/mpp/plan/optimization/TestPlanBuilder.java | 184 +++++++++++++
.../iotdb/db/mpp/plan/plan/LogicalPlannerTest.java | 1 -
40 files changed, 994 insertions(+), 147 deletions(-)
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 7a8c008ef5..5f04e5406d 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -531,13 +531,17 @@ paginationClause
;
rowPaginationClause
- : limitClause offsetClause?
- | offsetClause? limitClause
+ : limitClause
+ | offsetClause
+ | offsetClause limitClause
+ | limitClause offsetClause
;
seriesPaginationClause
- : slimitClause soffsetClause?
- | soffsetClause? slimitClause
+ : slimitClause
+ | soffsetClause
+ | soffsetClause slimitClause
+ | slimitClause soffsetClause
;
limitClause
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/traverser/TraverserWithLimitOffsetWrapper.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/traverser/TraverserWithLimitOffsetWrapper.java
index d357e951e4..b6fb42b6ee 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/traverser/TraverserWithLimitOffsetWrapper.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/traverser/TraverserWithLimitOffsetWrapper.java
@@ -26,14 +26,14 @@ import java.util.NoSuchElementException;
public class TraverserWithLimitOffsetWrapper<R> extends Traverser<R> {
private final Traverser<R> traverser;
- private final int limit;
- private final int offset;
+ private final long limit;
+ private final long offset;
private final boolean hasLimit;
private int count = 0;
int curOffset = 0;
- public TraverserWithLimitOffsetWrapper(Traverser<R> traverser, int limit, int offset) {
+ public TraverserWithLimitOffsetWrapper(Traverser<R> traverser, long limit, long offset) {
this.traverser = traverser;
this.limit = limit;
this.offset = offset;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/AbstractShowSchemaPlanImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/AbstractShowSchemaPlanImpl.java
index d0d05105d3..f81186c574 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/AbstractShowSchemaPlanImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/AbstractShowSchemaPlanImpl.java
@@ -27,8 +27,8 @@ import java.util.Objects;
public abstract class AbstractShowSchemaPlanImpl implements IShowSchemaPlan {
protected final PartialPath path;
- protected final int limit;
- protected final int offset;
+ protected final long limit;
+ protected final long offset;
protected final boolean isPrefixMatch;
protected AbstractShowSchemaPlanImpl(PartialPath path) {
@@ -38,7 +38,7 @@ public abstract class AbstractShowSchemaPlanImpl implements IShowSchemaPlan {
this.isPrefixMatch = false;
}
- AbstractShowSchemaPlanImpl(PartialPath path, int limit, int offset, boolean isPrefixMatch) {
+ AbstractShowSchemaPlanImpl(PartialPath path, long limit, long offset, boolean isPrefixMatch) {
this.path = path;
this.limit = limit;
this.offset = offset;
@@ -51,12 +51,12 @@ public abstract class AbstractShowSchemaPlanImpl implements IShowSchemaPlan {
}
@Override
- public int getLimit() {
+ public long getLimit() {
return limit;
}
@Override
- public int getOffset() {
+ public long getOffset() {
return offset;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/SchemaRegionReadPlanFactory.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/SchemaRegionReadPlanFactory.java
index 2afdb8b887..c024119fff 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/SchemaRegionReadPlanFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/SchemaRegionReadPlanFactory.java
@@ -41,7 +41,7 @@ public class SchemaRegionReadPlanFactory {
}
public static IShowDevicesPlan getShowDevicesPlan(
- PartialPath path, int limit, int offset, boolean isPrefixMatch) {
+ PartialPath path, long limit, long offset, boolean isPrefixMatch) {
return new ShowDevicesPlanImpl(path, limit, offset, isPrefixMatch, -1);
}
@@ -76,8 +76,8 @@ public class SchemaRegionReadPlanFactory {
boolean isContains,
String key,
String value,
- int limit,
- int offset,
+ long limit,
+ long offset,
boolean isPrefixMatch) {
return new ShowTimeSeriesPlanImpl(
path, relatedTemplate, isContains, key, value, limit, offset, isPrefixMatch);
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/ShowDevicesPlanImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/ShowDevicesPlanImpl.java
index f740f8e27b..e4153519fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/ShowDevicesPlanImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/ShowDevicesPlanImpl.java
@@ -31,7 +31,7 @@ public class ShowDevicesPlanImpl extends AbstractShowSchemaPlanImpl implements I
private final int schemaTemplateId;
ShowDevicesPlanImpl(
- PartialPath path, int limit, int offset, boolean isPrefixMatch, int schemaTemplateId) {
+ PartialPath path, long limit, long offset, boolean isPrefixMatch, int schemaTemplateId) {
super(path, limit, offset, isPrefixMatch);
this.schemaTemplateId = schemaTemplateId;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/ShowTimeSeriesPlanImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/ShowTimeSeriesPlanImpl.java
index 0906b83b34..be857877cc 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/ShowTimeSeriesPlanImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/ShowTimeSeriesPlanImpl.java
@@ -42,8 +42,8 @@ public class ShowTimeSeriesPlanImpl extends AbstractShowSchemaPlanImpl
boolean isContains,
String key,
String value,
- int limit,
- int offset,
+ long limit,
+ long offset,
boolean isPrefixMatch) {
super(path, limit, offset, isPrefixMatch);
this.relatedTemplate = relatedTemplate;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/read/IShowSchemaPlan.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/read/IShowSchemaPlan.java
index fb28d91af7..d86e8f2857 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/read/IShowSchemaPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/read/IShowSchemaPlan.java
@@ -39,9 +39,9 @@ public interface IShowSchemaPlan extends ISchemaRegionPlan {
PartialPath getPath();
- int getLimit();
+ long getLimit();
- int getOffset();
+ long getOffset();
boolean isPrefixMatch();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java
index ca1cc56990..206e0bb80c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java
@@ -206,8 +206,8 @@ public class TagManager {
PartialPath pathPattern = plan.getPath();
int curOffset = 0;
int count = 0;
- int limit = plan.getLimit();
- int offset = plan.getOffset();
+ long limit = plan.getLimit();
+ long offset = plan.getOffset();
boolean hasLimit = limit > 0 || offset > 0;
while (curOffset < offset && allMatchedNodes.hasNext()) {
IMeasurementMNode node = allMatchedNodes.next();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/DeviceSchemaSource.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/DeviceSchemaSource.java
index b606531369..2abebdfff9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/DeviceSchemaSource.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/DeviceSchemaSource.java
@@ -37,13 +37,13 @@ public class DeviceSchemaSource implements ISchemaSource<IDeviceSchemaInfo> {
private final PartialPath pathPattern;
private final boolean isPrefixMatch;
- private final int limit;
- private final int offset;
+ private final long limit;
+ private final long offset;
private final boolean hasSgCol;
DeviceSchemaSource(
- PartialPath pathPattern, boolean isPrefixPath, int limit, int offset, boolean hasSgCol) {
+ PartialPath pathPattern, boolean isPrefixPath, long limit, long offset, boolean hasSgCol) {
this.pathPattern = pathPattern;
this.isPrefixMatch = isPrefixPath;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/SchemaSourceFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/SchemaSourceFactory.java
index c0c4305ea9..16e58d2a44 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/SchemaSourceFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/SchemaSourceFactory.java
@@ -46,8 +46,8 @@ public class SchemaSourceFactory {
public static ISchemaSource<ITimeSeriesSchemaInfo> getTimeSeriesSchemaSource(
PartialPath pathPattern,
boolean isPrefixMatch,
- int limit,
- int offset,
+ long limit,
+ long offset,
String key,
String value,
boolean isContains,
@@ -62,7 +62,7 @@ public class SchemaSourceFactory {
}
public static ISchemaSource<IDeviceSchemaInfo> getDeviceSchemaSource(
- PartialPath pathPattern, boolean isPrefixPath, int limit, int offset, boolean hasSgCol) {
+ PartialPath pathPattern, boolean isPrefixPath, long limit, long offset, boolean hasSgCol) {
return new DeviceSchemaSource(pathPattern, isPrefixPath, limit, offset, hasSgCol);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/TimeSeriesSchemaSource.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/TimeSeriesSchemaSource.java
index 337aaacd0a..1ad90c5f80 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/TimeSeriesSchemaSource.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/TimeSeriesSchemaSource.java
@@ -41,8 +41,8 @@ public class TimeSeriesSchemaSource implements ISchemaSource<ITimeSeriesSchemaIn
private final PartialPath pathPattern;
private final boolean isPrefixMatch;
- private final int limit;
- private final int offset;
+ private final long limit;
+ private final long offset;
private final String key;
private final String value;
@@ -53,8 +53,8 @@ public class TimeSeriesSchemaSource implements ISchemaSource<ITimeSeriesSchemaIn
TimeSeriesSchemaSource(
PartialPath pathPattern,
boolean isPrefixMatch,
- int limit,
- int offset,
+ long limit,
+ long offset,
String key,
String value,
boolean isContains,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
index 14ee67b535..bf15b2a826 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
@@ -582,4 +582,8 @@ public class Analysis {
public void setVirtualSource(boolean virtualSource) {
isVirtualSource = virtualSource;
}
+
+ public Map<NodeRef<Expression>, TSDataType> getExpressionTypes() {
+ return expressionTypes;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ColumnPaginationController.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ColumnPaginationController.java
index 082428664d..730b79f7e0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ColumnPaginationController.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ColumnPaginationController.java
@@ -25,17 +25,17 @@ import org.apache.iotdb.db.exception.sql.PathNumOverLimitException;
/** apply MaxQueryDeduplicatedPathNum and SLIMIT & SOFFSET */
public class ColumnPaginationController {
- private int curLimit =
- IoTDBDescriptor.getInstance().getConfig().getMaxQueryDeduplicatedPathNum() + 1;
- private int curOffset;
+ private long curLimit =
+ IoTDBDescriptor.getInstance().getConfig().getMaxQueryDeduplicatedPathNum() + 1L;
+ private long curOffset;
// records the path number that the SchemaProcessor totally returned
- private int consumed = 0;
+ private long consumed = 0;
// for ALIGN BY DEVICE / DISABLE ALIGN / GROUP BY LEVEL / LAST, controller does is disabled
private final boolean isDisabled;
- public ColumnPaginationController(int seriesLimit, int seriesOffset, boolean isDisabled) {
+ public ColumnPaginationController(long seriesLimit, long seriesOffset, boolean isDisabled) {
// for series limit, the default value is 0, which means no limit
this.curLimit = seriesLimit == 0 ? this.curLimit : Math.min(seriesLimit, this.curLimit);
// series offset for result set. The default value is 0
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java
index 6f35ebfb60..2cbda77f77 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java
@@ -1342,4 +1342,38 @@ public class ExpressionAnalyzer {
"unsupported expression type: " + expression.getExpressionType());
}
}
+
+ public static boolean checkIsScalarExpression(Expression expression, Analysis analysis) {
+ if (expression instanceof TernaryExpression) {
+ TernaryExpression ternaryExpression = (TernaryExpression) expression;
+ return checkIsScalarExpression(ternaryExpression.getFirstExpression(), analysis)
+ && checkIsScalarExpression(ternaryExpression.getSecondExpression(), analysis)
+ && checkIsScalarExpression(ternaryExpression.getThirdExpression(), analysis);
+ } else if (expression instanceof BinaryExpression) {
+ BinaryExpression binaryExpression = (BinaryExpression) expression;
+ return checkIsScalarExpression(binaryExpression.getLeftExpression(), analysis)
+ && checkIsScalarExpression(binaryExpression.getRightExpression(), analysis);
+ } else if (expression instanceof UnaryExpression) {
+ return checkIsScalarExpression(((UnaryExpression) expression).getExpression(), analysis);
+ } else if (expression instanceof FunctionExpression) {
+ FunctionExpression functionExpression = (FunctionExpression) expression;
+ if (!functionExpression.isMappable(analysis.getExpressionTypes())
+ || BuiltinFunction.DEVICE_VIEW_SPECIAL_PROCESS_FUNCTIONS.contains(
+ functionExpression.getFunctionName())) {
+ return false;
+ }
+ List<Expression> inputExpressions = functionExpression.getExpressions();
+ for (Expression inputExpression : inputExpressions) {
+ if (!checkIsScalarExpression(inputExpression, analysis)) {
+ return false;
+ }
+ }
+ return true;
+ } else if (expression instanceof LeafOperand) {
+ return true;
+ } else {
+ throw new IllegalArgumentException(
+ "unsupported expression type: " + expression.getExpressionType());
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemorySourceVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemorySourceVisitor.java
index 44a131ec02..d1f15a9534 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemorySourceVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemorySourceVisitor.java
@@ -92,8 +92,11 @@ public class StatementMemorySourceVisitor
.plan(context.getAnalysis());
DistributionPlanner planner = new DistributionPlanner(context.getAnalysis(), logicalPlan);
PlanNode rootWithExchange = planner.addExchangeNode(planner.rewriteSource());
+ PlanNode optimizedRootWithExchange = planner.optimize(rootWithExchange);
+
List<String> lines =
- rootWithExchange.accept(new PlanGraphPrinter(), new PlanGraphPrinter.GraphContext());
+ optimizedRootWithExchange.accept(
+ new PlanGraphPrinter(), new PlanGraphPrinter.GraphContext());
TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.TEXT));
lines.forEach(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/ExpressionFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/ExpressionFactory.java
index 170e21db1a..7ac2d0e6bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/ExpressionFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/ExpressionFactory.java
@@ -47,6 +47,10 @@ public class ExpressionFactory {
return new TimeSeriesOperand(path);
}
+ public static TimeSeriesOperand timeSeries(PartialPath path) {
+ return new TimeSeriesOperand(path);
+ }
+
public static ConstantOperand constant(TSDataType dataType, String valueString) {
return new ConstantOperand(dataType, valueString);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/optimization/LimitOffsetPushDown.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/optimization/LimitOffsetPushDown.java
new file mode 100644
index 0000000000..a2b55ca175
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/optimization/LimitOffsetPushDown.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.optimization;
+
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.analyze.ExpressionAnalyzer;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SingleChildProcessNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.statement.StatementType;
+import org.apache.iotdb.db.mpp.plan.statement.component.FillPolicy;
+import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
+
+/**
+ * <b>Optimization phase:</b> Distributed plan planning
+ *
+ * <p><b>Rule:</b> The LIMIT OFFSET condition can be pushed down to the SeriesScanNode, when the
+ * following conditions are met:
+ * <li>Time series query (not aggregation query).
+ * <li>The query expressions are all scalar expression.
+ * <li>Functions that need to be calculated based on before or after values are not used, such as
+ * trend functions, FILL(previous), FILL(linear).
+ * <li>Only one scan node is included in the distributed plan. That is, only one single series or a
+ * group of series under an aligned device is queried, and all queried data is in one region.
+ */
+public class LimitOffsetPushDown implements PlanOptimizer {
+
+ @Override
+ public PlanNode optimize(PlanNode plan, Analysis analysis, MPPQueryContext context) {
+ if (analysis.getStatement().getType() != StatementType.QUERY) {
+ return plan;
+ }
+ QueryStatement queryStatement = (QueryStatement) analysis.getStatement();
+ if (queryStatement.isAggregationQuery()
+ || (!queryStatement.hasLimit() && !queryStatement.hasOffset())) {
+ return plan;
+ }
+ return plan.accept(new Rewriter(), new RewriterContext(analysis));
+ }
+
+ private static class Rewriter extends PlanVisitor<PlanNode, RewriterContext> {
+
+ @Override
+ public PlanNode visitPlan(PlanNode node, RewriterContext context) {
+ for (PlanNode child : node.getChildren()) {
+ context.setParent(node);
+ child.accept(this, context);
+ }
+ return node;
+ }
+
+ @Override
+ public PlanNode visitLimit(LimitNode node, RewriterContext context) {
+ PlanNode parent = context.getParent();
+
+ context.setParent(node);
+ context.setLimit(node.getLimit());
+ node.getChild().accept(this, context);
+
+ if (context.isEnablePushDown()) {
+ return concatParentWithChild(parent, node.getChild());
+ }
+ return node;
+ }
+
+ @Override
+ public PlanNode visitOffset(OffsetNode node, RewriterContext context) {
+ PlanNode parent = context.getParent();
+
+ context.setParent(node);
+ context.setOffset(node.getOffset());
+ node.getChild().accept(this, context);
+
+ if (context.isEnablePushDown()) {
+ return concatParentWithChild(parent, node.getChild());
+ }
+ return node;
+ }
+
+ private PlanNode concatParentWithChild(PlanNode parent, PlanNode child) {
+ if (parent != null) {
+ ((SingleChildProcessNode) parent).setChild(child);
+ return parent;
+ } else {
+ return child;
+ }
+ }
+
+ @Override
+ public PlanNode visitFill(FillNode node, RewriterContext context) {
+ FillPolicy fillPolicy = node.getFillDescriptor().getFillPolicy();
+ if (fillPolicy == FillPolicy.VALUE) {
+ node.getChild().accept(this, context);
+ } else {
+ context.setEnablePushDown(false);
+ }
+ return node;
+ }
+
+ @Override
+ public PlanNode visitFilter(FilterNode node, RewriterContext context) {
+ // Value filtering push-down occurs during the logical planning phase. If there is still a
+ // FilterNode here, it means that there are query filter conditions that cannot be pushed
+ // down.
+ context.setEnablePushDown(false);
+ return node;
+ }
+
+ @Override
+ public PlanNode visitTransform(TransformNode node, RewriterContext context) {
+ Expression[] outputExpressions = node.getOutputExpressions();
+ boolean enablePushDown = true;
+ for (Expression expression : outputExpressions) {
+ if (!ExpressionAnalyzer.checkIsScalarExpression(expression, context.getAnalysis())) {
+ enablePushDown = false;
+ break;
+ }
+ }
+
+ if (enablePushDown) {
+ node.getChild().accept(this, context);
+ } else {
+ context.setEnablePushDown(false);
+ }
+ return node;
+ }
+
+ @Override
+ public PlanNode visitMultiChildProcess(MultiChildProcessNode node, RewriterContext context) {
+ context.setEnablePushDown(false);
+ return node;
+ }
+
+ @Override
+ public PlanNode visitDeviceView(DeviceViewNode node, RewriterContext context) {
+ if (node.getChildren().size() == 1) {
+ node.getChildren().get(0).accept(this, context);
+ return node;
+ } else {
+ return visitMultiChildProcess(node, context);
+ }
+ }
+
+ @Override
+ public PlanNode visitSeriesScan(SeriesScanNode node, RewriterContext context) {
+ if (context.isEnablePushDown()) {
+ node.setLimit(context.getLimit());
+ node.setOffset(context.getOffset());
+ }
+ return node;
+ }
+
+ @Override
+ public PlanNode visitAlignedSeriesScan(AlignedSeriesScanNode node, RewriterContext context) {
+ if (context.isEnablePushDown()) {
+ node.setLimit(context.getLimit());
+ node.setOffset(context.getOffset());
+ }
+ return node;
+ }
+ }
+
+ private static class RewriterContext {
+ private long limit;
+ private long offset;
+
+ private boolean enablePushDown = true;
+
+ private PlanNode parent;
+
+ private final Analysis analysis;
+
+ public RewriterContext(Analysis analysis) {
+ this.analysis = analysis;
+ }
+
+ public long getLimit() {
+ return limit;
+ }
+
+ public void setLimit(long limit) {
+ this.limit = limit;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public void setOffset(long offset) {
+ this.offset = offset;
+ }
+
+ public boolean isEnablePushDown() {
+ return enablePushDown;
+ }
+
+ public void setEnablePushDown(boolean enablePushDown) {
+ this.enablePushDown = enablePushDown;
+ }
+
+ public PlanNode getParent() {
+ return parent;
+ }
+
+ public void setParent(PlanNode parent) {
+ this.parent = parent;
+ }
+
+ public Analysis getAnalysis() {
+ return analysis;
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/optimization/PlanOptimizer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/optimization/PlanOptimizer.java
index 20d6d37a1d..a2b1f860ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/optimization/PlanOptimizer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/optimization/PlanOptimizer.java
@@ -19,8 +19,9 @@
package org.apache.iotdb.db.mpp.plan.optimization;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
public interface PlanOptimizer {
- PlanNode optimize(PlanNode plan, MPPQueryContext context);
+ PlanNode optimize(PlanNode plan, Analysis analysis, MPPQueryContext context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index 2a80908424..08791b8217 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -1336,12 +1336,12 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
}
// parse LIMIT & OFFSET
- private int parseLimitClause(IoTDBSqlParser.LimitClauseContext ctx) {
- int limit;
+ private long parseLimitClause(IoTDBSqlParser.LimitClauseContext ctx) {
+ long limit;
try {
- limit = Integer.parseInt(ctx.INTEGER_LITERAL().getText());
+ limit = Long.parseLong(ctx.INTEGER_LITERAL().getText());
} catch (NumberFormatException e) {
- throw new SemanticException("Out of range. LIMIT <N>: N should be Int32.");
+ throw new SemanticException("Out of range. LIMIT <N>: N should be Int64.");
}
if (limit <= 0) {
throw new SemanticException("LIMIT <N>: N should be greater than 0.");
@@ -1349,13 +1349,13 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
return limit;
}
- private int parseOffsetClause(IoTDBSqlParser.OffsetClauseContext ctx) {
- int offset;
+ private long parseOffsetClause(IoTDBSqlParser.OffsetClauseContext ctx) {
+ long offset;
try {
- offset = Integer.parseInt(ctx.INTEGER_LITERAL().getText());
+ offset = Long.parseLong(ctx.INTEGER_LITERAL().getText());
} catch (NumberFormatException e) {
throw new SemanticException(
- "Out of range. OFFSET <OFFSETValue>: OFFSETValue should be Int32.");
+ "Out of range. OFFSET <OFFSETValue>: OFFSETValue should be Int64.");
}
if (offset < 0) {
throw new SemanticException("OFFSET <OFFSETValue>: OFFSETValue should >= 0.");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index d3893276a8..f2ba699208 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -880,7 +880,7 @@ public class LogicalPlanBuilder {
return this;
}
- public LogicalPlanBuilder planLimit(int rowLimit) {
+ public LogicalPlanBuilder planLimit(long rowLimit) {
if (rowLimit == 0) {
return this;
}
@@ -889,7 +889,7 @@ public class LogicalPlanBuilder {
return this;
}
- public LogicalPlanBuilder planOffset(int rowOffset) {
+ public LogicalPlanBuilder planOffset(long rowOffset) {
if (rowOffset == 0) {
return this;
}
@@ -963,8 +963,8 @@ public class LogicalPlanBuilder {
PartialPath pathPattern,
String key,
String value,
- int limit,
- int offset,
+ long limit,
+ long offset,
boolean orderByHeat,
boolean contains,
boolean prefixPath,
@@ -985,7 +985,7 @@ public class LogicalPlanBuilder {
}
public LogicalPlanBuilder planDeviceSchemaSource(
- PartialPath pathPattern, int limit, int offset, boolean prefixPath, boolean hasSgCol) {
+ PartialPath pathPattern, long limit, long offset, boolean prefixPath, boolean hasSgCol) {
this.root =
new DevicesSchemaScanNode(
context.getQueryId().genPlanNodeId(), pathPattern, limit, offset, prefixPath, hasSgCol);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
index 7696aab20a..a5453c348b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
@@ -466,8 +466,8 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
&& analysis.getSchemaPartitionInfo().getDistributionInfo().size() == 1
&& !showTimeSeriesStatement.isOrderByHeat();
- int limit = showTimeSeriesStatement.getLimit();
- int offset = showTimeSeriesStatement.getOffset();
+ long limit = showTimeSeriesStatement.getLimit();
+ long offset = showTimeSeriesStatement.getOffset();
if (showTimeSeriesStatement.isOrderByHeat()) {
limit = 0;
offset = 0;
@@ -523,8 +523,8 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
analysis.getSchemaPartitionInfo() != null
&& analysis.getSchemaPartitionInfo().getDistributionInfo().size() == 1;
- int limit = showDevicesStatement.getLimit();
- int offset = showDevicesStatement.getOffset();
+ long limit = showDevicesStatement.getLimit();
+ long offset = showDevicesStatement.getOffset();
if (!canPushDownOffsetLimit) {
limit = showDevicesStatement.getLimit() + showDevicesStatement.getOffset();
offset = 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
index b3f65eec7c..f55d695b2d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
@@ -50,7 +50,7 @@ public class LogicalPlanner {
.recordPlanCost(LOGICAL_PLANNER, System.nanoTime() - startTime);
for (PlanOptimizer optimizer : optimizers) {
- rootNode = optimizer.optimize(rootNode, context);
+ rootNode = optimizer.optimize(rootNode, analysis, context);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index f8ef1e525b..f56794b004 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -290,6 +290,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
}
seriesScanOptionsBuilder.withAllSensors(
context.getAllSensors(seriesPath.getDevice(), seriesPath.getMeasurement()));
+ seriesScanOptionsBuilder.withLimit(node.getLimit());
+ seriesScanOptionsBuilder.withOffset(node.getOffset());
SeriesScanOperator seriesScanOperator =
new SeriesScanOperator(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
index 422759a807..73fe5c5259 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.execution.exchange.sink.DownStreamChannelLocation;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.optimization.LimitOffsetPushDown;
+import org.apache.iotdb.db.mpp.plan.optimization.PlanOptimizer;
import org.apache.iotdb.db.mpp.plan.planner.IFragmentParallelPlaner;
import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
@@ -40,7 +42,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.ShuffleSinkNode;
import org.apache.iotdb.db.mpp.plan.statement.component.OrderByComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
-import org.apache.iotdb.db.mpp.plan.statement.sys.ShowQueriesStatement;
import org.apache.commons.lang3.Validate;
@@ -57,10 +58,15 @@ public class DistributionPlanner {
private MPPQueryContext context;
private LogicalQueryPlan logicalPlan;
+ private final List<PlanOptimizer> optimizers;
+
+ private int planFragmentIndex = 0;
+
public DistributionPlanner(Analysis analysis, LogicalQueryPlan logicalPlan) {
this.analysis = analysis;
this.logicalPlan = logicalPlan;
this.context = logicalPlan.getContext();
+ this.optimizers = Collections.singletonList(new LimitOffsetPushDown());
}
public PlanNode rewriteSource() {
@@ -152,6 +158,15 @@ public class DistributionPlanner {
|| orderByComponent.getSortItemList().get(0).getSortKey().equals(SortKey.DEVICE));
}
+ public PlanNode optimize(PlanNode rootWithExchange) {
+ if (analysis.getStatement() != null && analysis.getStatement().isQuery()) {
+ for (PlanOptimizer optimizer : optimizers) {
+ rootWithExchange = optimizer.optimize(rootWithExchange, analysis, context);
+ }
+ }
+ return rootWithExchange;
+ }
+
public SubPlan splitFragment(PlanNode root) {
FragmentBuilder fragmentBuilder = new FragmentBuilder(context);
return fragmentBuilder.splitToSubPlan(root);
@@ -160,13 +175,13 @@ public class DistributionPlanner {
public DistributedQueryPlan planFragments() {
PlanNode rootAfterRewrite = rewriteSource();
PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
- if (analysis.getStatement() instanceof QueryStatement
- || analysis.getStatement() instanceof ShowQueriesStatement) {
+ if (analysis.getStatement() != null && analysis.getStatement().isQuery()) {
analysis
.getRespDatasetHeader()
.setColumnToTsBlockIndexMap(rootWithExchange.getOutputColumnNames());
}
- SubPlan subPlan = splitFragment(rootWithExchange);
+ PlanNode optimizedRootWithExchange = optimize(rootWithExchange);
+ SubPlan subPlan = splitFragment(optimizedRootWithExchange);
// Mark the root Fragment of root SubPlan as `root`
subPlan.getPlanFragment().setRoot(true);
List<FragmentInstance> fragmentInstances = planFragmentInstances(subPlan);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
index c0a839cc6e..a9726d0c2f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
@@ -96,6 +96,14 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter
boxValue.add(String.format("SeriesScan-%s", node.getPlanNodeId().getId()));
boxValue.add(String.format("Series: %s", node.getSeriesPath()));
boxValue.add(String.format("TimeFilter: %s", node.getTimeFilter()));
+
+ long limit = node.getLimit(), offset = node.getOffset();
+ if (limit > 0) {
+ boxValue.add(String.format("Limit: %s", limit));
+ }
+ if (offset > 0) {
+ boxValue.add(String.format("Offset: %s", offset));
+ }
boxValue.add(printRegion(node.getRegionReplicaSet()));
return render(node, boxValue, context);
}
@@ -109,6 +117,14 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter
"Series: %s%s",
node.getAlignedPath().getDevice(), node.getAlignedPath().getMeasurementList()));
boxValue.add(String.format("TimeFilter: %s", node.getTimeFilter()));
+
+ long limit = node.getLimit(), offset = node.getOffset();
+ if (limit > 0) {
+ boxValue.add(String.format("Limit: %s", limit));
+ }
+ if (offset > 0) {
+ boxValue.add(String.format("Offset: %s", offset));
+ }
boxValue.add(printRegion(node.getRegionReplicaSet()));
return render(node, boxValue, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index 9356ddc742..cf3fbe02af 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -60,8 +60,10 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.HorizontallyConcat
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.IntoNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MergeSortNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ProjectNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SingleChildProcessNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SingleDeviceViewNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
@@ -79,6 +81,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.ShowQueriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
@@ -94,76 +97,88 @@ public abstract class PlanVisitor<R, C> {
public abstract R visitPlan(PlanNode node, C context);
- public R visitSeriesScan(SeriesScanNode node, C context) {
+ public R visitSourceNode(SourceNode node, C context) {
return visitPlan(node, context);
}
- public R visitSeriesAggregationScan(SeriesAggregationScanNode node, C context) {
+ public R visitSingleChildProcess(SingleChildProcessNode node, C context) {
return visitPlan(node, context);
}
- public R visitAlignedSeriesScan(AlignedSeriesScanNode node, C context) {
+ public R visitMultiChildProcess(MultiChildProcessNode node, C context) {
return visitPlan(node, context);
}
+ public R visitSeriesScan(SeriesScanNode node, C context) {
+ return visitSourceNode(node, context);
+ }
+
+ public R visitSeriesAggregationScan(SeriesAggregationScanNode node, C context) {
+ return visitSourceNode(node, context);
+ }
+
+ public R visitAlignedSeriesScan(AlignedSeriesScanNode node, C context) {
+ return visitSourceNode(node, context);
+ }
+
public R visitAlignedSeriesAggregationScan(AlignedSeriesAggregationScanNode node, C context) {
- return visitPlan(node, context);
+ return visitSourceNode(node, context);
}
public R visitDeviceView(DeviceViewNode node, C context) {
- return visitPlan(node, context);
+ return visitMultiChildProcess(node, context);
}
public R visitDeviceMerge(DeviceMergeNode node, C context) {
- return visitPlan(node, context);
+ return visitMultiChildProcess(node, context);
}
public R visitFill(FillNode node, C context) {
- return visitPlan(node, context);
+ return visitSingleChildProcess(node, context);
}
public R visitFilter(FilterNode node, C context) {
- return visitPlan(node, context);
+ return visitSingleChildProcess(node, context);
}
public R visitGroupByLevel(GroupByLevelNode node, C context) {
- return visitPlan(node, context);
+ return visitMultiChildProcess(node, context);
}
public R visitGroupByTag(GroupByTagNode node, C context) {
- return visitPlan(node, context);
+ return visitMultiChildProcess(node, context);
}
public R visitSlidingWindowAggregation(SlidingWindowAggregationNode node, C context) {
- return visitPlan(node, context);
+ return visitSingleChildProcess(node, context);
}
public R visitLimit(LimitNode node, C context) {
- return visitPlan(node, context);
+ return visitSingleChildProcess(node, context);
}
public R visitOffset(OffsetNode node, C context) {
- return visitPlan(node, context);
+ return visitSingleChildProcess(node, context);
}
public R visitAggregation(AggregationNode node, C context) {
- return visitPlan(node, context);
+ return visitMultiChildProcess(node, context);
}
public R visitSort(SortNode node, C context) {
- return visitPlan(node, context);
+ return visitSingleChildProcess(node, context);
}
public R visitProject(ProjectNode node, C context) {
- return visitPlan(node, context);
+ return visitSingleChildProcess(node, context);
}
public R visitTimeJoin(TimeJoinNode node, C context) {
- return visitPlan(node, context);
+ return visitMultiChildProcess(node, context);
}
public R visitExchange(ExchangeNode node, C context) {
- return visitPlan(node, context);
+ return visitSingleChildProcess(node, context);
}
public R visitSchemaQueryMerge(SchemaQueryMergeNode node, C context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/DevicesSchemaScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/DevicesSchemaScanNode.java
index 2dae4bd591..73307996cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/DevicesSchemaScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/DevicesSchemaScanNode.java
@@ -41,8 +41,8 @@ public class DevicesSchemaScanNode extends SchemaQueryScanNode {
public DevicesSchemaScanNode(
PlanNodeId id,
PartialPath path,
- int limit,
- int offset,
+ long limit,
+ long offset,
boolean isPrefixPath,
boolean hasSgCol) {
super(id, path, limit, offset, isPrefixPath);
@@ -98,8 +98,8 @@ public class DevicesSchemaScanNode extends SchemaQueryScanNode {
} catch (IllegalPathException e) {
throw new IllegalArgumentException("Cannot deserialize DevicesSchemaScanNode", e);
}
- int limit = ReadWriteIOUtils.readInt(byteBuffer);
- int offset = ReadWriteIOUtils.readInt(byteBuffer);
+ long limit = ReadWriteIOUtils.readLong(byteBuffer);
+ long offset = ReadWriteIOUtils.readLong(byteBuffer);
boolean isPrefixPath = ReadWriteIOUtils.readBool(byteBuffer);
boolean hasSgCol = ReadWriteIOUtils.readBool(byteBuffer);
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaQueryScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaQueryScanNode.java
index 7d71b7f64f..69af27aca9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaQueryScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaQueryScanNode.java
@@ -31,8 +31,8 @@ import java.util.List;
import java.util.Objects;
public abstract class SchemaQueryScanNode extends SourceNode {
- protected int limit;
- protected int offset;
+ protected long limit;
+ protected long offset;
protected PartialPath path;
private boolean hasLimit;
protected boolean isPrefixPath;
@@ -44,7 +44,7 @@ public abstract class SchemaQueryScanNode extends SourceNode {
}
protected SchemaQueryScanNode(
- PlanNodeId id, PartialPath partialPath, int limit, int offset, boolean isPrefixPath) {
+ PlanNodeId id, PartialPath partialPath, long limit, long offset, boolean isPrefixPath) {
super(id);
this.path = partialPath;
setLimit(limit);
@@ -89,11 +89,11 @@ public abstract class SchemaQueryScanNode extends SourceNode {
return isPrefixPath;
}
- public int getLimit() {
+ public long getLimit() {
return limit;
}
- public void setLimit(int limit) {
+ public void setLimit(long limit) {
this.limit = limit;
if (limit == 0) {
hasLimit = false;
@@ -112,7 +112,7 @@ public abstract class SchemaQueryScanNode extends SourceNode {
this.schemaRegionReplicaSet = schemaRegionReplicaSet;
}
- public int getOffset() {
+ public long getOffset() {
return offset;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/TimeSeriesSchemaScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/TimeSeriesSchemaScanNode.java
index 683abeabb8..29d080644c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/TimeSeriesSchemaScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/TimeSeriesSchemaScanNode.java
@@ -72,8 +72,8 @@ public class TimeSeriesSchemaScanNode extends SchemaQueryScanNode {
PartialPath partialPath,
String key,
String value,
- int limit,
- int offset,
+ long limit,
+ long offset,
boolean orderByHeat,
boolean isContains,
boolean isPrefixPath,
@@ -132,8 +132,8 @@ public class TimeSeriesSchemaScanNode extends SchemaQueryScanNode {
}
String key = ReadWriteIOUtils.readString(byteBuffer);
String value = ReadWriteIOUtils.readString(byteBuffer);
- int limit = ReadWriteIOUtils.readInt(byteBuffer);
- int offset = ReadWriteIOUtils.readInt(byteBuffer);
+ long limit = ReadWriteIOUtils.readLong(byteBuffer);
+ long offset = ReadWriteIOUtils.readLong(byteBuffer);
boolean oderByHeat = ReadWriteIOUtils.readBool(byteBuffer);
boolean isContains = ReadWriteIOUtils.readBool(byteBuffer);
boolean isPrefixPath = ReadWriteIOUtils.readBool(byteBuffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java
index ae03e13f70..df9387397c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java
@@ -33,19 +33,19 @@ import java.util.Objects;
/** LimitNode is used to select top n result. It uses the default order of upstream nodes */
public class LimitNode extends SingleChildProcessNode {
- private final int limit;
+ private final long limit;
- public LimitNode(PlanNodeId id, int limit) {
+ public LimitNode(PlanNodeId id, long limit) {
super(id);
this.limit = limit;
}
- public LimitNode(PlanNodeId id, PlanNode child, int limit) {
+ public LimitNode(PlanNodeId id, PlanNode child, long limit) {
super(id, child);
this.limit = limit;
}
- public int getLimit() {
+ public long getLimit() {
return limit;
}
@@ -77,7 +77,7 @@ public class LimitNode extends SingleChildProcessNode {
}
public static LimitNode deserialize(ByteBuffer byteBuffer) {
- int limit = ReadWriteIOUtils.readInt(byteBuffer);
+ long limit = ReadWriteIOUtils.readLong(byteBuffer);
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
return new LimitNode(planNodeId, limit);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java
index 64b912303b..59151bbb51 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java
@@ -36,14 +36,14 @@ import java.util.Objects;
*/
public class OffsetNode extends SingleChildProcessNode {
- private final int offset;
+ private final long offset;
- public OffsetNode(PlanNodeId id, int offset) {
+ public OffsetNode(PlanNodeId id, long offset) {
super(id);
this.offset = offset;
}
- public OffsetNode(PlanNodeId id, PlanNode child, int offset) {
+ public OffsetNode(PlanNodeId id, PlanNode child, long offset) {
super(id, child);
this.offset = offset;
}
@@ -76,12 +76,12 @@ public class OffsetNode extends SingleChildProcessNode {
}
public static OffsetNode deserialize(ByteBuffer byteBuffer) {
- int offset = ReadWriteIOUtils.readInt(byteBuffer);
+ long offset = ReadWriteIOUtils.readLong(byteBuffer);
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
return new OffsetNode(planNodeId, offset);
}
- public int getOffset() {
+ public long getOffset() {
return offset;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java
index 3adf0b0618..09387d0e5b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java
@@ -62,10 +62,10 @@ public class AlignedSeriesScanNode extends SeriesSourceNode {
@Nullable private Filter valueFilter;
// Limit for result set. The default value is -1, which means no limit
- private int limit;
+ private long limit;
// offset for result set. The default value is 0
- private int offset;
+ private long offset;
// The id of DataRegion where the node will run
private TRegionReplicaSet regionReplicaSet;
@@ -86,8 +86,8 @@ public class AlignedSeriesScanNode extends SeriesSourceNode {
Ordering scanOrder,
@Nullable Filter timeFilter,
@Nullable Filter valueFilter,
- int limit,
- int offset,
+ long limit,
+ long offset,
TRegionReplicaSet dataRegionReplicaSet) {
this(id, alignedPath, scanOrder);
this.timeFilter = timeFilter;
@@ -119,14 +119,22 @@ public class AlignedSeriesScanNode extends SeriesSourceNode {
return valueFilter;
}
- public int getLimit() {
+ public long getLimit() {
return limit;
}
- public int getOffset() {
+ public long getOffset() {
return offset;
}
+ public void setLimit(long limit) {
+ this.limit = limit;
+ }
+
+ public void setOffset(long offset) {
+ this.offset = offset;
+ }
+
@Override
public void open() throws Exception {}
@@ -241,8 +249,8 @@ public class AlignedSeriesScanNode extends SeriesSourceNode {
if (isNull == 1) {
valueFilter = FilterFactory.deserialize(byteBuffer);
}
- int limit = ReadWriteIOUtils.readInt(byteBuffer);
- int offset = ReadWriteIOUtils.readInt(byteBuffer);
+ long limit = ReadWriteIOUtils.readLong(byteBuffer);
+ long offset = ReadWriteIOUtils.readLong(byteBuffer);
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
return new AlignedSeriesScanNode(
planNodeId, alignedPath, scanOrder, timeFilter, valueFilter, limit, offset, null);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesScanNode.java
index 637aad52ec..6458d5af0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesScanNode.java
@@ -66,10 +66,10 @@ public class SeriesScanNode extends SeriesSourceNode {
@Nullable private Filter valueFilter;
// Limit for result set. The default value is -1, which means no limit
- private int limit;
+ private long limit;
// offset for result set. The default value is 0
- private int offset;
+ private long offset;
// The id of DataRegion where the node will run
private TRegionReplicaSet regionReplicaSet;
@@ -90,8 +90,8 @@ public class SeriesScanNode extends SeriesSourceNode {
Ordering scanOrder,
@Nullable Filter timeFilter,
@Nullable Filter valueFilter,
- int limit,
- int offset,
+ long limit,
+ long offset,
TRegionReplicaSet dataRegionReplicaSet) {
this(id, seriesPath, scanOrder);
this.timeFilter = timeFilter;
@@ -117,19 +117,19 @@ public class SeriesScanNode extends SeriesSourceNode {
this.regionReplicaSet = dataRegion;
}
- public int getLimit() {
+ public long getLimit() {
return limit;
}
- public int getOffset() {
+ public long getOffset() {
return offset;
}
- public void setLimit(int limit) {
+ public void setLimit(long limit) {
this.limit = limit;
}
- public void setOffset(int offset) {
+ public void setOffset(long offset) {
this.offset = offset;
}
@@ -252,8 +252,8 @@ public class SeriesScanNode extends SeriesSourceNode {
if (isNull == 1) {
valueFilter = FilterFactory.deserialize(byteBuffer);
}
- int limit = ReadWriteIOUtils.readInt(byteBuffer);
- int offset = ReadWriteIOUtils.readInt(byteBuffer);
+ long limit = ReadWriteIOUtils.readLong(byteBuffer);
+ long offset = ReadWriteIOUtils.readLong(byteBuffer);
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
return new SeriesScanNode(
planNodeId, partialPath, scanOrder, timeFilter, valueFilter, limit, offset, null);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/Statement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/Statement.java
index a3730c89a0..3962bae22b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/Statement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/Statement.java
@@ -55,7 +55,7 @@ public abstract class Statement extends StatementNode {
}
public boolean isQuery() {
- return statementType == StatementType.QUERY;
+ return false;
}
public boolean isAuthenticationRequired() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
index ef9550a7ec..480ab2a026 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
@@ -78,14 +78,14 @@ public class QueryStatement extends Statement {
private HavingCondition havingCondition;
// row limit for result set. The default value is 0, which means no limit
- private int rowLimit = 0;
+ private long rowLimit = 0;
// row offset for result set. The default value is 0
- private int rowOffset = 0;
+ private long rowOffset = 0;
// series limit and offset for result set. The default value is 0, which means no limit
- private int seriesLimit = 0;
+ private long seriesLimit = 0;
// series offset for result set. The default value is 0
- private int seriesOffset = 0;
+ private long seriesOffset = 0;
private FillComponent fillComponent;
@@ -116,6 +116,11 @@ public class QueryStatement extends Statement {
this.statementType = StatementType.QUERY;
}
+ @Override
+ public boolean isQuery() {
+ return true;
+ }
+
@Override
public List<PartialPath> getPaths() {
Set<PartialPath> authPaths = new HashSet<>();
@@ -168,35 +173,35 @@ public class QueryStatement extends Statement {
this.havingCondition = havingCondition;
}
- public int getRowLimit() {
+ public long getRowLimit() {
return rowLimit;
}
- public void setRowLimit(int rowLimit) {
+ public void setRowLimit(long rowLimit) {
this.rowLimit = rowLimit;
}
- public int getRowOffset() {
+ public long getRowOffset() {
return rowOffset;
}
- public void setRowOffset(int rowOffset) {
+ public void setRowOffset(long rowOffset) {
this.rowOffset = rowOffset;
}
- public int getSeriesLimit() {
+ public long getSeriesLimit() {
return seriesLimit;
}
- public void setSeriesLimit(int seriesLimit) {
+ public void setSeriesLimit(long seriesLimit) {
this.seriesLimit = seriesLimit;
}
- public int getSeriesOffset() {
+ public long getSeriesOffset() {
return seriesOffset;
}
- public void setSeriesOffset(int seriesOffset) {
+ public void setSeriesOffset(long seriesOffset) {
this.seriesOffset = seriesOffset;
}
@@ -373,6 +378,14 @@ public class QueryStatement extends Statement {
isCqQueryBody = cqQueryBody;
}
+ public boolean hasLimit() {
+ return rowLimit > 0;
+ }
+
+ public boolean hasOffset() {
+ return rowOffset > 0;
+ }
+
public void semanticCheck() {
if (isAggregationQuery()) {
if (disableAlign()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowStatement.java
index 014f294555..87c6576a6b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowStatement.java
@@ -28,8 +28,8 @@ import java.util.List;
public class ShowStatement extends Statement {
- int limit = 0;
- int offset = 0;
+ long limit = 0;
+ long offset = 0;
protected boolean isPrefixPath;
@@ -43,19 +43,19 @@ public class ShowStatement extends Statement {
return Collections.emptyList();
}
- public int getLimit() {
+ public long getLimit() {
return limit;
}
- public void setLimit(int limit) {
+ public void setLimit(long limit) {
this.limit = limit;
}
- public int getOffset() {
+ public long getOffset() {
return offset;
}
- public void setOffset(int offset) {
+ public void setOffset(long offset) {
this.offset = offset;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/ShowQueriesStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/ShowQueriesStatement.java
index 1b3439519d..c6f266d463 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/ShowQueriesStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/ShowQueriesStatement.java
@@ -37,13 +37,18 @@ public class ShowQueriesStatement extends ShowStatement {
private OrderByComponent orderByComponent;
- private int rowLimit;
- private int rowOffset;
+ private long rowLimit;
+ private long rowOffset;
private ZoneId zoneId;
public ShowQueriesStatement() {}
+ @Override
+ public boolean isQuery() {
+ return true;
+ }
+
@Override
public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
return visitor.visitShowQueries(this, context);
@@ -73,19 +78,19 @@ public class ShowQueriesStatement extends ShowStatement {
return orderByComponent.getSortItemList();
}
- public void setRowLimit(int rowLimit) {
+ public void setRowLimit(long rowLimit) {
this.rowLimit = rowLimit;
}
- public int getRowLimit() {
+ public long getRowLimit() {
return rowLimit;
}
- public void setRowOffset(int rowOffset) {
+ public void setRowOffset(long rowOffset) {
this.rowOffset = rowOffset;
}
- public int getRowOffset() {
+ public long getRowOffset() {
return rowOffset;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/optimization/LimitOffsetPushDownTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/optimization/LimitOffsetPushDownTest.java
new file mode 100644
index 0000000000..a473771a8f
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/optimization/LimitOffsetPushDownTest.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.optimization;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.AlignedPath;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.analyze.Analyzer;
+import org.apache.iotdb.db.mpp.plan.analyze.FakePartitionFetcherImpl;
+import org.apache.iotdb.db.mpp.plan.analyze.FakeSchemaFetcherImpl;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
+import org.apache.iotdb.db.mpp.plan.planner.LogicalPlanner;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.component.FillPolicy;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.iotdb.db.mpp.plan.expression.ExpressionFactory.add;
+import static org.apache.iotdb.db.mpp.plan.expression.ExpressionFactory.function;
+import static org.apache.iotdb.db.mpp.plan.expression.ExpressionFactory.gt;
+import static org.apache.iotdb.db.mpp.plan.expression.ExpressionFactory.intValue;
+import static org.apache.iotdb.db.mpp.plan.expression.ExpressionFactory.timeSeries;
+
+public class LimitOffsetPushDownTest {
+
+ private static final Map<String, PartialPath> schemaMap = new HashMap<>();
+
+ static {
+ try {
+ schemaMap.put("root.sg.d1.s1", new MeasurementPath("root.sg.d1.s1", TSDataType.INT32));
+ schemaMap.put("root.sg.d1.s2", new MeasurementPath("root.sg.d1.s2", TSDataType.DOUBLE));
+ schemaMap.put("root.sg.d2.s1", new MeasurementPath("root.sg.d2.s1", TSDataType.INT32));
+ schemaMap.put("root.sg.d2.s2", new MeasurementPath("root.sg.d2.s2", TSDataType.DOUBLE));
+
+ MeasurementPath aS1 = new MeasurementPath("root.sg.d2.a.s1", TSDataType.INT32);
+ MeasurementPath aS2 = new MeasurementPath("root.sg.d2.a.s2", TSDataType.DOUBLE);
+ AlignedPath alignedPath =
+ new AlignedPath(
+ "root.sg.d2.a",
+ Arrays.asList("s1", "s2"),
+ Arrays.asList(aS1.getMeasurementSchema(), aS2.getMeasurementSchema()));
+ aS1.setUnderAlignedEntity(true);
+ aS2.setUnderAlignedEntity(true);
+ schemaMap.put("root.sg.d2.a.s1", aS1);
+ schemaMap.put("root.sg.d2.a.s2", aS2);
+ schemaMap.put("root.sg.d2.a", alignedPath);
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testNonAlignedPushDown() {
+ checkPushDown(
+ "select s1 from root.sg.d1 limit 100;",
+ new TestPlanBuilder().scan("0", schemaMap.get("root.sg.d1.s1")).limit("1", 100).getRoot(),
+ new TestPlanBuilder().scan("0", schemaMap.get("root.sg.d1.s1"), 100, 0).getRoot());
+ checkPushDown(
+ "select s1 from root.sg.d1 offset 100;",
+ new TestPlanBuilder().scan("0", schemaMap.get("root.sg.d1.s1")).offset("1", 100).getRoot(),
+ new TestPlanBuilder().scan("0", schemaMap.get("root.sg.d1.s1"), 0, 100).getRoot());
+ checkPushDown(
+ "select s1 from root.sg.d1 limit 100 offset 100;",
+ new TestPlanBuilder()
+ .scan("0", schemaMap.get("root.sg.d1.s1"))
+ .offset("1", 100)
+ .limit("2", 100)
+ .getRoot(),
+ new TestPlanBuilder().scan("0", schemaMap.get("root.sg.d1.s1"), 100, 100).getRoot());
+ }
+
+ @Test
+ public void testAlignedPushDown() {
+ checkPushDown(
+ "select s1, s2 from root.sg.d2.a limit 100;",
+ new TestPlanBuilder()
+ .scanAligned("0", schemaMap.get("root.sg.d2.a"))
+ .limit("1", 100)
+ .getRoot(),
+ new TestPlanBuilder().scanAligned("0", schemaMap.get("root.sg.d2.a"), 100, 0).getRoot());
+ checkPushDown(
+ "select s1, s2 from root.sg.d2.a offset 100;",
+ new TestPlanBuilder()
+ .scanAligned("0", schemaMap.get("root.sg.d2.a"))
+ .offset("1", 100)
+ .getRoot(),
+ new TestPlanBuilder().scanAligned("0", schemaMap.get("root.sg.d2.a"), 0, 100).getRoot());
+ checkPushDown(
+ "select s1, s2 from root.sg.d2.a limit 100 offset 100;",
+ new TestPlanBuilder()
+ .scanAligned("0", schemaMap.get("root.sg.d2.a"))
+ .offset("1", 100)
+ .limit("2", 100)
+ .getRoot(),
+ new TestPlanBuilder().scanAligned("0", schemaMap.get("root.sg.d2.a"), 100, 100).getRoot());
+ }
+
+ @Test
+ public void testPushDownWithTransform() {
+ List<Expression> expressions =
+ Arrays.asList(
+ add(
+ function("sin", add(timeSeries(schemaMap.get("root.sg.d1.s1")), intValue("1"))),
+ intValue("1")),
+ gt(timeSeries(schemaMap.get("root.sg.d1.s1")), intValue("10")));
+ checkPushDown(
+ "select sin(s1 + 1) + 1, s1 > 10 from root.sg.d1 limit 100 offset 100;",
+ new TestPlanBuilder()
+ .scan("0", schemaMap.get("root.sg.d1.s1"))
+ .transform("1", expressions)
+ .offset("2", 100)
+ .limit("3", 100)
+ .getRoot(),
+ new TestPlanBuilder()
+ .scan("0", schemaMap.get("root.sg.d1.s1"), 100, 100)
+ .transform("1", expressions)
+ .getRoot());
+ }
+
+ @Test
+ public void testPushDownWithFill() {
+ checkPushDown(
+ "select s1 from root.sg.d1 fill(100) limit 100 offset 100;",
+ new TestPlanBuilder()
+ .scan("0", schemaMap.get("root.sg.d1.s1"))
+ .fill("1", "100")
+ .offset("2", 100)
+ .limit("3", 100)
+ .getRoot(),
+ new TestPlanBuilder()
+ .scan("0", schemaMap.get("root.sg.d1.s1"), 100, 100)
+ .fill("1", "100")
+ .getRoot());
+ }
+
+ @Test
+ public void testPushDownAlignByDevice() {
+ checkPushDown(
+ "select s1 from root.sg.d1 limit 100 offset 100 align by device;",
+ new TestPlanBuilder()
+ .scan("0", schemaMap.get("root.sg.d1.s1"))
+ .singleDeviceView("1", "root.sg.d1", "s1")
+ .offset("2", 100)
+ .limit("3", 100)
+ .getRoot(),
+ new TestPlanBuilder()
+ .scan("0", schemaMap.get("root.sg.d1.s1"), 100, 100)
+ .singleDeviceView("1", "root.sg.d1", "s1")
+ .getRoot());
+ }
+
+ @Test
+ public void testPushDownWithInto() {
+ checkPushDown(
+ "select s1 into root.sg.d2(s1) from root.sg.d1 limit 100 offset 100;",
+ new TestPlanBuilder()
+ .scan("0", schemaMap.get("root.sg.d1.s1"))
+ .offset("1", 100)
+ .limit("2", 100)
+ .into("3", schemaMap.get("root.sg.d1.s1"), schemaMap.get("root.sg.d2.s1"))
+ .getRoot(),
+ new TestPlanBuilder()
+ .scan("0", schemaMap.get("root.sg.d1.s1"), 100, 100)
+ .into("3", schemaMap.get("root.sg.d1.s1"), schemaMap.get("root.sg.d2.s1"))
+ .getRoot());
+ }
+
+ @Test
+ public void testCannotPushDown() {
+ checkCannotPushDown(
+ "select s1, s2 from root.sg.d1 limit 100;",
+ new TestPlanBuilder()
+ .timeJoin(Arrays.asList(schemaMap.get("root.sg.d1.s1"), schemaMap.get("root.sg.d1.s2")))
+ .limit("3", 100)
+ .getRoot());
+
+ checkCannotPushDown(
+ "select diff(s1 + 1) + 1 from root.sg.d1 limit 100 offset 100;",
+ new TestPlanBuilder()
+ .scan("0", schemaMap.get("root.sg.d1.s1"))
+ .transform(
+ "1",
+ Collections.singletonList(
+ add(
+ function(
+ "diff", add(timeSeries(schemaMap.get("root.sg.d1.s1")), intValue("1"))),
+ intValue("1"))))
+ .offset("2", 100)
+ .limit("3", 100)
+ .getRoot());
+
+ LinkedHashMap<String, String> functionAttributes = new LinkedHashMap<>();
+ functionAttributes.put("windowSize", "10");
+ checkCannotPushDown(
+ "select m4(s1,'windowSize'='10') from root.sg.d1 limit 100 offset 100;",
+ new TestPlanBuilder()
+ .scan("0", schemaMap.get("root.sg.d1.s1"))
+ .transform(
+ "1",
+ Collections.singletonList(
+ function("m4", functionAttributes, timeSeries(schemaMap.get("root.sg.d1.s1")))))
+ .offset("2", 100)
+ .limit("3", 100)
+ .getRoot());
+
+ checkCannotPushDown(
+ "select s1 from root.sg.d1 fill(linear) limit 100;",
+ new TestPlanBuilder()
+ .scan("0", schemaMap.get("root.sg.d1.s1"))
+ .fill("1", FillPolicy.LINEAR)
+ .limit("2", 100)
+ .getRoot());
+ checkCannotPushDown(
+ "select s1 from root.sg.d1 fill(previous) limit 100;",
+ new TestPlanBuilder()
+ .scan("0", schemaMap.get("root.sg.d1.s1"))
+ .fill("1", FillPolicy.PREVIOUS)
+ .limit("2", 100)
+ .getRoot());
+
+ checkCannotPushDown(
+ "select s1 from root.sg.d1 where s1 > 10 limit 100 offset 100;",
+ new TestPlanBuilder()
+ .scan("0", schemaMap.get("root.sg.d1.s1"))
+ .filter(
+ "1",
+ Collections.singletonList(timeSeries(schemaMap.get("root.sg.d1.s1"))),
+ gt(timeSeries(schemaMap.get("root.sg.d1.s1")), intValue("10")))
+ .offset("2", 100)
+ .limit("3", 100)
+ .getRoot());
+ }
+
+ private void checkPushDown(String sql, PlanNode rawPlan, PlanNode optPlan) {
+ Statement statement = StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset());
+
+ MPPQueryContext context = new MPPQueryContext(new QueryId("test_query"));
+ Analyzer analyzer =
+ new Analyzer(context, new FakePartitionFetcherImpl(), new FakeSchemaFetcherImpl());
+ Analysis analysis = analyzer.analyze(statement);
+
+ LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>());
+ PlanNode actualPlan = planner.plan(analysis).getRootNode();
+ Assert.assertEquals(rawPlan, actualPlan);
+
+ PlanNode actualOptPlan = new LimitOffsetPushDown().optimize(actualPlan, analysis, context);
+ Assert.assertEquals(optPlan, actualOptPlan);
+ }
+
+ private void checkCannotPushDown(String sql, PlanNode rawPlan) {
+ Statement statement = StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset());
+
+ MPPQueryContext context = new MPPQueryContext(new QueryId("test_query"));
+ Analyzer analyzer =
+ new Analyzer(context, new FakePartitionFetcherImpl(), new FakeSchemaFetcherImpl());
+ Analysis analysis = analyzer.analyze(statement);
+
+ LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>());
+ PlanNode actualPlan = planner.plan(analysis).getRootNode();
+
+ Assert.assertEquals(rawPlan, actualPlan);
+ Assert.assertEquals(
+ actualPlan, new LimitOffsetPushDown().optimize(actualPlan, analysis, context));
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/optimization/TestPlanBuilder.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/optimization/TestPlanBuilder.java
new file mode 100644
index 0000000000..c563c16e89
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/optimization/TestPlanBuilder.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.optimization;
+
+import org.apache.iotdb.commons.path.AlignedPath;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.IntoNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.IntoPathDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
+import org.apache.iotdb.db.mpp.plan.statement.component.FillPolicy;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
+import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
+import org.apache.iotdb.db.mpp.plan.statement.literal.LongLiteral;
+
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant.DEVICE;
+
+public class TestPlanBuilder {
+
+ private PlanNode root;
+
+ public TestPlanBuilder() {}
+
+ public PlanNode getRoot() {
+ return root;
+ }
+
+ public TestPlanBuilder scan(String id, PartialPath path) {
+ this.root = new SeriesScanNode(new PlanNodeId(id), (MeasurementPath) path);
+ return this;
+ }
+
+ public TestPlanBuilder scanAligned(String id, PartialPath path) {
+ this.root = new AlignedSeriesScanNode(new PlanNodeId(id), (AlignedPath) path);
+ return this;
+ }
+
+ public TestPlanBuilder scan(String id, PartialPath path, int limit, int offset) {
+ SeriesScanNode node = new SeriesScanNode(new PlanNodeId(id), (MeasurementPath) path);
+ node.setLimit(limit);
+ node.setOffset(offset);
+ this.root = node;
+ return this;
+ }
+
+ public TestPlanBuilder scanAligned(String id, PartialPath path, int limit, int offset) {
+ AlignedSeriesScanNode node = new AlignedSeriesScanNode(new PlanNodeId(id), (AlignedPath) path);
+ node.setLimit(limit);
+ node.setOffset(offset);
+ this.root = node;
+ return this;
+ }
+
+ public TestPlanBuilder timeJoin(List<PartialPath> paths) {
+ int planId = 0;
+
+ List<PlanNode> seriesSourceNodes = new ArrayList<>();
+ for (PartialPath path : paths) {
+ seriesSourceNodes.add(
+ new SeriesScanNode(new PlanNodeId(String.valueOf(planId)), (MeasurementPath) path));
+ planId++;
+ }
+ this.root =
+ new TimeJoinNode(new PlanNodeId(String.valueOf(planId)), Ordering.ASC, seriesSourceNodes);
+ return this;
+ }
+
+ public TestPlanBuilder transform(String id, List<Expression> expressions) {
+ this.root =
+ new TransformNode(
+ new PlanNodeId(id),
+ getRoot(),
+ expressions.toArray(new Expression[0]),
+ false,
+ ZonedDateTime.now().getOffset(),
+ Ordering.ASC);
+ return this;
+ }
+
+ public TestPlanBuilder limit(String id, long limit) {
+ this.root = new LimitNode(new PlanNodeId(id), getRoot(), limit);
+ return this;
+ }
+
+ public TestPlanBuilder offset(String id, long offset) {
+ this.root = new OffsetNode(new PlanNodeId(id), getRoot(), offset);
+ return this;
+ }
+
+ public TestPlanBuilder fill(String id, FillPolicy fillPolicy) {
+ this.root =
+ new FillNode(new PlanNodeId(id), getRoot(), new FillDescriptor(fillPolicy), Ordering.ASC);
+ return this;
+ }
+
+ public TestPlanBuilder fill(String id, String intValue) {
+ this.root =
+ new FillNode(
+ new PlanNodeId(id),
+ getRoot(),
+ new FillDescriptor(FillPolicy.VALUE, new LongLiteral(intValue)),
+ Ordering.ASC);
+ return this;
+ }
+
+ public TestPlanBuilder singleDeviceView(String id, String device, String measurement) {
+ Map<String, List<Integer>> deviceToMeasurementIndexesMap = new HashMap<>();
+ deviceToMeasurementIndexesMap.put(device, Collections.singletonList(1));
+ DeviceViewNode deviceViewNode =
+ new DeviceViewNode(
+ new PlanNodeId(id),
+ new OrderByParameter(
+ Arrays.asList(
+ new SortItem(SortKey.DEVICE, Ordering.ASC),
+ new SortItem(SortKey.TIME, Ordering.ASC))),
+ Arrays.asList(DEVICE, measurement),
+ deviceToMeasurementIndexesMap);
+ deviceViewNode.addChildDeviceNode(device, getRoot());
+ this.root = deviceViewNode;
+ return this;
+ }
+
+ public TestPlanBuilder filter(String id, List<Expression> expressions, Expression predicate) {
+ this.root =
+ new FilterNode(
+ new PlanNodeId(id),
+ getRoot(),
+ expressions.toArray(new Expression[0]),
+ predicate,
+ false,
+ ZonedDateTime.now().getOffset(),
+ Ordering.ASC);
+ return this;
+ }
+
+ public TestPlanBuilder into(String id, PartialPath sourcePath, PartialPath intoPath) {
+ IntoPathDescriptor intoPathDescriptor = new IntoPathDescriptor();
+ intoPathDescriptor.specifyTargetPath(sourcePath.toString(), intoPath);
+ intoPathDescriptor.specifyDeviceAlignment(intoPath.getDevice(), false);
+ intoPathDescriptor.recordSourceColumnDataType(
+ sourcePath.toString(), sourcePath.getSeriesType());
+ this.root = new IntoNode(new PlanNodeId(id), getRoot(), intoPathDescriptor);
+ return this;
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/LogicalPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/LogicalPlannerTest.java
index 022749581b..56ef93e10b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/LogicalPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/LogicalPlannerTest.java
@@ -82,7 +82,6 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
@Ignore
-@Deprecated
public class LogicalPlannerTest {
@Test