You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/22 11:10:32 UTC
[iotdb] branch master updated: Refactor attributes in PlanNode (#5616)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d939abfc5e Refactor attributes in PlanNode (#5616)
d939abfc5e is described below
commit d939abfc5e082089f8b15811eb05b1f1a9625737
Author: liuminghui233 <36...@users.noreply.github.com>
AuthorDate: Fri Apr 22 19:10:25 2022 +0800
Refactor attributes in PlanNode (#5616)
---
.../iotdb/db/mpp/common/schematree/SchemaTree.java | 15 +++
.../operator/process/merge/SingleColumnMerger.java | 2 +-
.../source/SeriesAggregateScanOperator.java | 6 +-
.../db/mpp/sql/planner/DistributionPlanner.java | 3 -
.../{plan => }/IFragmentParallelPlaner.java | 4 +-
.../db/mpp/sql/planner/LocalExecutionPlanner.java | 2 +-
.../iotdb/db/mpp/sql/planner/LogicalPlanner.java | 5 +-
.../iotdb/db/mpp/sql/planner/QueryPlanBuilder.java | 10 +-
.../{plan => }/SimpleFragmentParallelPlanner.java | 5 +-
.../{plan => }/WriteFragmentParallelPlanner.java | 5 +-
.../db/mpp/sql/planner/plan/node/PlanNode.java | 8 ++
.../node/metedata/read/DevicesSchemaScanNode.java | 17 ++++
.../plan/node/metedata/read/SchemaFetchNode.java | 17 ++++
.../plan/node/metedata/read/SchemaMergeNode.java | 17 ++++
.../metedata/read/TimeSeriesSchemaScanNode.java | 17 ++++
.../node/metedata/write/AlterTimeSeriesNode.java | 17 ++++
.../plan/node/metedata/write/AuthorNode.java | 17 ++++
.../write/CreateAlignedTimeSeriesNode.java | 16 +++
.../node/metedata/write/CreateTimeSeriesNode.java | 16 +++
.../planner/plan/node/process/AggregateNode.java | 31 +++---
.../planner/plan/node/process/DeviceMergeNode.java | 47 ++++-----
.../planner/plan/node/process/ExchangeNode.java | 17 ++++
.../sql/planner/plan/node/process/FillNode.java | 9 +-
.../sql/planner/plan/node/process/FilterNode.java | 10 +-
.../planner/plan/node/process/FilterNullNode.java | 81 +++++++--------
.../plan/node/process/GroupByLevelNode.java | 85 +++++++++++-----
.../sql/planner/plan/node/process/LimitNode.java | 9 +-
.../sql/planner/plan/node/process/OffsetNode.java | 9 +-
.../sql/planner/plan/node/process/SortNode.java | 11 +--
.../planner/plan/node/process/TimeJoinNode.java | 109 ++++++++++++---------
.../planner/plan/node/sink/FragmentSinkNode.java | 17 ++++
.../plan/node/source/SeriesAggregateScanNode.java | 37 ++++---
.../planner/plan/node/source/SeriesScanNode.java | 15 +--
.../plan/node/write/InsertMultiTabletsNode.java | 17 ++++
.../sql/planner/plan/node/write/InsertRowNode.java | 16 +++
.../planner/plan/node/write/InsertRowsNode.java | 17 ++++
.../plan/node/write/InsertRowsOfOneDeviceNode.java | 17 ++++
.../planner/plan/node/write/InsertTabletNode.java | 16 +++
.../Aggregation.java} | 21 ++--
.../planner/plan/parameter/AggregationStep.java | 73 ++++++++++++++
.../plan/parameter/FilterNullParameter.java | 88 +++++++++++++++++
.../plan/parameter/GroupByTimeParameter.java} | 81 ++++++++-------
.../plan/{ => parameter}/InputLocation.java | 35 ++++++-
.../planner/plan/{ => parameter}/OutputColumn.java | 24 ++++-
.../statement/component/GroupByTimeComponent.java | 73 --------------
.../query/expression/unary/FunctionExpression.java | 3 +
.../query/expression/unary/TimeSeriesOperand.java | 5 +-
.../iotdb/db/mpp/execution/DataDriverTest.java | 2 +-
.../iotdb/db/mpp/operator/LimitOperatorTest.java | 2 +-
.../operator/SeriesAggregateScanOperatorTest.java | 16 +--
.../db/mpp/operator/SingleColumnMergerTest.java | 2 +-
.../db/mpp/operator/TimeJoinOperatorTest.java | 2 +-
.../db/mpp/sql/plan/FragmentInstanceSerdeTest.java | 9 +-
.../node/process/DeviceMergeNodeSerdeTest.java | 3 -
.../plan/node/process/ExchangeNodeSerdeTest.java | 3 -
.../sql/plan/node/process/FillNodeSerdeTest.java | 4 -
.../sql/plan/node/process/FilterNodeSerdeTest.java | 4 -
.../plan/node/process/FilterNullNodeSerdeTest.java | 9 +-
.../node/process/GroupByLevelNodeSerdeTest.java | 9 +-
.../sql/plan/node/process/LimitNodeSerdeTest.java | 9 +-
.../sql/plan/node/process/OffsetNodeSerdeTest.java | 17 ++--
.../sql/plan/node/process/SortNodeSerdeTest.java | 9 +-
.../plan/node/process/TimeJoinNodeSerdeTest.java | 13 +--
.../source/SeriesAggregateScanNodeSerdeTest.java | 6 +-
64 files changed, 879 insertions(+), 412 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
index 0913a495a1..d8a5586362 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
@@ -20,6 +20,8 @@
package org.apache.iotdb.db.mpp.common.schematree;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.template.NoTemplateOnMNodeException;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -31,6 +33,7 @@ import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
+import java.util.Set;
import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_ROOT;
import static org.apache.iotdb.db.mpp.common.schematree.SchemaNode.SCHEMA_ENTITY_NODE;
@@ -64,6 +67,18 @@ public class SchemaTree {
return new Pair<>(visitor.getAllResult(), visitor.getNextOffset());
}
+ /**
+ * Get all device paths matching the path pattern.
+ *
+ * @param pathPattern the pattern of the target devices.
+ * @return A HashSet instance which stores devices paths matching the given path pattern.
+ */
+ public Set<PartialPath> getMatchedDevices(PartialPath pathPattern, boolean isPrefixMatch)
+ throws MetadataException {
+ // TODO: @zyk
+ throw new NoTemplateOnMNodeException("");
+ }
+
public DeviceSchemaInfo searchDeviceSchemaInfo(
PartialPath devicePath, List<String> measurements) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/SingleColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/SingleColumnMerger.java
index e4e054a0ef..8e3c82cb9a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/SingleColumnMerger.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/SingleColumnMerger.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.mpp.operator.process.merge;
-import org.apache.iotdb.db.mpp.sql.planner.plan.InputLocation;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.InputLocation;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
index 79586e3c4e..b94fadccc1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.operator.OperatorContext;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.sql.statement.component.GroupByTimeComponent;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.factory.AggregateResultFactory;
@@ -85,7 +85,7 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
List<AggregationType> aggregateFuncList,
Filter timeFilter,
boolean ascending,
- GroupByTimeComponent groupByTimeParameter) {
+ GroupByTimeParameter groupByTimeParameter) {
this.sourceId = sourceId;
this.operatorContext = context;
this.ascending = ascending;
@@ -121,7 +121,7 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
* Aggregation query has only one time window and the result set of it does not contain a
* timestamp, so it doesn't matter what the time range returns.
*/
- public ITimeRangeIterator initTimeRangeIterator(GroupByTimeComponent groupByTimeParameter) {
+ public ITimeRangeIterator initTimeRangeIterator(GroupByTimeParameter groupByTimeParameter) {
if (groupByTimeParameter == null) {
return new SingleTimeWindowIterator(0, Long.MAX_VALUE);
} else {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index b8a42f462b..67c8c5f740 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -25,12 +25,9 @@ import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
-import org.apache.iotdb.db.mpp.sql.planner.plan.IFragmentParallelPlaner;
import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
-import org.apache.iotdb.db.mpp.sql.planner.plan.SimpleFragmentParallelPlanner;
import org.apache.iotdb.db.mpp.sql.planner.plan.SubPlan;
-import org.apache.iotdb.db.mpp.sql.planner.plan.WriteFragmentParallelPlanner;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/IFragmentParallelPlaner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/IFragmentParallelPlaner.java
similarity index 92%
rename from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/IFragmentParallelPlaner.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/IFragmentParallelPlaner.java
index 36be61fa3f..512634ff9b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/IFragmentParallelPlaner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/IFragmentParallelPlaner.java
@@ -17,7 +17,9 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.sql.planner.plan;
+package org.apache.iotdb.db.mpp.sql.planner;
+
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
import java.util.List;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index 673efa4490..230ffac797 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -47,7 +47,6 @@ import org.apache.iotdb.db.mpp.operator.source.DataSourceOperator;
import org.apache.iotdb.db.mpp.operator.source.ExchangeOperator;
import org.apache.iotdb.db.mpp.operator.source.SeriesAggregateScanOperator;
import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
-import org.apache.iotdb.db.mpp.sql.planner.plan.OutputColumn;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.DevicesSchemaScanNode;
@@ -68,6 +67,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.OutputColumn;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
index cbaa275805..7d6cbee1fd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.optimization.PlanOptimizer;
-import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AlterTimeSeriesNode;
@@ -86,9 +85,7 @@ public class LogicalPlanner {
rootNode = optimizer.optimize(rootNode, context);
}
- analysis
- .getRespDatasetHeader()
- .setColumnToTsBlockIndexMap(((IOutputPlanNode) rootNode).getOutputColumnNames());
+ analysis.getRespDatasetHeader().setColumnToTsBlockIndexMap(rootNode.getOutputColumnNames());
}
return new LogicalQueryPlan(context, rootNode);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/QueryPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/QueryPlanBuilder.java
index db7497ea61..eae36dc667 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/QueryPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/QueryPlanBuilder.java
@@ -37,11 +37,11 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.OffsetNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.FilterNullParameter;
import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
import org.apache.iotdb.db.mpp.sql.statement.component.GroupByLevelComponent;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.db.query.aggregation.AggregationType;
-import org.apache.iotdb.db.query.expression.Expression;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -270,10 +270,10 @@ public class QueryPlanBuilder {
new FilterNullNode(
context.getQueryId().genPlanNodeId(),
this.getRoot(),
- filterNullComponent.getWithoutPolicyType(),
- filterNullComponent.getWithoutNullColumns().stream()
- .map(Expression::getExpressionString)
- .collect(Collectors.toList()));
+ new FilterNullParameter(
+ filterNullComponent.getWithoutPolicyType(),
+ new ArrayList<>() // TODO: support filtering based on partial columns
+ ));
}
public void planLimit(int rowLimit) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/SimpleFragmentParallelPlanner.java
similarity index 96%
rename from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/SimpleFragmentParallelPlanner.java
index f3413df263..83cffa59cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/SimpleFragmentParallelPlanner.java
@@ -16,12 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.sql.planner.plan;
+package org.apache.iotdb.db.mpp.sql.planner;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
+import org.apache.iotdb.db.mpp.sql.planner.plan.SubPlan;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/WriteFragmentParallelPlanner.java
similarity index 91%
rename from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/WriteFragmentParallelPlanner.java
index be34ae257e..6f28e87f21 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/WriteFragmentParallelPlanner.java
@@ -17,10 +17,13 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.sql.planner.plan;
+package org.apache.iotdb.db.mpp.sql.planner;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
+import org.apache.iotdb.db.mpp.sql.planner.plan.SubPlan;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
index f019999920..6c3d522a35 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.commons.lang.Validate;
@@ -77,6 +79,12 @@ public abstract class PlanNode {
public abstract int allowedChildCount();
+ public abstract List<ColumnHeader> getOutputColumnHeaders();
+
+ public abstract List<String> getOutputColumnNames();
+
+ public abstract List<TSDataType> getOutputColumnTypes();
+
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitPlan(this, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/DevicesSchemaScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/DevicesSchemaScanNode.java
index 4afa2fad9b..a66513953a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/DevicesSchemaScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/DevicesSchemaScanNode.java
@@ -20,9 +20,11 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
@@ -61,6 +63,21 @@ public class DevicesSchemaScanNode extends SchemaScanNode {
return new DevicesSchemaScanNode(getPlanNodeId(), path, limit, offset, isPrefixPath, hasSgCol);
}
+ @Override
+ public List<ColumnHeader> getOutputColumnHeaders() {
+ return null;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return null;
+ }
+
+ @Override
+ public List<TSDataType> getOutputColumnTypes() {
+ return null;
+ }
+
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.DEVICES_SCHEMA_SCAN.serialize(byteBuffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaFetchNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaFetchNode.java
index 54800d33a1..2afbfa4b95 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaFetchNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaFetchNode.java
@@ -19,11 +19,13 @@
package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import com.google.common.collect.ImmutableList;
@@ -61,6 +63,21 @@ public class SchemaFetchNode extends SchemaScanNode {
return 0;
}
+ @Override
+ public List<ColumnHeader> getOutputColumnHeaders() {
+ return null;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return null;
+ }
+
+ @Override
+ public List<TSDataType> getOutputColumnTypes() {
+ return null;
+ }
+
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.SCHEMA_FETCH.serialize(byteBuffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaMergeNode.java
index 7e9357cb68..1914825994 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaMergeNode.java
@@ -18,11 +18,13 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ProcessNode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -64,6 +66,21 @@ public class SchemaMergeNode extends ProcessNode {
return CHILD_COUNT_NO_LIMIT;
}
+ @Override
+ public List<ColumnHeader> getOutputColumnHeaders() {
+ return null;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return null;
+ }
+
+ @Override
+ public List<TSDataType> getOutputColumnTypes() {
+ return null;
+ }
+
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.SCHEMA_MERGE.serialize(byteBuffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/TimeSeriesSchemaScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/TimeSeriesSchemaScanNode.java
index ab5337f8a8..c789cc64f3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/TimeSeriesSchemaScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/TimeSeriesSchemaScanNode.java
@@ -20,9 +20,11 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
@@ -121,6 +123,21 @@ public class TimeSeriesSchemaScanNode extends SchemaScanNode {
getPlanNodeId(), path, key, value, limit, offset, orderByHeat, isContains, isPrefixPath);
}
+ @Override
+ public List<ColumnHeader> getOutputColumnHeaders() {
+ return null;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return null;
+ }
+
+ @Override
+ public List<TSDataType> getOutputColumnTypes() {
+ return null;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
index c38d0e1452..2df87c8b44 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
@@ -21,12 +21,14 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.sql.statement.metadata.AlterTimeSeriesStatement.AlterType;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
@@ -136,6 +138,21 @@ public class AlterTimeSeriesNode extends PlanNode {
return NO_CHILD_ALLOWED;
}
+ @Override
+ public List<ColumnHeader> getOutputColumnHeaders() {
+ return null;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return null;
+ }
+
+ @Override
+ public List<TSDataType> getOutputColumnTypes() {
+ return null;
+ }
+
@Override
public void serialize(ByteBuffer byteBuffer) {
byteBuffer.putShort((short) PlanNodeType.ALTER_TIME_SERIES.ordinal());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AuthorNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AuthorNode.java
index adcd0cfc06..1f49ea735f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AuthorNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AuthorNode.java
@@ -22,11 +22,13 @@ import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.auth.entity.PrivilegeType;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
@@ -164,6 +166,21 @@ public class AuthorNode extends PlanNode {
return 0;
}
+ @Override
+ public List<ColumnHeader> getOutputColumnHeaders() {
+ return null;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return null;
+ }
+
+ @Override
+ public List<TSDataType> getOutputColumnTypes() {
+ return null;
+ }
+
@Override
public void serialize(ByteBuffer byteBuffer) {
byteBuffer.putShort((short) PlanNodeType.AUTHOR.ordinal());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
index b6d0a56143..d90e79f8c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
@@ -159,6 +160,21 @@ public class CreateAlignedTimeSeriesNode extends PlanNode {
return NO_CHILD_ALLOWED;
}
+ @Override
+ public List<ColumnHeader> getOutputColumnHeaders() {
+ return null;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return null;
+ }
+
+ @Override
+ public List<TSDataType> getOutputColumnTypes() {
+ return null;
+ }
+
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C schemaRegion) {
return visitor.visitCreateAlignedTimeSeries(this, schemaRegion);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
index ddeefcef63..dcbea17148 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -170,6 +171,21 @@ public class CreateTimeSeriesNode extends WritePlanNode {
return NO_CHILD_ALLOWED;
}
+ @Override
+ public List<ColumnHeader> getOutputColumnHeaders() {
+ return null;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return null;
+ }
+
+ @Override
+ public List<TSDataType> getOutputColumnTypes() {
+ return null;
+ }
+
public static CreateTimeSeriesNode deserialize(ByteBuffer byteBuffer) {
String id;
PartialPath path = null;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java
index d5d018f0ee..ba71b30970 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java
@@ -21,12 +21,12 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
-import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.mpp.sql.statement.component.GroupByTimeComponent;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.Aggregation;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -52,19 +52,24 @@ import java.util.stream.Collectors;
* input as a TsBlock, it may be raw data or partial aggregation result. This node will output the
* final series aggregated result represented by TsBlock.
*/
-public class AggregateNode extends ProcessNode implements IOutputPlanNode {
+public class AggregateNode extends ProcessNode {
// The map from columns to corresponding aggregation functions on that column.
// KEY: The index of a column in the input {@link TsBlock}.
// VALUE: Aggregation functions on this column.
// (Currently, we only support one series in the aggregation function.)
- private final Map<PartialPath, Set<AggregationType>> aggregateFuncMap;
+ @Deprecated private final Map<PartialPath, Set<AggregationType>> aggregateFuncMap;
+
+ // The list of aggregation functions, each Aggregation will be output as one column of result
+ // TsBlock
+ private List<Aggregation> aggregationList;
// The parameter of `group by time`.
// Its value will be null if there is no `group by time` clause.
- private final GroupByTimeComponent groupByTimeParameter;
+ private final GroupByTimeParameter groupByTimeParameter;
- private final List<ColumnHeader> columnHeaders = new ArrayList<>();
+ // column name and datatype of each output column
+ private final List<ColumnHeader> outputColumnHeaders = new ArrayList<>();
private PlanNode child;
@@ -72,14 +77,14 @@ public class AggregateNode extends ProcessNode implements IOutputPlanNode {
PlanNodeId id,
PlanNode child,
Map<PartialPath, Set<AggregationType>> aggregateFuncMap,
- GroupByTimeComponent groupByTimeParameter) {
+ GroupByTimeParameter groupByTimeParameter) {
super(id);
this.child = child;
this.aggregateFuncMap = aggregateFuncMap;
this.groupByTimeParameter = groupByTimeParameter;
for (Map.Entry<PartialPath, Set<AggregationType>> entry : aggregateFuncMap.entrySet()) {
PartialPath path = entry.getKey();
- columnHeaders.addAll(
+ outputColumnHeaders.addAll(
entry.getValue().stream()
.map(
functionName ->
@@ -111,17 +116,21 @@ public class AggregateNode extends ProcessNode implements IOutputPlanNode {
@Override
public List<ColumnHeader> getOutputColumnHeaders() {
- return columnHeaders;
+ return outputColumnHeaders;
}
@Override
public List<String> getOutputColumnNames() {
- return columnHeaders.stream().map(ColumnHeader::getColumnName).collect(Collectors.toList());
+ return outputColumnHeaders.stream()
+ .map(ColumnHeader::getColumnName)
+ .collect(Collectors.toList());
}
@Override
public List<TSDataType> getOutputColumnTypes() {
- return columnHeaders.stream().map(ColumnHeader::getColumnType).collect(Collectors.toList());
+ return outputColumnHeaders.stream()
+ .map(ColumnHeader::getColumnType)
+ .collect(Collectors.toList());
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
index 72bdb6cd4c..21c1eae46f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
@@ -20,13 +20,11 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
-import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -49,24 +47,19 @@ import java.util.stream.Collectors;
* same between these TsBlocks. If the input TsBlock contains n columns, the device-based view will
* contain n+1 columns where the new column is Device column.
*/
-public class DeviceMergeNode extends ProcessNode implements IOutputPlanNode {
+public class DeviceMergeNode extends ProcessNode {
// The result output order that this operator
private OrderBy mergeOrder;
- // The policy to decide whether a row should be discarded
- // The without policy is able to be push down to the DeviceMergeNode because we can know whether a
- // row contains
- // null or not.
- private FilterNullComponent filterNullComponent;
-
// The map from deviceName to corresponding query result node responsible for that device.
// DeviceNode means the node whose output TsBlock contains the data belonged to one device.
private Map<String, PlanNode> childDeviceNodeMap = new HashMap<>();
- private List<PlanNode> children;
+ // column name and datatype of each output column
+ private final List<ColumnHeader> outputColumnHeaders = new ArrayList<>();
- private final List<ColumnHeader> columnHeaders = new ArrayList<>();
+ private List<PlanNode> children;
public DeviceMergeNode(PlanNodeId id) {
super(id);
@@ -79,10 +72,6 @@ public class DeviceMergeNode extends ProcessNode implements IOutputPlanNode {
this.children = new ArrayList<>();
}
- public void setFilterNullComponent(FilterNullComponent filterNullComponent) {
- this.filterNullComponent = filterNullComponent;
- }
-
@Override
public List<PlanNode> getChildren() {
return children;
@@ -110,28 +99,32 @@ public class DeviceMergeNode extends ProcessNode implements IOutputPlanNode {
}
private void updateColumnHeaders(PlanNode childNode) {
- List<ColumnHeader> childColumnHeaders = ((IOutputPlanNode) childNode).getOutputColumnHeaders();
+ List<ColumnHeader> childColumnHeaders = childNode.getOutputColumnHeaders();
for (ColumnHeader columnHeader : childColumnHeaders) {
ColumnHeader tmpColumnHeader = columnHeader.replacePathWithMeasurement();
- if (!columnHeaders.contains(tmpColumnHeader)) {
- columnHeaders.add(tmpColumnHeader);
+ if (!outputColumnHeaders.contains(tmpColumnHeader)) {
+ outputColumnHeaders.add(tmpColumnHeader);
}
}
}
@Override
public List<ColumnHeader> getOutputColumnHeaders() {
- return columnHeaders;
+ return outputColumnHeaders;
}
@Override
public List<String> getOutputColumnNames() {
- return columnHeaders.stream().map(ColumnHeader::getColumnName).collect(Collectors.toList());
+ return outputColumnHeaders.stream()
+ .map(ColumnHeader::getColumnName)
+ .collect(Collectors.toList());
}
@Override
public List<TSDataType> getOutputColumnTypes() {
- return columnHeaders.stream().map(ColumnHeader::getColumnType).collect(Collectors.toList());
+ return outputColumnHeaders.stream()
+ .map(ColumnHeader::getColumnType)
+ .collect(Collectors.toList());
}
public OrderBy getMergeOrder() {
@@ -147,14 +140,13 @@ public class DeviceMergeNode extends ProcessNode implements IOutputPlanNode {
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.DEVICE_MERGE.serialize(byteBuffer);
ReadWriteIOUtils.write(mergeOrder.ordinal(), byteBuffer);
- filterNullComponent.serialize(byteBuffer);
ReadWriteIOUtils.write(childDeviceNodeMap.size(), byteBuffer);
for (Map.Entry<String, PlanNode> e : childDeviceNodeMap.entrySet()) {
ReadWriteIOUtils.write(e.getKey(), byteBuffer);
e.getValue().serialize(byteBuffer);
}
- ReadWriteIOUtils.write(columnHeaders.size(), byteBuffer);
- for (ColumnHeader columnHeader : columnHeaders) {
+ ReadWriteIOUtils.write(outputColumnHeaders.size(), byteBuffer);
+ for (ColumnHeader columnHeader : outputColumnHeaders) {
columnHeader.serialize(byteBuffer);
}
}
@@ -162,7 +154,6 @@ public class DeviceMergeNode extends ProcessNode implements IOutputPlanNode {
public static DeviceMergeNode deserialize(ByteBuffer byteBuffer) {
int orderByIndex = ReadWriteIOUtils.readInt(byteBuffer);
OrderBy orderBy = OrderBy.values()[orderByIndex];
- FilterNullComponent filterNullComponent = FilterNullComponent.deserialize(byteBuffer);
Map<String, PlanNode> childDeviceNodeMap = new HashMap<>();
int childDeviceNodeMapSize = ReadWriteIOUtils.readInt(byteBuffer);
for (int i = 0; i < childDeviceNodeMapSize; i++) {
@@ -177,9 +168,8 @@ public class DeviceMergeNode extends ProcessNode implements IOutputPlanNode {
}
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
DeviceMergeNode deviceMergeNode = new DeviceMergeNode(planNodeId, orderBy);
- deviceMergeNode.filterNullComponent = filterNullComponent;
deviceMergeNode.childDeviceNodeMap = childDeviceNodeMap;
- deviceMergeNode.columnHeaders.addAll(columnHeaders);
+ deviceMergeNode.outputColumnHeaders.addAll(columnHeaders);
return deviceMergeNode;
}
@@ -207,12 +197,11 @@ public class DeviceMergeNode extends ProcessNode implements IOutputPlanNode {
DeviceMergeNode that = (DeviceMergeNode) o;
return mergeOrder == that.mergeOrder
- && Objects.equals(filterNullComponent, that.filterNullComponent)
&& Objects.equals(childDeviceNodeMap, that.childDeviceNodeMap);
}
@Override
public int hashCode() {
- return Objects.hash(mergeOrder, filterNullComponent, childDeviceNodeMap);
+ return Objects.hash(mergeOrder, childDeviceNodeMap);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index dc61098d7a..82998127ba 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -21,12 +21,14 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import com.google.common.collect.ImmutableList;
@@ -86,6 +88,21 @@ public class ExchangeNode extends PlanNode {
return CHILD_COUNT_NO_LIMIT;
}
+ @Override
+ public List<ColumnHeader> getOutputColumnHeaders() {
+ return null;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return null;
+ }
+
+ @Override
+ public List<TSDataType> getOutputColumnTypes() {
+ return null;
+ }
+
public void setUpstream(TEndPoint endPoint, FragmentInstanceId instanceId, PlanNodeId nodeId) {
this.upstreamEndpoint = endPoint;
this.upstreamInstanceId = instanceId;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
index b0ccf48018..b800b3308a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
-import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
@@ -38,7 +37,7 @@ import java.util.List;
import java.util.Objects;
/** FillNode is used to fill the empty field in one row. */
-public class FillNode extends ProcessNode implements IOutputPlanNode {
+public class FillNode extends ProcessNode {
private PlanNode child;
@@ -76,17 +75,17 @@ public class FillNode extends ProcessNode implements IOutputPlanNode {
@Override
public List<ColumnHeader> getOutputColumnHeaders() {
- return ((IOutputPlanNode) child).getOutputColumnHeaders();
+ return child.getOutputColumnHeaders();
}
@Override
public List<String> getOutputColumnNames() {
- return ((IOutputPlanNode) child).getOutputColumnNames();
+ return child.getOutputColumnNames();
}
@Override
public List<TSDataType> getOutputColumnTypes() {
- return ((IOutputPlanNode) child).getOutputColumnTypes();
+ return child.getOutputColumnTypes();
}
public FillPolicy getFillPolicy() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
index 25a117cb6f..0d74ee1092 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
-import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
@@ -40,7 +39,7 @@ import java.util.Objects;
import java.util.stream.Collectors;
/** The FilterNode is responsible to filter the RowRecord from TsBlock. */
-public class FilterNode extends ProcessNode implements IOutputPlanNode {
+public class FilterNode extends ProcessNode {
private PlanNode child;
@@ -58,10 +57,9 @@ public class FilterNode extends ProcessNode implements IOutputPlanNode {
this(id, predicate);
this.child = child;
this.columnHeaders =
- ((IOutputPlanNode) child)
- .getOutputColumnHeaders().stream()
- .filter(columnHeader -> outputColumnNames.contains(columnHeader.getColumnName()))
- .collect(Collectors.toList());
+ child.getOutputColumnHeaders().stream()
+ .filter(columnHeader -> outputColumnNames.contains(columnHeader.getColumnName()))
+ .collect(Collectors.toList());
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
index ecde0204d4..3a2201a489 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
@@ -20,15 +20,15 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
-import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.FilterNullParameter;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.InputLocation;
import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import com.google.common.collect.ImmutableList;
@@ -37,29 +37,36 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
-/** WithoutNode is used to discard specific rows from upstream node. */
-public class FilterNullNode extends ProcessNode implements IOutputPlanNode {
+/** FilterNullNode is used to discard specific rows from upstream node. */
+public class FilterNullNode extends ProcessNode {
- // The policy to discard the result from upstream operator
- private final FilterNullPolicy discardPolicy;
-
- private final List<String> filterNullColumnNames;
+ FilterNullParameter filterNullParameter;
private PlanNode child;
+ public FilterNullNode(PlanNodeId id, FilterNullParameter filterNullParameter) {
+ super(id);
+ this.filterNullParameter = filterNullParameter;
+ }
+
public FilterNullNode(
- PlanNodeId id, FilterNullPolicy policy, List<String> filterNullColumnNames) {
+ PlanNodeId id, FilterNullPolicy filterNullPolicy, List<InputLocation> filterNullColumns) {
super(id);
- this.discardPolicy = policy;
- this.filterNullColumnNames = filterNullColumnNames;
+ this.filterNullParameter = new FilterNullParameter(filterNullPolicy, filterNullColumns);
}
public FilterNullNode(
PlanNodeId id,
PlanNode child,
- FilterNullPolicy discardPolicy,
- List<String> filterNullColumnNames) {
- this(id, discardPolicy, filterNullColumnNames);
+ FilterNullPolicy filterNullPolicy,
+ List<InputLocation> filterNullColumns) {
+ super(id);
+ this.child = child;
+ this.filterNullParameter = new FilterNullParameter(filterNullPolicy, filterNullColumns);
+ }
+
+ public FilterNullNode(PlanNodeId id, PlanNode child, FilterNullParameter filterNullParameter) {
+ this(id, filterNullParameter);
this.child = child;
}
@@ -75,7 +82,7 @@ public class FilterNullNode extends ProcessNode implements IOutputPlanNode {
@Override
public PlanNode clone() {
- return new FilterNullNode(getPlanNodeId(), discardPolicy, filterNullColumnNames);
+ return new FilterNullNode(getPlanNodeId(), filterNullParameter);
}
@Override
@@ -85,25 +92,25 @@ public class FilterNullNode extends ProcessNode implements IOutputPlanNode {
@Override
public List<ColumnHeader> getOutputColumnHeaders() {
- return ((IOutputPlanNode) child).getOutputColumnHeaders();
+ return child.getOutputColumnHeaders();
}
@Override
public List<String> getOutputColumnNames() {
- return ((IOutputPlanNode) child).getOutputColumnNames();
+ return child.getOutputColumnNames();
}
@Override
public List<TSDataType> getOutputColumnTypes() {
- return ((IOutputPlanNode) child).getOutputColumnTypes();
+ return child.getOutputColumnTypes();
}
public FilterNullPolicy getDiscardPolicy() {
- return discardPolicy;
+ return filterNullParameter.getFilterNullPolicy();
}
- public List<String> getFilterNullColumnNames() {
- return filterNullColumnNames;
+ public List<InputLocation> getFilterNullColumns() {
+ return filterNullParameter.getFilterNullColumns();
}
@Override
@@ -114,30 +121,13 @@ public class FilterNullNode extends ProcessNode implements IOutputPlanNode {
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.FILTER_NULL.serialize(byteBuffer);
- ReadWriteIOUtils.write(discardPolicy.ordinal(), byteBuffer);
- if (filterNullColumnNames == null) {
- ReadWriteIOUtils.write(-1, byteBuffer);
- } else {
- ReadWriteIOUtils.write(filterNullColumnNames.size(), byteBuffer);
- for (String filterNullColumnName : filterNullColumnNames) {
- ReadWriteIOUtils.write(filterNullColumnName, byteBuffer);
- }
- }
+ filterNullParameter.serialize(byteBuffer);
}
public static FilterNullNode deserialize(ByteBuffer byteBuffer) {
- FilterNullPolicy filterNullPolicy =
- FilterNullPolicy.values()[ReadWriteIOUtils.readInt(byteBuffer)];
- int size = ReadWriteIOUtils.readInt(byteBuffer);
- List<String> filterNullColumnNames = null;
- if (size != -1) {
- filterNullColumnNames = new ArrayList<>();
- for (int i = 0; i < size; i++) {
- filterNullColumnNames.add(ReadWriteIOUtils.readString(byteBuffer));
- }
- }
+ FilterNullParameter filterNullParameter = FilterNullParameter.deserialize(byteBuffer);
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new FilterNullNode(planNodeId, filterNullPolicy, filterNullColumnNames);
+ return new FilterNullNode(planNodeId, filterNullParameter);
}
@TestOnly
@@ -145,7 +135,7 @@ public class FilterNullNode extends ProcessNode implements IOutputPlanNode {
String title = String.format("[FilterNullNode (%s)]", this.getPlanNodeId());
List<String> attributes = new ArrayList<>();
attributes.add("FilterNullPolicy: " + this.getDiscardPolicy());
- attributes.add("FilterNullColumnNames: " + this.getFilterNullColumnNames());
+ attributes.add("FilterNullColumns: " + this.getFilterNullColumns());
return new Pair<>(title, attributes);
}
@@ -160,13 +150,12 @@ public class FilterNullNode extends ProcessNode implements IOutputPlanNode {
}
FilterNullNode that = (FilterNullNode) o;
- return discardPolicy == that.discardPolicy
- && Objects.equals(child, that.child)
- && Objects.equals(filterNullColumnNames, that.filterNullColumnNames);
+ return Objects.equals(filterNullParameter, that.filterNullParameter)
+ && Objects.equals(child, that.child);
}
@Override
public int hashCode() {
- return Objects.hash(discardPolicy, child, filterNullColumnNames);
+ return Objects.hash(child, filterNullParameter);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
index dc06dec753..45ec5b0aa9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
@@ -20,11 +20,12 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
-import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.OutputColumn;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -35,7 +36,6 @@ import com.google.common.collect.ImmutableList;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -55,15 +55,25 @@ import java.util.stream.Collectors;
* <p>If the group by level parameter is [0, 2], then these two columns will not belong to one
* bucket. And the total buckets are `root.*.d1.s1` and `root.*.d2.s1`
*/
-public class GroupByLevelNode extends ProcessNode implements IOutputPlanNode {
+@Deprecated // TODO: delete later
+public class GroupByLevelNode extends ProcessNode {
private final int[] groupByLevels;
- private final Map<ColumnHeader, ColumnHeader> groupedPathMap;
+ @Deprecated private Map<ColumnHeader, ColumnHeader> groupedPathMap;
- private PlanNode child;
+ // The list of aggregation functions, each aggregation will be output as one column of result
+ // TsBlock
+ private List<AggregationType> aggregateFuncList = new ArrayList<>();
+
+ // indicate each output column should use which value column of which input TsBlock and the
+ // overlapped situation
+ private List<OutputColumn> outputColumns = new ArrayList<>();
- private final List<ColumnHeader> columnHeaders;
+ // column name and datatype of each output column
+ private List<ColumnHeader> outputColumnHeaders;
+
+ private PlanNode child;
public GroupByLevelNode(
PlanNodeId id,
@@ -74,7 +84,22 @@ public class GroupByLevelNode extends ProcessNode implements IOutputPlanNode {
this.child = child;
this.groupByLevels = groupByLevels;
this.groupedPathMap = groupedPathMap;
- this.columnHeaders = groupedPathMap.values().stream().distinct().collect(Collectors.toList());
+ this.outputColumnHeaders =
+ groupedPathMap.values().stream().distinct().collect(Collectors.toList());
+ }
+
+ public GroupByLevelNode(
+ PlanNodeId id,
+ PlanNode child,
+ int[] groupByLevels,
+ List<AggregationType> aggregateFuncList,
+ List<OutputColumn> outputColumns) {
+ super(id);
+ this.child = child;
+ this.groupByLevels = groupByLevels;
+ this.aggregateFuncList = aggregateFuncList;
+ this.outputColumns = outputColumns;
+ // TODO: init outputColumnHeaders
}
@Override
@@ -103,17 +128,21 @@ public class GroupByLevelNode extends ProcessNode implements IOutputPlanNode {
@Override
public List<ColumnHeader> getOutputColumnHeaders() {
- return columnHeaders;
+ return outputColumnHeaders;
}
@Override
public List<String> getOutputColumnNames() {
- return columnHeaders.stream().map(ColumnHeader::getColumnName).collect(Collectors.toList());
+ return outputColumnHeaders.stream()
+ .map(ColumnHeader::getColumnName)
+ .collect(Collectors.toList());
}
@Override
public List<TSDataType> getOutputColumnTypes() {
- return columnHeaders.stream().map(ColumnHeader::getColumnType).collect(Collectors.toList());
+ return outputColumnHeaders.stream()
+ .map(ColumnHeader::getColumnType)
+ .collect(Collectors.toList());
}
@Override
@@ -125,13 +154,16 @@ public class GroupByLevelNode extends ProcessNode implements IOutputPlanNode {
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.GROUP_BY_LEVEL.serialize(byteBuffer);
ReadWriteIOUtils.write(groupByLevels.length, byteBuffer);
- for (int i = 0; i < groupByLevels.length; i++) {
- ReadWriteIOUtils.write(groupByLevels[i], byteBuffer);
+ for (int level : groupByLevels) {
+ ReadWriteIOUtils.write(level, byteBuffer);
+ }
+ ReadWriteIOUtils.write(aggregateFuncList.size(), byteBuffer);
+ for (AggregationType aggregationType : aggregateFuncList) {
+ ReadWriteIOUtils.write(aggregationType.ordinal(), byteBuffer);
}
- ReadWriteIOUtils.write(groupedPathMap.size(), byteBuffer);
- for (Map.Entry<ColumnHeader, ColumnHeader> e : groupedPathMap.entrySet()) {
- e.getKey().serialize(byteBuffer);
- e.getValue().serialize(byteBuffer);
+ ReadWriteIOUtils.write(outputColumns.size(), byteBuffer);
+ for (OutputColumn outputColumn : outputColumns) {
+ outputColumn.serialize(byteBuffer);
}
}
@@ -141,14 +173,18 @@ public class GroupByLevelNode extends ProcessNode implements IOutputPlanNode {
for (int i = 0; i < groupByLevelSize; i++) {
groupByLevels[i] = ReadWriteIOUtils.readInt(byteBuffer);
}
- int mapSize = ReadWriteIOUtils.readInt(byteBuffer);
- Map<ColumnHeader, ColumnHeader> groupedPathMap = new HashMap<>();
- for (int i = 0; i < mapSize; i++) {
- groupedPathMap.put(
- ColumnHeader.deserialize(byteBuffer), ColumnHeader.deserialize(byteBuffer));
+ int aggregateFuncListSize = ReadWriteIOUtils.readInt(byteBuffer);
+ List<AggregationType> aggregateFuncList = new ArrayList<>(aggregateFuncListSize);
+ for (int i = 0; i < aggregateFuncListSize; i++) {
+ aggregateFuncList.add(AggregationType.values()[ReadWriteIOUtils.readInt(byteBuffer)]);
+ }
+ int outputColumnsSize = ReadWriteIOUtils.readInt(byteBuffer);
+ List<OutputColumn> outputColumns = new ArrayList<>(outputColumnsSize);
+ for (int i = 0; i < outputColumnsSize; i++) {
+ outputColumns.add(OutputColumn.deserialize(byteBuffer));
}
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new GroupByLevelNode(planNodeId, null, groupByLevels, groupedPathMap);
+ return new GroupByLevelNode(planNodeId, null, groupByLevels, aggregateFuncList, outputColumns);
}
@TestOnly
@@ -173,12 +209,13 @@ public class GroupByLevelNode extends ProcessNode implements IOutputPlanNode {
GroupByLevelNode that = (GroupByLevelNode) o;
return Objects.equals(child, that.child)
&& Arrays.equals(groupByLevels, that.groupByLevels)
- && Objects.equals(groupedPathMap, that.groupedPathMap);
+ && Objects.equals(aggregateFuncList, that.aggregateFuncList)
+ && Objects.equals(outputColumns, that.outputColumns);
}
@Override
public int hashCode() {
- int result = Objects.hash(child, groupedPathMap);
+ int result = Objects.hash(super.hashCode(), aggregateFuncList, outputColumns);
result = 31 * result + Arrays.hashCode(groupByLevels);
return result;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
index ae0291edb1..fd7d9b1477 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
-import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
@@ -37,7 +36,7 @@ import java.util.List;
import java.util.Objects;
/** LimitNode is used to select top n result. It uses the default order of upstream nodes */
-public class LimitNode extends ProcessNode implements IOutputPlanNode {
+public class LimitNode extends ProcessNode {
// The limit count
private final int limit;
@@ -75,17 +74,17 @@ public class LimitNode extends ProcessNode implements IOutputPlanNode {
@Override
public List<ColumnHeader> getOutputColumnHeaders() {
- return ((IOutputPlanNode) child).getOutputColumnHeaders();
+ return child.getOutputColumnHeaders();
}
@Override
public List<String> getOutputColumnNames() {
- return ((IOutputPlanNode) child).getOutputColumnNames();
+ return child.getOutputColumnNames();
}
@Override
public List<TSDataType> getOutputColumnTypes() {
- return ((IOutputPlanNode) child).getOutputColumnTypes();
+ return child.getOutputColumnTypes();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
index 15544e4f64..88f26b81d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
-import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
@@ -40,7 +39,7 @@ import java.util.Objects;
* OffsetNode is used to skip top n result from upstream nodes. It uses the default order of
* upstream nodes
*/
-public class OffsetNode extends ProcessNode implements IOutputPlanNode {
+public class OffsetNode extends ProcessNode {
// The limit count
private PlanNode child;
@@ -78,17 +77,17 @@ public class OffsetNode extends ProcessNode implements IOutputPlanNode {
@Override
public List<ColumnHeader> getOutputColumnHeaders() {
- return ((IOutputPlanNode) child).getOutputColumnHeaders();
+ return child.getOutputColumnHeaders();
}
@Override
public List<String> getOutputColumnNames() {
- return ((IOutputPlanNode) child).getOutputColumnNames();
+ return child.getOutputColumnNames();
}
@Override
public List<TSDataType> getOutputColumnTypes() {
- return ((IOutputPlanNode) child).getOutputColumnTypes();
+ return child.getOutputColumnTypes();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
index c25262c723..2973ae400e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
-import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
@@ -41,13 +40,13 @@ import java.util.Objects;
* In general, the parameter in sortNode should be pushed down to the upstream operators. In our
* optimized logical query plan, the sortNode should not appear.
*/
-public class SortNode extends ProcessNode implements IOutputPlanNode {
+public class SortNode extends ProcessNode {
private PlanNode child;
private final List<String> orderBy;
- private OrderBy sortOrder;
+ private final OrderBy sortOrder;
public SortNode(PlanNodeId id, List<String> orderBy, OrderBy sortOrder) {
super(id);
@@ -82,17 +81,17 @@ public class SortNode extends ProcessNode implements IOutputPlanNode {
@Override
public List<ColumnHeader> getOutputColumnHeaders() {
- return ((IOutputPlanNode) child).getOutputColumnHeaders();
+ return child.getOutputColumnHeaders();
}
@Override
public List<String> getOutputColumnNames() {
- return ((IOutputPlanNode) child).getOutputColumnNames();
+ return child.getOutputColumnNames();
}
@Override
public List<TSDataType> getOutputColumnTypes() {
- return ((IOutputPlanNode) child).getOutputColumnTypes();
+ return child.getOutputColumnTypes();
}
public OrderBy getSortOrder() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
index 455ace6a6b..b34dd68bda 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
@@ -20,14 +20,13 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
-import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.InputLocation;
-import org.apache.iotdb.db.mpp.sql.planner.plan.OutputColumn;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.FilterNullParameter;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.OutputColumn;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -45,7 +44,7 @@ import java.util.stream.Collectors;
* TimeJoinOperator is sorted by timestamp
*/
// TODO: define the TimeJoinMergeNode for distributed plan
-public class TimeJoinNode extends ProcessNode implements IOutputPlanNode {
+public class TimeJoinNode extends ProcessNode {
// This parameter indicates the order when executing multiway merge sort.
private final OrderBy mergeOrder;
@@ -54,16 +53,17 @@ public class TimeJoinNode extends ProcessNode implements IOutputPlanNode {
// The without policy is able to be push down to the TimeJoinOperator because we can know whether
// a row contains
// null or not.
- private FilterNullPolicy filterNullPolicy = FilterNullPolicy.NO_FILTER;
-
- private List<PlanNode> children;
-
- private List<ColumnHeader> columnHeaders = new ArrayList<>();
+ private FilterNullParameter filterNullParameter;
// indicate each output column should use which value column of which input TsBlock and the
// overlapped situation
// size of outputColumns must be equal to the size of columnHeaders
- private List<OutputColumn> outputColumns;
+ private List<OutputColumn> outputColumns = new ArrayList<>();
+
+ // column name and datatype of each output column
+ private final List<ColumnHeader> outputColumnHeaders = new ArrayList<>();
+
+ private List<PlanNode> children;
public TimeJoinNode(PlanNodeId id, OrderBy mergeOrder) {
super(id);
@@ -74,12 +74,7 @@ public class TimeJoinNode extends ProcessNode implements IOutputPlanNode {
public TimeJoinNode(PlanNodeId id, OrderBy mergeOrder, List<PlanNode> children) {
this(id, mergeOrder);
this.children = children;
- initColumnHeaders();
- outputColumns = new ArrayList<>(columnHeaders.size());
- // TODO need to be fixed by minghui, construct outputColumns in LogicalPlanner
- for (int i = 0; i < columnHeaders.size(); i++) {
- outputColumns.add(new OutputColumn(new InputLocation(i, 0)));
- }
+ initOutputColumns();
}
@Override
@@ -91,7 +86,7 @@ public class TimeJoinNode extends ProcessNode implements IOutputPlanNode {
public PlanNode clone() {
// TODO: (xingtanzjr)
TimeJoinNode cloneNode = new TimeJoinNode(getPlanNodeId(), this.mergeOrder);
- cloneNode.columnHeaders = this.columnHeaders;
+ cloneNode.outputColumns = this.outputColumns;
return cloneNode;
}
@@ -100,25 +95,40 @@ public class TimeJoinNode extends ProcessNode implements IOutputPlanNode {
return CHILD_COUNT_NO_LIMIT;
}
- private void initColumnHeaders() {
- for (PlanNode child : children) {
- columnHeaders.addAll(((IOutputPlanNode) child).getOutputColumnHeaders());
+ public List<OutputColumn> getOutputColumns() {
+ return outputColumns;
+ }
+
+ private void initOutputColumns() {
+ for (int tsBlockIndex = 0; tsBlockIndex < children.size(); tsBlockIndex++) {
+ List<ColumnHeader> childColumnHeaders = children.get(tsBlockIndex).getOutputColumnHeaders();
+ for (int valueColumnIndex = 0;
+ valueColumnIndex < childColumnHeaders.size();
+ valueColumnIndex++) {
+ InputLocation inputLocation = new InputLocation(tsBlockIndex, valueColumnIndex);
+ outputColumns.add(new OutputColumn(inputLocation));
+ }
+ outputColumnHeaders.addAll(childColumnHeaders);
}
}
@Override
public List<ColumnHeader> getOutputColumnHeaders() {
- return columnHeaders;
+ return outputColumnHeaders;
}
@Override
public List<String> getOutputColumnNames() {
- return columnHeaders.stream().map(ColumnHeader::getColumnName).collect(Collectors.toList());
+ return outputColumnHeaders.stream()
+ .map(ColumnHeader::getColumnName)
+ .collect(Collectors.toList());
}
@Override
public List<TSDataType> getOutputColumnTypes() {
- return columnHeaders.stream().map(ColumnHeader::getColumnType).collect(Collectors.toList());
+ return outputColumnHeaders.stream()
+ .map(ColumnHeader::getColumnType)
+ .collect(Collectors.toList());
}
@Override
@@ -130,30 +140,35 @@ public class TimeJoinNode extends ProcessNode implements IOutputPlanNode {
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.TIME_JOIN.serialize(byteBuffer);
ReadWriteIOUtils.write(mergeOrder.ordinal(), byteBuffer);
- ReadWriteIOUtils.write(filterNullPolicy.ordinal(), byteBuffer);
- ReadWriteIOUtils.write(columnHeaders.size(), byteBuffer);
- for (ColumnHeader columnHeader : columnHeaders) {
+ filterNullParameter.serialize(byteBuffer);
+ ReadWriteIOUtils.write(outputColumns.size(), byteBuffer);
+ for (OutputColumn outputColumn : outputColumns) {
+ outputColumn.serialize(byteBuffer);
+ }
+ ReadWriteIOUtils.write(outputColumnHeaders.size(), byteBuffer);
+ for (ColumnHeader columnHeader : outputColumnHeaders) {
columnHeader.serialize(byteBuffer);
}
}
- public List<OutputColumn> getOutputColumns() {
- return outputColumns;
- }
-
public static TimeJoinNode deserialize(ByteBuffer byteBuffer) {
OrderBy orderBy = OrderBy.values()[ReadWriteIOUtils.readInt(byteBuffer)];
- FilterNullPolicy filterNullPolicy =
- FilterNullPolicy.values()[ReadWriteIOUtils.readInt(byteBuffer)];
- int columnHeaderSize = ReadWriteIOUtils.readInt(byteBuffer);
- List<ColumnHeader> columnHeaders = new ArrayList<>();
- for (int i = 0; i < columnHeaderSize; i++) {
- columnHeaders.add(ColumnHeader.deserialize(byteBuffer));
+ FilterNullParameter filterNullParameter = FilterNullParameter.deserialize(byteBuffer);
+ int outputColumnSize = ReadWriteIOUtils.readInt(byteBuffer);
+ List<OutputColumn> outputColumns = new ArrayList<>(outputColumnSize);
+ for (int i = 0; i < outputColumnSize; i++) {
+ outputColumns.add(OutputColumn.deserialize(byteBuffer));
+ }
+ int outputColumnHeadersSize = ReadWriteIOUtils.readInt(byteBuffer);
+ List<ColumnHeader> outputColumnHeaders = new ArrayList<>(outputColumnHeadersSize);
+ for (int i = 0; i < outputColumnHeadersSize; i++) {
+ outputColumnHeaders.add(ColumnHeader.deserialize(byteBuffer));
}
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
TimeJoinNode timeJoinNode = new TimeJoinNode(planNodeId, orderBy);
- timeJoinNode.columnHeaders.addAll(columnHeaders);
- timeJoinNode.filterNullPolicy = filterNullPolicy;
+ timeJoinNode.outputColumns.addAll(outputColumns);
+ timeJoinNode.outputColumnHeaders.addAll(outputColumnHeaders);
+ timeJoinNode.setFilterNullParameter(filterNullParameter);
return timeJoinNode;
}
@@ -171,12 +186,12 @@ public class TimeJoinNode extends ProcessNode implements IOutputPlanNode {
return mergeOrder;
}
- public FilterNullPolicy getFilterNullPolicy() {
- return filterNullPolicy;
+ public FilterNullParameter getFilterNullParameter() {
+ return filterNullParameter;
}
- public void setWithoutPolicy(FilterNullPolicy filterNullPolicy) {
- this.filterNullPolicy = filterNullPolicy;
+ public void setFilterNullParameter(FilterNullParameter filterNullParameter) {
+ this.filterNullParameter = filterNullParameter;
}
public String toString() {
@@ -190,7 +205,9 @@ public class TimeJoinNode extends ProcessNode implements IOutputPlanNode {
attributes.add("MergeOrder: " + (this.getMergeOrder() == null ? "null" : this.getMergeOrder()));
attributes.add(
"FilterNullPolicy: "
- + (this.getFilterNullPolicy() == null ? "null" : this.getFilterNullPolicy()));
+ + (this.getFilterNullParameter() == null
+ ? "null"
+ : this.getFilterNullParameter().getFilterNullPolicy()));
return new Pair<>(title, attributes);
}
@@ -206,12 +223,12 @@ public class TimeJoinNode extends ProcessNode implements IOutputPlanNode {
TimeJoinNode that = (TimeJoinNode) o;
return mergeOrder == that.mergeOrder
- && filterNullPolicy == that.filterNullPolicy
+ && Objects.equals(filterNullParameter, that.filterNullParameter)
&& Objects.equals(children, that.children);
}
@Override
public int hashCode() {
- return Objects.hash(mergeOrder, filterNullPolicy, children);
+ return Objects.hash(mergeOrder, filterNullParameter, children);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
index c32b2b2062..48ce66b0ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
@@ -20,10 +20,12 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import com.google.common.collect.ImmutableList;
@@ -83,6 +85,21 @@ public class FragmentSinkNode extends SinkNode {
return ONE_CHILD;
}
+ @Override
+ public List<ColumnHeader> getOutputColumnHeaders() {
+ return null;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return null;
+ }
+
+ @Override
+ public List<TSDataType> getOutputColumnTypes() {
+ return null;
+ }
+
public static FragmentSinkNode deserialize(ByteBuffer byteBuffer) {
TEndPoint downStreamEndpoint =
new TEndPoint(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
index 88bb0742bb..dc58e42413 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
@@ -24,12 +24,12 @@ import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
-import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.mpp.sql.statement.component.GroupByTimeComponent;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.Aggregation;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
@@ -67,28 +67,31 @@ import java.util.stream.Collectors;
* represent the whole aggregation result of this series. And the timestamp will be 0, which is
* meaningless.
*/
-public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNode {
+public class SeriesAggregateScanNode extends SourceNode {
// The series path and aggregation functions on this series.
// (Currently, we only support one series in the aggregation function)
private final PartialPath seriesPath;
- private final List<AggregationType> aggregateFuncList;
+ @Deprecated private final List<AggregationType> aggregateFuncList;
+ private List<Aggregation> aggregationList;
// all the sensors in seriesPath's device of current query
- private Set<String> allSensors;
+ private final Set<String> allSensors;
// The order to traverse the data.
// Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here.
// The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
private final OrderBy scanOrder;
+ // time filter for current series, could be null if doesn't exist
private final Filter timeFilter;
// The parameter of `group by time`
// Its value will be null if there is no `group by time` clause,
- private final GroupByTimeComponent groupByTimeParameter;
+ private final GroupByTimeParameter groupByTimeParameter;
- private List<ColumnHeader> columnHeaders;
+ // contain output column headers of this node
+ private final List<ColumnHeader> outputColumnHeaders;
// The id of DataRegion where the node will run
private TRegionReplicaSet regionReplicaSet;
@@ -100,7 +103,7 @@ public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNo
List<AggregationType> aggregateFuncList,
OrderBy scanOrder,
Filter timeFilter,
- GroupByTimeComponent groupByTimeParameter) {
+ GroupByTimeParameter groupByTimeParameter) {
super(id);
this.seriesPath = seriesPath;
this.allSensors = allSensors;
@@ -108,7 +111,7 @@ public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNo
this.scanOrder = scanOrder;
this.timeFilter = timeFilter;
this.groupByTimeParameter = groupByTimeParameter;
- this.columnHeaders =
+ this.outputColumnHeaders =
aggregateFuncList.stream()
.map(
functionType ->
@@ -129,7 +132,7 @@ public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNo
return timeFilter;
}
- public GroupByTimeComponent getGroupByTimeParameter() {
+ public GroupByTimeParameter getGroupByTimeParameter() {
return groupByTimeParameter;
}
@@ -153,17 +156,21 @@ public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNo
@Override
public List<ColumnHeader> getOutputColumnHeaders() {
- return columnHeaders;
+ return outputColumnHeaders;
}
@Override
public List<String> getOutputColumnNames() {
- return columnHeaders.stream().map(ColumnHeader::getColumnName).collect(Collectors.toList());
+ return outputColumnHeaders.stream()
+ .map(ColumnHeader::getColumnName)
+ .collect(Collectors.toList());
}
@Override
public List<TSDataType> getOutputColumnTypes() {
- return columnHeaders.stream().map(ColumnHeader::getColumnType).collect(Collectors.toList());
+ return outputColumnHeaders.stream()
+ .map(ColumnHeader::getColumnType)
+ .collect(Collectors.toList());
}
@Override
@@ -227,7 +234,7 @@ public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNo
OrderBy scanOrder = OrderBy.values()[ReadWriteIOUtils.readInt(byteBuffer)];
Filter timeFilter = FilterFactory.deserialize(byteBuffer);
- GroupByTimeComponent groupByTimeComponent = GroupByTimeComponent.deserialize(byteBuffer);
+ GroupByTimeParameter groupByTimeParameter = GroupByTimeParameter.deserialize(byteBuffer);
TRegionReplicaSet regionReplicaSet = ThriftCommonsSerDeUtils.readTRegionReplicaSet(byteBuffer);
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
SeriesAggregateScanNode seriesAggregateScanNode =
@@ -238,7 +245,7 @@ public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNo
aggregateFuncList,
scanOrder,
timeFilter,
- groupByTimeComponent);
+ groupByTimeParameter);
seriesAggregateScanNode.regionReplicaSet = regionReplicaSet;
return seriesAggregateScanNode;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
index 76327e002f..adc814e043 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
-import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
@@ -52,7 +51,7 @@ import java.util.Set;
*
* <p>Children type: no child is allowed for SeriesScanNode
*/
-public class SeriesScanNode extends SourceNode implements IOutputPlanNode {
+public class SeriesScanNode extends SourceNode {
// The path of the target series which will be scanned.
private final PartialPath seriesPath;
@@ -77,7 +76,8 @@ public class SeriesScanNode extends SourceNode implements IOutputPlanNode {
// offset for result set. The default value is 0
private int offset;
- private ColumnHeader columnHeader;
+ // contain output column header of this node
+ private ColumnHeader outputColumnHeader;
// The id of DataRegion where the node will run
private TRegionReplicaSet regionReplicaSet;
@@ -93,7 +93,8 @@ public class SeriesScanNode extends SourceNode implements IOutputPlanNode {
this.seriesPath = seriesPath;
this.allSensors = allSensors;
this.scanOrder = scanOrder;
- this.columnHeader = new ColumnHeader(seriesPath.getFullPath(), seriesPath.getSeriesType());
+ this.outputColumnHeader =
+ new ColumnHeader(seriesPath.getFullPath(), seriesPath.getSeriesType());
}
public SeriesScanNode(PlanNodeId id, PartialPath seriesPath, TRegionReplicaSet regionReplicaSet) {
@@ -161,17 +162,17 @@ public class SeriesScanNode extends SourceNode implements IOutputPlanNode {
@Override
public List<ColumnHeader> getOutputColumnHeaders() {
- return ImmutableList.of(columnHeader);
+ return ImmutableList.of(outputColumnHeader);
}
@Override
public List<String> getOutputColumnNames() {
- return ImmutableList.of(columnHeader.getColumnName());
+ return ImmutableList.of(outputColumnHeader.getColumnName());
}
@Override
public List<TSDataType> getOutputColumnTypes() {
- return ImmutableList.of(columnHeader.getColumnType());
+ return ImmutableList.of(outputColumnHeader.getColumnType());
}
public Set<String> getAllSensors() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
index ee66e98396..2c15674023 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -21,11 +21,13 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.utils.StatusUtils;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -160,6 +162,21 @@ public class InsertMultiTabletsNode extends InsertNode {
return NO_CHILD_ALLOWED;
}
+ @Override
+ public List<ColumnHeader> getOutputColumnHeaders() {
+ return null;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return null;
+ }
+
+ @Override
+ public List<TSDataType> getOutputColumnTypes() {
+ return null;
+ }
+
public static InsertMultiTabletsNode deserialize(ByteBuffer byteBuffer) {
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
index eb25abadfe..32ed159e1a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -102,6 +103,21 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
return NO_CHILD_ALLOWED;
}
+ @Override
+ public List<ColumnHeader> getOutputColumnHeaders() {
+ return null;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return null;
+ }
+
+ @Override
+ public List<TSDataType> getOutputColumnTypes() {
+ return null;
+ }
+
@Override
public int serializedSize() {
int size = 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
index 62fad82b81..a2e2ebe4c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
@@ -22,11 +22,13 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -123,6 +125,21 @@ public class InsertRowsNode extends InsertNode {
return NO_CHILD_ALLOWED;
}
+ @Override
+ public List<ColumnHeader> getOutputColumnHeaders() {
+ return null;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return null;
+ }
+
+ @Override
+ public List<TSDataType> getOutputColumnTypes() {
+ return null;
+ }
+
public static InsertRowsNode deserialize(ByteBuffer byteBuffer) {
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index f83912efa9..40b6d7f4b5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -22,11 +22,13 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -104,6 +106,21 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
return NO_CHILD_ALLOWED;
}
+ @Override
+ public List<ColumnHeader> getOutputColumnHeaders() {
+ return null;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return null;
+ }
+
+ @Override
+ public List<TSDataType> getOutputColumnTypes() {
+ return null;
+ }
+
public static InsertRowsOfOneDeviceNode deserialize(ByteBuffer byteBuffer) {
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
index e03cfcaab1..15ed741d5d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -153,6 +154,21 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
return NO_CHILD_ALLOWED;
}
+ @Override
+ public List<ColumnHeader> getOutputColumnHeaders() {
+ return null;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return null;
+ }
+
+ @Override
+ public List<TSDataType> getOutputColumnTypes() {
+ return null;
+ }
+
@Override
public int serializedSize() {
return serializedSize(0, rowCount);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/IOutputPlanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/Aggregation.java
similarity index 60%
rename from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/IOutputPlanNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/Aggregation.java
index 661066a314..62d08ab319 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/IOutputPlanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/Aggregation.java
@@ -17,18 +17,27 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.sql.planner.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan.parameter;
-import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.util.List;
-public interface IOutputPlanNode {
+public class Aggregation {
- List<ColumnHeader> getOutputColumnHeaders();
+ // aggregation function name
+ private AggregationType aggregationType;
- List<String> getOutputColumnNames();
+ // indicate the input and output type
+ private AggregationStep step;
- List<TSDataType> getOutputColumnTypes();
+ // indicate this aggregation should use which value column of which input TsBlock
+ private List<InputLocation> inputLocations;
+
+ // datatype of each input value column
+ private List<TSDataType> inputDateTypes;
+
+ // datatype of output value column
+ private TSDataType outputDateType;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/AggregationStep.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/AggregationStep.java
new file mode 100644
index 0000000000..bf9b78546d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/AggregationStep.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.planner.plan.parameter;
+
+/**
+ * This attribute indicates the input and output type of the {@code Aggregator}.
+ *
+ * <p>There are three types of input/output:
+ *
+ * <ul>
+ * <li>Raw: raw data, as input only
+ * <li>Partial: intermediate aggregation result
+ * <li>Final: final aggregation result, as output only
+ * </ul>
+ */
+public enum AggregationStep {
+
+ // input Raw, output Partial
+ PARTIAL(true, true),
+ // input Partial, output Final
+ FINAL(false, false),
+ // input Partial, output Partial
+ INTERMEDIATE(false, true),
+ // input Raw, output Final
+ SINGLE(true, false);
+
+ private final boolean inputRaw;
+ private final boolean outputPartial;
+
+ AggregationStep(boolean inputRaw, boolean outputPartial) {
+ this.inputRaw = inputRaw;
+ this.outputPartial = outputPartial;
+ }
+
+ public boolean isInputRaw() {
+ return inputRaw;
+ }
+
+ public boolean isOutputPartial() {
+ return outputPartial;
+ }
+
+ public static AggregationStep partialOutput(AggregationStep step) {
+ if (step.isInputRaw()) {
+ return AggregationStep.PARTIAL;
+ }
+ return AggregationStep.INTERMEDIATE;
+ }
+
+ public static AggregationStep partialInput(AggregationStep step) {
+ if (step.isOutputPartial()) {
+ return AggregationStep.INTERMEDIATE;
+ }
+ return AggregationStep.FINAL;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/FilterNullParameter.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/FilterNullParameter.java
new file mode 100644
index 0000000000..a66a7cdd90
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/FilterNullParameter.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.planner.plan.parameter;
+
+import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class FilterNullParameter {
+
+ // The policy to discard the result from upstream npde
+ private final FilterNullPolicy filterNullPolicy;
+
+ // indicate columns used to filter null
+ private final List<InputLocation> filterNullColumns;
+
+ public FilterNullParameter(
+ FilterNullPolicy filterNullPolicy, List<InputLocation> filterNullColumns) {
+ this.filterNullPolicy = filterNullPolicy;
+ this.filterNullColumns = filterNullColumns;
+ }
+
+ public FilterNullPolicy getFilterNullPolicy() {
+ return filterNullPolicy;
+ }
+
+ public List<InputLocation> getFilterNullColumns() {
+ return filterNullColumns;
+ }
+
+ public void serialize(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write(filterNullPolicy.ordinal(), byteBuffer);
+ ReadWriteIOUtils.write(filterNullColumns.size(), byteBuffer);
+ for (InputLocation filterNullColumn : filterNullColumns) {
+ filterNullColumn.serialize(byteBuffer);
+ }
+ }
+
+ public static FilterNullParameter deserialize(ByteBuffer byteBuffer) {
+ FilterNullPolicy filterNullPolicy =
+ FilterNullPolicy.values()[ReadWriteIOUtils.readInt(byteBuffer)];
+ int size = ReadWriteIOUtils.readInt(byteBuffer);
+ List<InputLocation> filterNullColumns = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ filterNullColumns.add(InputLocation.deserialize(byteBuffer));
+ }
+ return new FilterNullParameter(filterNullPolicy, filterNullColumns);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ FilterNullParameter that = (FilterNullParameter) o;
+ return filterNullPolicy == that.filterNullPolicy
+ && Objects.equals(filterNullColumns, that.filterNullColumns);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(filterNullPolicy, filterNullColumns);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByTimeComponent.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/GroupByTimeParameter.java
similarity index 75%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByTimeComponent.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/GroupByTimeParameter.java
index 026297a331..9fa368a67f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByTimeComponent.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/GroupByTimeParameter.java
@@ -17,16 +17,15 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.sql.statement.component;
+package org.apache.iotdb.db.mpp.sql.planner.plan.parameter;
-import org.apache.iotdb.db.mpp.sql.statement.StatementNode;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
import java.util.Objects;
-/** This class maintains information of {@code GROUP BY} clause. */
-public class GroupByTimeComponent extends StatementNode {
+/** The parameter of `GROUP BY TIME` */
+public class GroupByTimeParameter {
// [startTime, endTime)
private long startTime;
@@ -45,14 +44,14 @@ public class GroupByTimeComponent extends StatementNode {
// if it is left close and right open interval
private boolean leftCRightO;
- public GroupByTimeComponent() {}
+ public GroupByTimeParameter() {}
- public GroupByTimeComponent(
+ public GroupByTimeParameter(
long startTime, long endTime, long interval, long slidingStep, boolean leftCRightO) {
this(startTime, endTime, interval, slidingStep, false, false, leftCRightO);
}
- public GroupByTimeComponent(
+ public GroupByTimeParameter(
long startTime,
long endTime,
long interval,
@@ -69,22 +68,6 @@ public class GroupByTimeComponent extends StatementNode {
this.leftCRightO = leftCRightO;
}
- public boolean isLeftCRightO() {
- return leftCRightO;
- }
-
- public void setLeftCRightO(boolean leftCRightO) {
- this.leftCRightO = leftCRightO;
- }
-
- public long getInterval() {
- return interval;
- }
-
- public void setInterval(long interval) {
- this.interval = interval;
- }
-
public long getStartTime() {
return startTime;
}
@@ -101,6 +84,14 @@ public class GroupByTimeComponent extends StatementNode {
this.endTime = endTime;
}
+ public long getInterval() {
+ return interval;
+ }
+
+ public void setInterval(long interval) {
+ this.interval = interval;
+ }
+
public long getSlidingStep() {
return slidingStep;
}
@@ -109,20 +100,28 @@ public class GroupByTimeComponent extends StatementNode {
this.slidingStep = slidingStep;
}
+ public boolean isIntervalByMonth() {
+ return isIntervalByMonth;
+ }
+
+ public void setIntervalByMonth(boolean intervalByMonth) {
+ isIntervalByMonth = intervalByMonth;
+ }
+
public boolean isSlidingStepByMonth() {
return isSlidingStepByMonth;
}
- public void setSlidingStepByMonth(boolean isSlidingStepByMonth) {
- this.isSlidingStepByMonth = isSlidingStepByMonth;
+ public void setSlidingStepByMonth(boolean slidingStepByMonth) {
+ isSlidingStepByMonth = slidingStepByMonth;
}
- public boolean isIntervalByMonth() {
- return isIntervalByMonth;
+ public boolean isLeftCRightO() {
+ return leftCRightO;
}
- public void setIntervalByMonth(boolean isIntervalByMonth) {
- this.isIntervalByMonth = isIntervalByMonth;
+ public void setLeftCRightO(boolean leftCRightO) {
+ this.leftCRightO = leftCRightO;
}
public void serialize(ByteBuffer buffer) {
@@ -135,23 +134,23 @@ public class GroupByTimeComponent extends StatementNode {
ReadWriteIOUtils.write(leftCRightO, buffer);
}
- public static GroupByTimeComponent deserialize(ByteBuffer buffer) {
- GroupByTimeComponent groupByTimeComponent = new GroupByTimeComponent();
- groupByTimeComponent.setStartTime(ReadWriteIOUtils.readLong(buffer));
- groupByTimeComponent.setEndTime(ReadWriteIOUtils.readLong(buffer));
- groupByTimeComponent.setInterval(ReadWriteIOUtils.readLong(buffer));
- groupByTimeComponent.setSlidingStep(ReadWriteIOUtils.readLong(buffer));
- groupByTimeComponent.setIntervalByMonth(ReadWriteIOUtils.readBool(buffer));
- groupByTimeComponent.setSlidingStepByMonth(ReadWriteIOUtils.readBool(buffer));
- groupByTimeComponent.setLeftCRightO(ReadWriteIOUtils.readBool(buffer));
- return groupByTimeComponent;
+ public static GroupByTimeParameter deserialize(ByteBuffer buffer) {
+ GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter();
+ groupByTimeParameter.setStartTime(ReadWriteIOUtils.readLong(buffer));
+ groupByTimeParameter.setEndTime(ReadWriteIOUtils.readLong(buffer));
+ groupByTimeParameter.setInterval(ReadWriteIOUtils.readLong(buffer));
+ groupByTimeParameter.setSlidingStep(ReadWriteIOUtils.readLong(buffer));
+ groupByTimeParameter.setIntervalByMonth(ReadWriteIOUtils.readBool(buffer));
+ groupByTimeParameter.setSlidingStepByMonth(ReadWriteIOUtils.readBool(buffer));
+ groupByTimeParameter.setLeftCRightO(ReadWriteIOUtils.readBool(buffer));
+ return groupByTimeParameter;
}
public boolean equals(Object obj) {
- if (!(obj instanceof GroupByTimeComponent)) {
+ if (!(obj instanceof GroupByTimeParameter)) {
return false;
}
- GroupByTimeComponent other = (GroupByTimeComponent) obj;
+ GroupByTimeParameter other = (GroupByTimeParameter) obj;
return this.startTime == other.startTime
&& this.endTime == other.endTime
&& this.interval == other.interval
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/InputLocation.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/InputLocation.java
similarity index 55%
rename from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/InputLocation.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/InputLocation.java
index 705d8cbace..fda0f4a957 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/InputLocation.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/InputLocation.java
@@ -16,7 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.sql.planner.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan.parameter;
+
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
public class InputLocation {
// which input tsblock
@@ -36,4 +41,32 @@ public class InputLocation {
public int getValueColumnIndex() {
return valueColumnIndex;
}
+
+ public void serialize(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write(tsBlockIndex, byteBuffer);
+ ReadWriteIOUtils.write(valueColumnIndex, byteBuffer);
+ }
+
+ public static InputLocation deserialize(ByteBuffer byteBuffer) {
+ int tsBlockIndex = ReadWriteIOUtils.readInt(byteBuffer);
+ int valueColumnIndex = ReadWriteIOUtils.readInt(byteBuffer);
+ return new InputLocation(tsBlockIndex, valueColumnIndex);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ InputLocation that = (InputLocation) o;
+ return tsBlockIndex == that.tsBlockIndex && valueColumnIndex == that.valueColumnIndex;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(tsBlockIndex, valueColumnIndex);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/OutputColumn.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/OutputColumn.java
similarity index 72%
rename from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/OutputColumn.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/OutputColumn.java
index 168027ef4f..f5e46777b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/OutputColumn.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/OutputColumn.java
@@ -16,10 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.sql.planner.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan.parameter;
+
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import com.google.common.collect.ImmutableList;
+import java.nio.ByteBuffer;
import java.util.List;
import static com.google.common.base.Preconditions.checkArgument;
@@ -66,4 +69,23 @@ public class OutputColumn {
checkArgument(index < sourceLocations.size(), "index is not valid");
return sourceLocations.get(index);
}
+
+ public void serialize(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write(sourceLocations.size(), byteBuffer);
+ for (InputLocation sourceLocation : sourceLocations) {
+ sourceLocation.serialize(byteBuffer);
+ }
+ ReadWriteIOUtils.write(overlapped, byteBuffer);
+ }
+
+ public static OutputColumn deserialize(ByteBuffer byteBuffer) {
+ int sourceLocationSize = ReadWriteIOUtils.readInt(byteBuffer);
+ ImmutableList.Builder<InputLocation> sourceLocations = ImmutableList.builder();
+ while (sourceLocationSize > 0) {
+ sourceLocations.add(InputLocation.deserialize(byteBuffer));
+ sourceLocationSize--;
+ }
+ boolean overlapped = ReadWriteIOUtils.readBool(byteBuffer);
+ return new OutputColumn(sourceLocations.build(), overlapped);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByTimeComponent.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByTimeComponent.java
index 026297a331..dbb2daa415 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByTimeComponent.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByTimeComponent.java
@@ -20,10 +20,6 @@
package org.apache.iotdb.db.mpp.sql.statement.component;
import org.apache.iotdb.db.mpp.sql.statement.StatementNode;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-
-import java.nio.ByteBuffer;
-import java.util.Objects;
/** This class maintains information of {@code GROUP BY} clause. */
public class GroupByTimeComponent extends StatementNode {
@@ -47,28 +43,6 @@ public class GroupByTimeComponent extends StatementNode {
public GroupByTimeComponent() {}
- public GroupByTimeComponent(
- long startTime, long endTime, long interval, long slidingStep, boolean leftCRightO) {
- this(startTime, endTime, interval, slidingStep, false, false, leftCRightO);
- }
-
- public GroupByTimeComponent(
- long startTime,
- long endTime,
- long interval,
- long slidingStep,
- boolean isIntervalByMonth,
- boolean isSlidingStepByMonth,
- boolean leftCRightO) {
- this.startTime = startTime;
- this.endTime = endTime;
- this.interval = interval;
- this.slidingStep = slidingStep;
- this.isIntervalByMonth = isIntervalByMonth;
- this.isSlidingStepByMonth = isSlidingStepByMonth;
- this.leftCRightO = leftCRightO;
- }
-
public boolean isLeftCRightO() {
return leftCRightO;
}
@@ -124,51 +98,4 @@ public class GroupByTimeComponent extends StatementNode {
public void setIntervalByMonth(boolean isIntervalByMonth) {
this.isIntervalByMonth = isIntervalByMonth;
}
-
- public void serialize(ByteBuffer buffer) {
- ReadWriteIOUtils.write(startTime, buffer);
- ReadWriteIOUtils.write(endTime, buffer);
- ReadWriteIOUtils.write(interval, buffer);
- ReadWriteIOUtils.write(slidingStep, buffer);
- ReadWriteIOUtils.write(isIntervalByMonth, buffer);
- ReadWriteIOUtils.write(isSlidingStepByMonth, buffer);
- ReadWriteIOUtils.write(leftCRightO, buffer);
- }
-
- public static GroupByTimeComponent deserialize(ByteBuffer buffer) {
- GroupByTimeComponent groupByTimeComponent = new GroupByTimeComponent();
- groupByTimeComponent.setStartTime(ReadWriteIOUtils.readLong(buffer));
- groupByTimeComponent.setEndTime(ReadWriteIOUtils.readLong(buffer));
- groupByTimeComponent.setInterval(ReadWriteIOUtils.readLong(buffer));
- groupByTimeComponent.setSlidingStep(ReadWriteIOUtils.readLong(buffer));
- groupByTimeComponent.setIntervalByMonth(ReadWriteIOUtils.readBool(buffer));
- groupByTimeComponent.setSlidingStepByMonth(ReadWriteIOUtils.readBool(buffer));
- groupByTimeComponent.setLeftCRightO(ReadWriteIOUtils.readBool(buffer));
- return groupByTimeComponent;
- }
-
- public boolean equals(Object obj) {
- if (!(obj instanceof GroupByTimeComponent)) {
- return false;
- }
- GroupByTimeComponent other = (GroupByTimeComponent) obj;
- return this.startTime == other.startTime
- && this.endTime == other.endTime
- && this.interval == other.interval
- && this.slidingStep == other.slidingStep
- && this.isSlidingStepByMonth == other.isSlidingStepByMonth
- && this.isIntervalByMonth == other.isIntervalByMonth
- && this.leftCRightO == other.leftCRightO;
- }
-
- public int hashCode() {
- return Objects.hash(
- startTime,
- endTime,
- interval,
- slidingStep,
- isIntervalByMonth,
- isSlidingStepByMonth,
- leftCRightO);
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index 8712dd5641..35da48e4b4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.InputLocation;
import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
@@ -80,6 +81,8 @@ public class FunctionExpression extends Expression {
*/
private List<Expression> expressions;
+ private List<InputLocation> inputLocations;
+
private List<PartialPath> paths;
private String parametersString;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index 4e9cfb5e11..2585107554 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.InputLocation;
import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.query.expression.Expression;
@@ -49,7 +50,9 @@ import java.util.Set;
public class TimeSeriesOperand extends Expression {
- protected PartialPath path;
+ private PartialPath path;
+
+ private InputLocation inputLocation;
public TimeSeriesOperand(PartialPath path) {
this.path = path;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
index 9b93d163ce..7596bde6d9 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
@@ -35,8 +35,8 @@ import org.apache.iotdb.db.mpp.operator.process.LimitOperator;
import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator;
import org.apache.iotdb.db.mpp.operator.process.merge.SingleColumnMerger;
import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
-import org.apache.iotdb.db.mpp.sql.planner.plan.InputLocation;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.InputLocation;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
index 282c2895b5..b8591c8fdb 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
@@ -33,8 +33,8 @@ import org.apache.iotdb.db.mpp.operator.process.LimitOperator;
import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator;
import org.apache.iotdb.db.mpp.operator.process.merge.SingleColumnMerger;
import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
-import org.apache.iotdb.db.mpp.sql.planner.plan.InputLocation;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.InputLocation;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
index d2045e0d16..09d56b7211 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
@@ -33,7 +33,7 @@ import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.operator.source.SeriesAggregateScanOperator;
import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.sql.statement.component.GroupByTimeComponent;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
@@ -214,7 +214,7 @@ public class SeriesAggregateScanOperatorTest {
@Test
public void testGroupByWithoutGlobalTimeFilter() throws IllegalPathException {
int[] result = new int[] {100, 100, 100, 100};
- GroupByTimeComponent groupByTimeParameter = new GroupByTimeComponent(0, 399, 100, 100, true);
+ GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
SeriesAggregateScanOperator seriesAggregateScanOperator =
initSeriesAggregateScanOperator(
Collections.singletonList(AggregationType.COUNT), null, true, groupByTimeParameter);
@@ -232,7 +232,7 @@ public class SeriesAggregateScanOperatorTest {
public void testGroupByWithGlobalTimeFilter() throws IllegalPathException {
int[] result = new int[] {0, 80, 100, 80};
Filter timeFilter = new AndFilter(TimeFilter.gtEq(120), TimeFilter.ltEq(379));
- GroupByTimeComponent groupByTimeParameter = new GroupByTimeComponent(0, 399, 100, 100, true);
+ GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
SeriesAggregateScanOperator seriesAggregateScanOperator =
initSeriesAggregateScanOperator(
Collections.singletonList(AggregationType.COUNT),
@@ -263,7 +263,7 @@ public class SeriesAggregateScanOperatorTest {
aggregationTypes.add(AggregationType.LAST_VALUE);
aggregationTypes.add(AggregationType.MAX_VALUE);
aggregationTypes.add(AggregationType.MIN_VALUE);
- GroupByTimeComponent groupByTimeParameter = new GroupByTimeComponent(0, 399, 100, 100, true);
+ GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
SeriesAggregateScanOperator seriesAggregateScanOperator =
initSeriesAggregateScanOperator(aggregationTypes, null, true, groupByTimeParameter);
int count = 0;
@@ -282,7 +282,7 @@ public class SeriesAggregateScanOperatorTest {
@Test
public void testGroupBySlidingTimeWindow() throws IllegalPathException {
int[] result = new int[] {50, 50, 50, 50, 50, 50, 50, 50};
- GroupByTimeComponent groupByTimeParameter = new GroupByTimeComponent(0, 399, 100, 50, true);
+ GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 50, true);
SeriesAggregateScanOperator seriesAggregateScanOperator =
initSeriesAggregateScanOperator(
Collections.singletonList(AggregationType.COUNT), null, true, groupByTimeParameter);
@@ -300,7 +300,7 @@ public class SeriesAggregateScanOperatorTest {
public void testGroupBySlidingTimeWindow2() throws IllegalPathException {
int[] timeColumn = new int[] {0, 20, 30, 50, 60, 80, 90, 110, 120, 140};
int[] result = new int[] {20, 10, 20, 10, 20, 10, 20, 10, 20, 9};
- GroupByTimeComponent groupByTimeParameter = new GroupByTimeComponent(0, 149, 50, 30, true);
+ GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 149, 50, 30, true);
SeriesAggregateScanOperator seriesAggregateScanOperator =
initSeriesAggregateScanOperator(
Collections.singletonList(AggregationType.COUNT), null, true, groupByTimeParameter);
@@ -329,7 +329,7 @@ public class SeriesAggregateScanOperatorTest {
aggregationTypes.add(AggregationType.LAST_VALUE);
aggregationTypes.add(AggregationType.MAX_VALUE);
aggregationTypes.add(AggregationType.MIN_VALUE);
- GroupByTimeComponent groupByTimeParameter = new GroupByTimeComponent(0, 149, 50, 30, true);
+ GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 149, 50, 30, true);
SeriesAggregateScanOperator seriesAggregateScanOperator =
initSeriesAggregateScanOperator(aggregationTypes, null, true, groupByTimeParameter);
int count = 0;
@@ -349,7 +349,7 @@ public class SeriesAggregateScanOperatorTest {
List<AggregationType> aggregateFuncList,
Filter timeFilter,
boolean ascending,
- GroupByTimeComponent groupByTimeParameter)
+ GroupByTimeParameter groupByTimeParameter)
throws IllegalPathException {
MeasurementPath measurementPath =
new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SingleColumnMergerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SingleColumnMergerTest.java
index 706ba38a93..9cd3b4bd86 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SingleColumnMergerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SingleColumnMergerTest.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.mpp.operator;
import org.apache.iotdb.db.mpp.operator.process.merge.SingleColumnMerger;
-import org.apache.iotdb.db.mpp.sql.planner.plan.InputLocation;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.InputLocation;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
index a4b8c3bcc3..276555839e 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
@@ -32,8 +32,8 @@ import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator;
import org.apache.iotdb.db.mpp.operator.process.merge.SingleColumnMerger;
import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
-import org.apache.iotdb.db.mpp.sql.planner.plan.InputLocation;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.InputLocation;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
index 5bf598cf14..ddc4945f87 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.OffsetNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.FilterNullParameter;
import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.tsfile.read.expression.IExpression;
@@ -120,7 +121,10 @@ public class FragmentInstanceSerdeTest {
FilterNullNode filterNullNode =
new FilterNullNode(
- new PlanNodeId("TestFilterNullNode"), null, FilterNullPolicy.ALL_NULL, null);
+ new PlanNodeId("TestFilterNullNode"),
+ null,
+ FilterNullPolicy.ALL_NULL,
+ new ArrayList<>());
IExpression expression =
BinaryExpression.and(
new SingleSeriesExpression(
@@ -136,7 +140,8 @@ public class FragmentInstanceSerdeTest {
TimeJoinNode timeJoinNode =
new TimeJoinNode(new PlanNodeId("TimeJoinNode"), OrderBy.TIMESTAMP_DESC);
- timeJoinNode.setWithoutPolicy(FilterNullPolicy.CONTAINS_NULL);
+ timeJoinNode.setFilterNullParameter(
+ new FilterNullParameter(FilterNullPolicy.CONTAINS_NULL, new ArrayList<>()));
SeriesScanNode seriesScanNode1 =
new SeriesScanNode(new PlanNodeId("SeriesScanNode1"), new MeasurementPath("root.sg.d1.s2"));
seriesScanNode1.setRegionReplicaSet(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/DeviceMergeNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/DeviceMergeNodeSerdeTest.java
index dc8646a993..fcf3221a9f 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/DeviceMergeNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/DeviceMergeNodeSerdeTest.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.AggregateNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.DeviceMergeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
-import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -67,8 +66,6 @@ public class DeviceMergeNodeSerdeTest {
DeviceMergeNode deviceMergeNode =
new DeviceMergeNode(new PlanNodeId("TestDeviceMergeNode"), OrderBy.TIMESTAMP_ASC);
- FilterNullComponent filterNullComponent = new FilterNullComponent();
- deviceMergeNode.setFilterNullComponent(filterNullComponent);
deviceMergeNode.addChildDeviceNode("device", aggregateNode);
aggregateFuncMap = new HashMap<>();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/ExchangeNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/ExchangeNodeSerdeTest.java
index 76563e29b9..ae25b60799 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/ExchangeNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/ExchangeNodeSerdeTest.java
@@ -35,7 +35,6 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.DeviceMergeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
-import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -73,8 +72,6 @@ public class ExchangeNodeSerdeTest {
DeviceMergeNode deviceMergeNode =
new DeviceMergeNode(new PlanNodeId("TestDeviceMergeNode"), OrderBy.TIMESTAMP_ASC);
- FilterNullComponent filterNullComponent = new FilterNullComponent();
- deviceMergeNode.setFilterNullComponent(filterNullComponent);
deviceMergeNode.addChildDeviceNode("device", aggregateNode);
aggregateFuncMap = new HashMap<>();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/FillNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/FillNodeSerdeTest.java
index df3a0f3d95..55a4ebc2fd 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/FillNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/FillNodeSerdeTest.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.DeviceMergeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FillNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.sql.statement.component.FillPolicy;
-import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -56,9 +55,6 @@ public class FillNodeSerdeTest {
DeviceMergeNode deviceMergeNode =
new DeviceMergeNode(new PlanNodeId("TestDeviceMergeNode"), OrderBy.TIMESTAMP_ASC);
- FilterNullComponent filterNullComponent = new FilterNullComponent();
- deviceMergeNode.setFilterNullComponent(filterNullComponent);
-
Map<PartialPath, Set<AggregationType>> aggregateFuncMap = new HashMap<>();
Set<AggregationType> aggregationTypes = new HashSet<>();
aggregationTypes.add(AggregationType.MAX_TIME);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/FilterNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/FilterNodeSerdeTest.java
index b33fe9088b..686f8ce8bc 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/FilterNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/FilterNodeSerdeTest.java
@@ -33,7 +33,6 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FillNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.sql.statement.component.FillPolicy;
-import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -67,9 +66,6 @@ public class FilterNodeSerdeTest {
DeviceMergeNode deviceMergeNode =
new DeviceMergeNode(new PlanNodeId("TestDeviceMergeNode"), OrderBy.TIMESTAMP_ASC);
- FilterNullComponent filterNullComponent = new FilterNullComponent();
- deviceMergeNode.setFilterNullComponent(filterNullComponent);
-
Map<PartialPath, Set<AggregationType>> aggregateFuncMap = new HashMap<>();
Set<AggregationType> aggregationTypes = new HashSet<>();
aggregationTypes.add(AggregationType.MAX_TIME);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/FilterNullNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/FilterNullNodeSerdeTest.java
index f34d1ae761..ae8650840d 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/FilterNullNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/FilterNullNodeSerdeTest.java
@@ -34,7 +34,6 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNullNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.sql.statement.component.FillPolicy;
-import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.db.query.aggregation.AggregationType;
@@ -69,9 +68,6 @@ public class FilterNullNodeSerdeTest {
DeviceMergeNode deviceMergeNode =
new DeviceMergeNode(new PlanNodeId("TestDeviceMergeNode"), OrderBy.TIMESTAMP_ASC);
- FilterNullComponent filterNullComponent = new FilterNullComponent();
- deviceMergeNode.setFilterNullComponent(filterNullComponent);
-
Map<PartialPath, Set<AggregationType>> aggregateFuncMap = new HashMap<>();
Set<AggregationType> aggregationTypes = new HashSet<>();
aggregationTypes.add(AggregationType.MAX_TIME);
@@ -104,7 +100,10 @@ public class FilterNullNodeSerdeTest {
FilterNullNode filterNullNode =
new FilterNullNode(
- new PlanNodeId("TestFilterNullNode"), filterNode, FilterNullPolicy.ALL_NULL, null);
+ new PlanNodeId("TestFilterNullNode"),
+ filterNode,
+ FilterNullPolicy.ALL_NULL,
+ new ArrayList<>());
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
filterNullNode.serialize(byteBuffer);
byteBuffer.flip();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/GroupByLevelNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/GroupByLevelNodeSerdeTest.java
index 8f4dc9a71a..8a5a61e82f 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/GroupByLevelNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/GroupByLevelNodeSerdeTest.java
@@ -36,7 +36,6 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNullNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.GroupByLevelNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.sql.statement.component.FillPolicy;
-import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.db.query.aggregation.AggregationType;
@@ -71,9 +70,6 @@ public class GroupByLevelNodeSerdeTest {
DeviceMergeNode deviceMergeNode =
new DeviceMergeNode(new PlanNodeId("TestDeviceMergeNode"), OrderBy.TIMESTAMP_ASC);
- FilterNullComponent filterNullComponent = new FilterNullComponent();
- deviceMergeNode.setFilterNullComponent(filterNullComponent);
-
Map<PartialPath, Set<AggregationType>> aggregateFuncMap = new HashMap<>();
Set<AggregationType> aggregationTypes = new HashSet<>();
aggregationTypes.add(AggregationType.MAX_TIME);
@@ -106,7 +102,10 @@ public class GroupByLevelNodeSerdeTest {
FilterNullNode filterNullNode =
new FilterNullNode(
- new PlanNodeId("TestFilterNullNode"), filterNode, FilterNullPolicy.ALL_NULL, null);
+ new PlanNodeId("TestFilterNullNode"),
+ filterNode,
+ FilterNullPolicy.ALL_NULL,
+ new ArrayList<>());
Map<ColumnHeader, ColumnHeader> groupedPathMap = new HashMap<>();
groupedPathMap.put(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/LimitNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/LimitNodeSerdeTest.java
index 2e6e84f4b1..3e1066dd86 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/LimitNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/LimitNodeSerdeTest.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.GroupByLevelNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.sql.statement.component.FillPolicy;
-import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.db.query.aggregation.AggregationType;
@@ -72,9 +71,6 @@ public class LimitNodeSerdeTest {
DeviceMergeNode deviceMergeNode =
new DeviceMergeNode(new PlanNodeId("TestDeviceMergeNode"), OrderBy.TIMESTAMP_ASC);
- FilterNullComponent filterNullComponent = new FilterNullComponent();
- deviceMergeNode.setFilterNullComponent(filterNullComponent);
-
Map<PartialPath, Set<AggregationType>> aggregateFuncMap = new HashMap<>();
Set<AggregationType> aggregationTypes = new HashSet<>();
aggregationTypes.add(AggregationType.MAX_TIME);
@@ -107,7 +103,10 @@ public class LimitNodeSerdeTest {
FilterNullNode filterNullNode =
new FilterNullNode(
- new PlanNodeId("TestFilterNullNode"), filterNode, FilterNullPolicy.ALL_NULL, null);
+ new PlanNodeId("TestFilterNullNode"),
+ filterNode,
+ FilterNullPolicy.ALL_NULL,
+ new ArrayList<>());
Map<ColumnHeader, ColumnHeader> groupedPathMap = new HashMap<>();
groupedPathMap.put(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/OffsetNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/OffsetNodeSerdeTest.java
index 5cfa8dc015..b15e8705af 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/OffsetNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/OffsetNodeSerdeTest.java
@@ -41,8 +41,8 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.OffsetNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.FilterNullParameter;
import org.apache.iotdb.db.mpp.sql.statement.component.FillPolicy;
-import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.db.query.aggregation.AggregationType;
@@ -51,6 +51,7 @@ import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
import org.apache.iotdb.tsfile.read.filter.operator.Regexp;
+import com.google.common.collect.ImmutableList;
import org.junit.Test;
import java.nio.ByteBuffer;
@@ -78,9 +79,6 @@ public class OffsetNodeSerdeTest {
DeviceMergeNode deviceMergeNode =
new DeviceMergeNode(new PlanNodeId("TestDeviceMergeNode"), OrderBy.TIMESTAMP_ASC);
- FilterNullComponent filterNullComponent = new FilterNullComponent();
- deviceMergeNode.setFilterNullComponent(filterNullComponent);
-
Map<PartialPath, Set<AggregationType>> aggregateFuncMap = new HashMap<>();
Set<AggregationType> aggregationTypes = new HashSet<>();
aggregationTypes.add(AggregationType.MAX_TIME);
@@ -113,7 +111,10 @@ public class OffsetNodeSerdeTest {
FilterNullNode filterNullNode =
new FilterNullNode(
- new PlanNodeId("TestFilterNullNode"), filterNode, FilterNullPolicy.ALL_NULL, null);
+ new PlanNodeId("TestFilterNullNode"),
+ filterNode,
+ FilterNullPolicy.ALL_NULL,
+ new ArrayList<>());
Map<ColumnHeader, ColumnHeader> groupedPathMap = new HashMap<>();
groupedPathMap.put(
@@ -141,7 +142,8 @@ public class OffsetNodeSerdeTest {
OffsetNode offsetNode = new OffsetNode(new PlanNodeId("OffsetNode"), 100);
LimitNode limitNode = new LimitNode(new PlanNodeId("LimitNode"), 100);
FilterNullNode filterNullNode =
- new FilterNullNode(new PlanNodeId("FilterNullNode"), FilterNullPolicy.ALL_NULL, null);
+ new FilterNullNode(
+ new PlanNodeId("FilterNullNode"), FilterNullPolicy.ALL_NULL, ImmutableList.of());
QueryFilter queryFilter = new QueryFilter(FilterType.KW_AND);
BasicFunctionFilter leftQueryFilter =
new BasicFunctionFilter(FilterType.GREATERTHAN, new MeasurementPath("root.sg.d1.s2"), "10");
@@ -159,7 +161,8 @@ public class OffsetNodeSerdeTest {
TimeJoinNode timeJoinNode =
new TimeJoinNode(new PlanNodeId("TimeJoinNode"), OrderBy.TIMESTAMP_DESC);
- timeJoinNode.setWithoutPolicy(FilterNullPolicy.CONTAINS_NULL);
+ timeJoinNode.setFilterNullParameter(
+ new FilterNullParameter(FilterNullPolicy.CONTAINS_NULL, new ArrayList<>()));
SeriesScanNode seriesScanNode1 =
new SeriesScanNode(new PlanNodeId("SeriesScanNode1"), new MeasurementPath("root.sg.d1.s2"));
seriesScanNode1.setRegionReplicaSet(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/SortNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/SortNodeSerdeTest.java
index 98f733b41d..9f914493bc 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/SortNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/SortNodeSerdeTest.java
@@ -39,7 +39,6 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.OffsetNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.SortNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.sql.statement.component.FillPolicy;
-import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.db.query.aggregation.AggregationType;
@@ -76,9 +75,6 @@ public class SortNodeSerdeTest {
DeviceMergeNode deviceMergeNode =
new DeviceMergeNode(new PlanNodeId("TestDeviceMergeNode"), OrderBy.TIMESTAMP_ASC);
- FilterNullComponent filterNullComponent = new FilterNullComponent();
- deviceMergeNode.setFilterNullComponent(filterNullComponent);
-
Map<PartialPath, Set<AggregationType>> aggregateFuncMap = new HashMap<>();
Set<AggregationType> aggregationTypes = new HashSet<>();
aggregationTypes.add(AggregationType.MAX_TIME);
@@ -111,7 +107,10 @@ public class SortNodeSerdeTest {
FilterNullNode filterNullNode =
new FilterNullNode(
- new PlanNodeId("TestFilterNullNode"), filterNode, FilterNullPolicy.ALL_NULL, null);
+ new PlanNodeId("TestFilterNullNode"),
+ filterNode,
+ FilterNullPolicy.ALL_NULL,
+ new ArrayList<>());
Map<ColumnHeader, ColumnHeader> groupedPathMap = new HashMap<>();
groupedPathMap.put(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/TimeJoinNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/TimeJoinNodeSerdeTest.java
index c98ddb2ef2..5f752c1061 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/TimeJoinNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/TimeJoinNodeSerdeTest.java
@@ -39,8 +39,8 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.OffsetNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.SortNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.FilterNullParameter;
import org.apache.iotdb.db.mpp.sql.statement.component.FillPolicy;
-import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.db.query.aggregation.AggregationType;
@@ -76,9 +76,6 @@ public class TimeJoinNodeSerdeTest {
DeviceMergeNode deviceMergeNode =
new DeviceMergeNode(new PlanNodeId("TestDeviceMergeNode"), OrderBy.TIMESTAMP_ASC);
- FilterNullComponent filterNullComponent = new FilterNullComponent();
- deviceMergeNode.setFilterNullComponent(filterNullComponent);
-
Map<PartialPath, Set<AggregationType>> aggregateFuncMap = new HashMap<>();
Set<AggregationType> aggregationTypes = new HashSet<>();
aggregationTypes.add(AggregationType.MAX_TIME);
@@ -112,7 +109,10 @@ public class TimeJoinNodeSerdeTest {
FilterNullNode filterNullNode =
new FilterNullNode(
- new PlanNodeId("TestFilterNullNode"), filterNode, FilterNullPolicy.ALL_NULL, null);
+ new PlanNodeId("TestFilterNullNode"),
+ filterNode,
+ FilterNullPolicy.ALL_NULL,
+ new ArrayList<>());
Map<ColumnHeader, ColumnHeader> groupedPathMap = new HashMap<>();
groupedPathMap.put(
@@ -136,7 +136,8 @@ public class TimeJoinNodeSerdeTest {
TimeJoinNode timeJoinNode =
new TimeJoinNode(new PlanNodeId("TestTimeJoinNode"), OrderBy.TIMESTAMP_ASC);
- timeJoinNode.setWithoutPolicy(FilterNullPolicy.CONTAINS_NULL);
+ timeJoinNode.setFilterNullParameter(
+ new FilterNullParameter(FilterNullPolicy.CONTAINS_NULL, new ArrayList<>()));
timeJoinNode.addChild(sortNode);
ByteBuffer byteBuffer = ByteBuffer.allocate(2048);
timeJoinNode.serialize(byteBuffer);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/source/SeriesAggregateScanNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/source/SeriesAggregateScanNodeSerdeTest.java
index 258968d6da..7271a366fa 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/source/SeriesAggregateScanNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/source/SeriesAggregateScanNodeSerdeTest.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.mpp.sql.plan.node.PlanNodeDeserializeHelper;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
-import org.apache.iotdb.db.mpp.sql.statement.component.GroupByTimeComponent;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -53,8 +53,8 @@ public class SeriesAggregateScanNodeSerdeTest {
st.add("s2");
List<AggregationType> aggregateFuncList = new ArrayList<>();
aggregateFuncList.add(AggregationType.MAX_TIME);
- GroupByTimeComponent groupByTimeComponent =
- new GroupByTimeComponent(1, 100, 1, 1, true, true, true);
+ GroupByTimeParameter groupByTimeComponent =
+ new GroupByTimeParameter(1, 100, 1, 1, true, true, true);
SeriesAggregateScanNode seriesAggregateScanNode =
new SeriesAggregateScanNode(
new PlanNodeId("TestSeriesAggregateScanNode"),