You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/03/24 09:34:05 UTC

[iotdb] 01/01: add IntervalService to mpp.thrift

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/add-internal-service
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e0b1e43362b7fdc584c5a846ef7097b372ddf698
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu Mar 24 17:15:47 2022 +0800

    add IntervalService to mpp.thrift
---
 .../iotdb/db/mpp/common/MPPQueryContext.java       |  6 ++-
 .../apache/iotdb/db/mpp/execution/Coordinator.java |  2 +-
 .../iotdb/db/mpp/execution/QueryExecution.java     |  1 +
 thrift/src/main/thrift/mpp.thrift                  | 62 ++++++++++++++++++++++
 4 files changed, 69 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
index 2a348d3..36ec40b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.db.mpp.common;
 
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+
 /**
  * This class is used to record the context of a query including QueryId, query statement, session
  * info and so on
@@ -26,13 +28,15 @@ public class MPPQueryContext {
   private String sql;
   private QueryId queryId;
   private SessionInfo session;
+  private QueryType queryType;
 
   public MPPQueryContext() {}
 
-  public MPPQueryContext(String sql, QueryId queryId, SessionInfo session) {
+  public MPPQueryContext(String sql, QueryId queryId, SessionInfo session, QueryType type) {
     this.sql = sql;
     this.queryId = queryId;
     this.session = session;
+    this.queryType = type;
   }
 
   public QueryId getQueryId() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index 65d92b4..b2025bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -51,7 +51,7 @@ public class Coordinator {
       Statement statement, QueryId queryId, QueryType queryType, SessionInfo session, String sql) {
 
     QueryExecution execution =
-        createQueryExecution(statement, new MPPQueryContext(sql, queryId, session));
+        createQueryExecution(statement, new MPPQueryContext(sql, queryId, session, queryType));
     queryExecutionMap.put(queryId, execution);
 
     execution.start();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 0b15ef1..1bb7570 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.mpp.execution.scheduler.ClusterScheduler;
 import org.apache.iotdb.db.mpp.execution.scheduler.IScheduler;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.analyze.Analyzer;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.optimization.PlanOptimizer;
 import org.apache.iotdb.db.mpp.sql.planner.DistributionPlanner;
 import org.apache.iotdb.db.mpp.sql.planner.LogicalPlanner;
diff --git a/thrift/src/main/thrift/mpp.thrift b/thrift/src/main/thrift/mpp.thrift
index 7c1f8bc..c385422 100644
--- a/thrift/src/main/thrift/mpp.thrift
+++ b/thrift/src/main/thrift/mpp.thrift
@@ -46,6 +46,68 @@ struct EndOfDataBlockEvent {
   2: required string operatorId
 }
 
+struct TFragmentInstance {
+  1: required binary body
+}
+
+struct TSendFragmentInstanceReq {
+  1: required TFragmentInstance fragmentInstance
+}
+
+struct TSendFragmentInstanceResp {
+  1: required bool accepted
+  2: optional string message
+}
+
+struct TFetchFragmentInstanceStateReq {
+  1: required FragmentInstanceId fragmentInstanceId
+}
+
+// TODO: Consider to use a simple string
+enum TFragmentInstanceState {
+  PLANNED,
+  RUNNING,
+  FLUSHING,
+  FINISHED,
+  CANCELED,
+  ABORTED,
+  FAILED;
+}
+
+// TODO: need to supply more fields according to implementation
+struct TFragmentInstanceStateResp {
+  1: required TFragmentInstanceState state
+}
+
+struct TCancelQueryReq {
+  1: required string queryId
+}
+
+struct TCancelPlanFragmentReq {
+  1: required string planFragmentId
+}
+
+struct TCancelFragmentInstanceReq {
+  1: required FragmentInstanceId fragmentInstanceId
+}
+
+struct TCancelResp {
+  1: required bool cancelled
+  2: optional string messsga
+}
+
+service InternalService {
+    TSendFragmentInstanceResp sendFragmentInstance(TSendFragmentInstanceReq req);
+
+    TFragmentInstanceStateResp fetchFragmentInstanceState(TFetchFragmentInstanceStateReq req);
+
+    TCancelResp cancelQuery(TCancelQueryReq req);
+
+    TCancelResp cancelPlanFragment(TCancelPlanFragmentReq req);
+
+    TCancelResp cancelFragmentInstance(TCancelFragmentInstanceReq req);
+}
+
 service DataBlockService {
   GetDataBlockResponse getDataBlock(GetDataBlockReqest req);