You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/06 09:00:04 UTC

[iotdb] branch xingtanzjr/query_execution updated: add SetSinkForRootInstance for root fragment instance

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/query_execution
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/xingtanzjr/query_execution by this push:
     new 10211972e6 add SetSinkForRootInstance for root fragment instance
10211972e6 is described below

commit 10211972e6681ef8715f964b3c99808c618c52e5
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Apr 6 16:59:54 2022 +0800

    add SetSinkForRootInstance for root fragment instance
---
 .../iotdb/db/mpp/common/MPPQueryContext.java       | 23 +++++++++++++++-------
 .../apache/iotdb/db/mpp/execution/Coordinator.java |  7 ++++---
 .../db/mpp/sql/planner/DistributionPlanner.java    |  3 ++-
 .../db/mpp/sql/plan/DistributionPlannerTest.java   |  4 ++--
 .../iotdb/db/mpp/sql/plan/QueryPlannerTest.java    |  3 ++-
 5 files changed, 26 insertions(+), 14 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
index b58e94e91a..578cd8ed1e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.common;
 
+import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 
@@ -30,19 +31,23 @@ public class MPPQueryContext {
   private QueryId queryId;
   private SessionInfo session;
   private QueryType queryType;
-  private String hostname;
+
+  private Endpoint hostEndpoint;
+  private FragmentInstanceId virtualFragmentInstanceId;
   private PlanNodeId virtualResultNodeId;
 
   public MPPQueryContext(QueryId queryId) {
     this.queryId = queryId;
   }
 
-  public MPPQueryContext(String sql, QueryId queryId, SessionInfo session, QueryType type) {
+  public MPPQueryContext(String sql, QueryId queryId, SessionInfo session, QueryType type, Endpoint hostEndpoint) {
     this.sql = sql;
     this.queryId = queryId;
     this.session = session;
     this.queryType = type;
-    this.virtualResultNodeId = getVirtualResultNodeId(queryId);
+    this.hostEndpoint = hostEndpoint;
+    this.virtualResultNodeId = queryId.genPlanNodeId();
+    this.virtualFragmentInstanceId = queryId.genPlanFragmentId().genFragmentInstanceId();
   }
 
   public QueryId getQueryId() {
@@ -53,11 +58,15 @@ public class MPPQueryContext {
     return queryType;
   }
 
-  public String getHostname() {
-    return hostname;
+  public Endpoint getHostEndpoint() {
+    return hostEndpoint;
+  }
+
+  public FragmentInstanceId getVirtualFragmentInstanceId() {
+    return virtualFragmentInstanceId;
   }
 
-  private PlanNodeId getVirtualResultNodeId(QueryId queryId) {
-    return new PlanNodeId(String.format("%s_result_node", queryId.getId()));
+  public PlanNodeId getVirtualResultNodeId() {
+    return virtualResultNodeId;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index 1cd95a0202..ff5baca47f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
+import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
@@ -61,7 +62,7 @@ public class Coordinator {
       Statement statement, QueryId queryId, QueryType queryType, SessionInfo session, String sql) {
 
     QueryExecution execution =
-        createQueryExecution(statement, new MPPQueryContext(sql, queryId, session, queryType));
+        createQueryExecution(statement, new MPPQueryContext(sql, queryId, session, queryType, getHostEndpoint()));
     queryExecutionMap.put(queryId, execution);
 
     execution.start();
@@ -81,9 +82,9 @@ public class Coordinator {
   }
 
   // Get the hostname of current coordinator
-  private String getHostname() {
+  private Endpoint getHostEndpoint() {
     // TODO: (xingtanzjr) how to get the hostname ?
-    return "";
+    return new Endpoint();
   }
 
   public static Coordinator getInstance() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index 5b6a24bc4c..7be76b4673 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -71,6 +71,7 @@ public class DistributionPlanner {
     PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
     SubPlan subPlan = splitFragment(rootWithExchange);
     List<FragmentInstance> fragmentInstances = planFragmentInstances(subPlan);
+    SetSinkForRootInstance(subPlan, fragmentInstances);
     return new DistributedQueryPlan(
         logicalPlan.getContext(), subPlan, subPlan.getPlanFragmentList(), fragmentInstances);
   }
@@ -96,9 +97,9 @@ public class DistributionPlanner {
       return;
     }
     FragmentSinkNode sinkNode = new FragmentSinkNode(context.getQueryId().genPlanNodeId());
+    sinkNode.setDownStream(context.getHostEndpoint(), context.getVirtualFragmentInstanceId(), context.getVirtualResultNodeId());
     sinkNode.setChild(rootInstance.getFragment().getRoot());
     rootInstance.getFragment().setRoot(sinkNode);
-    //    sinkNode.setDownStream();
   }
 
   private PlanFragmentId getNextFragmentId() {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index ee53241aed..3668adb4dc 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -121,7 +121,7 @@ public class DistributionPlannerTest {
 
     Analysis analysis = constructAnalysis();
 
-    MPPQueryContext context = new MPPQueryContext("", queryId, null, QueryType.READ);
+    MPPQueryContext context = new MPPQueryContext("", queryId, null, QueryType.READ, new Endpoint());
     DistributionPlanner planner =
         new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
     PlanNode rootAfterRewrite = planner.rewriteSource();
@@ -148,7 +148,7 @@ public class DistributionPlannerTest {
 
     Analysis analysis = constructAnalysis();
 
-    MPPQueryContext context = new MPPQueryContext("", queryId, null, QueryType.READ);
+    MPPQueryContext context = new MPPQueryContext("", queryId, null, QueryType.READ, new Endpoint());
     DistributionPlanner planner =
         new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
     DistributedQueryPlan plan = planner.planFragments();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
index e38952fea3..48b31a2994 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.mpp.sql.plan;
 
+import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
@@ -48,7 +49,7 @@ public class QueryPlannerTest {
     QueryExecution queryExecution =
         new QueryExecution(
             stmt,
-            new MPPQueryContext(querySql, new QueryId("query1"), new SessionInfo(), QueryType.READ),
+            new MPPQueryContext(querySql, new QueryId("query1"), new SessionInfo(), QueryType.READ, new Endpoint()),
             IoTDBThreadPoolFactory.newSingleThreadExecutor("Test-Query"),
             IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Test-Query-Scheduled"));
     queryExecution.doLogicalPlan();