You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/11 12:11:59 UTC

[iotdb] branch xingtanzjr/mpp_rpc created (now 11e426f276)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch xingtanzjr/mpp_rpc
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 11e426f276 complete internal RPC

This branch includes the following new commits:

     new 74eca8903c tmp saved
     new a6a6e6c285 Merge branch 'master' into xingtanzjr/mpp_rpc
     new 11e426f276 complete internal RPC

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 02/03: Merge branch 'master' into xingtanzjr/mpp_rpc

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/mpp_rpc
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a6a6e6c285cd6f7e4f15ddfad76d2183d4b18bbe
Merge: 74eca8903c d160be9c22
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Apr 11 19:26:59 2022 +0800

    Merge branch 'master' into xingtanzjr/mpp_rpc

 confignode/src/assembly/confignode.xml             |   9 +-
 .../resources/conf/iotdb-confignode.properties     |  20 +-
 .../iotdb/confignode/conf/ConfigNodeConf.java      |  25 +-
 .../iotdb/confignode/conf/ConfigNodeConfCheck.java |  29 +-
 .../confignode/conf/ConfigNodeDescriptor.java      |   8 +-
 ...aSet.java => DataNodeConfigurationDataSet.java} |  32 +-
 .../consensus/response/DataNodesInfoDataSet.java   |  12 +-
 .../consensus/response/DataPartitionDataSet.java   | 104 +++++-
 .../consensus/response/SchemaPartitionDataSet.java |  92 ++---
 .../response/StorageGroupSchemaDataSet.java        |  11 +-
 .../iotdb/confignode/manager/ConfigManager.java    |  87 +++--
 .../iotdb/confignode/manager/ConsensusManager.java |  33 +-
 .../iotdb/confignode/manager/DataNodeManager.java  |  87 ++---
 .../apache/iotdb/confignode/manager/Manager.java   |  88 ++---
 .../iotdb/confignode/manager/PartitionManager.java | 193 +++++++----
 .../iotdb/confignode/manager/RegionManager.java    | 126 +++----
 .../iotdb/confignode/partition/DataRegionInfo.java |  66 ----
 .../confignode/partition/SchemaRegionInfo.java     |  59 ----
 .../confignode/partition/StorageGroupSchema.java   |  77 +++--
 .../persistence/DataNodeInfoPersistence.java       | 122 ++++---
 .../persistence/PartitionInfoPersistence.java      | 145 +++++---
 .../persistence/RegionInfoPersistence.java         | 210 ++++++------
 .../iotdb/confignode/physical/PhysicalPlan.java    |  32 +-
 .../confignode/physical/PhysicalPlanType.java      |  12 +-
 .../physical/crud/CreateDataPartitionPlan.java     | 128 +++++++
 .../physical/crud/CreateRegionsPlan.java           |  96 ++++++
 .../physical/crud/CreateSchemaPartitionPlan.java   |  19 +-
 .../crud/GetOrCreateDataPartitionPlan.java         | 139 ++++++++
 .../GetOrCreateSchemaPartitionPlan.java}           |  50 +--
 .../confignode/physical/sys/DataPartitionPlan.java |  78 -----
 .../physical/sys/QueryDataNodeInfoPlan.java        |  14 +
 .../physical/sys/RegisterDataNodePlan.java         |  22 +-
 .../physical/sys/SetStorageGroupPlan.java          |  43 +--
 .../confignode/service/executor/PlanExecutor.java  |  27 +-
 .../server/ConfigNodeRPCServerProcessor.java       | 246 +++++++-------
 .../confignode/consensus/RatisConsensusDemo.java   |  27 +-
 .../manager/ConfigManagerManualTest.java           |  29 +-
 .../hash/DeviceGroupHashExecutorManualTest.java    |   2 +-
 .../physical/SerializeDeserializeUT.java           | 168 +++++++++
 .../server/ConfigNodeRPCServerProcessorTest.java   | 377 +++++++++++++++------
 .../org/apache/iotdb/consensus/IConsensus.java     |   2 +
 .../iotdb/consensus/ratis/RatisConsensus.java      |  15 +
 .../org/apache/iotdb/consensus/ratis/Utils.java    |  12 +-
 .../consensus/standalone/StandAloneConsensus.java  |   8 +
 .../iotdb/consensus/ratis/RatisConsensusTest.java  |   1 +
 .../{partition => cluster}/DataNodeLocation.java   |  51 +--
 .../commons/hash/DeviceGroupHashExecutor.java      |  31 --
 .../iotdb/commons/partition/DataPartition.java     | 128 ++++++-
 .../iotdb/commons/partition/RegionReplicaSet.java  |   1 +
 .../iotdb/commons/partition/SchemaPartition.java   |  49 ++-
 .../commons/partition/SeriesPartitionSlot.java     |  35 +-
 .../iotdb/commons/partition/TimePartitionSlot.java |  33 ++
 .../executor/SeriesPartitionExecutor.java}         |  25 +-
 .../executor}/hash/APHashExecutor.java             |  11 +-
 .../executor}/hash/BKDRHashExecutor.java           |  11 +-
 .../executor}/hash/JSHashExecutor.java             |  11 +-
 .../executor}/hash/SDBMHashExecutor.java           |  11 +-
 .../iotdb/db/metadata/idtable/entry/IDeviceID.java |   4 +
 .../db/metadata/idtable/entry/PlainDeviceID.java   |  12 +
 .../db/metadata/idtable/entry/SHA256DeviceID.java  |  21 ++
 .../apache/iotdb/db/metadata/path/AlignedPath.java |  55 +++
 .../iotdb/db/metadata/path/MeasurementPath.java    |  41 +++
 .../apache/iotdb/db/metadata/path/PartialPath.java |  30 ++
 .../db/metadata/path/PathDeserializeUtil.java      |  59 ++++
 .../iotdb/db/mpp/common/FragmentInstanceId.java    |  34 ++
 .../apache/iotdb/db/mpp/common/PlanFragmentId.java |  25 ++
 .../org/apache/iotdb/db/mpp/common/QueryId.java    |   4 +
 .../db/mpp/common/filter/BasicFunctionFilter.java  |  20 ++
 .../mpp/common/filter/FilterDeserializeUtil.java   |  63 ++++
 .../iotdb/db/mpp/common/filter/FunctionFilter.java |  43 +++
 .../iotdb/db/mpp/common/filter/InFilter.java       |  23 ++
 .../iotdb/db/mpp/common/filter/LikeFilter.java     |  18 +
 .../iotdb/db/mpp/common/filter/QueryFilter.java    |  60 ++++
 .../iotdb/db/mpp/common/filter/RegexpFilter.java   |  16 +
 .../db/mpp/execution/FragmentInstanceInfo.java     |   1 -
 .../mpp/sql/analyze/FakePartitionFetcherImpl.java  |   1 +
 .../apache/iotdb/db/mpp/sql/parser/ASTVisitor.java |   3 +-
 .../db/mpp/sql/planner/plan/FragmentInstance.java  |  54 ++-
 .../db/mpp/sql/planner/plan/PlanFragment.java      |  28 +-
 .../db/mpp/sql/planner/plan/node/ColumnHeader.java |  18 +
 .../db/mpp/sql/planner/plan/node/PlanNode.java     |  36 +-
 .../db/mpp/sql/planner/plan/node/PlanNodeId.java   |  14 +
 .../db/mpp/sql/planner/plan/node/PlanNodeType.java |   3 +-
 .../plan/node/metedata/read/ShowDevicesNode.java   |  15 +-
 .../planner/plan/node/metedata/read/ShowNode.java  |   4 +-
 .../node/metedata/write/AlterTimeSeriesNode.java   |  13 +-
 .../plan/node/metedata/write/AuthorNode.java       |   5 +
 .../write/CreateAlignedTimeSeriesNode.java         |  14 +-
 .../node/metedata/write/CreateTimeSeriesNode.java  |  28 +-
 .../planner/plan/node/process/AggregateNode.java   |  47 ++-
 .../planner/plan/node/process/DeviceMergeNode.java |  51 ++-
 .../planner/plan/node/process/ExchangeNode.java    |  55 ++-
 .../sql/planner/plan/node/process/FillNode.java    |  36 +-
 .../sql/planner/plan/node/process/FilterNode.java  |  34 +-
 .../planner/plan/node/process/FilterNullNode.java  |  32 +-
 .../plan/node/process/GroupByLevelNode.java        |  42 ++-
 .../sql/planner/plan/node/process/LimitNode.java   |  15 +-
 .../sql/planner/plan/node/process/OffsetNode.java  |  19 +-
 .../sql/planner/plan/node/process/SortNode.java    |  47 ++-
 .../planner/plan/node/process/TimeJoinNode.java    |  31 +-
 .../planner/plan/node/sink/FragmentSinkNode.java   |  47 ++-
 .../plan/node/source/SeriesAggregateScanNode.java  |  54 ++-
 .../planner/plan/node/source/SeriesScanNode.java   |  75 +++-
 .../InsertMultiTabletNode.java}                    |  19 +-
 .../sql/planner/plan/node/write/InsertNode.java    |   5 +-
 .../sql/planner/plan/node/write/InsertRowNode.java |   8 +-
 .../planner/plan/node/write/InsertTabletNode.java  |   8 +-
 .../db/mpp/sql/statement/component/FillPolicy.java |   2 +-
 .../statement/component/FilterNullComponent.java   |  44 +++
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    |   3 +-
 .../iotdb/db/qp/utils/GroupByLevelController.java  |  90 +++++
 .../db/query/aggregation/AggregateResult.java      |   9 +
 .../iotdb/db/query/expression/Expression.java      |   6 +
 .../iotdb/db/query/expression/ExpressionType.java  | 115 +++++++
 .../iotdb/db/query/expression/ResultColumn.java    |  17 +
 .../expression/binary/AdditionExpression.java      |  19 ++
 .../query/expression/binary/BinaryExpression.java  |   8 +
 .../expression/binary/DivisionExpression.java      |  19 ++
 .../query/expression/binary/EqualToExpression.java |  19 ++
 .../expression/binary/GreaterEqualExpression.java  |  19 ++
 .../expression/binary/GreaterThanExpression.java   |  19 ++
 .../expression/binary/LessEqualExpression.java     |  19 ++
 .../expression/binary/LessThanExpression.java      |  19 ++
 .../expression/binary/LogicAndExpression.java      |  19 ++
 .../query/expression/binary/LogicOrExpression.java |  19 ++
 .../query/expression/binary/ModuloExpression.java  |  19 ++
 .../binary/MultiplicationExpression.java           |  19 ++
 .../expression/binary/NonEqualExpression.java      |  19 ++
 .../expression/binary/SubtractionExpression.java   |  19 ++
 .../db/query/expression/unary/ConstantOperand.java |  22 +-
 .../query/expression/unary/FunctionExpression.java |  31 ++
 .../query/expression/unary/LogicNotExpression.java |  18 +
 .../query/expression/unary/NegationExpression.java |  18 +
 .../query/expression/unary/TimeSeriesOperand.java  |  19 ++
 .../java/org/apache/iotdb/db/service/DataNode.java |  10 +-
 .../iotdb/db/utils/IExpressionDeserializeUtil.java |  51 +++
 .../db/mpp/sql/plan/DistributionPlannerTest.java   |   2 +-
 .../db/mpp/sql/plan/FragmentInstanceSerdeTest.java | 117 +++++++
 .../sql/plan/node/PlanNodeDeserializeHelper.java}  |  18 +-
 .../metadata/read/ShowDevicesNodeSerdeTest.java}   |  23 +-
 .../plan/node/process/AggregateNodeSerdeTest.java  |  58 ++++
 .../node/process/DeviceMergeNodeSerdeTest.java     |  81 +++++
 .../plan/node/process/ExchangeNodeSerdeTest.java   | 101 ++++++
 .../sql/plan/node/process/FillNodeSerdeTest.java   |  85 +++++
 .../sql/plan/node/process/FilterNodeSerdeTest.java |  97 ++++++
 .../plan/node/process/FilterNullNodeSerdeTest.java | 102 ++++++
 .../node/process/GroupByLevelNodeSerdeTest.java    | 117 +++++++
 .../sql/plan/node/process/LimitNodeSerdeTest.java  | 119 +++++++
 .../sql/plan/node/process/OffsetNodeSerdeTest.java | 186 ++++++++++
 .../sql/plan/node/process/SortNodeSerdeTest.java   | 130 +++++++
 .../plan/node/process/TimeJoinNodeSerdeTest.java   | 135 ++++++++
 .../plan/node/sink/FragmentSinkNodeSerdeTest.java  |  52 +++
 .../source/SeriesAggregateScanNodeSerdeTest.java   |  69 ++++
 .../plan/node/source/SeriesScanNodeSerdeTest.java  |  55 +++
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   7 +-
 .../src/main/thrift/confignode.thrift              | 137 ++++----
 thrift/src/main/thrift/common.thrift               |  13 +-
 thrift/src/main/thrift/management.thrift           |   4 +-
 .../org/apache/iotdb/tsfile/read/common/Path.java  |  21 ++
 .../iotdb/tsfile/read/expression/IExpression.java  |   3 +
 .../read/expression/impl/BinaryExpression.java     |  16 +
 .../read/expression/impl/GlobalTimeExpression.java |  13 +
 .../expression/impl/SingleSeriesExpression.java    |   9 +
 .../iotdb/tsfile/read/filter/basic/Filter.java     |   8 +
 .../iotdb/tsfile/utils/ReadWriteIOUtils.java       |  31 ++
 165 files changed, 6001 insertions(+), 1487 deletions(-)

