You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/12 04:34:52 UTC
[iotdb] branch master updated: [IoTDB-2666] Implement the RPC of Fragment dispatch and status track (#5478)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5d5f67f3dd [IoTDB-2666] Implement the RPC of Fragment dispatch and status track (#5478)
5d5f67f3dd is described below
commit 5d5f67f3dd860e1d641863289023fd140baa6418
Author: Zhang.Jinrui <xi...@gmail.com>
AuthorDate: Tue Apr 12 12:34:48 2022 +0800
[IoTDB-2666] Implement the RPC of Fragment dispatch and status track (#5478)
---
.../iotdb/commons/consensus/ConsensusGroupId.java | 27 ++++++++---
.../iotdb/commons/consensus/DataRegionId.java | 5 ++
.../iotdb/commons/consensus/PartitionRegionId.java | 5 ++
.../iotdb/commons/consensus/SchemaRegionId.java | 5 ++
.../apache/iotdb/commons/ConsensusGroupIdTest.java | 53 +++++++++++++++++++++
.../iotdb/db/mpp/common/FragmentInstanceId.java | 8 +++-
.../scheduler/AbstractFragInsStateTracker.java | 2 +-
.../scheduler/SimpleFragInstanceDispatcher.java | 18 ++++++-
.../db/mpp/schedule/FragmentInstanceScheduler.java | 2 +-
.../db/mpp/sql/planner/DistributionPlanner.java | 3 +-
.../db/mpp/sql/planner/plan/FragmentInstance.java | 48 ++++++++++++-------
.../plan/SimpleFragmentParallelPlanner.java | 21 ++++++++-
.../iotdb/db/service/InternalServiceImpl.java | 33 ++++++++++++-
.../apache/iotdb/db/mpp/buffer/SinkHandleTest.java | 15 +++---
.../iotdb/db/mpp/buffer/SourceHandleTest.java | 20 ++++----
.../db/mpp/sql/plan/FragmentInstanceIdTest.java | 38 +++++++++++++++
.../db/mpp/sql/plan/FragmentInstanceSerdeTest.java | 55 +++++++++++++++++-----
thrift/src/main/thrift/mpp.thrift | 9 +++-
18 files changed, 298 insertions(+), 69 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
index abc7747e09..13e38df6c0 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
@@ -33,6 +33,8 @@ public interface ConsensusGroupId {
// return specific id
int getId();
+ void setId(int id);
+
// return specific type
GroupType getType();
@@ -43,22 +45,33 @@ public interface ConsensusGroupId {
throw new IOException("unrecognized id type " + index);
}
GroupType type = GroupType.values()[index];
- ConsensusGroupId id;
+ ConsensusGroupId groupId = createEmpty(type);
+ groupId.deserializeImpl(buffer);
+ return groupId;
+ }
+
+ public static ConsensusGroupId createEmpty(GroupType type) {
+ ConsensusGroupId groupId;
switch (type) {
case DataRegion:
- id = new DataRegionId();
+ groupId = new DataRegionId();
break;
case SchemaRegion:
- id = new SchemaRegionId();
+ groupId = new SchemaRegionId();
break;
case PartitionRegion:
- id = new PartitionRegionId();
+ groupId = new PartitionRegionId();
break;
default:
- throw new IOException("unrecognized id type " + type);
+ throw new IllegalArgumentException("unrecognized id type " + type);
}
- id.deserializeImpl(buffer);
- return id;
+ return groupId;
+ }
+
+ public static ConsensusGroupId create(int id, GroupType type) {
+ ConsensusGroupId groupId = createEmpty(type);
+ groupId.setId(id);
+ return groupId;
}
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
index aa570cea7a..3085541766 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
@@ -49,6 +49,11 @@ public class DataRegionId implements ConsensusGroupId {
return id;
}
+ @Override
+ public void setId(int id) {
+ this.id = id;
+ }
+
@Override
public GroupType getType() {
return GroupType.DataRegion;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java
index 4eb38a15db..86f5fe963c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java
@@ -49,6 +49,11 @@ public class PartitionRegionId implements ConsensusGroupId {
return id;
}
+ @Override
+ public void setId(int id) {
+ this.id = id;
+ }
+
@Override
public GroupType getType() {
return GroupType.PartitionRegion;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java
index 0b3d9ad551..a42ddf3ad0 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java
@@ -49,6 +49,11 @@ public class SchemaRegionId implements ConsensusGroupId {
return id;
}
+ @Override
+ public void setId(int id) {
+ this.id = id;
+ }
+
@Override
public GroupType getType() {
return GroupType.SchemaRegion;
diff --git a/node-commons/src/test/java/org/apache/iotdb/commons/ConsensusGroupIdTest.java b/node-commons/src/test/java/org/apache/iotdb/commons/ConsensusGroupIdTest.java
new file mode 100644
index 0000000000..e890449d2d
--- /dev/null
+++ b/node-commons/src/test/java/org/apache/iotdb/commons/ConsensusGroupIdTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons;
+
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.consensus.GroupType;
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class ConsensusGroupIdTest {
+ @Test
+ public void TestCreate() throws IOException {
+ ConsensusGroupId dataRegionId = ConsensusGroupId.Factory.create(1, GroupType.DataRegion);
+ Assert.assertTrue(dataRegionId instanceof DataRegionId);
+ Assert.assertEquals(1, dataRegionId.getId());
+ Assert.assertEquals(GroupType.DataRegion, dataRegionId.getType());
+
+ ConsensusGroupId schemaRegionId = ConsensusGroupId.Factory.create(2, GroupType.SchemaRegion);
+ Assert.assertTrue(schemaRegionId instanceof SchemaRegionId);
+ Assert.assertEquals(2, schemaRegionId.getId());
+ Assert.assertEquals(GroupType.SchemaRegion, schemaRegionId.getType());
+
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ schemaRegionId.serializeImpl(buffer);
+ buffer.flip();
+
+ ConsensusGroupId schemaRegionIdClone = ConsensusGroupId.Factory.create(buffer);
+ Assert.assertEquals(schemaRegionId, schemaRegionIdClone);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
index b22f480589..8c0336adbd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
@@ -71,7 +71,13 @@ public class FragmentInstanceId {
}
public TFragmentInstanceId toThrift() {
- return new TFragmentInstanceId(queryId.getId(), String.valueOf(fragmentId.getId()), instanceId);
+ return new TFragmentInstanceId(queryId.getId(), fragmentId.getId(), instanceId);
+ }
+
+ public static FragmentInstanceId fromThrift(TFragmentInstanceId tFragmentInstanceId) {
+ return new FragmentInstanceId(
+ new PlanFragmentId(tFragmentInstanceId.queryId, tFragmentInstanceId.fragmentId),
+ tFragmentInstanceId.instanceId);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
index 6f64a27a1a..b87579561c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
@@ -67,7 +67,7 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT
private TFragmentInstanceId getTId(FragmentInstance instance) {
return new TFragmentInstanceId(
instance.getId().getQueryId().getId(),
- String.valueOf(instance.getId().getFragmentId().getId()),
+ instance.getId().getFragmentId().getId(),
instance.getId().getInstanceId());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index 60bdb13199..0dde122c74 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -21,9 +21,13 @@ package org.apache.iotdb.db.mpp.execution.scheduler;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
import org.apache.iotdb.mpp.rpc.thrift.InternalService;
+import org.apache.iotdb.mpp.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
+import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
import org.apache.thrift.TException;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -45,8 +49,18 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
InternalService.Client client =
InternalServiceClientFactory.getInternalServiceClient(
instance.getHostEndpoint().getIp(), instance.getHostEndpoint().getPort());
- // TODO: (xingtanzjr) add request construction
- client.sendFragmentInstance(null);
+ // TODO: (xingtanzjr) consider how to handle the buffer here
+ ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
+ instance.serializeRequest(buffer);
+ buffer.flip();
+ TConsensusGroupId groupId =
+ new TConsensusGroupId(
+ instance.getDataRegionId().getId().getId(),
+ instance.getDataRegionId().getId().getType().toString());
+ TSendFragmentInstanceReq req =
+ new TSendFragmentInstanceReq(
+ new TFragmentInstance(buffer), groupId, instance.getType().toString());
+ client.sendFragmentInstance(req);
}
} catch (TException e) {
// TODO: (xingtanzjr) add more details
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
index 2ddd981249..efaa56b89e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
@@ -196,7 +196,7 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
blockManager.forceDeregisterFragmentInstance(
new TFragmentInstanceId(
task.getId().getQueryId().getId(),
- String.valueOf(task.getId().getFragmentId().getId()),
+ task.getId().getFragmentId().getId(),
task.getId().getInstanceId()));
}
readyQueue.remove(task.getId());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index 182f85fbe5..96341a2ca1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -79,7 +79,8 @@ public class DistributionPlanner {
// Convert fragment to detailed instance
// And for parallel-able fragment, clone it into several instances with different params.
public List<FragmentInstance> planFragmentInstances(SubPlan subPlan) {
- IFragmentParallelPlaner parallelPlaner = new SimpleFragmentParallelPlanner(subPlan);
+ IFragmentParallelPlaner parallelPlaner =
+ new SimpleFragmentParallelPlanner(subPlan, analysis, context);
return parallelPlaner.parallelPlan();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index d46e502326..03fd9f3e0e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -23,11 +23,13 @@ import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -35,7 +37,7 @@ import java.util.Objects;
public class FragmentInstance implements IConsensusRequest {
private final FragmentInstanceId id;
-
+ private final QueryType type;
// The reference of PlanFragment which this instance is generated from
private final PlanFragment fragment;
// The DataRegion where the FragmentInstance should run
@@ -47,9 +49,11 @@ public class FragmentInstance implements IConsensusRequest {
// We can add some more params for a specific FragmentInstance
// So that we can make different FragmentInstance owns different data range.
- public FragmentInstance(PlanFragment fragment, int index) {
+ public FragmentInstance(PlanFragment fragment, int index, Filter timeFilter, QueryType type) {
this.fragment = fragment;
+ this.timeFilter = timeFilter;
this.id = generateId(fragment.getId(), index);
+ this.type = type;
}
public static FragmentInstanceId generateId(PlanFragmentId id, int index) {
@@ -101,6 +105,10 @@ public class FragmentInstance implements IConsensusRequest {
return timeFilter;
}
+ public QueryType getType() {
+ return type;
+ }
+
public String toString() {
StringBuilder ret = new StringBuilder();
ret.append(
@@ -114,9 +122,13 @@ public class FragmentInstance implements IConsensusRequest {
public static FragmentInstance deserializeFrom(ByteBuffer buffer) {
FragmentInstanceId id = FragmentInstanceId.deserialize(buffer);
+ PlanFragment planFragment = PlanFragment.deserialize(buffer);
+ boolean hasTimeFilter = ReadWriteIOUtils.readBool(buffer);
+ Filter timeFilter = hasTimeFilter ? FilterFactory.deserialize(buffer) : null;
+ QueryType queryType = QueryType.values()[ReadWriteIOUtils.readInt(buffer)];
FragmentInstance fragmentInstance =
new FragmentInstance(
- PlanFragment.deserialize(buffer), Integer.parseInt(id.getInstanceId()));
+ planFragment, Integer.parseInt(id.getInstanceId()), timeFilter, queryType);
RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
try {
regionReplicaSet.deserializeImpl(buffer);
@@ -127,7 +139,6 @@ public class FragmentInstance implements IConsensusRequest {
endpoint.deserializeImpl(buffer);
fragmentInstance.dataRegion = regionReplicaSet;
fragmentInstance.hostEndpoint = endpoint;
- fragmentInstance.timeFilter = FilterFactory.deserialize(buffer);
return fragmentInstance;
}
@@ -136,29 +147,30 @@ public class FragmentInstance implements IConsensusRequest {
public void serializeRequest(ByteBuffer buffer) {
id.serialize(buffer);
fragment.serialize(buffer);
+ ReadWriteIOUtils.write(timeFilter != null, buffer);
+ if (timeFilter != null) {
+ timeFilter.serialize(buffer);
+ }
+ ReadWriteIOUtils.write(type.ordinal(), buffer);
dataRegion.serializeImpl(buffer);
hostEndpoint.serializeImpl(buffer);
- timeFilter.serialize(buffer);
}
@Override
public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- FragmentInstance that = (FragmentInstance) o;
- return Objects.equals(id, that.id)
- && Objects.equals(fragment, that.fragment)
- && Objects.equals(dataRegion, that.dataRegion)
- && Objects.equals(hostEndpoint, that.hostEndpoint)
- && Objects.equals(timeFilter, that.timeFilter);
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ FragmentInstance instance = (FragmentInstance) o;
+ return Objects.equals(id, instance.id)
+ && type == instance.type
+ && Objects.equals(fragment, instance.fragment)
+ && Objects.equals(dataRegion, instance.dataRegion)
+ && Objects.equals(hostEndpoint, instance.hostEndpoint)
+ && Objects.equals(timeFilter, instance.timeFilter);
}
@Override
public int hashCode() {
- return Objects.hash(id, fragment, dataRegion, hostEndpoint, timeFilter);
+ return Objects.hash(id, type, fragment, dataRegion, hostEndpoint, timeFilter);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
index 4bdd01e9de..c2014c55d2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
@@ -19,12 +19,16 @@
package org.apache.iotdb.db.mpp.sql.planner.plan;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import java.util.ArrayList;
import java.util.HashMap;
@@ -38,6 +42,8 @@ import java.util.Map;
public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
private SubPlan subPlan;
+ private Analysis analysis;
+ private MPPQueryContext queryContext;
// Record all the FragmentInstances belonged to same PlanFragment
Map<PlanFragmentId, FragmentInstance> instanceMap;
@@ -45,8 +51,11 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
Map<PlanNodeId, PlanFragmentId> planNodeMap;
List<FragmentInstance> fragmentInstanceList;
- public SimpleFragmentParallelPlanner(SubPlan subPlan) {
+ public SimpleFragmentParallelPlanner(
+ SubPlan subPlan, Analysis analysis, MPPQueryContext context) {
this.subPlan = subPlan;
+ this.analysis = analysis;
+ this.queryContext = context;
this.instanceMap = new HashMap<>();
this.planNodeMap = new HashMap<>();
this.fragmentInstanceList = new ArrayList<>();
@@ -72,8 +81,16 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
// one by one
int instanceIdx = 0;
PlanNode rootCopy = PlanNodeUtil.deepCopy(fragment.getRoot());
+ Filter timeFilter =
+ analysis.getQueryFilter() == null
+ ? null
+ : ((GlobalTimeExpression) analysis.getQueryFilter()).getFilter();
FragmentInstance fragmentInstance =
- new FragmentInstance(new PlanFragment(fragment.getId(), rootCopy), instanceIdx);
+ new FragmentInstance(
+ new PlanFragment(fragment.getId(), rootCopy),
+ instanceIdx,
+ timeFilter,
+ queryContext.getQueryType());
// Get the target DataRegion for origin PlanFragment, then its instance will be distributed one
// of them.
diff --git a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
index acac7210cf..a0e1d01aab 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
@@ -19,6 +19,16 @@
package org.apache.iotdb.db.service;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.GroupType;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.db.consensus.ConsensusImpl;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceInfo;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceManager;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.mpp.rpc.thrift.InternalService;
import org.apache.iotdb.mpp.rpc.thrift.SchemaFetchRequest;
import org.apache.iotdb.mpp.rpc.thrift.SchemaFetchResponse;
@@ -30,6 +40,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStateReq;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceStateResp;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
@@ -42,13 +53,33 @@ public class InternalServiceImpl implements InternalService.Iface {
@Override
public TSendFragmentInstanceResp sendFragmentInstance(TSendFragmentInstanceReq req)
throws TException {
+ ByteBufferConsensusRequest request = new ByteBufferConsensusRequest(req.fragmentInstance.body);
+ QueryType type = QueryType.valueOf(req.queryType);
+ ConsensusGroupId groupId =
+ ConsensusGroupId.Factory.create(
+ req.consensusGroupId.id, GroupType.valueOf(req.consensusGroupId.type));
+ switch (type) {
+ case READ:
+ ConsensusReadResponse readResp = ConsensusImpl.getInstance().read(groupId, request);
+ FragmentInstanceInfo info = (FragmentInstanceInfo) readResp.getDataset();
+ return new TSendFragmentInstanceResp(info.getState().isFailed());
+ case WRITE:
+ ConsensusWriteResponse writeResp = ConsensusImpl.getInstance().write(groupId, request);
+ // TODO: (xingtanzjr) need to distinguish more conditions for response status.
+ boolean accepted =
+ writeResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
+ return new TSendFragmentInstanceResp(accepted);
+ }
return null;
}
@Override
public TFragmentInstanceStateResp fetchFragmentInstanceState(TFetchFragmentInstanceStateReq req)
throws TException {
- return null;
+ FragmentInstanceInfo info =
+ FragmentInstanceManager.getInstance()
+ .getInstanceInfo(FragmentInstanceId.fromThrift(req.fragmentInstanceId));
+ return new TFragmentInstanceStateResp(info.getState().toString());
}
@Override
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
index 64229ebe30..65ce8c8fd4 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
@@ -46,10 +46,9 @@ public class SinkHandleTest {
final long mockTsBlockSize = 1024L * 1024L;
final int numOfMockTsBlock = 10;
final String remoteHostname = "remote";
- final TFragmentInstanceId remoteFragmentInstanceId =
- new TFragmentInstanceId(queryId, "f0", "0");
+ final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
final String remotePlanNodeId = "exchange_0";
- final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, "f1", "0");
+ final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
// Construct a mock LocalMemoryManager that returns unblocked futures.
LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
@@ -176,10 +175,9 @@ public class SinkHandleTest {
final long mockTsBlockSize = 1024L * 1024L;
final int numOfMockTsBlock = 10;
final String remoteHostname = "remote";
- final TFragmentInstanceId remoteFragmentInstanceId =
- new TFragmentInstanceId(queryId, "f0", "0");
+ final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
final String remotePlanNodeId = "exchange_0";
- final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, "f1", "0");
+ final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
// Construct a mock LocalMemoryManager that returns blocked futures.
LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
@@ -359,10 +357,9 @@ public class SinkHandleTest {
final long mockTsBlockSize = 1024L * 1024L;
final int numOfMockTsBlock = 10;
final String remoteHostname = "remote";
- final TFragmentInstanceId remoteFragmentInstanceId =
- new TFragmentInstanceId(queryId, "f0", "0");
+ final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
final String remotePlanNodeId = "exchange_0";
- final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, "f1", "0");
+ final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
// Construct a mock LocalMemoryManager that returns blocked futures.
LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
index 55c40e8015..1aaa5abd2f 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
@@ -50,10 +50,9 @@ public class SourceHandleTest {
final long mockTsBlockSize = 1024L * 1024L;
final int numOfMockTsBlock = 10;
final String remoteHostname = "remote";
- final TFragmentInstanceId remoteFragmentInstanceId =
- new TFragmentInstanceId(queryId, "f1", "0");
+ final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
final String localPlanNodeId = "exchange_0";
- final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, "f0", "0");
+ final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
// Construct a mock LocalMemoryManager that do not block any reservation.
LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
@@ -170,10 +169,9 @@ public class SourceHandleTest {
final long mockTsBlockSize = 1024L * 1024L;
final int numOfMockTsBlock = 10;
final String remoteHostname = "remote";
- final TFragmentInstanceId remoteFragmentInstanceId =
- new TFragmentInstanceId(queryId, "f1", "0");
+ final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
final String localPlanNodeId = "exchange_0";
- final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, "f0", "0");
+ final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
// Construct a mock LocalMemoryManager with capacity 3 * mockTsBlockSize.
LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
@@ -316,10 +314,9 @@ public class SourceHandleTest {
final long mockTsBlockSize = 1024L * 1024L;
final int numOfMockTsBlock = 10;
final String remoteHostname = "remote";
- final TFragmentInstanceId remoteFragmentInstanceId =
- new TFragmentInstanceId(queryId, "f1", "0");
+ final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
final String localPlanNodeId = "exchange_0";
- final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, "f0", "0");
+ final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
// Construct a mock LocalMemoryManager that returns unblocked futures.
LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
@@ -508,10 +505,9 @@ public class SourceHandleTest {
final long mockTsBlockSize = 1024L * 1024L;
final int numOfMockTsBlock = 10;
final String remoteHostname = "remote";
- final TFragmentInstanceId remoteFragmentInstanceId =
- new TFragmentInstanceId(queryId, "f1", "0");
+ final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
final String localPlanNodeId = "exchange_0";
- final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, "f0", "0");
+ final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
// Construct a mock LocalMemoryManager that returns unblocked futures.
LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceIdTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceIdTest.java
new file mode 100644
index 0000000000..e3b02b8f03
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceIdTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.plan;
+
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FragmentInstanceIdTest {
+ @Test
+ public void TestFromThrift() {
+ String queryId = "test_query";
+ TFragmentInstanceId tId = new TFragmentInstanceId(queryId, 1, "0");
+ FragmentInstanceId id = FragmentInstanceId.fromThrift(tId);
+ Assert.assertEquals(queryId, id.getQueryId().getId());
+ Assert.assertEquals(1, id.getFragmentId().getId());
+ Assert.assertEquals("0", id.getInstanceId());
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
index 93a2d49eb5..dea1e9b031 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
@@ -24,8 +24,10 @@ import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNullNode;
@@ -52,6 +54,45 @@ public class FragmentInstanceSerdeTest {
@Test
public void TestSerializeAndDeserializeForTree1() throws IllegalPathException {
+ FragmentInstance fragmentInstance =
+ new FragmentInstance(
+ new PlanFragment(new PlanFragmentId("test", -1), constructPlanNodeTree()),
+ -1,
+ new GroupByFilter(1, 2, 3, 4),
+ QueryType.READ);
+ RegionReplicaSet regionReplicaSet =
+ new RegionReplicaSet(new DataRegionId(1), new ArrayList<>());
+ fragmentInstance.setDataRegionId(regionReplicaSet);
+ fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.1", 6666));
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+ fragmentInstance.serializeRequest(byteBuffer);
+ byteBuffer.flip();
+ FragmentInstance deserializeFragmentInstance = FragmentInstance.deserializeFrom(byteBuffer);
+ assertEquals(deserializeFragmentInstance, fragmentInstance);
+ }
+
+ @Test
+ public void TestSerializeAndDeserializeWithNullFilter() throws IllegalPathException {
+ FragmentInstance fragmentInstance =
+ new FragmentInstance(
+ new PlanFragment(new PlanFragmentId("test2", 1), constructPlanNodeTree()),
+ -1,
+ null,
+ QueryType.READ);
+ RegionReplicaSet regionReplicaSet =
+ new RegionReplicaSet(new DataRegionId(1), new ArrayList<>());
+ fragmentInstance.setDataRegionId(regionReplicaSet);
+ fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.2", 6667));
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+ fragmentInstance.serializeRequest(byteBuffer);
+ byteBuffer.flip();
+ FragmentInstance deserializeFragmentInstance = FragmentInstance.deserializeFrom(byteBuffer);
+ assertEquals(deserializeFragmentInstance, fragmentInstance);
+ }
+
+ private PlanNode constructPlanNodeTree() throws IllegalPathException {
// create node
OffsetNode offsetNode = new OffsetNode(new PlanNodeId("OffsetNode"), 100);
LimitNode limitNode = new LimitNode(new PlanNodeId("LimitNode"), 100);
@@ -100,18 +141,6 @@ public class FragmentInstanceSerdeTest {
limitNode.addChild(filterNullNode);
offsetNode.addChild(limitNode);
- FragmentInstance fragmentInstance =
- new FragmentInstance(new PlanFragment(new PlanFragmentId("test", -1), offsetNode), -1);
- RegionReplicaSet regionReplicaSet =
- new RegionReplicaSet(new DataRegionId(1), new ArrayList<>());
- fragmentInstance.setDataRegionId(regionReplicaSet);
- fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.1", 6666));
- fragmentInstance.setTimeFilter(new GroupByFilter(1, 2, 3, 4));
-
- ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
- fragmentInstance.serializeRequest(byteBuffer);
- byteBuffer.flip();
- FragmentInstance deserializeFragmentInstance = FragmentInstance.deserializeFrom(byteBuffer);
- assertEquals(deserializeFragmentInstance, fragmentInstance);
+ return offsetNode;
}
}
diff --git a/thrift/src/main/thrift/mpp.thrift b/thrift/src/main/thrift/mpp.thrift
index b1cae785d6..e3be3e324b 100644
--- a/thrift/src/main/thrift/mpp.thrift
+++ b/thrift/src/main/thrift/mpp.thrift
@@ -22,7 +22,7 @@ namespace java org.apache.iotdb.mpp.rpc.thrift
struct TFragmentInstanceId {
1: required string queryId
- 2: required string fragmentId
+ 2: required i32 fragmentId
3: required string instanceId
}
@@ -61,8 +61,15 @@ struct TFragmentInstance {
1: required binary body
}
+struct TConsensusGroupId {
+ 1: required i32 id
+ 2: required string type
+}
+
struct TSendFragmentInstanceReq {
1: required TFragmentInstance fragmentInstance
+ 2: required TConsensusGroupId consensusGroupId
+ 3: required string queryType
}
struct TSendFragmentInstanceResp {