You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/11 12:12:02 UTC

[iotdb] 03/03: complete internal RPC

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/mpp_rpc
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 11e426f276c91ece5ba3e0d57e35ff37dfe19dc0
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Apr 11 20:11:45 2022 +0800

    complete internal RPC
---
 .../scheduler/SimpleFragInstanceDispatcher.java    | 18 ++++++++-
 .../db/mpp/sql/planner/DistributionPlanner.java    |  3 +-
 .../db/mpp/sql/planner/plan/FragmentInstance.java  | 44 +++++++++++++---------
 .../plan/SimpleFragmentParallelPlanner.java        | 13 ++++++-
 .../iotdb/db/service/InternalServiceImpl.java      | 15 ++++++--
 .../db/mpp/sql/plan/FragmentInstanceSerdeTest.java |  8 +++-
 6 files changed, 72 insertions(+), 29 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index 60bdb13199..0dde122c74 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -21,9 +21,13 @@ package org.apache.iotdb.db.mpp.execution.scheduler;
 
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.InternalService;
+import org.apache.iotdb.mpp.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
+import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
 
 import org.apache.thrift.TException;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -45,8 +49,18 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
               InternalService.Client client =
                   InternalServiceClientFactory.getInternalServiceClient(
                       instance.getHostEndpoint().getIp(), instance.getHostEndpoint().getPort());