diff --cc server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
index 6564274996,b22f480589..49b0aa3dd2
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
@@@ -60,10 -74,23 +74,30 @@@ public class FragmentInstanceId 
      return new TFragmentInstanceId(queryId.getId(), String.valueOf(fragmentId.getId()), instanceId);
    }
  
 +  public static FragmentInstanceId fromThrift(TFragmentInstanceId tFragmentInstanceId) {
 +    return new FragmentInstanceId(
 +        new PlanFragmentId(
 +            tFragmentInstanceId.queryId, Integer.parseInt(tFragmentInstanceId.fragmentId)),
 +        tFragmentInstanceId.instanceId);
 +  }
++
+   @Override
+   public boolean equals(Object o) {
+     if (this == o) {
+       return true;
+     }
+     if (o == null || getClass() != o.getClass()) {
+       return false;
+     }
+     FragmentInstanceId that = (FragmentInstanceId) o;
+     return Objects.equals(fullId, that.fullId)
+         && Objects.equals(queryId, that.queryId)
+         && Objects.equals(fragmentId, that.fragmentId)
+         && Objects.equals(instanceId, that.instanceId);
+   }
+ 
+   @Override
+   public int hashCode() {
+     return Objects.hash(fullId, queryId, fragmentId, instanceId);
+   }
  }


