You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/29 04:11:04 UTC
[iotdb] branch master updated: Add outputColumnNames in ExchangeNode (#5730)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new be72769252 Add outputColumnNames in ExchangeNode (#5730)
be72769252 is described below
commit be72769252038fc4bd31e0e7b678ba2f8d087e5e
Author: liuminghui233 <36...@users.noreply.github.com>
AuthorDate: Fri Apr 29 12:10:59 2022 +0800
Add outputColumnNames in ExchangeNode (#5730)
---
.../plan/node/metedata/read/SchemaFetchNode.java | 4 +++-
.../planner/plan/node/process/ExchangeNode.java | 28 ++++++++++++++++------
.../planner/plan/node/sink/FragmentSinkNode.java | 2 +-
3 files changed, 25 insertions(+), 9 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaFetchNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaFetchNode.java
index abb73d66b3..afe13299a1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaFetchNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaFetchNode.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import com.google.common.collect.ImmutableList;
+
import java.nio.ByteBuffer;
import java.util.List;
@@ -48,7 +50,7 @@ public class SchemaFetchNode extends SchemaScanNode {
@Override
public List<String> getOutputColumnNames() {
- return null;
+ return ImmutableList.of();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index fadb9aed82..de749b6188 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import com.google.common.collect.ImmutableList;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
@@ -47,6 +48,8 @@ public class ExchangeNode extends PlanNode {
private FragmentInstanceId upstreamInstanceId;
private PlanNodeId upstreamPlanNodeId;
+ private List<String> outputColumnNames;
+
public ExchangeNode(PlanNodeId id) {
super(id);
}
@@ -87,7 +90,11 @@ public class ExchangeNode extends PlanNode {
@Override
public List<String> getOutputColumnNames() {
- return child.getOutputColumnNames();
+ return outputColumnNames;
+ }
+
+ public void setOutputColumnNames(List<String> outputColumnNames) {
+ this.outputColumnNames = outputColumnNames;
}
public void setUpstream(TEndPoint endPoint, FragmentInstanceId instanceId, PlanNodeId nodeId) {
@@ -102,9 +109,16 @@ public class ExchangeNode extends PlanNode {
ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readInt(byteBuffer));
FragmentInstanceId fragmentInstanceId = FragmentInstanceId.deserialize(byteBuffer);
PlanNodeId upstreamPlanNodeId = PlanNodeId.deserialize(byteBuffer);
+ int outputColumnNamesSize = ReadWriteIOUtils.readInt(byteBuffer);
+ List<String> outputColumnNames = new ArrayList<>(outputColumnNamesSize);
+ while (outputColumnNamesSize > 0) {
+ outputColumnNames.add(ReadWriteIOUtils.readString(byteBuffer));
+ outputColumnNamesSize--;
+ }
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
ExchangeNode exchangeNode = new ExchangeNode(planNodeId);
exchangeNode.setUpstream(endPoint, fragmentInstanceId, upstreamPlanNodeId);
+ exchangeNode.setOutputColumnNames(outputColumnNames);
return exchangeNode;
}
@@ -115,6 +129,11 @@ public class ExchangeNode extends PlanNode {
ReadWriteIOUtils.write(upstreamEndpoint.getPort(), byteBuffer);
upstreamInstanceId.serialize(byteBuffer);
upstreamPlanNodeId.serialize(byteBuffer);
+ List<String> outputColumnNames = remoteSourceNode.getOutputColumnNames();
+ ReadWriteIOUtils.write(outputColumnNames.size(), byteBuffer);
+ for (String outputColumnName : outputColumnNames) {
+ ReadWriteIOUtils.write(outputColumnName, byteBuffer);
+ }
}
public PlanNode getChild() {
@@ -185,11 +204,6 @@ public class ExchangeNode extends PlanNode {
@Override
public int hashCode() {
return Objects.hash(
- super.hashCode(),
- child,
- remoteSourceNode,
- upstreamEndpoint,
- upstreamInstanceId,
- upstreamPlanNodeId);
+ super.hashCode(), child, upstreamEndpoint, upstreamInstanceId, upstreamPlanNodeId);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
index 86dccdf638..14871c8536 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
@@ -85,7 +85,7 @@ public class FragmentSinkNode extends SinkNode {
@Override
public List<String> getOutputColumnNames() {
- return null;
+ return child.getOutputColumnNames();
}
public static FragmentSinkNode deserialize(ByteBuffer byteBuffer) {