You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/06 09:56:15 UTC

[iotdb] branch xingtanzjr/query_execution updated: link ResultNode with QueryExecution

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/query_execution
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/xingtanzjr/query_execution by this push:
     new 066d8d7776 link ResultNode with QueryExecution
066d8d7776 is described below

commit 066d8d77769c04cf8bc12b153d00f694fe120a72
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Apr 6 17:30:35 2022 +0800

    link ResultNode with QueryExecution
---
 .../resources/conf/iotdb-engine.properties         |  3 ++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 ++++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  4 ++
 .../iotdb/db/mpp/common/MPPQueryContext.java       | 18 +++------
 .../iotdb/db/mpp/common/ResultNodeContext.java     | 44 +++++++++++++++++++---
 .../apache/iotdb/db/mpp/execution/Coordinator.java |  9 +++--
 .../iotdb/db/mpp/execution/QueryExecution.java     | 19 ++++++++++
 .../db/mpp/sql/planner/DistributionPlanner.java    |  9 ++++-
 .../db/mpp/sql/plan/DistributionPlannerTest.java   |  6 ++-
 .../iotdb/db/mpp/sql/plan/QueryPlannerTest.java    |  3 +-
 10 files changed, 102 insertions(+), 24 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index d6598a5d14..46f41c8a7b 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -27,6 +27,9 @@ rpc_address=0.0.0.0
 # Datatype: int
 rpc_port=6667
 