[iotdb] 03/03: complete internal RPC

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/mpp_rpc
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 11e426f276c91ece5ba3e0d57e35ff37dfe19dc0
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Apr 11 20:11:45 2022 +0800

    complete internal RPC
---
 .../scheduler/SimpleFragInstanceDispatcher.java    | 18 ++++++++-
 .../db/mpp/sql/planner/DistributionPlanner.java    |  3 +-
 .../db/mpp/sql/planner/plan/FragmentInstance.java  | 44 +++++++++++++---------
 .../plan/SimpleFragmentParallelPlanner.java        | 13 ++++++-
 .../iotdb/db/service/InternalServiceImpl.java      | 15 ++++++--
 .../db/mpp/sql/plan/FragmentInstanceSerdeTest.java |  8 +++-
 6 files changed, 72 insertions(+), 29 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index 60bdb13199..0dde122c74 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -21,9 +21,13 @@ package org.apache.iotdb.db.mpp.execution.scheduler;
 
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.InternalService;
+import org.apache.iotdb.mpp.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
+import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
 
 import org.apache.thrift.TException;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -45,8 +49,18 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
               InternalService.Client client =
                   InternalServiceClientFactory.getInternalServiceClient(
                       instance.getHostEndpoint().getIp(), instance.getHostEndpoint().getPort());