-              // TODO: (xingtanzjr) add request construction
-              client.sendFragmentInstance(null);
+              // TODO: (xingtanzjr) consider how to handle the buffer here
+              ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
+              instance.serializeRequest(buffer);
+              buffer.flip();
+              TConsensusGroupId groupId =
+                  new TConsensusGroupId(
+                      instance.getDataRegionId().getId().getId(),
+                      instance.getDataRegionId().getId().getType().toString());
+              TSendFragmentInstanceReq req =
+                  new TSendFragmentInstanceReq(
+                      new TFragmentInstance(buffer), groupId, instance.getType().toString());
+              client.sendFragmentInstance(req);
             }
           } catch (TException e) {
             // TODO: (xingtanzjr) add more details
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index baba0c6ae0..96341a2ca1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -79,7 +79,8 @@ public class DistributionPlanner {
   // Convert fragment to detailed instance
   // And for parallel-able fragment, clone it into several instances with different params.
   public List<FragmentInstance> planFragmentInstances(SubPlan subPlan) {
-    IFragmentParallelPlaner parallelPlaner = new SimpleFragmentParallelPlanner(subPlan, analysis);
+    IFragmentParallelPlaner parallelPlaner =
+        new SimpleFragmentParallelPlanner(subPlan, analysis, context);
     return parallelPlaner.parallelPlan();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index d46e502326..6a5e40db50 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -23,11 +23,13 @@ import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -35,7 +37,7 @@ import java.util.Objects;
 
 public class FragmentInstance implements IConsensusRequest {
   private final FragmentInstanceId id;
-
+  private final QueryType type;
   // The reference of PlanFragment which this instance is generated from
   private final PlanFragment fragment;
   // The DataRegion where the FragmentInstance should run
@@ -47,9 +49,11 @@ public class FragmentInstance implements IConsensusRequest {
   // We can add some more params for a specific FragmentInstance
   // So that we can make different FragmentInstance owns different data range.
 
-  public FragmentInstance(PlanFragment fragment, int index) {
+  public FragmentInstance(PlanFragment fragment, int index, Filter timeFilter, QueryType type) {
     this.fragment = fragment;
+    this.timeFilter = timeFilter;
     this.id = generateId(fragment.getId(), index);
+    this.type = type;
   }
 
   public static FragmentInstanceId generateId(PlanFragmentId id, int index) {
@@ -101,6 +105,10 @@ public class FragmentInstance implements IConsensusRequest {
     return timeFilter;
   }
 
+  public QueryType getType() {
+    return type;
+  }
+
   public String toString() {
     StringBuilder ret = new StringBuilder();
     ret.append(
@@ -116,7 +124,10 @@ public class FragmentInstance implements IConsensusRequest {
     FragmentInstanceId id = FragmentInstanceId.deserialize(buffer);
     FragmentInstance fragmentInstance =
         new FragmentInstance(
-            PlanFragment.deserialize(buffer), Integer.parseInt(id.getInstanceId()));
+            PlanFragment.deserialize(buffer),
+            Integer.parseInt(id.getInstanceId()),
+            FilterFactory.deserialize(buffer),
+            QueryType.values()[ReadWriteIOUtils.readInt(buffer)]);
     RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
     try {
       regionReplicaSet.deserializeImpl(buffer);
@@ -127,7 +138,6 @@ public class FragmentInstance implements IConsensusRequest {
     endpoint.deserializeImpl(buffer);
     fragmentInstance.dataRegion = regionReplicaSet;
     fragmentInstance.hostEndpoint = endpoint;
-    fragmentInstance.timeFilter = FilterFactory.deserialize(buffer);
 
     return fragmentInstance;
   }
@@ -136,29 +146,27 @@ public class FragmentInstance implements IConsensusRequest {
   public void serializeRequest(ByteBuffer buffer) {
     id.serialize(buffer);
     fragment.serialize(buffer);
+    timeFilter.serialize(buffer);
+    ReadWriteIOUtils.write(type.ordinal(), buffer);
     dataRegion.serializeImpl(buffer);
     hostEndpoint.serializeImpl(buffer);
-    timeFilter.serialize(buffer);
   }
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    FragmentInstance that = (FragmentInstance) o;
-    return Objects.equals(id, that.id)
-        && Objects.equals(fragment, that.fragment)
-        && Objects.equals(dataRegion, that.dataRegion)
-        && Objects.equals(hostEndpoint, that.hostEndpoint)
-        && Objects.equals(timeFilter, that.timeFilter);
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    FragmentInstance instance = (FragmentInstance) o;
+    return Objects.equals(id, instance.id)
+        && type == instance.type
+        && Objects.equals(fragment, instance.fragment)
+        && Objects.equals(dataRegion, instance.dataRegion)
+        && Objects.equals(hostEndpoint, instance.hostEndpoint)
+        && Objects.equals(timeFilter, instance.timeFilter);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(id, fragment, dataRegion, hostEndpoint, timeFilter);
+    return Objects.hash(id, type, fragment, dataRegion, hostEndpoint, timeFilter);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
index f91e4a7f32..33ca767f0e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.mpp.sql.planner.plan;
 
 import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
@@ -26,6 +27,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -40,6 +42,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
 
   private SubPlan subPlan;
   private Analysis analysis;
+  private MPPQueryContext queryContext;
 
   // Record all the FragmentInstances belonged to same PlanFragment
   Map<PlanFragmentId, FragmentInstance> instanceMap;
@@ -47,9 +50,11 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
   Map<PlanNodeId, PlanFragmentId> planNodeMap;
   List<FragmentInstance> fragmentInstanceList;
 
-  public SimpleFragmentParallelPlanner(SubPlan subPlan, Analysis analysis) {
+  public SimpleFragmentParallelPlanner(
+      SubPlan subPlan, Analysis analysis, MPPQueryContext context) {
     this.subPlan = subPlan;
     this.analysis = analysis;
+    this.queryContext = context;
     this.instanceMap = new HashMap<>();
     this.planNodeMap = new HashMap<>();
     this.fragmentInstanceList = new ArrayList<>();
@@ -76,7 +81,11 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
     int instanceIdx = 0;
     PlanNode rootCopy = PlanNodeUtil.deepCopy(fragment.getRoot());
     FragmentInstance fragmentInstance =
-        new FragmentInstance(new PlanFragment(fragment.getId(), rootCopy), instanceIdx);
+        new FragmentInstance(
+            new PlanFragment(fragment.getId(), rootCopy),
+            instanceIdx,
+            ((GlobalTimeExpression) analysis.getQueryFilter()).getFilter(),
+            queryContext.getQueryType());
 
     // Get the target DataRegion for origin PlanFragment, then its instance will be distributed one
     // of them.
diff --git a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
index 60cc4d8f18..a0e1d01aab 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.db.service;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.GroupType;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.db.consensus.ConsensusImpl;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceInfo;
@@ -38,6 +40,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStateReq;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceStateResp;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.TException;
 
@@ -57,11 +60,15 @@ public class InternalServiceImpl implements InternalService.Iface {
             req.consensusGroupId.id, GroupType.valueOf(req.consensusGroupId.type));
     switch (type) {
       case READ:
-        ConsensusImpl.getInstance().read(groupId, request);
-        break;
+        ConsensusReadResponse readResp = ConsensusImpl.getInstance().read(groupId, request);
+        FragmentInstanceInfo info = (FragmentInstanceInfo) readResp.getDataset();
+        return new TSendFragmentInstanceResp(info.getState().isFailed());
       case WRITE:
-        ConsensusImpl.getInstance().write(groupId, request);
-        break;
+        ConsensusWriteResponse writeResp = ConsensusImpl.getInstance().write(groupId, request);
+        // TODO: (xingtanzjr) need to distinguish more conditions for response status.
+        boolean accepted =
+            writeResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
+        return new TSendFragmentInstanceResp(accepted);
     }
     return null;
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
index 93a2d49eb5..33dc8a9613 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -101,12 +102,15 @@ public class FragmentInstanceSerdeTest {
     offsetNode.addChild(limitNode);
 
     FragmentInstance fragmentInstance =
-        new FragmentInstance(new PlanFragment(new PlanFragmentId("test", -1), offsetNode), -1);
+        new FragmentInstance(
+            new PlanFragment(new PlanFragmentId("test", -1), offsetNode),
+            -1,
+            new GroupByFilter(1, 2, 3, 4),
+            QueryType.READ);
     RegionReplicaSet regionReplicaSet =
         new RegionReplicaSet(new DataRegionId(1), new ArrayList<>());
     fragmentInstance.setDataRegionId(regionReplicaSet);
     fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.1", 6666));
-    fragmentInstance.setTimeFilter(new GroupByFilter(1, 2, 3, 4));
 
     ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
     fragmentInstance.serializeRequest(byteBuffer);