You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/07 08:15:15 UTC

[iotdb] branch xingtanzjr/decouple_circular_dep created (now 13c5952206)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch xingtanzjr/decouple_circular_dep
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 13c5952206 decouple the circular dependency between FragmentSinkNode and ExchagneNode

This branch includes the following new commits:

     new 13c5952206 decouple the circular dependency between FragmentSinkNode and ExchagneNode

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: decouple the circular dependency between FragmentSinkNode and ExchagneNode

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/decouple_circular_dep
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 13c5952206d15da17d8439bbd7b76cb8cc583168
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu Apr 7 16:13:47 2022 +0800

    decouple the circular dependency between FragmentSinkNode and ExchagneNode
---
 .../iotdb/db/mpp/sql/planner/DistributionPlanner.java     |  2 +-
 .../iotdb/db/mpp/sql/planner/plan/FragmentInstance.java   |  2 +-
 .../sql/planner/plan/SimpleFragmentParallelPlanner.java   |  2 +-
 .../mpp/sql/planner/plan/node/process/ExchangeNode.java   |  2 +-
 .../mpp/sql/planner/plan/node/sink/FragmentSinkNode.java  | 15 ++++-----------
 5 files changed, 8 insertions(+), 15 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index 06f96f632f..adf567d961 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -299,7 +299,7 @@ public class DistributionPlanner {
         ExchangeNode exchangeNode = (ExchangeNode) root;
         FragmentSinkNode sinkNode = new FragmentSinkNode(PlanNodeIdAllocator.generateId());
         sinkNode.setChild(exchangeNode.getChild());
-        sinkNode.setDownStreamNode(exchangeNode);
+        sinkNode.setDownStreamPlanNodeId(exchangeNode.getId());
         // Record the source node info in the ExchangeNode so that we can keep the connection of
         // these nodes/fragments
         exchangeNode.setRemoteSourceNode(sinkNode);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index a8a465cd5b..d9c45bc895 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -85,7 +85,7 @@ public class FragmentInstance implements IConsensusRequest {
       FragmentSinkNode sink = (FragmentSinkNode) root;
       return String.format(
           "(%s, %s, %s)",
-          sink.getDownStreamEndpoint(), sink.getDownStreamInstanceId(), sink.getDownStreamNode());
+          sink.getDownStreamEndpoint(), sink.getDownStreamInstanceId(), sink.getDownStreamPlanNodeId());
     }
     return "<No downstream>";
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
index 8e8b26be57..fdedd49eda 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
@@ -98,7 +98,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
       if (rootNode instanceof FragmentSinkNode) {
         // Set target Endpoint for FragmentSinkNode
         FragmentSinkNode sinkNode = (FragmentSinkNode) rootNode;
-        PlanNodeId downStreamNodeId = sinkNode.getDownStreamNode().getId();
+        PlanNodeId downStreamNodeId = sinkNode.getDownStreamPlanNodeId();
         FragmentInstance downStreamInstance = findDownStreamInstance(downStreamNodeId);
         sinkNode.setDownStream(
             downStreamInstance.getHostEndpoint(), downStreamInstance.getId(), downStreamNodeId);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index 48c8954975..4aa54b84da 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -65,7 +65,7 @@ public class ExchangeNode extends PlanNode {
     ExchangeNode node = new ExchangeNode(getId());
     if (remoteSourceNode != null) {
       FragmentSinkNode remoteSourceNodeClone = (FragmentSinkNode) remoteSourceNode.clone();
-      remoteSourceNodeClone.setDownStreamNode(node);
+      remoteSourceNodeClone.setDownStreamPlanNodeId(node.getId());
       node.setRemoteSourceNode(remoteSourceNode);
     }
     return node;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
index 94f2d1d3b8..da6833d245 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
 
 import com.google.common.collect.ImmutableList;
 
@@ -31,7 +30,6 @@ import java.util.List;
 
 public class FragmentSinkNode extends SinkNode {
   private PlanNode child;
-  private ExchangeNode downStreamNode;
 
   private Endpoint downStreamEndpoint;
   private FragmentInstanceId downStreamInstanceId;
@@ -55,7 +53,6 @@ public class FragmentSinkNode extends SinkNode {
   public PlanNode clone() {
     FragmentSinkNode sinkNode = new FragmentSinkNode(getId());
     sinkNode.setDownStream(downStreamEndpoint, downStreamInstanceId, downStreamPlanNodeId);
-    sinkNode.setDownStreamNode(downStreamNode);
     return sinkNode;
   }
 
@@ -103,20 +100,16 @@ public class FragmentSinkNode extends SinkNode {
         getDownStreamEndpoint().getIp(), getDownStreamInstanceId(), getDownStreamPlanNodeId());
   }
 
-  public ExchangeNode getDownStreamNode() {
-    return downStreamNode;
-  }
-
-  public void setDownStreamNode(ExchangeNode downStreamNode) {
-    this.downStreamNode = downStreamNode;
-  }
-
   public void setDownStream(Endpoint endPoint, FragmentInstanceId instanceId, PlanNodeId nodeId) {
     this.downStreamEndpoint = endPoint;
     this.downStreamInstanceId = instanceId;
     this.downStreamPlanNodeId = nodeId;
   }
 
+  public void setDownStreamPlanNodeId(PlanNodeId downStreamPlanNodeId) {
+    this.downStreamPlanNodeId = downStreamPlanNodeId;
+  }
+
   public Endpoint getDownStreamEndpoint() {
     return downStreamEndpoint;
   }