You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/03/28 11:11:46 UTC

[iotdb] branch xingtanzjr/polish_node updated (37e268c -> 59c9d25)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch xingtanzjr/polish_node
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


 discard 37e268c  fix the bug in distribution planner
     new 59c9d25  fix the bug in distribution planner

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (37e268c)
            \
             N -- N -- N   refs/heads/xingtanzjr/polish_node (59c9d25)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java   | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

[iotdb] 01/01: fix the bug in distribution planner

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/polish_node
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 59c9d25fcc7794942838ed2835f0c32a76bf9bc4
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Mar 28 17:44:29 2022 +0800

    fix the bug in distribution planner
---
 .../sql/planner/plan/SimpleFragmentParallelPlanner.java   |  3 ++-
 .../mpp/sql/planner/plan/node/process/ExchangeNode.java   |  4 +++-
 .../mpp/sql/planner/plan/node/sink/FragmentSinkNode.java  | 15 +++++++++++++--
 3 files changed, 18 insertions(+), 4 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
index 3603310..1be8d95 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
@@ -72,7 +72,8 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
     // one by one
     int instanceIdx = 0;
     PlanNode rootCopy = PlanNodeUtil.deepCopy(fragment.getRoot());
-    FragmentInstance fragmentInstance = new FragmentInstance(fragment, instanceIdx);
+    FragmentInstance fragmentInstance =
+        new FragmentInstance(new PlanFragment(fragment.getId(), rootCopy), instanceIdx);
 
     // Get the target DataRegion for origin PlanFragment, then its instance will be distributed one
     // of them.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index 80bbf41..0df6d36 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -60,7 +60,9 @@ public class ExchangeNode extends PlanNode {
   public PlanNode clone() {
     ExchangeNode node = new ExchangeNode(getId());
     if (remoteSourceNode != null) {
-      node.setRemoteSourceNode((FragmentSinkNode) remoteSourceNode.clone());
+      FragmentSinkNode remoteSourceNodeClone = (FragmentSinkNode) remoteSourceNode.clone();
+      remoteSourceNodeClone.setDownStreamNode(node);
+      node.setRemoteSourceNode(remoteSourceNode);
     }
     return node;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
index 3ab12c7..aeb129f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang.Validate;
 
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -51,12 +52,22 @@ public class FragmentSinkNode extends SinkNode {
 
   @Override
   public PlanNode clone() {
-    return null;
+    FragmentSinkNode sinkNode = new FragmentSinkNode(getId());
+    sinkNode.setDownStream(downStreamEndpoint, downStreamInstanceId, downStreamPlanNodeId);
+    sinkNode.setDownStreamNode(downStreamNode);
+    return sinkNode;
   }
 
   @Override
   public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return null;
+    Validate.isTrue(
+        children == null || children.size() == 1,
+        "Children size of FragmentSinkNode should be 0 or 1");
+    FragmentSinkNode sinkNode = (FragmentSinkNode) clone();
+    if (children != null) {
+      sinkNode.setChild(children.get(0));
+    }
+    return sinkNode;
   }
 
   @Override