You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/11 12:12:02 UTC
[iotdb] 03/03: complete internal RPC
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/mpp_rpc
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 11e426f276c91ece5ba3e0d57e35ff37dfe19dc0
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Apr 11 20:11:45 2022 +0800
complete internal RPC
---
.../scheduler/SimpleFragInstanceDispatcher.java | 18 ++++++++-
.../db/mpp/sql/planner/DistributionPlanner.java | 3 +-
.../db/mpp/sql/planner/plan/FragmentInstance.java | 44 +++++++++++++---------
.../plan/SimpleFragmentParallelPlanner.java | 13 ++++++-
.../iotdb/db/service/InternalServiceImpl.java | 15 ++++++--
.../db/mpp/sql/plan/FragmentInstanceSerdeTest.java | 8 +++-
6 files changed, 72 insertions(+), 29 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index 60bdb13199..0dde122c74 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -21,9 +21,13 @@ package org.apache.iotdb.db.mpp.execution.scheduler;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
import org.apache.iotdb.mpp.rpc.thrift.InternalService;
+import org.apache.iotdb.mpp.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
+import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
import org.apache.thrift.TException;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -45,8 +49,18 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
InternalService.Client client =
InternalServiceClientFactory.getInternalServiceClient(
instance.getHostEndpoint().getIp(), instance.getHostEndpoint().getPort());
- // TODO: (xingtanzjr) add request construction
- client.sendFragmentInstance(null);
+ // TODO: (xingtanzjr) consider how to handle the buffer here
+ ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
+ instance.serializeRequest(buffer);
+ buffer.flip();
+ TConsensusGroupId groupId =
+ new TConsensusGroupId(
+ instance.getDataRegionId().getId().getId(),
+ instance.getDataRegionId().getId().getType().toString());
+ TSendFragmentInstanceReq req =
+ new TSendFragmentInstanceReq(
+ new TFragmentInstance(buffer), groupId, instance.getType().toString());
+ client.sendFragmentInstance(req);
}
} catch (TException e) {
// TODO: (xingtanzjr) add more details
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index baba0c6ae0..96341a2ca1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -79,7 +79,8 @@ public class DistributionPlanner {
// Convert fragment to detailed instance
// And for parallel-able fragment, clone it into several instances with different params.
public List<FragmentInstance> planFragmentInstances(SubPlan subPlan) {
- IFragmentParallelPlaner parallelPlaner = new SimpleFragmentParallelPlanner(subPlan, analysis);
+ IFragmentParallelPlaner parallelPlaner =
+ new SimpleFragmentParallelPlanner(subPlan, analysis, context);
return parallelPlaner.parallelPlan();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index d46e502326..6a5e40db50 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -23,11 +23,13 @@ import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -35,7 +37,7 @@ import java.util.Objects;
public class FragmentInstance implements IConsensusRequest {
private final FragmentInstanceId id;
-
+ private final QueryType type;
// The reference of PlanFragment which this instance is generated from
private final PlanFragment fragment;
// The DataRegion where the FragmentInstance should run
@@ -47,9 +49,11 @@ public class FragmentInstance implements IConsensusRequest {
// We can add some more params for a specific FragmentInstance
// So that we can make different FragmentInstance owns different data range.
- public FragmentInstance(PlanFragment fragment, int index) {
+ public FragmentInstance(PlanFragment fragment, int index, Filter timeFilter, QueryType type) {
this.fragment = fragment;
+ this.timeFilter = timeFilter;
this.id = generateId(fragment.getId(), index);
+ this.type = type;
}
public static FragmentInstanceId generateId(PlanFragmentId id, int index) {
@@ -101,6 +105,10 @@ public class FragmentInstance implements IConsensusRequest {
return timeFilter;
}
+ public QueryType getType() {
+ return type;
+ }
+
public String toString() {
StringBuilder ret = new StringBuilder();
ret.append(
@@ -116,7 +124,10 @@ public class FragmentInstance implements IConsensusRequest {
FragmentInstanceId id = FragmentInstanceId.deserialize(buffer);
FragmentInstance fragmentInstance =
new FragmentInstance(
- PlanFragment.deserialize(buffer), Integer.parseInt(id.getInstanceId()));
+ PlanFragment.deserialize(buffer),
+ Integer.parseInt(id.getInstanceId()),
+ FilterFactory.deserialize(buffer),
+ QueryType.values()[ReadWriteIOUtils.readInt(buffer)]);
RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
try {
regionReplicaSet.deserializeImpl(buffer);
@@ -127,7 +138,6 @@ public class FragmentInstance implements IConsensusRequest {
endpoint.deserializeImpl(buffer);
fragmentInstance.dataRegion = regionReplicaSet;
fragmentInstance.hostEndpoint = endpoint;
- fragmentInstance.timeFilter = FilterFactory.deserialize(buffer);
return fragmentInstance;
}
@@ -136,29 +146,27 @@ public class FragmentInstance implements IConsensusRequest {
public void serializeRequest(ByteBuffer buffer) {
id.serialize(buffer);
fragment.serialize(buffer);
+ timeFilter.serialize(buffer);
+ ReadWriteIOUtils.write(type.ordinal(), buffer);
dataRegion.serializeImpl(buffer);
hostEndpoint.serializeImpl(buffer);
- timeFilter.serialize(buffer);
}
@Override
public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- FragmentInstance that = (FragmentInstance) o;
- return Objects.equals(id, that.id)
- && Objects.equals(fragment, that.fragment)
- && Objects.equals(dataRegion, that.dataRegion)
- && Objects.equals(hostEndpoint, that.hostEndpoint)
- && Objects.equals(timeFilter, that.timeFilter);
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ FragmentInstance instance = (FragmentInstance) o;
+ return Objects.equals(id, instance.id)
+ && type == instance.type
+ && Objects.equals(fragment, instance.fragment)
+ && Objects.equals(dataRegion, instance.dataRegion)
+ && Objects.equals(hostEndpoint, instance.hostEndpoint)
+ && Objects.equals(timeFilter, instance.timeFilter);
}
@Override
public int hashCode() {
- return Objects.hash(id, fragment, dataRegion, hostEndpoint, timeFilter);
+ return Objects.hash(id, type, fragment, dataRegion, hostEndpoint, timeFilter);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
index f91e4a7f32..33ca767f0e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.sql.planner.plan;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
@@ -26,6 +27,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import java.util.ArrayList;
import java.util.HashMap;
@@ -40,6 +42,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
private SubPlan subPlan;
private Analysis analysis;
+ private MPPQueryContext queryContext;
// Record all the FragmentInstances belonged to same PlanFragment
Map<PlanFragmentId, FragmentInstance> instanceMap;
@@ -47,9 +50,11 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
Map<PlanNodeId, PlanFragmentId> planNodeMap;
List<FragmentInstance> fragmentInstanceList;
- public SimpleFragmentParallelPlanner(SubPlan subPlan, Analysis analysis) {
+ public SimpleFragmentParallelPlanner(
+ SubPlan subPlan, Analysis analysis, MPPQueryContext context) {
this.subPlan = subPlan;
this.analysis = analysis;
+ this.queryContext = context;
this.instanceMap = new HashMap<>();
this.planNodeMap = new HashMap<>();
this.fragmentInstanceList = new ArrayList<>();
@@ -76,7 +81,11 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
int instanceIdx = 0;
PlanNode rootCopy = PlanNodeUtil.deepCopy(fragment.getRoot());
FragmentInstance fragmentInstance =
- new FragmentInstance(new PlanFragment(fragment.getId(), rootCopy), instanceIdx);
+ new FragmentInstance(
+ new PlanFragment(fragment.getId(), rootCopy),
+ instanceIdx,
+ ((GlobalTimeExpression) analysis.getQueryFilter()).getFilter(),
+ queryContext.getQueryType());
// Get the target DataRegion for origin PlanFragment, then its instance will be distributed one
// of them.
diff --git a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
index 60cc4d8f18..a0e1d01aab 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.db.service;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.GroupType;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.db.consensus.ConsensusImpl;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceInfo;
@@ -38,6 +40,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStateReq;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceStateResp;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
@@ -57,11 +60,15 @@ public class InternalServiceImpl implements InternalService.Iface {
req.consensusGroupId.id, GroupType.valueOf(req.consensusGroupId.type));
switch (type) {
case READ:
- ConsensusImpl.getInstance().read(groupId, request);
- break;
+ ConsensusReadResponse readResp = ConsensusImpl.getInstance().read(groupId, request);
+ FragmentInstanceInfo info = (FragmentInstanceInfo) readResp.getDataset();
+ return new TSendFragmentInstanceResp(info.getState().isFailed());
case WRITE:
- ConsensusImpl.getInstance().write(groupId, request);
- break;
+ ConsensusWriteResponse writeResp = ConsensusImpl.getInstance().write(groupId, request);
+ // TODO: (xingtanzjr) need to distinguish more conditions for response status.
+ boolean accepted =
+ writeResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
+ return new TSendFragmentInstanceResp(accepted);
}
return null;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
index 93a2d49eb5..33dc8a9613 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -101,12 +102,15 @@ public class FragmentInstanceSerdeTest {
offsetNode.addChild(limitNode);
FragmentInstance fragmentInstance =
- new FragmentInstance(new PlanFragment(new PlanFragmentId("test", -1), offsetNode), -1);
+ new FragmentInstance(
+ new PlanFragment(new PlanFragmentId("test", -1), offsetNode),
+ -1,
+ new GroupByFilter(1, 2, 3, 4),
+ QueryType.READ);
RegionReplicaSet regionReplicaSet =
new RegionReplicaSet(new DataRegionId(1), new ArrayList<>());
fragmentInstance.setDataRegionId(regionReplicaSet);
fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.1", 6666));
- fragmentInstance.setTimeFilter(new GroupByFilter(1, 2, 3, 4));
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
fragmentInstance.serializeRequest(byteBuffer);