You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/12 04:34:52 UTC

[iotdb] branch master updated: [IoTDB-2666] Implement the RPC of Fragment dispatch and status track (#5478)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d5f67f3dd [IoTDB-2666] Implement the RPC of Fragment dispatch and status track (#5478)
5d5f67f3dd is described below

commit 5d5f67f3dd860e1d641863289023fd140baa6418
Author: Zhang.Jinrui <xi...@gmail.com>
AuthorDate: Tue Apr 12 12:34:48 2022 +0800

    [IoTDB-2666] Implement the RPC of Fragment dispatch and status track (#5478)
---
 .../iotdb/commons/consensus/ConsensusGroupId.java  | 27 ++++++++---
 .../iotdb/commons/consensus/DataRegionId.java      |  5 ++
 .../iotdb/commons/consensus/PartitionRegionId.java |  5 ++
 .../iotdb/commons/consensus/SchemaRegionId.java    |  5 ++
 .../apache/iotdb/commons/ConsensusGroupIdTest.java | 53 +++++++++++++++++++++
 .../iotdb/db/mpp/common/FragmentInstanceId.java    |  8 +++-
 .../scheduler/AbstractFragInsStateTracker.java     |  2 +-
 .../scheduler/SimpleFragInstanceDispatcher.java    | 18 ++++++-
 .../db/mpp/schedule/FragmentInstanceScheduler.java |  2 +-
 .../db/mpp/sql/planner/DistributionPlanner.java    |  3 +-
 .../db/mpp/sql/planner/plan/FragmentInstance.java  | 48 ++++++++++++-------
 .../plan/SimpleFragmentParallelPlanner.java        | 21 ++++++++-
 .../iotdb/db/service/InternalServiceImpl.java      | 33 ++++++++++++-
 .../apache/iotdb/db/mpp/buffer/SinkHandleTest.java | 15 +++---
 .../iotdb/db/mpp/buffer/SourceHandleTest.java      | 20 ++++----
 .../db/mpp/sql/plan/FragmentInstanceIdTest.java    | 38 +++++++++++++++
 .../db/mpp/sql/plan/FragmentInstanceSerdeTest.java | 55 +++++++++++++++++-----
 thrift/src/main/thrift/mpp.thrift                  |  9 +++-
 18 files changed, 298 insertions(+), 69 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
index abc7747e09..13e38df6c0 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/ConsensusGroupId.java
@@ -33,6 +33,8 @@ public interface ConsensusGroupId {
   // return specific id
   int getId();
 
+  void setId(int id);
+
   // return specific type
   GroupType getType();
 
@@ -43,22 +45,33 @@ public interface ConsensusGroupId {
         throw new IOException("unrecognized id type " + index);
       }
       GroupType type = GroupType.values()[index];
-      ConsensusGroupId id;
+      ConsensusGroupId groupId = createEmpty(type);
+      groupId.deserializeImpl(buffer);
+      return groupId;
+    }
+
+    public static ConsensusGroupId createEmpty(GroupType type) {
+      ConsensusGroupId groupId;
       switch (type) {
         case DataRegion:
-          id = new DataRegionId();
+          groupId = new DataRegionId();
           break;
         case SchemaRegion:
-          id = new SchemaRegionId();
+          groupId = new SchemaRegionId();
           break;
         case PartitionRegion:
-          id = new PartitionRegionId();
+          groupId = new PartitionRegionId();
           break;
         default:
-          throw new IOException("unrecognized id type " + type);
+          throw new IllegalArgumentException("unrecognized id type " + type);
       }
-      id.deserializeImpl(buffer);
-      return id;
+      return groupId;
+    }
+
+    public static ConsensusGroupId create(int id, GroupType type) {
+      ConsensusGroupId groupId = createEmpty(type);
+      groupId.setId(id);
+      return groupId;
     }
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
index aa570cea7a..3085541766 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/DataRegionId.java
@@ -49,6 +49,11 @@ public class DataRegionId implements ConsensusGroupId {
     return id;
   }
 
+  @Override
+  public void setId(int id) {
+    this.id = id;
+  }
+
   @Override
   public GroupType getType() {
     return GroupType.DataRegion;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java
index 4eb38a15db..86f5fe963c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/PartitionRegionId.java
@@ -49,6 +49,11 @@ public class PartitionRegionId implements ConsensusGroupId {
     return id;
   }
 
+  @Override
+  public void setId(int id) {
+    this.id = id;
+  }
+
   @Override
   public GroupType getType() {
     return GroupType.PartitionRegion;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java
index 0b3d9ad551..a42ddf3ad0 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/SchemaRegionId.java
@@ -49,6 +49,11 @@ public class SchemaRegionId implements ConsensusGroupId {
     return id;
   }
 
+  @Override
+  public void setId(int id) {
+    this.id = id;
+  }
+
   @Override
   public GroupType getType() {
     return GroupType.SchemaRegion;
diff --git a/node-commons/src/test/java/org/apache/iotdb/commons/ConsensusGroupIdTest.java b/node-commons/src/test/java/org/apache/iotdb/commons/ConsensusGroupIdTest.java
new file mode 100644
index 0000000000..e890449d2d
--- /dev/null
+++ b/node-commons/src/test/java/org/apache/iotdb/commons/ConsensusGroupIdTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons;
+
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.consensus.GroupType;
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class ConsensusGroupIdTest {
+  @Test
+  public void TestCreate() throws IOException {
+    ConsensusGroupId dataRegionId = ConsensusGroupId.Factory.create(1, GroupType.DataRegion);
+    Assert.assertTrue(dataRegionId instanceof DataRegionId);
+    Assert.assertEquals(1, dataRegionId.getId());
+    Assert.assertEquals(GroupType.DataRegion, dataRegionId.getType());
+
+    ConsensusGroupId schemaRegionId = ConsensusGroupId.Factory.create(2, GroupType.SchemaRegion);
+    Assert.assertTrue(schemaRegionId instanceof SchemaRegionId);
+    Assert.assertEquals(2, schemaRegionId.getId());
+    Assert.assertEquals(GroupType.SchemaRegion, schemaRegionId.getType());
+
+    ByteBuffer buffer = ByteBuffer.allocate(1024);
+    schemaRegionId.serializeImpl(buffer);
+    buffer.flip();
+
+    ConsensusGroupId schemaRegionIdClone = ConsensusGroupId.Factory.create(buffer);
+    Assert.assertEquals(schemaRegionId, schemaRegionIdClone);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
index b22f480589..8c0336adbd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
@@ -71,7 +71,13 @@ public class FragmentInstanceId {
   }
 
   public TFragmentInstanceId toThrift() {
-    return new TFragmentInstanceId(queryId.getId(), String.valueOf(fragmentId.getId()), instanceId);
+    return new TFragmentInstanceId(queryId.getId(), fragmentId.getId(), instanceId);
+  }
+
+  public static FragmentInstanceId fromThrift(TFragmentInstanceId tFragmentInstanceId) {
+    return new FragmentInstanceId(
+        new PlanFragmentId(tFragmentInstanceId.queryId, tFragmentInstanceId.fragmentId),
+        tFragmentInstanceId.instanceId);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
index 6f64a27a1a..b87579561c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
@@ -67,7 +67,7 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT
   private TFragmentInstanceId getTId(FragmentInstance instance) {
     return new TFragmentInstanceId(
         instance.getId().getQueryId().getId(),
-        String.valueOf(instance.getId().getFragmentId().getId()),
+        instance.getId().getFragmentId().getId(),
         instance.getId().getInstanceId());
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index 60bdb13199..0dde122c74 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -21,9 +21,13 @@ package org.apache.iotdb.db.mpp.execution.scheduler;
 
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.InternalService;
+import org.apache.iotdb.mpp.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
+import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
 
 import org.apache.thrift.TException;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -45,8 +49,18 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
               InternalService.Client client =
                   InternalServiceClientFactory.getInternalServiceClient(
                       instance.getHostEndpoint().getIp(), instance.getHostEndpoint().getPort());
-              // TODO: (xingtanzjr) add request construction
-              client.sendFragmentInstance(null);
+              // TODO: (xingtanzjr) consider how to handle the buffer here
+              ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
+              instance.serializeRequest(buffer);
+              buffer.flip();
+              TConsensusGroupId groupId =
+                  new TConsensusGroupId(
+                      instance.getDataRegionId().getId().getId(),
+                      instance.getDataRegionId().getId().getType().toString());
+              TSendFragmentInstanceReq req =
+                  new TSendFragmentInstanceReq(
+                      new TFragmentInstance(buffer), groupId, instance.getType().toString());
+              client.sendFragmentInstance(req);
             }
           } catch (TException e) {
             // TODO: (xingtanzjr) add more details
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
index 2ddd981249..efaa56b89e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
@@ -196,7 +196,7 @@ public class FragmentInstanceScheduler implements IFragmentInstanceScheduler, IS
       blockManager.forceDeregisterFragmentInstance(
           new TFragmentInstanceId(
               task.getId().getQueryId().getId(),
-              String.valueOf(task.getId().getFragmentId().getId()),
+              task.getId().getFragmentId().getId(),
               task.getId().getInstanceId()));
     }
     readyQueue.remove(task.getId());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index 182f85fbe5..96341a2ca1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -79,7 +79,8 @@ public class DistributionPlanner {
   // Convert fragment to detailed instance
   // And for parallel-able fragment, clone it into several instances with different params.
   public List<FragmentInstance> planFragmentInstances(SubPlan subPlan) {
-    IFragmentParallelPlaner parallelPlaner = new SimpleFragmentParallelPlanner(subPlan);
+    IFragmentParallelPlaner parallelPlaner =
+        new SimpleFragmentParallelPlanner(subPlan, analysis, context);
     return parallelPlaner.parallelPlan();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index d46e502326..03fd9f3e0e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -23,11 +23,13 @@ import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -35,7 +37,7 @@ import java.util.Objects;
 
 public class FragmentInstance implements IConsensusRequest {
   private final FragmentInstanceId id;
-
+  private final QueryType type;
   // The reference of PlanFragment which this instance is generated from
   private final PlanFragment fragment;
   // The DataRegion where the FragmentInstance should run
@@ -47,9 +49,11 @@ public class FragmentInstance implements IConsensusRequest {
   // We can add some more params for a specific FragmentInstance
   // So that we can make different FragmentInstance owns different data range.
 
-  public FragmentInstance(PlanFragment fragment, int index) {
+  public FragmentInstance(PlanFragment fragment, int index, Filter timeFilter, QueryType type) {
     this.fragment = fragment;
+    this.timeFilter = timeFilter;
     this.id = generateId(fragment.getId(), index);
+    this.type = type;
   }
 
   public static FragmentInstanceId generateId(PlanFragmentId id, int index) {
@@ -101,6 +105,10 @@ public class FragmentInstance implements IConsensusRequest {
     return timeFilter;
   }
 
+  public QueryType getType() {
+    return type;
+  }
+
   public String toString() {
     StringBuilder ret = new StringBuilder();
     ret.append(
@@ -114,9 +122,13 @@ public class FragmentInstance implements IConsensusRequest {
 
   public static FragmentInstance deserializeFrom(ByteBuffer buffer) {
     FragmentInstanceId id = FragmentInstanceId.deserialize(buffer);
+    PlanFragment planFragment = PlanFragment.deserialize(buffer);
+    boolean hasTimeFilter = ReadWriteIOUtils.readBool(buffer);
+    Filter timeFilter = hasTimeFilter ? FilterFactory.deserialize(buffer) : null;
+    QueryType queryType = QueryType.values()[ReadWriteIOUtils.readInt(buffer)];
     FragmentInstance fragmentInstance =
         new FragmentInstance(
-            PlanFragment.deserialize(buffer), Integer.parseInt(id.getInstanceId()));
+            planFragment, Integer.parseInt(id.getInstanceId()), timeFilter, queryType);
     RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
     try {
       regionReplicaSet.deserializeImpl(buffer);
@@ -127,7 +139,6 @@ public class FragmentInstance implements IConsensusRequest {
     endpoint.deserializeImpl(buffer);
     fragmentInstance.dataRegion = regionReplicaSet;
     fragmentInstance.hostEndpoint = endpoint;
-    fragmentInstance.timeFilter = FilterFactory.deserialize(buffer);
 
     return fragmentInstance;
   }
@@ -136,29 +147,30 @@ public class FragmentInstance implements IConsensusRequest {
   public void serializeRequest(ByteBuffer buffer) {
     id.serialize(buffer);
     fragment.serialize(buffer);
+    ReadWriteIOUtils.write(timeFilter != null, buffer);
+    if (timeFilter != null) {
+      timeFilter.serialize(buffer);
+    }
+    ReadWriteIOUtils.write(type.ordinal(), buffer);
     dataRegion.serializeImpl(buffer);
     hostEndpoint.serializeImpl(buffer);
-    timeFilter.serialize(buffer);
   }
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    FragmentInstance that = (FragmentInstance) o;
-    return Objects.equals(id, that.id)
-        && Objects.equals(fragment, that.fragment)
-        && Objects.equals(dataRegion, that.dataRegion)
-        && Objects.equals(hostEndpoint, that.hostEndpoint)
-        && Objects.equals(timeFilter, that.timeFilter);
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    FragmentInstance instance = (FragmentInstance) o;
+    return Objects.equals(id, instance.id)
+        && type == instance.type
+        && Objects.equals(fragment, instance.fragment)
+        && Objects.equals(dataRegion, instance.dataRegion)
+        && Objects.equals(hostEndpoint, instance.hostEndpoint)
+        && Objects.equals(timeFilter, instance.timeFilter);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(id, fragment, dataRegion, hostEndpoint, timeFilter);
+    return Objects.hash(id, type, fragment, dataRegion, hostEndpoint, timeFilter);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
index 4bdd01e9de..c2014c55d2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
@@ -19,12 +19,16 @@
 package org.apache.iotdb.db.mpp.sql.planner.plan;
 
 import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -38,6 +42,8 @@ import java.util.Map;
 public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
 
   private SubPlan subPlan;
+  private Analysis analysis;
+  private MPPQueryContext queryContext;
 
   // Record all the FragmentInstances belonged to same PlanFragment
   Map<PlanFragmentId, FragmentInstance> instanceMap;
@@ -45,8 +51,11 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
   Map<PlanNodeId, PlanFragmentId> planNodeMap;
   List<FragmentInstance> fragmentInstanceList;
 
-  public SimpleFragmentParallelPlanner(SubPlan subPlan) {
+  public SimpleFragmentParallelPlanner(
+      SubPlan subPlan, Analysis analysis, MPPQueryContext context) {
     this.subPlan = subPlan;
+    this.analysis = analysis;
+    this.queryContext = context;
     this.instanceMap = new HashMap<>();
     this.planNodeMap = new HashMap<>();
     this.fragmentInstanceList = new ArrayList<>();
@@ -72,8 +81,16 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
     // one by one
     int instanceIdx = 0;
     PlanNode rootCopy = PlanNodeUtil.deepCopy(fragment.getRoot());
+    Filter timeFilter =
+        analysis.getQueryFilter() == null
+            ? null
+            : ((GlobalTimeExpression) analysis.getQueryFilter()).getFilter();
     FragmentInstance fragmentInstance =
-        new FragmentInstance(new PlanFragment(fragment.getId(), rootCopy), instanceIdx);
+        new FragmentInstance(
+            new PlanFragment(fragment.getId(), rootCopy),
+            instanceIdx,
+            timeFilter,
+            queryContext.getQueryType());
 
     // Get the target DataRegion for origin PlanFragment, then its instance will be distributed one
     // of them.
diff --git a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
index acac7210cf..a0e1d01aab 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/InternalServiceImpl.java
@@ -19,6 +19,16 @@
 
 package org.apache.iotdb.db.service;
 
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.GroupType;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.db.consensus.ConsensusImpl;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceInfo;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceManager;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.mpp.rpc.thrift.InternalService;
 import org.apache.iotdb.mpp.rpc.thrift.SchemaFetchRequest;
 import org.apache.iotdb.mpp.rpc.thrift.SchemaFetchResponse;
@@ -30,6 +40,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStateReq;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceStateResp;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.TException;
 
@@ -42,13 +53,33 @@ public class InternalServiceImpl implements InternalService.Iface {
   @Override
   public TSendFragmentInstanceResp sendFragmentInstance(TSendFragmentInstanceReq req)
       throws TException {
+    ByteBufferConsensusRequest request = new ByteBufferConsensusRequest(req.fragmentInstance.body);
+    QueryType type = QueryType.valueOf(req.queryType);
+    ConsensusGroupId groupId =
+        ConsensusGroupId.Factory.create(
+            req.consensusGroupId.id, GroupType.valueOf(req.consensusGroupId.type));
+    switch (type) {
+      case READ:
+        ConsensusReadResponse readResp = ConsensusImpl.getInstance().read(groupId, request);
+        FragmentInstanceInfo info = (FragmentInstanceInfo) readResp.getDataset();
+        return new TSendFragmentInstanceResp(info.getState().isFailed());
+      case WRITE:
+        ConsensusWriteResponse writeResp = ConsensusImpl.getInstance().write(groupId, request);
+        // TODO: (xingtanzjr) need to distinguish more conditions for response status.
+        boolean accepted =
+            writeResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
+        return new TSendFragmentInstanceResp(accepted);
+    }
     return null;
   }
 
   @Override
   public TFragmentInstanceStateResp fetchFragmentInstanceState(TFetchFragmentInstanceStateReq req)
       throws TException {
-    return null;
+    FragmentInstanceInfo info =
+        FragmentInstanceManager.getInstance()
+            .getInstanceInfo(FragmentInstanceId.fromThrift(req.fragmentInstanceId));
+    return new TFragmentInstanceStateResp(info.getState().toString());
   }
 
   @Override
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
index 64229ebe30..65ce8c8fd4 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
@@ -46,10 +46,9 @@ public class SinkHandleTest {
     final long mockTsBlockSize = 1024L * 1024L;
     final int numOfMockTsBlock = 10;
     final String remoteHostname = "remote";
-    final TFragmentInstanceId remoteFragmentInstanceId =
-        new TFragmentInstanceId(queryId, "f0", "0");
+    final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
     final String remotePlanNodeId = "exchange_0";
-    final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, "f1", "0");
+    final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
 
     // Construct a mock LocalMemoryManager that returns unblocked futures.
     LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
@@ -176,10 +175,9 @@ public class SinkHandleTest {
     final long mockTsBlockSize = 1024L * 1024L;
     final int numOfMockTsBlock = 10;
     final String remoteHostname = "remote";
-    final TFragmentInstanceId remoteFragmentInstanceId =
-        new TFragmentInstanceId(queryId, "f0", "0");
+    final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
     final String remotePlanNodeId = "exchange_0";
-    final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, "f1", "0");
+    final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
 
     // Construct a mock LocalMemoryManager that returns blocked futures.
     LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
@@ -359,10 +357,9 @@ public class SinkHandleTest {
     final long mockTsBlockSize = 1024L * 1024L;
     final int numOfMockTsBlock = 10;
     final String remoteHostname = "remote";
-    final TFragmentInstanceId remoteFragmentInstanceId =
-        new TFragmentInstanceId(queryId, "f0", "0");
+    final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
     final String remotePlanNodeId = "exchange_0";
-    final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, "f1", "0");
+    final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
 
     // Construct a mock LocalMemoryManager that returns blocked futures.
     LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
index 55c40e8015..1aaa5abd2f 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
@@ -50,10 +50,9 @@ public class SourceHandleTest {
     final long mockTsBlockSize = 1024L * 1024L;
     final int numOfMockTsBlock = 10;
     final String remoteHostname = "remote";
-    final TFragmentInstanceId remoteFragmentInstanceId =
-        new TFragmentInstanceId(queryId, "f1", "0");
+    final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
     final String localPlanNodeId = "exchange_0";
-    final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, "f0", "0");
+    final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
 
     // Construct a mock LocalMemoryManager that do not block any reservation.
     LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
@@ -170,10 +169,9 @@ public class SourceHandleTest {
     final long mockTsBlockSize = 1024L * 1024L;
     final int numOfMockTsBlock = 10;
     final String remoteHostname = "remote";
-    final TFragmentInstanceId remoteFragmentInstanceId =
-        new TFragmentInstanceId(queryId, "f1", "0");
+    final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
     final String localPlanNodeId = "exchange_0";
-    final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, "f0", "0");
+    final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
 
     // Construct a mock LocalMemoryManager with capacity 3 * mockTsBlockSize.
     LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
@@ -316,10 +314,9 @@ public class SourceHandleTest {
     final long mockTsBlockSize = 1024L * 1024L;
     final int numOfMockTsBlock = 10;
     final String remoteHostname = "remote";
-    final TFragmentInstanceId remoteFragmentInstanceId =
-        new TFragmentInstanceId(queryId, "f1", "0");
+    final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
     final String localPlanNodeId = "exchange_0";
-    final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, "f0", "0");
+    final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
 
     // Construct a mock LocalMemoryManager that returns unblocked futures.
     LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
@@ -508,10 +505,9 @@ public class SourceHandleTest {
     final long mockTsBlockSize = 1024L * 1024L;
     final int numOfMockTsBlock = 10;
     final String remoteHostname = "remote";
-    final TFragmentInstanceId remoteFragmentInstanceId =
-        new TFragmentInstanceId(queryId, "f1", "0");
+    final TFragmentInstanceId remoteFragmentInstanceId = new TFragmentInstanceId(queryId, 1, "0");
     final String localPlanNodeId = "exchange_0";
-    final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, "f0", "0");
+    final TFragmentInstanceId localFragmentInstanceId = new TFragmentInstanceId(queryId, 0, "0");
 
     // Construct a mock LocalMemoryManager that returns unblocked futures.
     LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceIdTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceIdTest.java
new file mode 100644
index 0000000000..e3b02b8f03
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceIdTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.plan;
+
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FragmentInstanceIdTest {
+  @Test
+  public void TestFromThrift() {
+    String queryId = "test_query";
+    TFragmentInstanceId tId = new TFragmentInstanceId(queryId, 1, "0");
+    FragmentInstanceId id = FragmentInstanceId.fromThrift(tId);
+    Assert.assertEquals(queryId, id.getQueryId().getId());
+    Assert.assertEquals(1, id.getFragmentId().getId());
+    Assert.assertEquals("0", id.getInstanceId());
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
index 93a2d49eb5..dea1e9b031 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
@@ -24,8 +24,10 @@ import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNullNode;
@@ -52,6 +54,45 @@ public class FragmentInstanceSerdeTest {
 
   @Test
   public void TestSerializeAndDeserializeForTree1() throws IllegalPathException {
+    FragmentInstance fragmentInstance =
+        new FragmentInstance(
+            new PlanFragment(new PlanFragmentId("test", -1), constructPlanNodeTree()),
+            -1,
+            new GroupByFilter(1, 2, 3, 4),
+            QueryType.READ);
+    RegionReplicaSet regionReplicaSet =
+        new RegionReplicaSet(new DataRegionId(1), new ArrayList<>());
+    fragmentInstance.setDataRegionId(regionReplicaSet);
+    fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.1", 6666));
+
+    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+    fragmentInstance.serializeRequest(byteBuffer);
+    byteBuffer.flip();
+    FragmentInstance deserializeFragmentInstance = FragmentInstance.deserializeFrom(byteBuffer);
+    assertEquals(deserializeFragmentInstance, fragmentInstance);
+  }
+
+  @Test
+  public void TestSerializeAndDeserializeWithNullFilter() throws IllegalPathException {
+    FragmentInstance fragmentInstance =
+        new FragmentInstance(
+            new PlanFragment(new PlanFragmentId("test2", 1), constructPlanNodeTree()),
+            -1,
+            null,
+            QueryType.READ);
+    RegionReplicaSet regionReplicaSet =
+        new RegionReplicaSet(new DataRegionId(1), new ArrayList<>());
+    fragmentInstance.setDataRegionId(regionReplicaSet);
+    fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.2", 6667));
+
+    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+    fragmentInstance.serializeRequest(byteBuffer);
+    byteBuffer.flip();
+    FragmentInstance deserializeFragmentInstance = FragmentInstance.deserializeFrom(byteBuffer);
+    assertEquals(deserializeFragmentInstance, fragmentInstance);
+  }
+
+  private PlanNode constructPlanNodeTree() throws IllegalPathException {
     // create node
     OffsetNode offsetNode = new OffsetNode(new PlanNodeId("OffsetNode"), 100);
     LimitNode limitNode = new LimitNode(new PlanNodeId("LimitNode"), 100);
@@ -100,18 +141,6 @@ public class FragmentInstanceSerdeTest {
     limitNode.addChild(filterNullNode);
     offsetNode.addChild(limitNode);
 
-    FragmentInstance fragmentInstance =
-        new FragmentInstance(new PlanFragment(new PlanFragmentId("test", -1), offsetNode), -1);
-    RegionReplicaSet regionReplicaSet =
-        new RegionReplicaSet(new DataRegionId(1), new ArrayList<>());
-    fragmentInstance.setDataRegionId(regionReplicaSet);
-    fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.1", 6666));
-    fragmentInstance.setTimeFilter(new GroupByFilter(1, 2, 3, 4));
-
-    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
-    fragmentInstance.serializeRequest(byteBuffer);
-    byteBuffer.flip();
-    FragmentInstance deserializeFragmentInstance = FragmentInstance.deserializeFrom(byteBuffer);
-    assertEquals(deserializeFragmentInstance, fragmentInstance);
+    return offsetNode;
   }
 }
diff --git a/thrift/src/main/thrift/mpp.thrift b/thrift/src/main/thrift/mpp.thrift
index b1cae785d6..e3be3e324b 100644
--- a/thrift/src/main/thrift/mpp.thrift
+++ b/thrift/src/main/thrift/mpp.thrift
@@ -22,7 +22,7 @@ namespace java org.apache.iotdb.mpp.rpc.thrift
 
 struct TFragmentInstanceId {
   1: required string queryId
-  2: required string fragmentId
+  2: required i32 fragmentId
   3: required string instanceId
 }
 
@@ -61,8 +61,15 @@ struct TFragmentInstance {
   1: required binary body
 }
 
+struct TConsensusGroupId {
+  1: required i32 id
+  2: required string type
+}
+
 struct TSendFragmentInstanceReq {
   1: required TFragmentInstance fragmentInstance
+  2: required TConsensusGroupId consensusGroupId
+  3: required string queryType
 }
 
 struct TSendFragmentInstanceResp {