You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/06 09:56:15 UTC
[iotdb] branch xingtanzjr/query_execution updated: link ResultNode with QueryExecution
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/query_execution
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/xingtanzjr/query_execution by this push:
new 066d8d7776 link ResultNode with QueryExecution
066d8d7776 is described below
commit 066d8d77769c04cf8bc12b153d00f694fe120a72
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Apr 6 17:30:35 2022 +0800
link ResultNode with QueryExecution
---
.../resources/conf/iotdb-engine.properties | 3 ++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 ++++++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 4 ++
.../iotdb/db/mpp/common/MPPQueryContext.java | 18 +++------
.../iotdb/db/mpp/common/ResultNodeContext.java | 44 +++++++++++++++++++---
.../apache/iotdb/db/mpp/execution/Coordinator.java | 9 +++--
.../iotdb/db/mpp/execution/QueryExecution.java | 19 ++++++++++
.../db/mpp/sql/planner/DistributionPlanner.java | 9 ++++-
.../db/mpp/sql/plan/DistributionPlannerTest.java | 6 ++-
.../iotdb/db/mpp/sql/plan/QueryPlannerTest.java | 3 +-
10 files changed, 102 insertions(+), 24 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index d6598a5d14..46f41c8a7b 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -27,6 +27,9 @@ rpc_address=0.0.0.0
# Datatype: int
rpc_port=6667
+# Datatype: int
+mpp_port=7777
+
# Datatype: String
# used for communication between cluster nodes.
# if this parameter is commented, then the IP that binded by the hostname will be used.
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 580d8f7552..3fa169ada8 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -104,6 +104,9 @@ public class IoTDBConfig {
/** Port which the JDBC server listens to. */
private int rpcPort = 6667;
+ /** Port which is used for node communication in MPP. */
+ private int mppPort = 7777;
+
/** Port which the influxdb protocol server listens to. */
private int influxDBRpcPort = 8086;
@@ -2586,4 +2589,12 @@ public class IoTDBConfig {
public void setJoinClusterTimeOutMs(long joinClusterTimeOutMs) {
this.joinClusterTimeOutMs = joinClusterTimeOutMs;
}
+
+ public int getMppPort() {
+ return mppPort;
+ }
+
+ public void setMppPort(int mppPort) {
+ this.mppPort = mppPort;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index e384b5c2b1..4daa79ecd9 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -161,6 +161,10 @@ public class IoTDBDescriptor {
Integer.parseInt(
properties.getProperty("rpc_port", Integer.toString(conf.getRpcPort()))));
+ conf.setMppPort(
+ Integer.parseInt(
+ properties.getProperty("mpp_port", Integer.toString(conf.getRpcPort()))));
+
conf.setEnableInfluxDBRpcService(
Boolean.parseBoolean(
properties.getProperty(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
index 578cd8ed1e..def89dba39 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.mpp.common;
import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
/**
* This class is used to record the context of a query including QueryId, query statement, session
@@ -33,21 +32,20 @@ public class MPPQueryContext {
private QueryType queryType;
private Endpoint hostEndpoint;
- private FragmentInstanceId virtualFragmentInstanceId;
- private PlanNodeId virtualResultNodeId;
+ private ResultNodeContext resultNodeContext;
public MPPQueryContext(QueryId queryId) {
this.queryId = queryId;
}
- public MPPQueryContext(String sql, QueryId queryId, SessionInfo session, QueryType type, Endpoint hostEndpoint) {
+ public MPPQueryContext(
+ String sql, QueryId queryId, SessionInfo session, QueryType type, Endpoint hostEndpoint) {
this.sql = sql;
this.queryId = queryId;
this.session = session;
this.queryType = type;
this.hostEndpoint = hostEndpoint;
- this.virtualResultNodeId = queryId.genPlanNodeId();
- this.virtualFragmentInstanceId = queryId.genPlanFragmentId().genFragmentInstanceId();
+ this.resultNodeContext = new ResultNodeContext(queryId);
}
public QueryId getQueryId() {
@@ -62,11 +60,7 @@ public class MPPQueryContext {
return hostEndpoint;
}
- public FragmentInstanceId getVirtualFragmentInstanceId() {
- return virtualFragmentInstanceId;
- }
-
- public PlanNodeId getVirtualResultNodeId() {
- return virtualResultNodeId;
+ public ResultNodeContext getResultNodeContext() {
+ return resultNodeContext;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/ResultNodeContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/ResultNodeContext.java
index e40a4bac44..4c56646285 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/ResultNodeContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/ResultNodeContext.java
@@ -19,15 +19,49 @@
package org.apache.iotdb.db.mpp.common;
+import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
public class ResultNodeContext {
- private QueryId queryId;
- private FragmentInstanceId virtualResultInstanceId;
- private PlanNodeId virtualResultNodeId;
+
+ private final FragmentInstanceId virtualFragmentInstanceId;
+ private final PlanNodeId virtualResultNodeId;
+
+ private Endpoint upStreamEndpoint;
+ private FragmentInstanceId upStreamFragmentInstanceId;
+ private PlanNodeId upStreamPlanNodeId;
public ResultNodeContext(QueryId queryId) {
- this.queryId = queryId;
- // this.virtualResultInstanceId = new FragmentInstanceId(new PlanFragmentId(queryId, ))
+ this.virtualResultNodeId = queryId.genPlanNodeId();
+ this.virtualFragmentInstanceId = queryId.genPlanFragmentId().genFragmentInstanceId();
+ }
+
+ public void setUpStream(
+ Endpoint upStreamEndpoint,
+ FragmentInstanceId upStreamFragmentInstanceId,
+ PlanNodeId upStreamPlanNodeId) {
+ this.upStreamEndpoint = upStreamEndpoint;
+ this.upStreamFragmentInstanceId = upStreamFragmentInstanceId;
+ this.upStreamPlanNodeId = upStreamPlanNodeId;
+ }
+
+ public FragmentInstanceId getVirtualFragmentInstanceId() {
+ return virtualFragmentInstanceId;
+ }
+
+ public PlanNodeId getVirtualResultNodeId() {
+ return virtualResultNodeId;
+ }
+
+ public Endpoint getUpStreamEndpoint() {
+ return upStreamEndpoint;
+ }
+
+ public FragmentInstanceId getUpStreamFragmentInstanceId() {
+ return upStreamFragmentInstanceId;
+ }
+
+ public PlanNodeId getUpStreamPlanNodeId() {
+ return upStreamPlanNodeId;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index ff5baca47f..0c2bb8138a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.execution;
import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.common.SessionInfo;
@@ -62,7 +63,8 @@ public class Coordinator {
Statement statement, QueryId queryId, QueryType queryType, SessionInfo session, String sql) {
QueryExecution execution =
- createQueryExecution(statement, new MPPQueryContext(sql, queryId, session, queryType, getHostEndpoint()));
+ createQueryExecution(
+ statement, new MPPQueryContext(sql, queryId, session, queryType, getHostEndpoint()));
queryExecutionMap.put(queryId, execution);
execution.start();
@@ -83,8 +85,9 @@ public class Coordinator {
// Get the hostname of current coordinator
private Endpoint getHostEndpoint() {
- // TODO: (xingtanzjr) how to get the hostname ?
- return new Endpoint();
+ return new Endpoint(
+ IoTDBDescriptor.getInstance().getConfig().getRpcAddress(),
+ IoTDBDescriptor.getInstance().getConfig().getMppPort());
}
public static Coordinator getInstance() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 0e2bc6c240..c268092bf8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.execution;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.iotdb.db.mpp.buffer.ISourceHandle;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.execution.scheduler.ClusterScheduler;
@@ -78,6 +79,9 @@ public class QueryExecution {
this.analysis = analyze(statement, context);
this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
+ //TODO: (xingtanzjr) Initialize the result handle after the DataBlockManager is merged.
+// resultHandle = xxxx
+
// We add the abort logic inside the QueryExecution.
// So that the other components can only focus on the state change.
stateMachine.addStateChangeListener(
@@ -148,6 +152,21 @@ public class QueryExecution {
* implemented with DataStreamManager)
*/
public ByteBuffer getBatchResult() {
+ SettableFuture<Boolean> hasData = SettableFuture.create();
+ ListenableFuture<Void> blocked = resultHandle.isBlocked();
+ if (blocked.isDone()) {
+ hasData.set(true);
+ } else {
+ blocked.addListener(() -> {
+ hasData.set(true);
+ }, executor);
+ }
+ try {
+ hasData.get();
+ } catch (InterruptedException | ExecutionException e) {
+ e.printStackTrace();
+ }
+// return resultHandle.receive();
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index 7be76b4673..3b863a469f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -96,9 +96,16 @@ public class DistributionPlanner {
if (rootInstance == null) {
return;
}
+
FragmentSinkNode sinkNode = new FragmentSinkNode(context.getQueryId().genPlanNodeId());
- sinkNode.setDownStream(context.getHostEndpoint(), context.getVirtualFragmentInstanceId(), context.getVirtualResultNodeId());
+ sinkNode.setDownStream(
+ context.getHostEndpoint(),
+ context.getResultNodeContext().getVirtualFragmentInstanceId(),
+ context.getResultNodeContext().getVirtualResultNodeId());
sinkNode.setChild(rootInstance.getFragment().getRoot());
+ context
+ .getResultNodeContext()
+ .setUpStream(rootInstance.getHostEndpoint(), rootInstance.getId(), sinkNode.getId());
rootInstance.getFragment().setRoot(sinkNode);
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index 3668adb4dc..fb82050a84 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -121,7 +121,8 @@ public class DistributionPlannerTest {
Analysis analysis = constructAnalysis();
- MPPQueryContext context = new MPPQueryContext("", queryId, null, QueryType.READ, new Endpoint());
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, QueryType.READ, new Endpoint());
DistributionPlanner planner =
new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
PlanNode rootAfterRewrite = planner.rewriteSource();
@@ -148,7 +149,8 @@ public class DistributionPlannerTest {
Analysis analysis = constructAnalysis();
- MPPQueryContext context = new MPPQueryContext("", queryId, null, QueryType.READ, new Endpoint());
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, QueryType.READ, new Endpoint());
DistributionPlanner planner =
new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
DistributedQueryPlan plan = planner.planFragments();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
index 48b31a2994..d4e599723b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
@@ -49,7 +49,8 @@ public class QueryPlannerTest {
QueryExecution queryExecution =
new QueryExecution(
stmt,
- new MPPQueryContext(querySql, new QueryId("query1"), new SessionInfo(), QueryType.READ, new Endpoint()),
+ new MPPQueryContext(
+ querySql, new QueryId("query1"), new SessionInfo(), QueryType.READ, new Endpoint()),
IoTDBThreadPoolFactory.newSingleThreadExecutor("Test-Query"),
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Test-Query-Scheduled"));
queryExecution.doLogicalPlan();