+# Datatype: int
+mpp_port=7777
+
 # Datatype: String
 # used for communication between cluster nodes.
 # if this parameter is commented, then the IP that binded by the hostname will be used.
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 580d8f7552..3fa169ada8 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -104,6 +104,9 @@ public class IoTDBConfig {
   /** Port which the JDBC server listens to. */
   private int rpcPort = 6667;
 
+  /** Port which is used for node communication in MPP. */
+  private int mppPort = 7777;
+
   /** Port which the influxdb protocol server listens to. */
   private int influxDBRpcPort = 8086;
 
@@ -2586,4 +2589,12 @@ public class IoTDBConfig {
   public void setJoinClusterTimeOutMs(long joinClusterTimeOutMs) {
     this.joinClusterTimeOutMs = joinClusterTimeOutMs;
   }
+
+  public int getMppPort() {
+    return mppPort;
+  }
+
+  public void setMppPort(int mppPort) {
+    this.mppPort = mppPort;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index e384b5c2b1..4daa79ecd9 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -161,6 +161,10 @@ public class IoTDBDescriptor {
           Integer.parseInt(
               properties.getProperty("rpc_port", Integer.toString(conf.getRpcPort()))));
 
+      conf.setMppPort(
+          Integer.parseInt(
+              properties.getProperty("mpp_port", Integer.toString(conf.getRpcPort()))));
+
       conf.setEnableInfluxDBRpcService(
           Boolean.parseBoolean(
               properties.getProperty(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
index 578cd8ed1e..def89dba39 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.mpp.common;
 
 import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 
 /**
  * This class is used to record the context of a query including QueryId, query statement, session
@@ -33,21 +32,20 @@ public class MPPQueryContext {
   private QueryType queryType;
 
   private Endpoint hostEndpoint;
-  private FragmentInstanceId virtualFragmentInstanceId;
-  private PlanNodeId virtualResultNodeId;
+  private ResultNodeContext resultNodeContext;
 
   public MPPQueryContext(QueryId queryId) {
     this.queryId = queryId;
   }
 
-  public MPPQueryContext(String sql, QueryId queryId, SessionInfo session, QueryType type, Endpoint hostEndpoint) {
+  public MPPQueryContext(
+      String sql, QueryId queryId, SessionInfo session, QueryType type, Endpoint hostEndpoint) {
     this.sql = sql;
     this.queryId = queryId;
     this.session = session;
     this.queryType = type;
     this.hostEndpoint = hostEndpoint;
-    this.virtualResultNodeId = queryId.genPlanNodeId();
-    this.virtualFragmentInstanceId = queryId.genPlanFragmentId().genFragmentInstanceId();
+    this.resultNodeContext = new ResultNodeContext(queryId);
   }
 
   public QueryId getQueryId() {
@@ -62,11 +60,7 @@ public class MPPQueryContext {
     return hostEndpoint;
   }
 
-  public FragmentInstanceId getVirtualFragmentInstanceId() {
-    return virtualFragmentInstanceId;
-  }
-
-  public PlanNodeId getVirtualResultNodeId() {
-    return virtualResultNodeId;
+  public ResultNodeContext getResultNodeContext() {
+    return resultNodeContext;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/ResultNodeContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/ResultNodeContext.java
index e40a4bac44..4c56646285 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/ResultNodeContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/ResultNodeContext.java
@@ -19,15 +19,49 @@
 
 package org.apache.iotdb.db.mpp.common;
 
+import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 
 public class ResultNodeContext {
-  private QueryId queryId;
-  private FragmentInstanceId virtualResultInstanceId;
-  private PlanNodeId virtualResultNodeId;
+
+  private final FragmentInstanceId virtualFragmentInstanceId;
+  private final PlanNodeId virtualResultNodeId;
+
+  private Endpoint upStreamEndpoint;
+  private FragmentInstanceId upStreamFragmentInstanceId;
+  private PlanNodeId upStreamPlanNodeId;
 
   public ResultNodeContext(QueryId queryId) {
-    this.queryId = queryId;
-    //    this.virtualResultInstanceId = new FragmentInstanceId(new PlanFragmentId(queryId, ))
+    this.virtualResultNodeId = queryId.genPlanNodeId();
+    this.virtualFragmentInstanceId = queryId.genPlanFragmentId().genFragmentInstanceId();
+  }
+
+  public void setUpStream(
+      Endpoint upStreamEndpoint,
+      FragmentInstanceId upStreamFragmentInstanceId,
+      PlanNodeId upStreamPlanNodeId) {
+    this.upStreamEndpoint = upStreamEndpoint;
+    this.upStreamFragmentInstanceId = upStreamFragmentInstanceId;
+    this.upStreamPlanNodeId = upStreamPlanNodeId;
+  }
+
+  public FragmentInstanceId getVirtualFragmentInstanceId() {
+    return virtualFragmentInstanceId;
+  }
+
+  public PlanNodeId getVirtualResultNodeId() {
+    return virtualResultNodeId;
+  }
+
+  public Endpoint getUpStreamEndpoint() {
+    return upStreamEndpoint;
+  }
+
+  public FragmentInstanceId getUpStreamFragmentInstanceId() {
+    return upStreamFragmentInstanceId;
+  }
+
+  public PlanNodeId getUpStreamPlanNodeId() {
+    return upStreamPlanNodeId;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index ff5baca47f..0c2bb8138a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.execution;
 
 import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.common.SessionInfo;
@@ -62,7 +63,8 @@ public class Coordinator {
       Statement statement, QueryId queryId, QueryType queryType, SessionInfo session, String sql) {
 
     QueryExecution execution =
-        createQueryExecution(statement, new MPPQueryContext(sql, queryId, session, queryType, getHostEndpoint()));
+        createQueryExecution(
+            statement, new MPPQueryContext(sql, queryId, session, queryType, getHostEndpoint()));
     queryExecutionMap.put(queryId, execution);
 
     execution.start();
@@ -83,8 +85,9 @@ public class Coordinator {
 
   // Get the hostname of current coordinator
   private Endpoint getHostEndpoint() {
-    // TODO: (xingtanzjr) how to get the hostname ?
-    return new Endpoint();
+    return new Endpoint(
+        IoTDBDescriptor.getInstance().getConfig().getRpcAddress(),
+        IoTDBDescriptor.getInstance().getConfig().getMppPort());
   }
 
   public static Coordinator getInstance() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 0e2bc6c240..c268092bf8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.iotdb.db.mpp.buffer.ISourceHandle;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.execution.scheduler.ClusterScheduler;
@@ -78,6 +79,9 @@ public class QueryExecution {
     this.analysis = analyze(statement, context);
     this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
 
+    //TODO: (xingtanzjr) Initialize the result handle after the DataBlockManager is merged.
+//    resultHandle = xxxx
+
     // We add the abort logic inside the QueryExecution.
     // So that the other components can only focus on the state change.
     stateMachine.addStateChangeListener(
@@ -148,6 +152,21 @@ public class QueryExecution {
    * implemented with DataStreamManager)
    */
   public ByteBuffer getBatchResult() {
+    SettableFuture<Boolean> hasData = SettableFuture.create();
+    ListenableFuture<Void> blocked = resultHandle.isBlocked();
+    if (blocked.isDone()) {
+      hasData.set(true);
+    } else {
+      blocked.addListener(() -> {
+        hasData.set(true);
+      }, executor);
+    }
+    try {
+      hasData.get();
+    } catch (InterruptedException | ExecutionException e) {
+      e.printStackTrace();
+    }
+//    return resultHandle.receive();
     return null;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index 7be76b4673..3b863a469f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -96,9 +96,16 @@ public class DistributionPlanner {
     if (rootInstance == null) {
       return;
     }
+
     FragmentSinkNode sinkNode = new FragmentSinkNode(context.getQueryId().genPlanNodeId());
-    sinkNode.setDownStream(context.getHostEndpoint(), context.getVirtualFragmentInstanceId(), context.getVirtualResultNodeId());
+    sinkNode.setDownStream(
+        context.getHostEndpoint(),
+        context.getResultNodeContext().getVirtualFragmentInstanceId(),
+        context.getResultNodeContext().getVirtualResultNodeId());
     sinkNode.setChild(rootInstance.getFragment().getRoot());
+    context
+        .getResultNodeContext()
+        .setUpStream(rootInstance.getHostEndpoint(), rootInstance.getId(), sinkNode.getId());
     rootInstance.getFragment().setRoot(sinkNode);
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index 3668adb4dc..fb82050a84 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -121,7 +121,8 @@ public class DistributionPlannerTest {
 
     Analysis analysis = constructAnalysis();
 
-    MPPQueryContext context = new MPPQueryContext("", queryId, null, QueryType.READ, new Endpoint());
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, QueryType.READ, new Endpoint());
     DistributionPlanner planner =
         new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
     PlanNode rootAfterRewrite = planner.rewriteSource();
@@ -148,7 +149,8 @@ public class DistributionPlannerTest {
 
     Analysis analysis = constructAnalysis();
 
-    MPPQueryContext context = new MPPQueryContext("", queryId, null, QueryType.READ, new Endpoint());
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, QueryType.READ, new Endpoint());
     DistributionPlanner planner =
         new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
     DistributedQueryPlan plan = planner.planFragments();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
index 48b31a2994..d4e599723b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
@@ -49,7 +49,8 @@ public class QueryPlannerTest {
     QueryExecution queryExecution =
         new QueryExecution(
             stmt,
-            new MPPQueryContext(querySql, new QueryId("query1"), new SessionInfo(), QueryType.READ, new Endpoint()),
+            new MPPQueryContext(
+                querySql, new QueryId("query1"), new SessionInfo(), QueryType.READ, new Endpoint()),
             IoTDBThreadPoolFactory.newSingleThreadExecutor("Test-Query"),
             IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Test-Query-Scheduled"));
     queryExecution.doLogicalPlan();