You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/10/19 11:13:04 UTC
[iotdb] 03/12: add IntoNode & DeviceViewNode
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/intoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9ce4eacfcbf806a44f93830df73e3f377043efea
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Oct 17 22:20:17 2022 +0800
add IntoNode & DeviceViewNode
---
.../planner/distribution/ExchangeNodeAdder.java | 6 +-
.../plan/planner/distribution/SourceRewriter.java | 11 +--
.../mpp/plan/planner/plan/node/PlanNodeType.java | 4 +-
.../db/mpp/plan/planner/plan/node/PlanVisitor.java | 10 +++
.../metedata/read/SchemaQueryOrderByHeatNode.java | 4 +-
.../planner/plan/node/process/AggregationNode.java | 23 ++---
.../planner/plan/node/process/DeviceMergeNode.java | 32 +------
.../{LimitNode.java => DeviceViewIntoNode.java} | 97 +++++++++-------------
.../planner/plan/node/process/DeviceViewNode.java | 19 +----
.../planner/plan/node/process/ExchangeNode.java | 42 +---------
.../plan/planner/plan/node/process/FillNode.java | 46 ++--------
.../plan/node/process/GroupByLevelNode.java | 29 +++----
.../planner/plan/node/process/GroupByTagNode.java | 17 +---
.../process/{OffsetNode.java => IntoNode.java} | 91 +++++++++-----------
.../plan/planner/plan/node/process/LimitNode.java | 37 ++-------
...tiChildNode.java => MultiChildProcessNode.java} | 23 ++++-
.../plan/planner/plan/node/process/OffsetNode.java | 36 ++------
.../planner/plan/node/process/ProjectNode.java | 40 +++------
...iChildNode.java => SingleChildProcessNode.java} | 50 ++++++++---
.../node/process/SlidingWindowAggregationNode.java | 36 ++------
.../plan/planner/plan/node/process/SortNode.java | 29 ++-----
.../planner/plan/node/process/TimeJoinNode.java | 17 +---
.../planner/plan/node/process/TransformNode.java | 29 +------
.../node/process/last/LastQueryCollectNode.java | 4 +-
.../plan/node/process/last/LastQueryMergeNode.java | 4 +-
.../plan/node/process/last/LastQueryNode.java | 4 +-
26 files changed, 237 insertions(+), 503 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
index 37c487b4ab..60c0a6a9b6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
@@ -38,7 +38,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
@@ -241,8 +241,8 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
return processMultiChildNode(node, context);
}
- private PlanNode processMultiChildNode(MultiChildNode node, NodeGroupContext context) {
- MultiChildNode newNode = (MultiChildNode) node.clone();
+ private PlanNode processMultiChildNode(MultiChildProcessNode node, NodeGroupContext context) {
+ MultiChildProcessNode newNode = (MultiChildProcessNode) node.clone();
List<PlanNode> visitedChildren = new ArrayList<>();
node.getChildren()
.forEach(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 8df3a16804..e6bc4891f5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -38,7 +38,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryCollectNode;
@@ -254,7 +254,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
}
private PlanNode processRawSeriesScan(
- SeriesSourceNode node, DistributionPlanContext context, MultiChildNode parent) {
+ SeriesSourceNode node, DistributionPlanContext context, MultiChildProcessNode parent) {
List<SeriesSourceNode> sourceNodes = splitSeriesSourceNodeByPartition(node, context);
if (sourceNodes.size() == 1) {
return sourceNodes.get(0);
@@ -407,8 +407,9 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
return processRawMultiChildNode(node, context);
}
- private PlanNode processRawMultiChildNode(MultiChildNode node, DistributionPlanContext context) {
- MultiChildNode root = (MultiChildNode) node.clone();
+ private PlanNode processRawMultiChildNode(
+ MultiChildProcessNode node, DistributionPlanContext context) {
+ MultiChildProcessNode root = (MultiChildProcessNode) node.clone();
// Step 1: Get all source nodes. For the node which is not source, add it as the child of
// current TimeJoinNode
List<SourceNode> sources = new ArrayList<>();
@@ -468,7 +469,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
} else {
// We clone a TimeJoinNode from root to make the params to be consistent.
// But we need to assign a new ID to it
- MultiChildNode parentOfGroup = (MultiChildNode) root.clone();
+ MultiChildProcessNode parentOfGroup = (MultiChildProcessNode) root.clone();
parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
seriesScanNodes.forEach(parentOfGroup::addChild);
root.addChild(parentOfGroup);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index dd102898d5..8b7804bfd6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -140,7 +140,9 @@ public enum PlanNodeType {
LOAD_TSFILE((short) 55),
CONSTRUCT_SCHEMA_BLACK_LIST_NODE((short) 56),
ROLLBACK_SCHEMA_BLACK_LIST_NODE((short) 57),
- GROUP_BY_TAG((short) 58);
+ GROUP_BY_TAG((short) 58),
+ INTO((short) 59),
+ DEVICE_VIEW_INTO((short) 60);
public static final int BYTES = Short.BYTES;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index 8b4155cb53..cad20d65de 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -44,12 +44,14 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCre
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.RollbackSchemaBlackListNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewIntoNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.IntoNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ProjectNode;
@@ -301,4 +303,12 @@ public abstract class PlanVisitor<R, C> {
public R visitActivateTemplate(ActivateTemplateNode node, C context) {
return visitPlan(node, context);
}
+
+ public R visitInto(IntoNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitDeviceViewInto(DeviceViewIntoNode node, C context) {
+ return visitPlan(node, context);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaQueryOrderByHeatNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaQueryOrderByHeatNode.java
index 68cdf909cd..0c3d78ce6f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaQueryOrderByHeatNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaQueryOrderByHeatNode.java
@@ -23,14 +23,14 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
-public class SchemaQueryOrderByHeatNode extends MultiChildNode {
+public class SchemaQueryOrderByHeatNode extends MultiChildProcessNode {
public SchemaQueryOrderByHeatNode(PlanNodeId id) {
super(id);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
index 837be95747..4448ca0c5c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
@@ -44,7 +44,7 @@ import java.util.stream.Collectors;
* input as a TsBlock, it may be raw data or partial aggregation result. This node will output the
* final series aggregated result represented by TsBlock.
*/
-public class AggregationNode extends MultiChildNode {
+public class AggregationNode extends MultiChildProcessNode {
// The list of aggregate functions, each AggregateDescriptor will be output as one or two column
// of
@@ -74,8 +74,10 @@ public class AggregationNode extends MultiChildNode {
List<AggregationDescriptor> aggregationDescriptorList,
@Nullable GroupByTimeParameter groupByTimeParameter,
Ordering scanOrder) {
- this(id, aggregationDescriptorList, groupByTimeParameter, scanOrder);
- this.children = children;
+ super(id, children);
+ this.aggregationDescriptorList = getDeduplicatedDescriptors(aggregationDescriptorList);
+ this.groupByTimeParameter = groupByTimeParameter;
+ this.scanOrder = scanOrder;
}
public List<AggregationDescriptor> getAggregationDescriptorList() {
@@ -91,21 +93,6 @@ public class AggregationNode extends MultiChildNode {
return scanOrder;
}
- @Override
- public List<PlanNode> getChildren() {
- return children;
- }
-
- @Override
- public void addChild(PlanNode child) {
- this.children.add(child);
- }
-
- @Override
- public int allowedChildCount() {
- return CHILD_COUNT_NO_LIMIT;
- }
-
@Override
public PlanNode clone() {
return new AggregationNode(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
index 141530d3f4..d3ec660e7f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
@@ -34,7 +34,7 @@ import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
-public class DeviceMergeNode extends MultiChildNode {
+public class DeviceMergeNode extends MultiChildProcessNode {
// The result output order, which could sort by device and time.
// The size of this list is 2 and the first SortItem in this list has higher priority.
@@ -43,16 +43,6 @@ public class DeviceMergeNode extends MultiChildNode {
// the list of selected devices
private final List<String> devices;
- public DeviceMergeNode(
- PlanNodeId id,
- List<PlanNode> children,
- OrderByParameter mergeOrderParameter,
- List<String> devices) {
- super(id, children);
- this.mergeOrderParameter = mergeOrderParameter;
- this.devices = devices;
- }
-
public DeviceMergeNode(
PlanNodeId id, OrderByParameter mergeOrderParameter, List<String> devices) {
super(id);
@@ -68,21 +58,6 @@ public class DeviceMergeNode extends MultiChildNode {
return devices;
}
- @Override
- public List<PlanNode> getChildren() {
- return children;
- }
-
- @Override
- public void addChild(PlanNode child) {
- this.children.add(child);
- }
-
- @Override
- public int allowedChildCount() {
- return CHILD_COUNT_NO_LIMIT;
- }
-
@Override
public PlanNode clone() {
return new DeviceMergeNode(getPlanNodeId(), getMergeOrderParameter(), getDevices());
@@ -147,13 +122,12 @@ public class DeviceMergeNode extends MultiChildNode {
}
DeviceMergeNode that = (DeviceMergeNode) o;
return Objects.equals(mergeOrderParameter, that.mergeOrderParameter)
- && Objects.equals(devices, that.devices)
- && Objects.equals(children, that.children);
+ && Objects.equals(devices, that.devices);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), mergeOrderParameter, devices, children);
+ return Objects.hash(super.hashCode(), mergeOrderParameter, devices);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewIntoNode.java
similarity index 51%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewIntoNode.java
index 9d3badac0e..987cbc18d2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewIntoNode.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -16,102 +16,74 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.plan.planner.plan.node.process;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-
-import com.google.common.collect.ImmutableList;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.DeviceViewIntoPathDescriptor;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
+import java.util.stream.Collectors;
-/** LimitNode is used to select top n result. It uses the default order of upstream nodes */
-public class LimitNode extends ProcessNode {
+public class DeviceViewIntoNode extends SingleChildProcessNode {
- private final int limit;
+ private final DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor;
- private PlanNode child;
-
- public LimitNode(PlanNodeId id, int limit) {
+ public DeviceViewIntoNode(
+ PlanNodeId id, DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor) {
super(id);
- this.limit = limit;
- }
-
- public LimitNode(PlanNodeId id, PlanNode child, int limit) {
- this(id, limit);
- this.child = child;
- }
-
- public int getLimit() {
- return limit;
- }
-
- public PlanNode getChild() {
- return child;
- }
-
- public void setChild(PlanNode child) {
- this.child = child;
- }
-
- @Override
- public List<PlanNode> getChildren() {
- return ImmutableList.of(child);
+ this.deviceViewIntoPathDescriptor = deviceViewIntoPathDescriptor;
}
- @Override
- public void addChild(PlanNode child) {
- this.child = child;
+ public DeviceViewIntoNode(
+ PlanNodeId id, PlanNode child, DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor) {
+ super(id, child);
+ this.deviceViewIntoPathDescriptor = deviceViewIntoPathDescriptor;
}
@Override
public PlanNode clone() {
- return new LimitNode(getPlanNodeId(), this.limit);
- }
-
- @Override
- public int allowedChildCount() {
- return ONE_CHILD;
+ return new DeviceViewIntoNode(getPlanNodeId(), this.deviceViewIntoPathDescriptor);
}
@Override
public List<String> getOutputColumnNames() {
- return child.getOutputColumnNames();
- }
-
- @Override
- public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
- return visitor.visitLimit(this, context);
+ return ColumnHeaderConstant.selectIntoAlignByDeviceColumnHeaders.stream()
+ .map(ColumnHeader::getColumnName)
+ .collect(Collectors.toList());
}
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
- PlanNodeType.LIMIT.serialize(byteBuffer);
- ReadWriteIOUtils.write(limit, byteBuffer);
+ PlanNodeType.INTO.serialize(byteBuffer);
+ this.deviceViewIntoPathDescriptor.serialize(byteBuffer);
}
@Override
protected void serializeAttributes(DataOutputStream stream) throws IOException {
- PlanNodeType.LIMIT.serialize(stream);
- ReadWriteIOUtils.write(limit, stream);
+ PlanNodeType.INTO.serialize(stream);
+ this.deviceViewIntoPathDescriptor.serialize(stream);
}
- public static LimitNode deserialize(ByteBuffer byteBuffer) {
- int limit = ReadWriteIOUtils.readInt(byteBuffer);
+ public static DeviceViewIntoNode deserialize(ByteBuffer byteBuffer) {
+ DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor =
+ DeviceViewIntoPathDescriptor.deserialize(byteBuffer);
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new LimitNode(planNodeId, limit);
+ return new DeviceViewIntoNode(planNodeId, deviceViewIntoPathDescriptor);
}
@Override
- public String toString() {
- return "LimitNode-" + this.getPlanNodeId();
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitDeviceViewInto(this, context);
}
@Override
@@ -125,12 +97,17 @@ public class LimitNode extends ProcessNode {
if (!super.equals(o)) {
return false;
}
- LimitNode that = (LimitNode) o;
- return limit == that.limit && child.equals(that.child);
+ DeviceViewIntoNode intoNode = (DeviceViewIntoNode) o;
+ return deviceViewIntoPathDescriptor.equals(intoNode.deviceViewIntoPathDescriptor);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), limit, child);
+ return Objects.hash(super.hashCode(), deviceViewIntoPathDescriptor);
+ }
+
+ @Override
+ public String toString() {
+ return "DeviceViewIntoNode-" + getPlanNodeId();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java
index 034db109de..8517904d21 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java
@@ -42,7 +42,7 @@ import java.util.Objects;
* same between these TsBlocks. If the input TsBlock contains n columns, the device-based view will
* contain n+1 columns where the new column is Device column.
*/
-public class DeviceViewNode extends MultiChildNode {
+public class DeviceViewNode extends MultiChildProcessNode {
// The result output order, which could sort by device and time.
// The size of this list is 2 and the first SortItem in this list has higher priority.
@@ -95,21 +95,6 @@ public class DeviceViewNode extends MultiChildNode {
return deviceToMeasurementIndexesMap;
}
- @Override
- public List<PlanNode> getChildren() {
- return children;
- }
-
- @Override
- public void addChild(PlanNode child) {
- this.children.add(child);
- }
-
- @Override
- public int allowedChildCount() {
- return CHILD_COUNT_NO_LIMIT;
- }
-
@Override
public PlanNode clone() {
return new DeviceViewNode(
@@ -224,7 +209,6 @@ public class DeviceViewNode extends MultiChildNode {
DeviceViewNode that = (DeviceViewNode) o;
return mergeOrderParameter.equals(that.mergeOrderParameter)
&& devices.equals(that.devices)
- && children.equals(that.children)
&& outputColumnNames.equals(that.outputColumnNames)
&& deviceToMeasurementIndexesMap.equals(that.deviceToMeasurementIndexesMap);
}
@@ -235,7 +219,6 @@ public class DeviceViewNode extends MultiChildNode {
super.hashCode(),
mergeOrderParameter,
devices,
- children,
outputColumnNames,
deviceToMeasurementIndexesMap);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
index 780a0e629f..0a4b64f3de 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
@@ -28,8 +28,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import com.google.common.collect.ImmutableList;
-
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -37,8 +35,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
-public class ExchangeNode extends PlanNode {
- private PlanNode child;
+public class ExchangeNode extends SingleChildProcessNode {
+
// The remoteSourceNode is used to record the remote source info for current ExchangeNode
// It is not the child of current ExchangeNode
private FragmentSinkNode remoteSourceNode;
@@ -56,24 +54,11 @@ public class ExchangeNode extends PlanNode {
super(id);
}
- @Override
- public List<PlanNode> getChildren() {
- if (this.child == null) {
- return ImmutableList.of();
- }
- return ImmutableList.of(child);
- }
-
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitExchange(this, context);
}
- @Override
- public void addChild(PlanNode child) {
- this.child = child;
- }
-
@Override
public PlanNode clone() {
ExchangeNode node = new ExchangeNode(getPlanNodeId());
@@ -85,11 +70,6 @@ public class ExchangeNode extends PlanNode {
return node;
}
- @Override
- public int allowedChildCount() {
- return CHILD_COUNT_NO_LIMIT;
- }
-
@Override
public List<String> getOutputColumnNames() {
return outputColumnNames;
@@ -150,14 +130,6 @@ public class ExchangeNode extends PlanNode {
}
}
- public PlanNode getChild() {
- return child;
- }
-
- public void setChild(PlanNode child) {
- this.child = child;
- }
-
@Override
public String toString() {
return String.format(
@@ -182,10 +154,6 @@ public class ExchangeNode extends PlanNode {
this.setOutputColumnNames(remoteSourceNode.getOutputColumnNames());
}
- public void cleanChildren() {
- this.child = null;
- }
-
public TEndPoint getUpstreamEndpoint() {
return upstreamEndpoint;
}
@@ -210,15 +178,13 @@ public class ExchangeNode extends PlanNode {
return false;
}
ExchangeNode that = (ExchangeNode) o;
- return Objects.equals(child, that.child)
- && Objects.equals(upstreamEndpoint, that.upstreamEndpoint)
+ return Objects.equals(upstreamEndpoint, that.upstreamEndpoint)
&& Objects.equals(upstreamInstanceId, that.upstreamInstanceId)
&& Objects.equals(upstreamPlanNodeId, that.upstreamPlanNodeId);
}
@Override
public int hashCode() {
- return Objects.hash(
- super.hashCode(), child, upstreamEndpoint, upstreamInstanceId, upstreamPlanNodeId);
+ return Objects.hash(super.hashCode(), upstreamEndpoint, upstreamInstanceId, upstreamPlanNodeId);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/FillNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/FillNode.java
index 5acb18a9de..d177d98d9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/FillNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/FillNode.java
@@ -26,8 +26,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import com.google.common.collect.ImmutableList;
-
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -35,48 +33,24 @@ import java.util.List;
import java.util.Objects;
/** FillNode is used to fill the empty field in one row. */
-public class FillNode extends ProcessNode {
+public class FillNode extends SingleChildProcessNode {
// descriptions of how null values are filled
- private FillDescriptor fillDescriptor;
-
- private Ordering scanOrder;
-
- private PlanNode child;
+ private final FillDescriptor fillDescriptor;
- public FillNode(PlanNodeId id) {
- super(id);
- }
+ private final Ordering scanOrder;
public FillNode(PlanNodeId id, FillDescriptor fillDescriptor, Ordering scanOrder) {
- this(id);
+ super(id);
this.fillDescriptor = fillDescriptor;
this.scanOrder = scanOrder;
}
public FillNode(
PlanNodeId id, PlanNode child, FillDescriptor fillDescriptor, Ordering scanOrder) {
- this(id, fillDescriptor, scanOrder);
- this.child = child;
- }
-
- @Override
- public List<PlanNode> getChildren() {
- return ImmutableList.of(child);
- }
-
- public PlanNode getChild() {
- return child;
- }
-
- @Override
- public void addChild(PlanNode child) {
- this.child = child;
- }
-
- @Override
- public int allowedChildCount() {
- return ONE_CHILD;
+ super(id, child);
+ this.fillDescriptor = fillDescriptor;
+ this.scanOrder = scanOrder;
}
@Override
@@ -127,14 +101,12 @@ public class FillNode extends ProcessNode {
return false;
}
FillNode that = (FillNode) o;
- return Objects.equals(fillDescriptor, that.fillDescriptor)
- && Objects.equals(child, that.child)
- && scanOrder == that.scanOrder;
+ return Objects.equals(fillDescriptor, that.fillDescriptor) && scanOrder == that.scanOrder;
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), fillDescriptor, child, scanOrder);
+ return Objects.hash(super.hashCode(), fillDescriptor, scanOrder);
}
public FillDescriptor getFillDescriptor() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
index 0fa39ff184..da7b8dc9c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
@@ -52,7 +52,7 @@ import java.util.stream.Collectors;
* <p>If the group by level parameter is [0, 2], then these two columns will not belong to one
* bucket. And the total buckets are `root.*.d1.s1` and `root.*.d2.s1`
*/
-public class GroupByLevelNode extends MultiChildNode {
+public class GroupByLevelNode extends MultiChildProcessNode {
// The list of aggregate descriptors
// each GroupByLevelDescriptor will be output as one or two column of result TsBlock
@@ -87,21 +87,6 @@ public class GroupByLevelNode extends MultiChildNode {
this.scanOrder = scanOrder;
}
- @Override
- public List<PlanNode> getChildren() {
- return children;
- }
-
- @Override
- public void addChild(PlanNode child) {
- this.children.add(child);
- }
-
- @Override
- public int allowedChildCount() {
- return CHILD_COUNT_NO_LIMIT;
- }
-
@Override
public PlanNode clone() {
return new GroupByLevelNode(
@@ -191,9 +176,15 @@ public class GroupByLevelNode extends MultiChildNode {
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- if (!super.equals(o)) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
GroupByLevelNode that = (GroupByLevelNode) o;
return Objects.equals(groupByLevelDescriptors, that.groupByLevelDescriptors)
&& Objects.equals(groupByTimeParameter, that.groupByTimeParameter)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
index e0ef548508..c2e558bdc7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
@@ -43,7 +43,7 @@ import java.util.Map.Entry;
import java.util.Objects;
import java.util.stream.Collectors;
-public class GroupByTagNode extends MultiChildNode {
+public class GroupByTagNode extends MultiChildProcessNode {
private final List<String> tagKeys;
private final Map<List<String>, List<CrossSeriesAggregationDescriptor>>
@@ -87,16 +87,6 @@ public class GroupByTagNode extends MultiChildNode {
this.outputColumnNames = Validate.notNull(outputColumnNames);
}
- @Override
- public List<PlanNode> getChildren() {
- return children;
- }
-
- @Override
- public void addChild(PlanNode child) {
- this.children.add(child);
- }
-
@Override
public PlanNode clone() {
// TODO: better do deep copy
@@ -109,11 +99,6 @@ public class GroupByTagNode extends MultiChildNode {
this.outputColumnNames);
}
- @Override
- public int allowedChildCount() {
- return CHILD_COUNT_NO_LIMIT;
- }
-
@Override
public List<String> getOutputColumnNames() {
List<String> ret = new ArrayList<>(tagKeys);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/IntoNode.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/IntoNode.java
index 0407f6d946..b68e862a7a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/IntoNode.java
@@ -16,96 +16,71 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.plan.planner.plan.node.process;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-
-import com.google.common.collect.ImmutableList;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.IntoPathDescriptor;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
+import java.util.stream.Collectors;
-/**
- * OffsetNode is used to skip top n result from upstream nodes. It uses the default order of
- * upstream nodes
- */
-public class OffsetNode extends ProcessNode {
-
- private final int offset;
+public class IntoNode extends SingleChildProcessNode {
- private PlanNode child;
+ private final IntoPathDescriptor intoPathDescriptor;
- public OffsetNode(PlanNodeId id, int offset) {
+ public IntoNode(PlanNodeId id, IntoPathDescriptor intoPathDescriptor) {
super(id);
- this.offset = offset;
+ this.intoPathDescriptor = intoPathDescriptor;
}
- public OffsetNode(PlanNodeId id, PlanNode child, int offset) {
- this(id, offset);
- this.child = child;
- }
-
- @Override
- public List<PlanNode> getChildren() {
- return ImmutableList.of(child);
- }
-
- @Override
- public void addChild(PlanNode child) {
- this.child = child;
- }
-
- @Override
- public int allowedChildCount() {
- return ONE_CHILD;
+ public IntoNode(PlanNodeId id, PlanNode child, IntoPathDescriptor intoPathDescriptor) {
+ super(id, child);
+ this.intoPathDescriptor = intoPathDescriptor;
}
@Override
public PlanNode clone() {
- return new OffsetNode(getPlanNodeId(), offset);
+ return new IntoNode(getPlanNodeId(), this.intoPathDescriptor);
}
@Override
public List<String> getOutputColumnNames() {
- return child.getOutputColumnNames();
- }
-
- @Override
- public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
- return visitor.visitOffset(this, context);
+ return ColumnHeaderConstant.selectIntoColumnHeaders.stream()
+ .map(ColumnHeader::getColumnName)
+ .collect(Collectors.toList());
}
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
- PlanNodeType.OFFSET.serialize(byteBuffer);
- ReadWriteIOUtils.write(offset, byteBuffer);
+ PlanNodeType.INTO.serialize(byteBuffer);
+ this.intoPathDescriptor.serialize(byteBuffer);
}
@Override
protected void serializeAttributes(DataOutputStream stream) throws IOException {
- PlanNodeType.OFFSET.serialize(stream);
- ReadWriteIOUtils.write(offset, stream);
+ PlanNodeType.INTO.serialize(stream);
+ this.intoPathDescriptor.serialize(stream);
}
- public static OffsetNode deserialize(ByteBuffer byteBuffer) {
- int offset = ReadWriteIOUtils.readInt(byteBuffer);
+ public static IntoNode deserialize(ByteBuffer byteBuffer) {
+ IntoPathDescriptor intoPathDescriptor = IntoPathDescriptor.deserialize(byteBuffer);
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new OffsetNode(planNodeId, offset);
+ return new IntoNode(planNodeId, intoPathDescriptor);
}
- public PlanNode getChild() {
- return child;
- }
-
- public int getOffset() {
- return offset;
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitInto(this, context);
}
@Override
@@ -116,12 +91,20 @@ public class OffsetNode extends ProcessNode {
if (o == null || getClass() != o.getClass()) {
return false;
}
- OffsetNode that = (OffsetNode) o;
- return offset == that.offset && child.equals(that.child);
+ if (!super.equals(o)) {
+ return false;
+ }
+ IntoNode intoNode = (IntoNode) o;
+ return intoPathDescriptor.equals(intoNode.intoPathDescriptor);
}
@Override
public int hashCode() {
- return Objects.hash(child, offset);
+ return Objects.hash(super.hashCode(), intoPathDescriptor);
+ }
+
+ @Override
+ public String toString() {
+ return "IntoNode-" + getPlanNodeId();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java
index 9d3badac0e..ae03e13f70 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java
@@ -24,8 +24,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import com.google.common.collect.ImmutableList;
-
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -33,54 +31,29 @@ import java.util.List;
import java.util.Objects;
/** LimitNode is used to select top n result. It uses the default order of upstream nodes */
-public class LimitNode extends ProcessNode {
+public class LimitNode extends SingleChildProcessNode {
private final int limit;
- private PlanNode child;
-
public LimitNode(PlanNodeId id, int limit) {
super(id);
this.limit = limit;
}
public LimitNode(PlanNodeId id, PlanNode child, int limit) {
- this(id, limit);
- this.child = child;
+ super(id, child);
+ this.limit = limit;
}
public int getLimit() {
return limit;
}
- public PlanNode getChild() {
- return child;
- }
-
- public void setChild(PlanNode child) {
- this.child = child;
- }
-
- @Override
- public List<PlanNode> getChildren() {
- return ImmutableList.of(child);
- }
-
- @Override
- public void addChild(PlanNode child) {
- this.child = child;
- }
-
@Override
public PlanNode clone() {
return new LimitNode(getPlanNodeId(), this.limit);
}
- @Override
- public int allowedChildCount() {
- return ONE_CHILD;
- }
-
@Override
public List<String> getOutputColumnNames() {
return child.getOutputColumnNames();
@@ -126,11 +99,11 @@ public class LimitNode extends ProcessNode {
return false;
}
LimitNode that = (LimitNode) o;
- return limit == that.limit && child.equals(that.child);
+ return limit == that.limit;
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), limit, child);
+ return Objects.hash(super.hashCode(), limit);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildProcessNode.java
similarity index 77%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildProcessNode.java
index 9e699f370c..e23a86d0c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildProcessNode.java
@@ -26,16 +26,16 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
-public abstract class MultiChildNode extends ProcessNode {
+public abstract class MultiChildProcessNode extends ProcessNode {
protected List<PlanNode> children;
- public MultiChildNode(PlanNodeId id, List<PlanNode> children) {
+ public MultiChildProcessNode(PlanNodeId id, List<PlanNode> children) {
super(id);
this.children = children;
}
- public MultiChildNode(PlanNodeId id) {
+ public MultiChildProcessNode(PlanNodeId id) {
super(id);
this.children = new ArrayList<>();
}
@@ -44,6 +44,21 @@ public abstract class MultiChildNode extends ProcessNode {
this.children = children;
}
+ @Override
+ public List<PlanNode> getChildren() {
+ return children;
+ }
+
+ @Override
+ public void addChild(PlanNode child) {
+ this.children.add(child);
+ }
+
+ @Override
+ public int allowedChildCount() {
+ return CHILD_COUNT_NO_LIMIT;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -55,7 +70,7 @@ public abstract class MultiChildNode extends ProcessNode {
if (!super.equals(o)) {
return false;
}
- MultiChildNode that = (MultiChildNode) o;
+ MultiChildProcessNode that = (MultiChildProcessNode) o;
return children.equals(that.children);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java
index 0407f6d946..64b912303b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java
@@ -24,8 +24,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import com.google.common.collect.ImmutableList;
-
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -36,35 +34,18 @@ import java.util.Objects;
* OffsetNode is used to skip top n result from upstream nodes. It uses the default order of
* upstream nodes
*/
-public class OffsetNode extends ProcessNode {
+public class OffsetNode extends SingleChildProcessNode {
private final int offset;
- private PlanNode child;
-
public OffsetNode(PlanNodeId id, int offset) {
super(id);
this.offset = offset;
}
public OffsetNode(PlanNodeId id, PlanNode child, int offset) {
- this(id, offset);
- this.child = child;
- }
-
- @Override
- public List<PlanNode> getChildren() {
- return ImmutableList.of(child);
- }
-
- @Override
- public void addChild(PlanNode child) {
- this.child = child;
- }
-
- @Override
- public int allowedChildCount() {
- return ONE_CHILD;
+ super(id, child);
+ this.offset = offset;
}
@Override
@@ -100,10 +81,6 @@ public class OffsetNode extends ProcessNode {
return new OffsetNode(planNodeId, offset);
}
- public PlanNode getChild() {
- return child;
- }
-
public int getOffset() {
return offset;
}
@@ -116,12 +93,15 @@ public class OffsetNode extends ProcessNode {
if (o == null || getClass() != o.getClass()) {
return false;
}
+ if (!super.equals(o)) {
+ return false;
+ }
OffsetNode that = (OffsetNode) o;
- return offset == that.offset && child.equals(that.child);
+ return offset == that.offset;
}
@Override
public int hashCode() {
- return Objects.hash(child, offset);
+ return Objects.hash(super.hashCode(), offset);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ProjectNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ProjectNode.java
index 3a9b5a7268..d5e71b6257 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ProjectNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ProjectNode.java
@@ -25,8 +25,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import com.google.common.collect.ImmutableList;
-
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -34,38 +32,20 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
-public class ProjectNode extends ProcessNode {
+public class ProjectNode extends SingleChildProcessNode {
private final List<String> outputColumnNames;
- private PlanNode child;
-
public ProjectNode(PlanNodeId id, List<String> outputColumnNames) {
super(id);
this.outputColumnNames = outputColumnNames;
}
public ProjectNode(PlanNodeId id, PlanNode child, List<String> outputColumnNames) {
- super(id);
- this.child = child;
+ super(id, child);
this.outputColumnNames = outputColumnNames;
}
- @Override
- public List<PlanNode> getChildren() {
- return ImmutableList.of(child);
- }
-
- @Override
- public void addChild(PlanNode child) {
- this.child = child;
- }
-
- @Override
- public int allowedChildCount() {
- return ONE_CHILD;
- }
-
@Override
public PlanNode clone() {
return new ProjectNode(getPlanNodeId(), getOutputColumnNames());
@@ -112,15 +92,21 @@ public class ProjectNode extends ProcessNode {
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- if (!super.equals(o)) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
ProjectNode that = (ProjectNode) o;
- return outputColumnNames.equals(that.outputColumnNames) && child.equals(that.child);
+ return outputColumnNames.equals(that.outputColumnNames);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), outputColumnNames, child);
+ return Objects.hash(super.hashCode(), outputColumnNames);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SingleChildProcessNode.java
similarity index 59%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SingleChildProcessNode.java
index 9e699f370c..bd183cf3ee 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SingleChildProcessNode.java
@@ -22,26 +22,52 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.process;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import java.util.ArrayList;
+import com.google.common.collect.ImmutableList;
+
import java.util.List;
import java.util.Objects;
-public abstract class MultiChildNode extends ProcessNode {
+public abstract class SingleChildProcessNode extends ProcessNode {
- protected List<PlanNode> children;
+ protected PlanNode child;
- public MultiChildNode(PlanNodeId id, List<PlanNode> children) {
+ public SingleChildProcessNode(PlanNodeId id) {
super(id);
- this.children = children;
}
- public MultiChildNode(PlanNodeId id) {
+ public SingleChildProcessNode(PlanNodeId id, PlanNode child) {
super(id);
- this.children = new ArrayList<>();
+ this.child = child;
+ }
+
+ public PlanNode getChild() {
+ return child;
+ }
+
+ public void setChild(PlanNode child) {
+ this.child = child;
+ }
+
+ public void cleanChildren() {
+ this.child = null;
}
- public void setChildren(List<PlanNode> children) {
- this.children = children;
+ @Override
+ public List<PlanNode> getChildren() {
+ if (this.child == null) {
+ return ImmutableList.of();
+ }
+ return ImmutableList.of(child);
+ }
+
+ @Override
+ public void addChild(PlanNode child) {
+ this.child = child;
+ }
+
+ @Override
+ public int allowedChildCount() {
+ return ONE_CHILD;
}
@Override
@@ -55,12 +81,12 @@ public abstract class MultiChildNode extends ProcessNode {
if (!super.equals(o)) {
return false;
}
- MultiChildNode that = (MultiChildNode) o;
- return children.equals(that.children);
+ SingleChildProcessNode that = (SingleChildProcessNode) o;
+ return Objects.equals(child, that.child);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), children);
+ return Objects.hash(super.hashCode(), child);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SlidingWindowAggregationNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SlidingWindowAggregationNode.java
index a2f4935791..49d470a780 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SlidingWindowAggregationNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SlidingWindowAggregationNode.java
@@ -28,8 +28,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import com.google.common.collect.ImmutableList;
-
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -38,7 +36,7 @@ import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
-public class SlidingWindowAggregationNode extends ProcessNode {
+public class SlidingWindowAggregationNode extends SingleChildProcessNode {
// The list of aggregate functions, each AggregateDescriptor will be output as one column of
// result TsBlock
@@ -49,8 +47,6 @@ public class SlidingWindowAggregationNode extends ProcessNode {
protected Ordering scanOrder;
- private PlanNode child;
-
public SlidingWindowAggregationNode(
PlanNodeId id,
List<AggregationDescriptor> aggregationDescriptorList,
@@ -68,8 +64,10 @@ public class SlidingWindowAggregationNode extends ProcessNode {
List<AggregationDescriptor> aggregationDescriptorList,
GroupByTimeParameter groupByTimeParameter,
Ordering scanOrder) {
- this(id, aggregationDescriptorList, groupByTimeParameter, scanOrder);
- this.child = child;
+ super(id, child);
+ this.aggregationDescriptorList = aggregationDescriptorList;
+ this.groupByTimeParameter = groupByTimeParameter;
+ this.scanOrder = scanOrder;
}
public List<AggregationDescriptor> getAggregationDescriptorList() {
@@ -88,25 +86,6 @@ public class SlidingWindowAggregationNode extends ProcessNode {
return scanOrder;
}
- public PlanNode getChild() {
- return child;
- }
-
- @Override
- public List<PlanNode> getChildren() {
- return ImmutableList.of(child);
- }
-
- @Override
- public void addChild(PlanNode child) {
- this.child = child;
- }
-
- @Override
- public int allowedChildCount() {
- return ONE_CHILD;
- }
-
@Override
public PlanNode clone() {
return new SlidingWindowAggregationNode(
@@ -189,13 +168,12 @@ public class SlidingWindowAggregationNode extends ProcessNode {
}
SlidingWindowAggregationNode that = (SlidingWindowAggregationNode) o;
return Objects.equals(aggregationDescriptorList, that.aggregationDescriptorList)
- && Objects.equals(groupByTimeParameter, that.groupByTimeParameter)
- && Objects.equals(child, that.child);
+ && Objects.equals(groupByTimeParameter, that.groupByTimeParameter);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), aggregationDescriptorList, groupByTimeParameter, child);
+ return Objects.hash(super.hashCode(), aggregationDescriptorList, groupByTimeParameter);
}
public String toString() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SortNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SortNode.java
index 283987eb97..e033fd163d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SortNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SortNode.java
@@ -25,8 +25,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import com.google.common.collect.ImmutableList;
-
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -37,9 +35,7 @@ import java.util.Objects;
* In general, the parameter in sortNode should be pushed down to the upstream operators. In our
* optimized logical query plan, the sortNode should not appear.
*/
-public class SortNode extends ProcessNode {
-
- private PlanNode child;
+public class SortNode extends SingleChildProcessNode {
private final Ordering sortOrder;
@@ -49,29 +45,14 @@ public class SortNode extends ProcessNode {
}
public SortNode(PlanNodeId id, PlanNode child, Ordering sortOrder) {
- this(id, sortOrder);
- this.child = child;
+ super(id, child);
+ this.sortOrder = sortOrder;
}
public Ordering getSortOrder() {
return sortOrder;
}
- @Override
- public List<PlanNode> getChildren() {
- return ImmutableList.of(child);
- }
-
- @Override
- public void addChild(PlanNode child) {
- this.child = child;
- }
-
- @Override
- public int allowedChildCount() {
- return ONE_CHILD;
- }
-
@Override
public PlanNode clone() {
return new SortNode(getPlanNodeId(), sortOrder);
@@ -117,11 +98,11 @@ public class SortNode extends ProcessNode {
return false;
}
SortNode sortNode = (SortNode) o;
- return child.equals(sortNode.child) && sortOrder == sortNode.sortOrder;
+ return sortOrder == sortNode.sortOrder;
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), child, sortOrder);
+ return Objects.hash(super.hashCode(), sortOrder);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java
index 5685e6d95b..2598cd4e28 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java
@@ -38,7 +38,7 @@ import java.util.stream.Collectors;
* timestamp column. It will join two or more TsBlock by Timestamp column. The output result of
* TimeJoinOperator is sorted by timestamp
*/
-public class TimeJoinNode extends MultiChildNode {
+public class TimeJoinNode extends MultiChildProcessNode {
// This parameter indicates the order when executing multiway merge sort.
private final Ordering mergeOrder;
@@ -57,21 +57,6 @@ public class TimeJoinNode extends MultiChildNode {
return mergeOrder;
}
- @Override
- public List<PlanNode> getChildren() {
- return children;
- }
-
- @Override
- public void addChild(PlanNode child) {
- this.children.add(child);
- }
-
- @Override
- public int allowedChildCount() {
- return CHILD_COUNT_NO_LIMIT;
- }
-
@Override
public PlanNode clone() {
return new TimeJoinNode(getPlanNodeId(), getMergeOrder());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TransformNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TransformNode.java
index f6233e2e9d..112e53a01b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TransformNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TransformNode.java
@@ -27,8 +27,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import com.google.common.collect.ImmutableList;
-
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -38,9 +36,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Objects;
-public class TransformNode extends ProcessNode {
-
- protected PlanNode childPlanNode;
+public class TransformNode extends SingleChildProcessNode {
protected final Expression[] outputExpressions;
protected final boolean keepNull;
@@ -52,13 +48,12 @@ public class TransformNode extends ProcessNode {
public TransformNode(
PlanNodeId id,
- PlanNode childPlanNode,
+ PlanNode child,
Expression[] outputExpressions,
boolean keepNull,
ZoneId zoneId,
Ordering scanOrder) {
- super(id);
- this.childPlanNode = childPlanNode;
+ super(id, child);
this.outputExpressions = outputExpressions;
this.keepNull = keepNull;
this.zoneId = zoneId;
@@ -78,21 +73,6 @@ public class TransformNode extends ProcessNode {
this.scanOrder = scanOrder;
}
- @Override
- public final List<PlanNode> getChildren() {
- return ImmutableList.of(childPlanNode);
- }
-
- @Override
- public final void addChild(PlanNode childPlanNode) {
- this.childPlanNode = childPlanNode;
- }
-
- @Override
- public final int allowedChildCount() {
- return ONE_CHILD;
- }
-
@Override
public final List<String> getOutputColumnNames() {
if (outputColumnNames == null) {
@@ -185,7 +165,6 @@ public class TransformNode extends ProcessNode {
}
TransformNode that = (TransformNode) o;
return keepNull == that.keepNull
- && childPlanNode.equals(that.childPlanNode)
&& Arrays.equals(outputExpressions, that.outputExpressions)
&& zoneId.equals(that.zoneId)
&& scanOrder == that.scanOrder;
@@ -193,7 +172,7 @@ public class TransformNode extends ProcessNode {
@Override
public int hashCode() {
- int result = Objects.hash(super.hashCode(), childPlanNode, keepNull, zoneId, scanOrder);
+ int result = Objects.hash(super.hashCode(), keepNull, zoneId, scanOrder);
result = 31 * result + Arrays.hashCode(outputExpressions);
return result;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryCollectNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryCollectNode.java
index a81a5fb455..5f38f64a0a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryCollectNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryCollectNode.java
@@ -22,7 +22,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -32,7 +32,7 @@ import java.util.Objects;
import static org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS;
-public class LastQueryCollectNode extends MultiChildNode {
+public class LastQueryCollectNode extends MultiChildProcessNode {
public LastQueryCollectNode(PlanNodeId id) {
super(id);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryMergeNode.java
index 482fc0110d..accb4dcbd7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryMergeNode.java
@@ -22,7 +22,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
import java.io.DataOutputStream;
@@ -33,7 +33,7 @@ import java.util.Objects;
import static org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS;
-public class LastQueryMergeNode extends MultiChildNode {
+public class LastQueryMergeNode extends MultiChildProcessNode {
// The result output order, which could sort by sensor and time.
// The size of this list is 2 and the first SortItem in this list has higher priority.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryNode.java
index 23b40ca829..b8e1cf422f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryNode.java
@@ -22,7 +22,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
@@ -38,7 +38,7 @@ import java.util.Objects;
import static org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS;
-public class LastQueryNode extends MultiChildNode {
+public class LastQueryNode extends MultiChildProcessNode {
private final Filter timeFilter;