You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/06 09:00:04 UTC
[iotdb] branch xingtanzjr/query_execution updated: add SetSinkForRootInstance for root fragment instance
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/query_execution
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/xingtanzjr/query_execution by this push:
new 10211972e6 add SetSinkForRootInstance for root fragment instance
10211972e6 is described below
commit 10211972e6681ef8715f964b3c99808c618c52e5
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Apr 6 16:59:54 2022 +0800
add SetSinkForRootInstance for root fragment instance
---
.../iotdb/db/mpp/common/MPPQueryContext.java | 23 +++++++++++++++-------
.../apache/iotdb/db/mpp/execution/Coordinator.java | 7 ++++---
.../db/mpp/sql/planner/DistributionPlanner.java | 3 ++-
.../db/mpp/sql/plan/DistributionPlannerTest.java | 4 ++--
.../iotdb/db/mpp/sql/plan/QueryPlannerTest.java | 3 ++-
5 files changed, 26 insertions(+), 14 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
index b58e94e91a..578cd8ed1e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.common;
+import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -30,19 +31,23 @@ public class MPPQueryContext {
private QueryId queryId;
private SessionInfo session;
private QueryType queryType;
- private String hostname;
+
+ private Endpoint hostEndpoint;
+ private FragmentInstanceId virtualFragmentInstanceId;
private PlanNodeId virtualResultNodeId;
public MPPQueryContext(QueryId queryId) {
this.queryId = queryId;
}
- public MPPQueryContext(String sql, QueryId queryId, SessionInfo session, QueryType type) {
+ public MPPQueryContext(String sql, QueryId queryId, SessionInfo session, QueryType type, Endpoint hostEndpoint) {
this.sql = sql;
this.queryId = queryId;
this.session = session;
this.queryType = type;
- this.virtualResultNodeId = getVirtualResultNodeId(queryId);
+ this.hostEndpoint = hostEndpoint;
+ this.virtualResultNodeId = queryId.genPlanNodeId();
+ this.virtualFragmentInstanceId = queryId.genPlanFragmentId().genFragmentInstanceId();
}
public QueryId getQueryId() {
@@ -53,11 +58,15 @@ public class MPPQueryContext {
return queryType;
}
- public String getHostname() {
- return hostname;
+ public Endpoint getHostEndpoint() {
+ return hostEndpoint;
+ }
+
+ public FragmentInstanceId getVirtualFragmentInstanceId() {
+ return virtualFragmentInstanceId;
}
- private PlanNodeId getVirtualResultNodeId(QueryId queryId) {
- return new PlanNodeId(String.format("%s_result_node", queryId.getId()));
+ public PlanNodeId getVirtualResultNodeId() {
+ return virtualResultNodeId;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index 1cd95a0202..ff5baca47f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.execution;
+import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.QueryId;
@@ -61,7 +62,7 @@ public class Coordinator {
Statement statement, QueryId queryId, QueryType queryType, SessionInfo session, String sql) {
QueryExecution execution =
- createQueryExecution(statement, new MPPQueryContext(sql, queryId, session, queryType));
+ createQueryExecution(statement, new MPPQueryContext(sql, queryId, session, queryType, getHostEndpoint()));
queryExecutionMap.put(queryId, execution);
execution.start();
@@ -81,9 +82,9 @@ public class Coordinator {
}
// Get the hostname of current coordinator
- private String getHostname() {
+ private Endpoint getHostEndpoint() {
// TODO: (xingtanzjr) how to get the hostname ?
- return "";
+ return new Endpoint();
}
public static Coordinator getInstance() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index 5b6a24bc4c..7be76b4673 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -71,6 +71,7 @@ public class DistributionPlanner {
PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
SubPlan subPlan = splitFragment(rootWithExchange);
List<FragmentInstance> fragmentInstances = planFragmentInstances(subPlan);
+ SetSinkForRootInstance(subPlan, fragmentInstances);
return new DistributedQueryPlan(
logicalPlan.getContext(), subPlan, subPlan.getPlanFragmentList(), fragmentInstances);
}
@@ -96,9 +97,9 @@ public class DistributionPlanner {
return;
}
FragmentSinkNode sinkNode = new FragmentSinkNode(context.getQueryId().genPlanNodeId());
+ sinkNode.setDownStream(context.getHostEndpoint(), context.getVirtualFragmentInstanceId(), context.getVirtualResultNodeId());
sinkNode.setChild(rootInstance.getFragment().getRoot());
rootInstance.getFragment().setRoot(sinkNode);
- // sinkNode.setDownStream();
}
private PlanFragmentId getNextFragmentId() {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index ee53241aed..3668adb4dc 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -121,7 +121,7 @@ public class DistributionPlannerTest {
Analysis analysis = constructAnalysis();
- MPPQueryContext context = new MPPQueryContext("", queryId, null, QueryType.READ);
+ MPPQueryContext context = new MPPQueryContext("", queryId, null, QueryType.READ, new Endpoint());
DistributionPlanner planner =
new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
PlanNode rootAfterRewrite = planner.rewriteSource();
@@ -148,7 +148,7 @@ public class DistributionPlannerTest {
Analysis analysis = constructAnalysis();
- MPPQueryContext context = new MPPQueryContext("", queryId, null, QueryType.READ);
+ MPPQueryContext context = new MPPQueryContext("", queryId, null, QueryType.READ, new Endpoint());
DistributionPlanner planner =
new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
DistributedQueryPlan plan = planner.planFragments();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
index e38952fea3..48b31a2994 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.sql.plan;
+import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.QueryId;
@@ -48,7 +49,7 @@ public class QueryPlannerTest {
QueryExecution queryExecution =
new QueryExecution(
stmt,
- new MPPQueryContext(querySql, new QueryId("query1"), new SessionInfo(), QueryType.READ),
+ new MPPQueryContext(querySql, new QueryId("query1"), new SessionInfo(), QueryType.READ, new Endpoint()),
IoTDBThreadPoolFactory.newSingleThreadExecutor("Test-Query"),
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Test-Query-Scheduled"));
queryExecution.doLogicalPlan();