You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/29 04:11:04 UTC

[iotdb] branch master updated: Add outputColumnNames in ExchangeNode (#5730)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new be72769252 Add outputColumnNames in ExchangeNode (#5730)
be72769252 is described below

commit be72769252038fc4bd31e0e7b678ba2f8d087e5e
Author: liuminghui233 <36...@users.noreply.github.com>
AuthorDate: Fri Apr 29 12:10:59 2022 +0800

    Add outputColumnNames in ExchangeNode (#5730)
---
 .../plan/node/metedata/read/SchemaFetchNode.java   |  4 +++-
 .../planner/plan/node/process/ExchangeNode.java    | 28 ++++++++++++++++------
 .../planner/plan/node/sink/FragmentSinkNode.java   |  2 +-
 3 files changed, 25 insertions(+), 9 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaFetchNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaFetchNode.java
index abb73d66b3..afe13299a1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaFetchNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaFetchNode.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 
+import com.google.common.collect.ImmutableList;
+
 import java.nio.ByteBuffer;
 import java.util.List;
 
@@ -48,7 +50,7 @@ public class SchemaFetchNode extends SchemaScanNode {
 
   @Override
   public List<String> getOutputColumnNames() {
-    return null;
+    return ImmutableList.of();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index fadb9aed82..de749b6188 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import com.google.common.collect.ImmutableList;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
@@ -47,6 +48,8 @@ public class ExchangeNode extends PlanNode {
   private FragmentInstanceId upstreamInstanceId;
   private PlanNodeId upstreamPlanNodeId;
 
+  private List<String> outputColumnNames;
+
   public ExchangeNode(PlanNodeId id) {
     super(id);
   }
@@ -87,7 +90,11 @@ public class ExchangeNode extends PlanNode {
 
   @Override
   public List<String> getOutputColumnNames() {
-    return child.getOutputColumnNames();
+    return outputColumnNames;
+  }
+
+  public void setOutputColumnNames(List<String> outputColumnNames) {
+    this.outputColumnNames = outputColumnNames;
   }
 
   public void setUpstream(TEndPoint endPoint, FragmentInstanceId instanceId, PlanNodeId nodeId) {
@@ -102,9 +109,16 @@ public class ExchangeNode extends PlanNode {
             ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readInt(byteBuffer));
     FragmentInstanceId fragmentInstanceId = FragmentInstanceId.deserialize(byteBuffer);
     PlanNodeId upstreamPlanNodeId = PlanNodeId.deserialize(byteBuffer);
+    int outputColumnNamesSize = ReadWriteIOUtils.readInt(byteBuffer);
+    List<String> outputColumnNames = new ArrayList<>(outputColumnNamesSize);
+    while (outputColumnNamesSize > 0) {
+      outputColumnNames.add(ReadWriteIOUtils.readString(byteBuffer));
+      outputColumnNamesSize--;
+    }
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
     ExchangeNode exchangeNode = new ExchangeNode(planNodeId);
     exchangeNode.setUpstream(endPoint, fragmentInstanceId, upstreamPlanNodeId);
+    exchangeNode.setOutputColumnNames(outputColumnNames);
     return exchangeNode;
   }
 
@@ -115,6 +129,11 @@ public class ExchangeNode extends PlanNode {
     ReadWriteIOUtils.write(upstreamEndpoint.getPort(), byteBuffer);
     upstreamInstanceId.serialize(byteBuffer);
     upstreamPlanNodeId.serialize(byteBuffer);
+    List<String> outputColumnNames = remoteSourceNode.getOutputColumnNames();
+    ReadWriteIOUtils.write(outputColumnNames.size(), byteBuffer);
+    for (String outputColumnName : outputColumnNames) {
+      ReadWriteIOUtils.write(outputColumnName, byteBuffer);
+    }
   }
 
   public PlanNode getChild() {
@@ -185,11 +204,6 @@ public class ExchangeNode extends PlanNode {
   @Override
   public int hashCode() {
     return Objects.hash(
-        super.hashCode(),
-        child,
-        remoteSourceNode,
-        upstreamEndpoint,
-        upstreamInstanceId,
-        upstreamPlanNodeId);
+        super.hashCode(), child, upstreamEndpoint, upstreamInstanceId, upstreamPlanNodeId);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
index 86dccdf638..14871c8536 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
@@ -85,7 +85,7 @@ public class FragmentSinkNode extends SinkNode {
 
   @Override
   public List<String> getOutputColumnNames() {
-    return null;
+    return child.getOutputColumnNames();
   }
 
   public static FragmentSinkNode deserialize(ByteBuffer byteBuffer) {