You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/11 12:12:00 UTC

[iotdb] 01/03: tmp saved

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/mpp_rpc
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 74eca8903ced87a037b650e24513953abd5c2d46
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Apr 11 18:08:37 2022 +0800

    tmp saved
---
 .../iotdb/commons/consensus/ConsensusGroupId.java  | 27 ++++++++++++++++------
 .../iotdb/commons/consensus/DataRegionId.java      |  5 ++++
 .../iotdb/commons/consensus/PartitionRegionId.java |  5 ++++
 .../iotdb/commons/consensus/SchemaRegionId.java    |  5 ++++
 .../iotdb/db/mpp/common/FragmentInstanceId.java    |  7 ++++++
 .../db/mpp/sql/planner/DistributionPlanner.java    |  2 +-
 .../plan/SimpleFragmentParallelPlanner.java        |  5 +++-
 .../iotdb/db/service/InternalServiceImpl.java      | 26 ++++++++++++++++++++-
 thrift/src/main/thrift/mpp.thrift                  |  7 ++++++
 9 files changed, 79 insertions(+), 10 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
index abc7747e09..13e38df6c0 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
@@ -33,6 +33,8 @@ public interface ConsensusGroupId {
   // return specific id
   int getId();
 
+  void setId(int id);
+
   // return specific type
   GroupType getType();
 
@@ -43,22 +45,33 @@ public interface ConsensusGroupId {
         throw new IOException("unrecognized id type " + index);
       }
       GroupType type = GroupType.values()[index];
-      ConsensusGroupId id;
+      ConsensusGroupId groupId = createEmpty(type);
+      groupId.deserializeImpl(buffer);
+      return groupId;
+    }
+
+    public static ConsensusGroupId createEmpty(GroupType type) {
+      ConsensusGroupId groupId;
       switch (type) {
         case DataRegion:
-          id = new DataRegionId();
+          groupId = new DataRegionId();
           break;
         case SchemaRegion:
-          id = new SchemaRegionId();
+          groupId = new SchemaRegionId();
           break;
         case PartitionRegion:
-          id = new PartitionRegionId();
+          groupId = new PartitionRegionId();
           break;
         default:
-          throw new IOException("unrecognized id type " + type);
+          throw new IllegalArgumentException("unrecognized id type " + type);
       }
-      id.deserializeImpl(buffer);
-      return id;
+      return groupId;
+    }
+
+    public static ConsensusGroupId create(int id, GroupType type) {
+      ConsensusGroupId groupId = createEmpty(type);
+      groupId.setId(id);
+      return groupId;
     }
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
index aa570cea7a..3085541766 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
@@ -49,6 +49,11 @@ public class DataRegionId implements ConsensusGroupId {
     return id;
   }
 
