You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/11 12:12:00 UTC
[iotdb] 01/03: tmp saved
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/mpp_rpc
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 74eca8903ced87a037b650e24513953abd5c2d46
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Apr 11 18:08:37 2022 +0800
tmp saved
---
.../iotdb/commons/consensus/ConsensusGroupId.java | 27 ++++++++++++++++------
.../iotdb/commons/consensus/DataRegionId.java | 5 ++++
.../iotdb/commons/consensus/PartitionRegionId.java | 5 ++++
.../iotdb/commons/consensus/SchemaRegionId.java | 5 ++++
.../iotdb/db/mpp/common/FragmentInstanceId.java | 7 ++++++
.../db/mpp/sql/planner/DistributionPlanner.java | 2 +-
.../plan/SimpleFragmentParallelPlanner.java | 5 +++-
.../iotdb/db/service/InternalServiceImpl.java | 26 ++++++++++++++++++++-
thrift/src/main/thrift/mpp.thrift | 7 ++++++
9 files changed, 79 insertions(+), 10 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
index abc7747e09..13e38df6c0 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
@@ -33,6 +33,8 @@ public interface ConsensusGroupId {
// return specific id
int getId();
+ void setId(int id);
+
// return specific type
GroupType getType();
@@ -43,22 +45,33 @@ public interface ConsensusGroupId {
throw new IOException("unrecognized id type " + index);
}
GroupType type = GroupType.values()[index];
- ConsensusGroupId id;
+ ConsensusGroupId groupId = createEmpty(type);
+ groupId.deserializeImpl(buffer);
+ return groupId;
+ }
+
+ public static ConsensusGroupId createEmpty(GroupType type) {
+ ConsensusGroupId groupId;
switch (type) {
case DataRegion:
- id = new DataRegionId();
+ groupId = new DataRegionId();
break;
case SchemaRegion:
- id = new SchemaRegionId();
+ groupId = new SchemaRegionId();
break;
case PartitionRegion:
- id = new PartitionRegionId();
+ groupId = new PartitionRegionId();
break;
default:
- throw new IOException("unrecognized id type " + type);
+ throw new IllegalArgumentException("unrecognized id type " + type);
}
- id.deserializeImpl(buffer);
- return id;
+ return groupId;
+ }
+
+ public static ConsensusGroupId create(int id, GroupType type) {
+ ConsensusGroupId groupId = createEmpty(type);
+ groupId.setId(id);
+ return groupId;
}
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
index aa570cea7a..3085541766 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
@@ -49,6 +49,11 @@ public class DataRegionId implements ConsensusGroupId {
return id;
}
+ @Override
+ public void setId(int id) {
+ this.id = id;
+ }
+
@Override
public GroupType getType() {
return GroupType.DataRegion;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java
index 4eb38a15db..86f5fe963c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java
@@ -49,6 +49,11 @@ public class PartitionRegionId implements ConsensusGroupId {
return id;
}
+ @Override
+ public void setId(int id) {
+ this.id = id;
+ }
+
@Override
public GroupType getType() {
return GroupType.PartitionRegion;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java
index 0b3d9ad551..a42ddf3ad0 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java
@@ -49,6 +49,11 @@ public class SchemaRegionId implements ConsensusGroupId {
return id;
}
+ @Override
+ public void setId(int id) {
+ this.id = id;
+ }
+
@Override
public GroupType getType() {
return GroupType.SchemaRegion;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
index 369466c158..6564274996 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
@@ -59,4 +59,11 @@ public class FragmentInstanceId {
public TFragmentInstanceId toThrift() {
return new TFragmentInstanceId(queryId.getId(), String.valueOf(fragmentId.getId()), instanceId);
}
+
+ public static FragmentInstanceId fromThrift(TFragmentInstanceId tFragmentInstanceId) {
+ return new FragmentInstanceId(
+ new PlanFragmentId(
+ tFragmentInstanceId.queryId, Integer.parseInt(tFragmentInstanceId.fragmentId)),
+ tFragmentInstanceId.instanceId);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index 182f85fbe5..baba0c6ae0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -79,7 +79,7 @@ public class DistributionPlanner {
// Convert fragment to detailed instance
// And for parallel-able fragment, clone it into several instances with different params.
public List<FragmentInstance> planFragmentInstances(SubPlan subPlan) {
- IFragmentParallelPlaner parallelPlaner = new SimpleFragmentParallelPlanner(subPlan);
+ IFragmentParallelPlaner parallelPlaner = new SimpleFragmentParallelPlanner(subPlan, analysis);
return parallelPlaner.parallelPlan();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
index 4bdd01e9de..f91e4a7f32 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.sql.planner.plan;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
@@ -38,6 +39,7 @@ import java.util.Map;
public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
private SubPlan subPlan;
+ private Analysis analysis;
// Record all the FragmentInstances belonged to same PlanFragment
Map<PlanFragmentId, FragmentInstance> instanceMap;
@@ -45,8 +47,9 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
Map<PlanNodeId, PlanFragmentId> planNodeMap;
List<FragmentInstance> fragmentInstanceList;
- public SimpleFragmentParallelPlanner(SubPlan subPlan) {
+ public SimpleFragmentParallelPlanner(SubPlan subPlan, Analysis analysis) {
this.subPlan = subPlan;
+ this.analysis = analysis;
this.instanceMap = new HashMap<>();
this.planNodeMap = new HashMap<>();
this.fragmentInstanceList = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
index acac7210cf..60cc4d8f18 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
@@ -19,6 +19,14 @@
package org.apache.iotdb.db.service;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.GroupType;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.db.consensus.ConsensusImpl;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceInfo;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceManager;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.mpp.rpc.thrift.InternalService;
import org.apache.iotdb.mpp.rpc.thrift.SchemaFetchRequest;
import org.apache.iotdb.mpp.rpc.thrift.SchemaFetchResponse;
@@ -42,13 +50,29 @@ public class InternalServiceImpl implements InternalService.Iface {
@Override
public TSendFragmentInstanceResp sendFragmentInstance(TSendFragmentInstanceReq req)
throws TException {
+ ByteBufferConsensusRequest request = new ByteBufferConsensusRequest(req.fragmentInstance.body);
+ QueryType type = QueryType.valueOf(req.queryType);
+ ConsensusGroupId groupId =
+ ConsensusGroupId.Factory.create(
+ req.consensusGroupId.id, GroupType.valueOf(req.consensusGroupId.type));
+ switch (type) {
+ case READ:
+ ConsensusImpl.getInstance().read(groupId, request);
+ break;
+ case WRITE:
+ ConsensusImpl.getInstance().write(groupId, request);
+ break;
+ }
return null;
}
@Override
public TFragmentInstanceStateResp fetchFragmentInstanceState(TFetchFragmentInstanceStateReq req)
throws TException {
- return null;
+ FragmentInstanceInfo info =
+ FragmentInstanceManager.getInstance()
+ .getInstanceInfo(FragmentInstanceId.fromThrift(req.fragmentInstanceId));
+ return new TFragmentInstanceStateResp(info.getState().toString());
}
@Override
diff --git a/thrift/src/main/thrift/mpp.thrift b/thrift/src/main/thrift/mpp.thrift
index b1cae785d6..d0ed78f9d5 100644
--- a/thrift/src/main/thrift/mpp.thrift
+++ b/thrift/src/main/thrift/mpp.thrift
@@ -61,8 +61,15 @@ struct TFragmentInstance {
1: required binary body
}
+struct TConsensusGroupId {
+ 1: required i32 id
+ 2: required string type
+}
+
struct TSendFragmentInstanceReq {
1: required TFragmentInstance fragmentInstance
+ 2: required TConsensusGroupId consensusGroupId
+ 3: required string queryType
}
struct TSendFragmentInstanceResp {