You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/22 12:27:02 UTC
[iotdb] 01/01: Fix TimeJoinNode clone and serde bug
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch TimeJoinNode
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 497340dea9febed7e94f500098b56b41be2f5844
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri Apr 22 20:26:47 2022 +0800
Fix TimeJoinNode clone and serde bug
---
.../db/mpp/sql/planner/DistributionPlanner.java | 4 ++++
.../planner/plan/node/process/TimeJoinNode.java | 25 ++++++++++++++++------
.../planner/plan/node/source/SeriesScanNode.java | 6 +++++-
3 files changed, 27 insertions(+), 8 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index 67c8c5f740..22b571ac3b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -188,6 +188,7 @@ public class DistributionPlanner {
split.setRegionReplicaSet(dataRegion);
timeJoinNode.addChild(split);
}
+ timeJoinNode.initOutputColumns();
return timeJoinNode;
}
@@ -253,6 +254,7 @@ public class DistributionPlanner {
}
});
+ root.initOutputColumns();
return root;
}
@@ -373,6 +375,7 @@ public class DistributionPlanner {
// If the distributionType of all the children are same, no ExchangeNode need to be added.
if (distributionType == NodeDistributionType.SAME_WITH_ALL_CHILDREN) {
newNode.setChildren(visitedChildren);
+ newNode.initOutputColumns();
return newNode;
}
@@ -389,6 +392,7 @@ public class DistributionPlanner {
newNode.addChild(child);
}
});
+ newNode.initOutputColumns();
return newNode;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
index b34dd68bda..87139cacc8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
@@ -61,7 +61,7 @@ public class TimeJoinNode extends ProcessNode {
private List<OutputColumn> outputColumns = new ArrayList<>();
// column name and datatype of each output column
- private final List<ColumnHeader> outputColumnHeaders = new ArrayList<>();
+ private List<ColumnHeader> outputColumnHeaders = new ArrayList<>();
private List<PlanNode> children;
@@ -85,9 +85,10 @@ public class TimeJoinNode extends ProcessNode {
@Override
public PlanNode clone() {
// TODO: (xingtanzjr)
- TimeJoinNode cloneNode = new TimeJoinNode(getPlanNodeId(), this.mergeOrder);
- cloneNode.outputColumns = this.outputColumns;
- return cloneNode;
+ TimeJoinNode node = new TimeJoinNode(getPlanNodeId(), this.mergeOrder);
+ node.outputColumnHeaders = this.outputColumnHeaders;
+ node.outputColumns = this.outputColumns;
+ return node;
}
@Override
@@ -99,7 +100,9 @@ public class TimeJoinNode extends ProcessNode {
return outputColumns;
}
- private void initOutputColumns() {
+ public void initOutputColumns() {
+ outputColumns.clear();
+ outputColumnHeaders.clear();
for (int tsBlockIndex = 0; tsBlockIndex < children.size(); tsBlockIndex++) {
List<ColumnHeader> childColumnHeaders = children.get(tsBlockIndex).getOutputColumnHeaders();
for (int valueColumnIndex = 0;
@@ -140,7 +143,12 @@ public class TimeJoinNode extends ProcessNode {
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.TIME_JOIN.serialize(byteBuffer);
ReadWriteIOUtils.write(mergeOrder.ordinal(), byteBuffer);
- filterNullParameter.serialize(byteBuffer);
+ if (filterNullParameter == null) {
+ ReadWriteIOUtils.write(true, byteBuffer);
+ } else {
+ ReadWriteIOUtils.write(false, byteBuffer);
+ filterNullParameter.serialize(byteBuffer);
+ }
ReadWriteIOUtils.write(outputColumns.size(), byteBuffer);
for (OutputColumn outputColumn : outputColumns) {
outputColumn.serialize(byteBuffer);
@@ -153,7 +161,10 @@ public class TimeJoinNode extends ProcessNode {
public static TimeJoinNode deserialize(ByteBuffer byteBuffer) {
OrderBy orderBy = OrderBy.values()[ReadWriteIOUtils.readInt(byteBuffer)];
- FilterNullParameter filterNullParameter = FilterNullParameter.deserialize(byteBuffer);
+ FilterNullParameter filterNullParameter = null;
+ if (!ReadWriteIOUtils.readIsNull(byteBuffer)) {
+ filterNullParameter = FilterNullParameter.deserialize(byteBuffer);
+ }
int outputColumnSize = ReadWriteIOUtils.readInt(byteBuffer);
List<OutputColumn> outputColumns = new ArrayList<>(outputColumnSize);
for (int i = 0; i < outputColumnSize; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
index adc814e043..20251d9784 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
@@ -152,7 +152,11 @@ public class SeriesScanNode extends SourceNode {
@Override
public PlanNode clone() {
- return new SeriesScanNode(getPlanNodeId(), getSeriesPath(), this.regionReplicaSet);
+ SeriesScanNode seriesScanNode =
+ new SeriesScanNode(getPlanNodeId(), getSeriesPath(), this.regionReplicaSet);
+ seriesScanNode.allSensors = this.allSensors;
+ seriesScanNode.outputColumnHeader = this.outputColumnHeader;
+ return seriesScanNode;
}
@Override