+  @Override
+  public void setId(int id) {
+    this.id = id;
+  }
+
   @Override
   public GroupType getType() {
     return GroupType.DataRegion;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java
index 4eb38a15db..86f5fe963c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java
@@ -49,6 +49,11 @@ public class PartitionRegionId implements ConsensusGroupId {
     return id;
   }
 
+  @Override
+  public void setId(int id) {
+    this.id = id;
+  }
+
   @Override
   public GroupType getType() {
     return GroupType.PartitionRegion;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java
index 0b3d9ad551..a42ddf3ad0 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java
@@ -49,6 +49,11 @@ public class SchemaRegionId implements ConsensusGroupId {
     return id;
   }
 
+  @Override
+  public void setId(int id) {
+    this.id = id;
+  }
+
   @Override
   public GroupType getType() {
     return GroupType.SchemaRegion;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
index 369466c158..6564274996 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
@@ -59,4 +59,11 @@ public class FragmentInstanceId {
   public TFragmentInstanceId toThrift() {
     return new TFragmentInstanceId(queryId.getId(), String.valueOf(fragmentId.getId()), instanceId);
   }
+
+  public static FragmentInstanceId fromThrift(TFragmentInstanceId tFragmentInstanceId) {
+    return new FragmentInstanceId(
+        new PlanFragmentId(
+            tFragmentInstanceId.queryId, Integer.parseInt(tFragmentInstanceId.fragmentId)),
+        tFragmentInstanceId.instanceId);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index 182f85fbe5..baba0c6ae0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -79,7 +79,7 @@ public class DistributionPlanner {
   // Convert fragment to detailed instance
   // And for parallel-able fragment, clone it into several instances with different params.
   public List<FragmentInstance> planFragmentInstances(SubPlan subPlan) {
-    IFragmentParallelPlaner parallelPlaner = new SimpleFragmentParallelPlanner(subPlan);
+    IFragmentParallelPlaner parallelPlaner = new SimpleFragmentParallelPlanner(subPlan, analysis);
     return parallelPlaner.parallelPlan();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
index 4bdd01e9de..f91e4a7f32 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.sql.planner.plan;
 
 import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
@@ -38,6 +39,7 @@ import java.util.Map;
 public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
 
   private SubPlan subPlan;
+  private Analysis analysis;
 
   // Record all the FragmentInstances belonged to same PlanFragment
   Map<PlanFragmentId, FragmentInstance> instanceMap;
@@ -45,8 +47,9 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
   Map<PlanNodeId, PlanFragmentId> planNodeMap;
   List<FragmentInstance> fragmentInstanceList;
 
-  public SimpleFragmentParallelPlanner(SubPlan subPlan) {
+  public SimpleFragmentParallelPlanner(SubPlan subPlan, Analysis analysis) {
     this.subPlan = subPlan;
+    this.analysis = analysis;
     this.instanceMap = new HashMap<>();
     this.planNodeMap = new HashMap<>();
     this.fragmentInstanceList = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
index acac7210cf..60cc4d8f18 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
@@ -19,6 +19,14 @@
 
 package org.apache.iotdb.db.service;
 
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.GroupType;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.db.consensus.ConsensusImpl;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceInfo;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceManager;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.mpp.rpc.thrift.InternalService;
 import org.apache.iotdb.mpp.rpc.thrift.SchemaFetchRequest;
 import org.apache.iotdb.mpp.rpc.thrift.SchemaFetchResponse;
@@ -42,13 +50,29 @@ public class InternalServiceImpl implements InternalService.Iface {
   @Override
   public TSendFragmentInstanceResp sendFragmentInstance(TSendFragmentInstanceReq req)
       throws TException {
+    ByteBufferConsensusRequest request = new ByteBufferConsensusRequest(req.fragmentInstance.body);
+    QueryType type = QueryType.valueOf(req.queryType);
+    ConsensusGroupId groupId =
+        ConsensusGroupId.Factory.create(
+            req.consensusGroupId.id, GroupType.valueOf(req.consensusGroupId.type));
+    switch (type) {
+      case READ:
+        ConsensusImpl.getInstance().read(groupId, request);
+        break;
+      case WRITE:
+        ConsensusImpl.getInstance().write(groupId, request);
+        break;
+    }
     return null;
   }
 
   @Override
   public TFragmentInstanceStateResp fetchFragmentInstanceState(TFetchFragmentInstanceStateReq req)
       throws TException {
-    return null;
+    FragmentInstanceInfo info =
+        FragmentInstanceManager.getInstance()
+            .getInstanceInfo(FragmentInstanceId.fromThrift(req.fragmentInstanceId));
+    return new TFragmentInstanceStateResp(info.getState().toString());
   }
 
   @Override
diff --git a/thrift/src/main/thrift/mpp.thrift b/thrift/src/main/thrift/mpp.thrift
index b1cae785d6..d0ed78f9d5 100644
--- a/thrift/src/main/thrift/mpp.thrift
+++ b/thrift/src/main/thrift/mpp.thrift
@@ -61,8 +61,15 @@ struct TFragmentInstance {
   1: required binary body
 }
 
+struct TConsensusGroupId {
+  1: required i32 id
+  2: required string type
+}
+
 struct TSendFragmentInstanceReq {
   1: required TFragmentInstance fragmentInstance
+  2: required TConsensusGroupId consensusGroupId
+  3: required string queryType
 }
 
 struct TSendFragmentInstanceResp {