-              // TODO: (xingtanzjr) add request construction
-              client.sendFragmentInstance(null);
+              // TODO: (xingtanzjr) consider how to handle the buffer here
+              ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
+              instance.serializeRequest(buffer);
+              buffer.flip();
+              TConsensusGroupId groupId =
+                  new TConsensusGroupId(
+                      instance.getDataRegionId().getId().getId(),
+                      instance.getDataRegionId().getId().getType().toString());
+              TSendFragmentInstanceReq req =
+                  new TSendFragmentInstanceReq(
+                      new TFragmentInstance(buffer), groupId, instance.getType().toString());
+              client.sendFragmentInstance(req);
             }
           } catch (TException e) {
             // TODO: (xingtanzjr) add more details
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index baba0c6ae0..96341a2ca1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -79,7 +79,8 @@ public class DistributionPlanner {
   // Convert fragment to detailed instance
   // And for parallel-able fragment, clone it into several instances with different params.
   public List<FragmentInstance> planFragmentInstances(SubPlan subPlan) {
-    IFragmentParallelPlaner parallelPlaner = new SimpleFragmentParallelPlanner(subPlan, analysis);
+    IFragmentParallelPlaner parallelPlaner =
+        new SimpleFragmentParallelPlanner(subPlan, analysis, context);
     return parallelPlaner.parallelPlan();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index d46e502326..6a5e40db50 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -23,11 +23,13 @@ import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -35,7 +37,7 @@ import java.util.Objects;
 
 public class FragmentInstance implements IConsensusRequest {
   private final FragmentInstanceId id;
-
+  private final QueryType type;
   // The reference of PlanFragment which this instance is generated from
   private final PlanFragment fragment;
   // The DataRegion where the FragmentInstance should run
@@ -47,9 +49,11 @@ public class FragmentInstance implements IConsensusRequest {
   // We can add some more params for a specific FragmentInstance
   // So that we can make different FragmentInstance owns different data range.
 
-  public FragmentInstance(PlanFragment fragment, int index) {
+  public FragmentInstance(PlanFragment fragment, int index, Filter timeFilter, QueryType type) {
     this.fragment = fragment;
+    this.timeFilter = timeFilter;
     this.id = generateId(fragment.getId(), index);
+    this.type = type;
   }
 
   public static FragmentInstanceId generateId(PlanFragmentId id, int index) {
@@ -101,6 +105,10 @@ public class FragmentInstance implements IConsensusRequest {
     return timeFilter;
   }
 
+  public QueryType getType() {
+    return type;
+  }
+
   public String toString() {
     StringBuilder ret = new StringBuilder();
     ret.append(
@@ -116,7 +124,10 @@ public class FragmentInstance implements IConsensusRequest {
     FragmentInstanceId id = FragmentInstanceId.deserialize(buffer);
     FragmentInstance fragmentInstance =
         new FragmentInstance(
-            PlanFragment.deserialize(buffer), Integer.parseInt(id.getInstanceId()));
+            PlanFragment.deserialize(buffer),
+            Integer.parseInt(id.getInstanceId()),
+            FilterFactory.deserialize(buffer),
+            QueryType.values()[ReadWriteIOUtils.readInt(buffer)]);
     RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
     try {
       regionReplicaSet.deserializeImpl(buffer);
@@ -127,7 +138,6 @@ public class FragmentInstance implements IConsensusRequest {
     endpoint.deserializeImpl(buffer);
     fragmentInstance.dataRegion = regionReplicaSet;
     fragmentInstance.hostEndpoint = endpoint;
-    fragmentInstance.timeFilter = FilterFactory.deserialize(buffer);
 
     return fragmentInstance;
   }
@@ -136,29 +146,27 @@ public class FragmentInstance implements IConsensusRequest {
   public void serializeRequest(ByteBuffer buffer) {
     id.serialize(buffer);
     fragment.serialize(buffer);
+    timeFilter.serialize(buffer);
+    ReadWriteIOUtils.write(type.ordinal(), buffer);
     dataRegion.serializeImpl(buffer);
     hostEndpoint.serializeImpl(buffer);
-    timeFilter.serialize(buffer);
   }
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    FragmentInstance that = (FragmentInstance) o;
-    return Objects.equals(id, that.id)
-        && Objects.equals(fragment, that.fragment)
-        && Objects.equals(dataRegion, that.dataRegion)
-        && Objects.equals(hostEndpoint, that.hostEndpoint)
-        && Objects.equals(timeFilter, that.timeFilter);
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    FragmentInstance instance = (FragmentInstance) o;
+    return Objects.equals(id, instance.id)
+        && type == instance.type
+        && Objects.equals(fragment, instance.fragment)
+        && Objects.equals(dataRegion, instance.dataRegion)
+        && Objects.equals(hostEndpoint, instance.hostEndpoint)
+        && Objects.equals(timeFilter, instance.timeFilter);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(id, fragment, dataRegion, hostEndpoint, timeFilter);
+    return Objects.hash(id, type, fragment, dataRegion, hostEndpoint, timeFilter);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
index f91e4a7f32..33ca767f0e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.mpp.sql.planner.plan;
 
 import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
@@ -26,6 +27,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -40,6 +42,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
 
   private SubPlan subPlan;
   private Analysis analysis;
+  private MPPQueryContext queryContext;
 
   // Record all the FragmentInstances belonged to same PlanFragment
   Map<PlanFragmentId, FragmentInstance> instanceMap;
@@ -47,9 +50,11 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
   Map<PlanNodeId, PlanFragmentId> planNodeMap;
   List<FragmentInstance> fragmentInstanceList;
 
-  public SimpleFragmentParallelPlanner(SubPlan subPlan, Analysis analysis) {
+  public SimpleFragmentParallelPlanner(
+      SubPlan subPlan, Analysis analysis, MPPQueryContext context) {
     this.subPlan = subPlan;
     this.analysis = analysis;
+    this.queryContext = context;
     this.instanceMap = new HashMap<>();
     this.planNodeMap = new HashMap<>();
     this.fragmentInstanceList = new ArrayList<>();
@@ -76,7 +81,11 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
     int instanceIdx = 0;
     PlanNode rootCopy = PlanNodeUtil.deepCopy(fragment.getRoot());
     FragmentInstance fragmentInstance =
-        new FragmentInstance(new PlanFragment(fragment.getId(), rootCopy), instanceIdx);
+        new FragmentInstance(
+            new PlanFragment(fragment.getId(), rootCopy),
+            instanceIdx,
+            ((GlobalTimeExpression) analysis.getQueryFilter()).getFilter(),
+            queryContext.getQueryType());
 
     // Get the target DataRegion for origin PlanFragment, then its instance will be distributed one
     // of them.
diff --git a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
index 60cc4d8f18..a0e1d01aab 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.db.service;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.GroupType;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.db.consensus.ConsensusImpl;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceInfo;
@@ -38,6 +40,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStateReq;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceStateResp;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.TException;
 
@@ -57,11 +60,15 @@ public class InternalServiceImpl implements InternalService.Iface {
             req.consensusGroupId.id, GroupType.valueOf(req.consensusGroupId.type));
     switch (type) {
       case READ:
-        ConsensusImpl.getInstance().read(groupId, request);
-        break;
+        ConsensusReadResponse readResp = ConsensusImpl.getInstance().read(groupId, request);
+        FragmentInstanceInfo info = (FragmentInstanceInfo) readResp.getDataset();
+        return new TSendFragmentInstanceResp(info.getState().isFailed());
       case WRITE:
-        ConsensusImpl.getInstance().write(groupId, request);
-        break;
+        ConsensusWriteResponse writeResp = ConsensusImpl.getInstance().write(groupId, request);
+        // TODO: (xingtanzjr) need to distinguish more conditions for response status.
+        boolean accepted =
+            writeResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
+        return new TSendFragmentInstanceResp(accepted);
     }
     return null;
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
index 93a2d49eb5..33dc8a9613 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -101,12 +102,15 @@ public class FragmentInstanceSerdeTest {
     offsetNode.addChild(limitNode);
 
     FragmentInstance fragmentInstance =
-        new FragmentInstance(new PlanFragment(new PlanFragmentId("test", -1), offsetNode), -1);
+        new FragmentInstance(
+            new PlanFragment(new PlanFragmentId("test", -1), offsetNode),
+            -1,
+            new GroupByFilter(1, 2, 3, 4),
+            QueryType.READ);
     RegionReplicaSet regionReplicaSet =
         new RegionReplicaSet(new DataRegionId(1), new ArrayList<>());
     fragmentInstance.setDataRegionId(regionReplicaSet);
     fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.1", 6666));
-    fragmentInstance.setTimeFilter(new GroupByFilter(1, 2, 3, 4));
 
     ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
     fragmentInstance.serializeRequest(byteBuffer);


[iotdb] 01/03: tmp saved

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/mpp_rpc
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 74eca8903ced87a037b650e24513953abd5c2d46
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Apr 11 18:08:37 2022 +0800

    tmp saved
---
 .../iotdb/commons/consensus/ConsensusGroupId.java  | 27 ++++++++++++++++------
 .../iotdb/commons/consensus/DataRegionId.java      |  5 ++++
 .../iotdb/commons/consensus/PartitionRegionId.java |  5 ++++
 .../iotdb/commons/consensus/SchemaRegionId.java    |  5 ++++
 .../iotdb/db/mpp/common/FragmentInstanceId.java    |  7 ++++++
 .../db/mpp/sql/planner/DistributionPlanner.java    |  2 +-
 .../plan/SimpleFragmentParallelPlanner.java        |  5 +++-
 .../iotdb/db/service/InternalServiceImpl.java      | 26 ++++++++++++++++++++-
 thrift/src/main/thrift/mpp.thrift                  |  7 ++++++
 9 files changed, 79 insertions(+), 10 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
index abc7747e09..13e38df6c0 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
@@ -33,6 +33,8 @@ public interface ConsensusGroupId {
   // return specific id
   int getId();
 
+  void setId(int id);
+
   // return specific type
   GroupType getType();
 
@@ -43,22 +45,33 @@ public interface ConsensusGroupId {
         throw new IOException("unrecognized id type " + index);
       }
       GroupType type = GroupType.values()[index];
-      ConsensusGroupId id;
+      ConsensusGroupId groupId = createEmpty(type);
+      groupId.deserializeImpl(buffer);
+      return groupId;
+    }
+
+    public static ConsensusGroupId createEmpty(GroupType type) {
+      ConsensusGroupId groupId;
       switch (type) {
         case DataRegion:
-          id = new DataRegionId();
+          groupId = new DataRegionId();
           break;
         case SchemaRegion:
-          id = new SchemaRegionId();
+          groupId = new SchemaRegionId();
           break;
         case PartitionRegion:
-          id = new PartitionRegionId();
+          groupId = new PartitionRegionId();
           break;
         default:
-          throw new IOException("unrecognized id type " + type);
+          throw new IllegalArgumentException("unrecognized id type " + type);
       }
-      id.deserializeImpl(buffer);
-      return id;
+      return groupId;
+    }
+
+    public static ConsensusGroupId create(int id, GroupType type) {
+      ConsensusGroupId groupId = createEmpty(type);
+      groupId.setId(id);
+      return groupId;
     }
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
index aa570cea7a..3085541766 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
@@ -49,6 +49,11 @@ public class DataRegionId implements ConsensusGroupId {
     return id;
   }
 
+  @Override
+  public void setId(int id) {
+    this.id = id;
+  }
+
   @Override
   public GroupType getType() {
     return GroupType.DataRegion;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java
index 4eb38a15db..86f5fe963c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java
@@ -49,6 +49,11 @@ public class PartitionRegionId implements ConsensusGroupId {
     return id;
   }
 
+  @Override
+  public void setId(int id) {
+    this.id = id;
+  }
+
   @Override
   public GroupType getType() {
     return GroupType.PartitionRegion;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java
index 0b3d9ad551..a42ddf3ad0 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java
@@ -49,6 +49,11 @@ public class SchemaRegionId implements ConsensusGroupId {
     return id;
   }
 
+  @Override
+  public void setId(int id) {
+    this.id = id;
+  }
+
   @Override
   public GroupType getType() {
     return GroupType.SchemaRegion;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
index 369466c158..6564274996 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
@@ -59,4 +59,11 @@ public class FragmentInstanceId {
   public TFragmentInstanceId toThrift() {
     return new TFragmentInstanceId(queryId.getId(), String.valueOf(fragmentId.getId()), instanceId);
   }
+
+  public static FragmentInstanceId fromThrift(TFragmentInstanceId tFragmentInstanceId) {
+    return new FragmentInstanceId(
+        new PlanFragmentId(
+            tFragmentInstanceId.queryId, Integer.parseInt(tFragmentInstanceId.fragmentId)),
+        tFragmentInstanceId.instanceId);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index 182f85fbe5..baba0c6ae0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -79,7 +79,7 @@ public class DistributionPlanner {
   // Convert fragment to detailed instance
   // And for parallel-able fragment, clone it into several instances with different params.
   public List<FragmentInstance> planFragmentInstances(SubPlan subPlan) {
-    IFragmentParallelPlaner parallelPlaner = new SimpleFragmentParallelPlanner(subPlan);
+    IFragmentParallelPlaner parallelPlaner = new SimpleFragmentParallelPlanner(subPlan, analysis);
     return parallelPlaner.parallelPlan();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
index 4bdd01e9de..f91e4a7f32 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.sql.planner.plan;
 
 import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
@@ -38,6 +39,7 @@ import java.util.Map;
 public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
 
   private SubPlan subPlan;
+  private Analysis analysis;
 
   // Record all the FragmentInstances belonged to same PlanFragment
   Map<PlanFragmentId, FragmentInstance> instanceMap;
@@ -45,8 +47,9 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
   Map<PlanNodeId, PlanFragmentId> planNodeMap;
   List<FragmentInstance> fragmentInstanceList;
 
-  public SimpleFragmentParallelPlanner(SubPlan subPlan) {
+  public SimpleFragmentParallelPlanner(SubPlan subPlan, Analysis analysis) {
     this.subPlan = subPlan;
+    this.analysis = analysis;
     this.instanceMap = new HashMap<>();
     this.planNodeMap = new HashMap<>();
     this.fragmentInstanceList = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
index acac7210cf..60cc4d8f18 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
@@ -19,6 +19,14 @@
 
 package org.apache.iotdb.db.service;
 
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.GroupType;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.db.consensus.ConsensusImpl;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceInfo;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceManager;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.mpp.rpc.thrift.InternalService;
 import org.apache.iotdb.mpp.rpc.thrift.SchemaFetchRequest;
 import org.apache.iotdb.mpp.rpc.thrift.SchemaFetchResponse;
@@ -42,13 +50,29 @@ public class InternalServiceImpl implements InternalService.Iface {
   @Override
   public TSendFragmentInstanceResp sendFragmentInstance(TSendFragmentInstanceReq req)
       throws TException {
+    ByteBufferConsensusRequest request = new ByteBufferConsensusRequest(req.fragmentInstance.body);
+    QueryType type = QueryType.valueOf(req.queryType);
+    ConsensusGroupId groupId =
+        ConsensusGroupId.Factory.create(
+            req.consensusGroupId.id, GroupType.valueOf(req.consensusGroupId.type));
+    switch (type) {
+      case READ:
+        ConsensusImpl.getInstance().read(groupId, request);
+        break;
+      case WRITE:
+        ConsensusImpl.getInstance().write(groupId, request);
+        break;
+    }
     return null;
   }
 
   @Override
   public TFragmentInstanceStateResp fetchFragmentInstanceState(TFetchFragmentInstanceStateReq req)
       throws TException {
-    return null;
+    FragmentInstanceInfo info =
+        FragmentInstanceManager.getInstance()
+            .getInstanceInfo(FragmentInstanceId.fromThrift(req.fragmentInstanceId));
+    return new TFragmentInstanceStateResp(info.getState().toString());
   }
 
   @Override
diff --git a/thrift/src/main/thrift/mpp.thrift b/thrift/src/main/thrift/mpp.thrift
index b1cae785d6..d0ed78f9d5 100644
--- a/thrift/src/main/thrift/mpp.thrift
+++ b/thrift/src/main/thrift/mpp.thrift
@@ -61,8 +61,15 @@ struct TFragmentInstance {
   1: required binary body
 }
 
+struct TConsensusGroupId {
+  1: required i32 id
+  2: required string type
+}
+
 struct TSendFragmentInstanceReq {
   1: required TFragmentInstance fragmentInstance
+  2: required TConsensusGroupId consensusGroupId
+  3: required string queryType
 }
 
 struct TSendFragmentInstanceResp {