You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/12/09 11:09:02 UTC

[iotdb] branch master updated: [IOTDB-5116] Fix wrong empty result set in aggregation query

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b5ef9a4256 [IOTDB-5116] Fix wrong empty result set in aggregation query
b5ef9a4256 is described below

commit b5ef9a4256f758936f76410e0045f1b38b2f0e90
Author: Weihao Li <60...@users.noreply.github.com>
AuthorDate: Fri Dec 9 19:08:55 2022 +0800

    [IOTDB-5116] Fix wrong empty result set in aggregation query
---
 .../db/it/aggregation/IoTDBAggregationIT.java      | 14 ++++-
 .../apache/iotdb/db/it/groupby/IOTDBGroupByIT.java | 12 ++++
 .../iotdb/commons/partition/ExecutorType.java      | 40 +++++++++++++
 .../iotdb/commons/partition/QueryExecutor.java     | 56 ++++++++++++++++++
 .../iotdb/commons/partition/StorageExecutor.java   | 67 ++++++++++++++++++++++
 .../mpp/execution/executor/RegionReadExecutor.java | 22 +++++++
 .../apache/iotdb/db/mpp/plan/analyze/Analysis.java |  5 +-
 .../SimpleFragmentParallelPlanner.java             | 20 ++++++-
 .../distribution/WriteFragmentParallelPlanner.java |  3 +-
 .../db/mpp/plan/planner/plan/FragmentInstance.java | 44 ++++++--------
 .../scheduler/FragmentInstanceDispatcherImpl.java  | 37 +++++++-----
 .../plan/scheduler/load/LoadTsFileScheduler.java   |  5 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       | 25 ++++----
 .../mpp/plan/plan/FragmentInstanceSerdeTest.java   | 26 ++++++---
 .../plan/scheduler/StandaloneSchedulerTest.java    | 11 ++--
 thrift/src/main/thrift/datanode.thrift             |  2 +-
 16 files changed, 317 insertions(+), 72 deletions(-)

diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationIT.java
index 7a07960490..8018a84f44 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationIT.java
@@ -46,6 +46,8 @@ import static org.apache.iotdb.db.constant.TestConstant.maxValue;
 import static org.apache.iotdb.db.constant.TestConstant.minTime;
 import static org.apache.iotdb.db.constant.TestConstant.minValue;
 import static org.apache.iotdb.db.constant.TestConstant.sum;
+import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualWithDescOrderTest;
+import static org.apache.iotdb.itbase.constant.TestConstant.TIMESTAMP_STR;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.fail;
 
@@ -65,7 +67,8 @@ public class IoTDBAggregationIT {
         "CREATE TIMESERIES root.vehicle.d0.s1 WITH DATATYPE=INT64, ENCODING=RLE",
         "CREATE TIMESERIES root.vehicle.d0.s2 WITH DATATYPE=FLOAT, ENCODING=RLE",
         "CREATE TIMESERIES root.vehicle.d0.s3 WITH DATATYPE=TEXT, ENCODING=PLAIN",
-        "CREATE TIMESERIES root.vehicle.d0.s4 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN"
+        "CREATE TIMESERIES root.vehicle.d0.s4 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
+        "CREATE TIMESERIES root.test.noDataRegion.s1 WITH DATATYPE=INT32"
       };
   private static final String[] dataSet2 =
       new String[] {
@@ -962,4 +965,13 @@ public class IoTDBAggregationIT {
       fail(e.getMessage());
     }
   }
+
+  @Test
+  public void noDataRegionTest() {
+    String[] expectedHeader =
+        new String[] {count("root.test.noDataRegion.s1"), sum("root.test.noDataRegion.s1")};
+    String[] retArray = new String[] {"0,null,"};
+    resultSetEqualWithDescOrderTest(
+        "select count(s1), sum(s1) from root.test.noDataRegion", expectedHeader, retArray);
+  }
 }
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/groupby/IOTDBGroupByIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/groupby/IOTDBGroupByIT.java
index 51b7c414a6..63c48cd748 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/groupby/IOTDBGroupByIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/groupby/IOTDBGroupByIT.java
@@ -63,6 +63,7 @@ public class IOTDBGroupByIT {
         "CREATE TIMESERIES root.ln.wf01.wt01.status WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
         "CREATE TIMESERIES root.ln.wf01.wt01.temperature WITH DATATYPE=DOUBLE, ENCODING=PLAIN",
         "CREATE TIMESERIES root.ln.wf01.wt01.hardware WITH DATATYPE=INT32, ENCODING=PLAIN",
+        "CREATE TIMESERIES root.test.noDataRegion.s1 WITH DATATYPE=INT32",
         "INSERT INTO root.ln.wf01.wt01(timestamp, temperature, status, hardware) values(1, 1.1, false, 11)",
         "INSERT INTO root.ln.wf01.wt01(timestamp, temperature, status, hardware) values(2, 2.2,  true, 22)",
         "INSERT INTO root.ln.wf01.wt01(timestamp, temperature, status, hardware) values(3, 3.3, false, 33 )",
@@ -528,4 +529,15 @@ public class IOTDBGroupByIT {
             + "GROUP BY ([1, 30), -1ms)",
         "no viable alternative at input");
   }
+
+  @Test
+  public void noDataRegionTest() {
+    String[] expectedHeader =
+        new String[] {count("root.test.noDataRegion.s1"), sum("root.test.noDataRegion.s1")};
+    String[] retArray = new String[] {"1,0,null,", "2,0,null,"};
+    resultSetEqualWithDescOrderTest(
+        "select count(s1), sum(s1) from root.test.noDataRegion" + "GROUP BY ([1, 3), 1ms)",
+        expectedHeader,
+        retArray);
+  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java
new file mode 100644
index 0000000000..e947ad732a
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+/** The interface is used to indicate where to execute a FragmentInstance */
+public interface ExecutorType {
+
+  /** Indicate if ExecutorType is StorageExecutor */
+  boolean isStorageExecutor();
+
+  TDataNodeLocation getDataNodeLocation();
+
+  default TRegionReplicaSet getRegionReplicaSet() {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  default void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/QueryExecutor.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/QueryExecutor.java
new file mode 100644
index 0000000000..55439beffd
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/QueryExecutor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+
+import java.util.Objects;
+
+/** QueryExecutor indicates this query can execute directly without data from StorageEngine */
+public class QueryExecutor implements ExecutorType {
+  TDataNodeLocation dataNodeLocation;
+
+  public QueryExecutor(TDataNodeLocation dataNodeLocation) {
+    this.dataNodeLocation = dataNodeLocation;
+  }
+
+  @Override
+  public TDataNodeLocation getDataNodeLocation() {
+    return dataNodeLocation;
+  }
+
+  @Override
+  public boolean isStorageExecutor() {
+    return false;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    QueryExecutor that = (QueryExecutor) o;
+    return Objects.equals(dataNodeLocation, that.dataNodeLocation);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(dataNodeLocation);
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/StorageExecutor.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/StorageExecutor.java
new file mode 100644
index 0000000000..fa517d627d
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/StorageExecutor.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+import java.util.Objects;
+
+/** StorageExecutor indicates execution of this query need data from StorageEngine */
+public class StorageExecutor implements ExecutorType {
+  private TRegionReplicaSet regionReplicaSet;
+
+  public StorageExecutor(TRegionReplicaSet regionReplicaSet) {
+    this.regionReplicaSet = regionReplicaSet;
+  }
+
+  @Override
+  public TDataNodeLocation getDataNodeLocation() {
+    return regionReplicaSet.getDataNodeLocations().get(0);
+  }
+
+  @Override
+  public boolean isStorageExecutor() {
+    return true;
+  }
+
+  @Override
+  public TRegionReplicaSet getRegionReplicaSet() {
+    return regionReplicaSet;
+  }
+
+  @Override
+  public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {
+    this.regionReplicaSet = regionReplicaSet;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    StorageExecutor that = (StorageExecutor) o;
+    return Objects.equals(regionReplicaSet, that.regionReplicaSet);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(regionReplicaSet);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionReadExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionReadExecutor.java
index 3fda73a847..9d15aaa009 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionReadExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionReadExecutor.java
@@ -24,7 +24,9 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
+import org.apache.iotdb.db.engine.storagegroup.VirtualDataRegion;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.utils.SetThreadName;
 
@@ -75,4 +77,24 @@ public class RegionReadExecutor {
       return resp;
     }
   }
+
+  public RegionExecutionResult execute(FragmentInstance fragmentInstance) {
+    // execute fragment instance in state machine
+    try (SetThreadName threadName = new SetThreadName(fragmentInstance.getId().getFullId())) {
+      RegionExecutionResult resp = new RegionExecutionResult();
+      // FI with queryExecutor will be executed directly
+      FragmentInstanceInfo info =
+          FragmentInstanceManager.getInstance()
+              .execDataQueryFragmentInstance(fragmentInstance, VirtualDataRegion.getInstance());
+      resp.setAccepted(!info.getState().isFailed());
+      resp.setMessage(info.getMessage());
+      return resp;
+    } catch (Throwable t) {
+      LOGGER.error("Execute FragmentInstance in QueryExecutor failed.", t);
+      RegionExecutionResult resp = new RegionExecutionResult();
+      resp.setAccepted(false);
+      resp.setMessage("Execute FragmentInstance failed: " + t.getMessage());
+      return resp;
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
index 180ac1d5c7..4b08a15181 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.IntoPathDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -259,7 +260,9 @@ public class Analysis {
 
   public boolean hasDataSource() {
     return (dataPartition != null && !dataPartition.isEmpty())
-        || (schemaPartition != null && !schemaPartition.isEmpty());
+        || (schemaPartition != null && !schemaPartition.isEmpty())
+        || (statement instanceof QueryStatement
+            && ((QueryStatement) statement).isAggregationQuery());
   }
 
   public LinkedHashMap<Expression, Set<Expression>> getCrossGroupByExpressions() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 75087b1dbb..cc2cfa2337 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -21,10 +21,13 @@ package org.apache.iotdb.db.mpp.plan.planner.distribution;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.partition.QueryExecutor;
+import org.apache.iotdb.commons.partition.StorageExecutor;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints;
 import org.apache.iotdb.db.mpp.plan.planner.IFragmentParallelPlaner;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.PlanFragment;
@@ -107,12 +110,23 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
     // of them.
     TRegionReplicaSet regionReplicaSet = fragment.getTargetRegion();
 
-    // Set DataRegion and target host for the instance
+    // Set ExecutorType and target host for the instance
     // We need to store all the replica host in case of the scenario that the instance need to be
     // redirected
     // to another host when scheduling
-    fragmentInstance.setDataRegionAndHost(regionReplicaSet);
-    fragmentInstance.setHostDataNode(selectTargetDataNode(regionReplicaSet));
+    if ((analysis.getDataPartitionInfo() == null || analysis.getDataPartitionInfo().isEmpty())
+        && (analysis.getStatement() instanceof QueryStatement
+            && ((QueryStatement) analysis.getStatement()).isAggregationQuery())) {
+      // AggregationQuery && no data region, we need to execute this FI on local
+      fragmentInstance.setExecutorAndHost(
+          new QueryExecutor(
+              new TDataNodeLocation()
+                  .setInternalEndPoint(DataNodeEndPoints.LOCAL_HOST_INTERNAL_ENDPOINT)
+                  .setMPPDataExchangeEndPoint(DataNodeEndPoints.LOCAL_HOST_DATA_BLOCK_ENDPOINT)));
+    } else {
+      fragmentInstance.setExecutorAndHost(new StorageExecutor(regionReplicaSet));
+      fragmentInstance.setHostDataNode(selectTargetDataNode(regionReplicaSet));
+    }
 
     if (analysis.getStatement() instanceof QueryStatement) {
       fragmentInstance.getFragment().generateTypeProvider(queryContext.getTypeProvider());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
index 469f64a6fd..0902dc562f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.mpp.plan.planner.distribution;
 
+import org.apache.iotdb.commons.partition.StorageExecutor;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.planner.IFragmentParallelPlaner;
@@ -64,7 +65,7 @@ public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner {
               queryContext.getQueryType(),
               queryContext.getTimeOut(),
               queryContext.getSession());
-      instance.setDataRegionAndHost(split.getRegionReplicaSet());
+      instance.setExecutorAndHost(new StorageExecutor(split.getRegionReplicaSet()));
       ret.add(instance);
     }
     return ret;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
index c8475c0e13..77db9b673d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.mpp.plan.planner.plan;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.exception.runtime.SerializationRunTimeException;
+import org.apache.iotdb.commons.partition.ExecutorType;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -55,8 +57,8 @@ public class FragmentInstance implements IConsensusRequest {
   // The reference of PlanFragment which this instance is generated from
   private final PlanFragment fragment;
 
-  // The Region where the FragmentInstance should run
-  private TRegionReplicaSet regionReplicaSet;
+  // Where the FragmentInstance should run
+  private ExecutorType executorType;
 
   private TDataNodeLocation hostDataNode;
 
@@ -99,25 +101,12 @@ public class FragmentInstance implements IConsensusRequest {
     this.isRoot = isRoot;
   }
 
-  public TRegionReplicaSet getDataRegionId() {
-    return regionReplicaSet;
-  }
-
-  public void setDataRegionAndHost(TRegionReplicaSet regionReplicaSet) {
-    if (regionReplicaSet == null) {
+  public void setExecutorAndHost(ExecutorType executorType) {
+    if (executorType == null) {
       return;
     }
-    this.regionReplicaSet = regionReplicaSet;
-    // TODO: (xingtanzjr) We select the first Endpoint as the default target host for current
-    // instance
-    if (IoTDBDescriptor.getInstance().getConfig().isClusterMode()) {
-      this.hostDataNode = regionReplicaSet.getDataNodeLocations().get(0);
-    } else {
-      // Although the logic to set hostDataNode for standalone is the same as
-      // cluster mode currently, it may be made different in later change.
-      // So we keep the conditions here.
-      this.hostDataNode = regionReplicaSet.getDataNodeLocations().get(0);
-    }
+    this.executorType = executorType;
+    this.hostDataNode = executorType.getDataNodeLocation();
   }
 
   // Although the HostDataNode is set in method setDataRegionAndHost(),
@@ -126,12 +115,17 @@ public class FragmentInstance implements IConsensusRequest {
     this.hostDataNode = hostDataNode;
   }
 
-  public TRegionReplicaSet getRegionReplicaSet() {
-    return regionReplicaSet;
+  public ExecutorType getExecutorType() {
+    return executorType;
   }
 
-  public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {
-    this.regionReplicaSet = regionReplicaSet;
+  @TestOnly
+  public void setExecutorType(ExecutorType executorType) {
+    this.executorType = executorType;
+  }
+
+  public TRegionReplicaSet getRegionReplicaSet() {
+    return executorType.getRegionReplicaSet();
   }
 
   public PlanFragment getFragment() {
@@ -241,14 +235,14 @@ public class FragmentInstance implements IConsensusRequest {
     return Objects.equals(id, instance.id)
         && type == instance.type
         && Objects.equals(fragment, instance.fragment)
-        && Objects.equals(regionReplicaSet, instance.regionReplicaSet)
+        && Objects.equals(executorType, instance.executorType)
         && Objects.equals(hostDataNode, instance.hostDataNode)
         && Objects.equals(timeFilter, instance.timeFilter);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(id, type, fragment, regionReplicaSet, hostDataNode, timeFilter);
+    return Objects.hash(id, type, fragment, executorType, hostDataNode, timeFilter);
   }
 
   public TDataNodeLocation getHostDataNode() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index a6a0604f50..1ba07ccb79 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -151,9 +151,11 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
       switch (instance.getType()) {
         case READ:
           TSendFragmentInstanceReq sendFragmentInstanceReq =
-              new TSendFragmentInstanceReq(
-                  new TFragmentInstance(instance.serializeToByteBuffer()),
-                  instance.getRegionReplicaSet().getRegionId());
+              new TSendFragmentInstanceReq(new TFragmentInstance(instance.serializeToByteBuffer()));
+          if (instance.getExecutorType().isStorageExecutor()) {
+            sendFragmentInstanceReq.setConsensusGroupId(
+                instance.getRegionReplicaSet().getRegionId());
+          }
           TSendFragmentInstanceResp sendFragmentInstanceResp =
               client.sendFragmentInstance(sendFragmentInstanceReq);
           if (!sendFragmentInstanceResp.accepted) {
@@ -205,23 +207,28 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
 
   private void dispatchLocally(FragmentInstance instance) throws FragmentInstanceDispatchException {
     // deserialize ConsensusGroupId
-    ConsensusGroupId groupId;
-    try {
-      groupId =
-          ConsensusGroupId.Factory.createFromTConsensusGroupId(
-              instance.getRegionReplicaSet().getRegionId());
-    } catch (Throwable t) {
-      logger.warn("Deserialize ConsensusGroupId failed. ", t);
-      throw new FragmentInstanceDispatchException(
-          RpcUtils.getStatus(
-              TSStatusCode.EXECUTE_STATEMENT_ERROR,
-              "Deserialize ConsensusGroupId failed: " + t.getMessage()));
+    ConsensusGroupId groupId = null;
+    if (instance.getExecutorType().isStorageExecutor()) {
+      try {
+        groupId =
+            ConsensusGroupId.Factory.createFromTConsensusGroupId(
+                instance.getRegionReplicaSet().getRegionId());
+      } catch (Throwable t) {
+        logger.warn("Deserialize ConsensusGroupId failed. ", t);
+        throw new FragmentInstanceDispatchException(
+            RpcUtils.getStatus(
+                TSStatusCode.EXECUTE_STATEMENT_ERROR,
+                "Deserialize ConsensusGroupId failed: " + t.getMessage()));
+      }
     }
 
     switch (instance.getType()) {
       case READ:
         RegionReadExecutor readExecutor = new RegionReadExecutor();
-        RegionExecutionResult readResult = readExecutor.execute(groupId, instance);
+        RegionExecutionResult readResult =
+            groupId == null
+                ? readExecutor.execute(instance)
+                : readExecutor.execute(groupId, instance);
         if (!readResult.isAccepted()) {
           logger.warn(readResult.getMessage());
           throw new FragmentInstanceDispatchException(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
index f1d710bb22..ada2fab6ae 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.partition.StorageExecutor;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.load.ChunkData;
@@ -173,7 +174,7 @@ public class LoadTsFileScheduler implements IScheduler {
             queryContext.getQueryType(),
             queryContext.getTimeOut(),
             queryContext.getSession());
-    instance.setDataRegionAndHost(replicaSet);
+    instance.setExecutorAndHost(new StorageExecutor(replicaSet));
     Future<FragInstanceDispatchResult> dispatchResultFuture =
         dispatcher.dispatch(Collections.singletonList(instance));
 
@@ -272,7 +273,7 @@ public class LoadTsFileScheduler implements IScheduler {
               queryContext.getQueryType(),
               queryContext.getTimeOut(),
               queryContext.getSession());
-      instance.setDataRegionAndHost(node.getLocalRegionReplicaSet());
+      instance.setExecutorAndHost(new StorageExecutor(node.getLocalRegionReplicaSet()));
       dispatcher.dispatchLocally(instance);
     } catch (FragmentInstanceDispatchException e) {
       logger.warn(
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index d6301492f5..e7011a2985 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -241,14 +241,16 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
     LOGGER.debug("receive FragmentInstance to group[{}]", req.getConsensusGroupId());
 
     // deserialize ConsensusGroupId
-    ConsensusGroupId groupId;
-    try {
-      groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
-    } catch (Throwable t) {
-      LOGGER.warn("Deserialize ConsensusGroupId failed. ", t);
-      TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
-      resp.setMessage("Deserialize ConsensusGroupId failed: " + t.getMessage());
-      return resp;
+    ConsensusGroupId groupId = null;
+    if (req.consensusGroupId != null) {
+      try {
+        groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+      } catch (Throwable t) {
+        LOGGER.warn("Deserialize ConsensusGroupId failed. ", t);
+        TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
+        resp.setMessage("Deserialize ConsensusGroupId failed: " + t.getMessage());
+        return resp;
+      }
     }
 
     // We deserialize here instead of the underlying state machine because parallelism is possible
@@ -264,11 +266,14 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
     }
 
     RegionReadExecutor executor = new RegionReadExecutor();
-    RegionExecutionResult executionResult = executor.execute(groupId, fragmentInstance);
+    RegionExecutionResult executionResult =
+        groupId == null
+            ? executor.execute(fragmentInstance)
+            : executor.execute(groupId, fragmentInstance);
     TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp();
     resp.setAccepted(executionResult.isAccepted());
     resp.setMessage(executionResult.getMessage());
-
+    // TODO
     return resp;
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java
index 01a5593e9e..8585732e76 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/FragmentInstanceSerdeTest.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.partition.QueryExecutor;
+import org.apache.iotdb.commons.partition.StorageExecutor;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -74,18 +76,26 @@ public class FragmentInstanceSerdeTest {
             QueryType.READ,
             config.getQueryTimeoutThreshold(),
             sessionInfo);
+    // test FI with StorageExecutor
     TRegionReplicaSet regionReplicaSet =
         new TRegionReplicaSet(
             new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
             ImmutableList.of(dataNodeLocation));
-    fragmentInstance.setDataRegionAndHost(regionReplicaSet);
-
+    fragmentInstance.setExecutorAndHost(new StorageExecutor(regionReplicaSet));
     ByteBuffer byteBuffer = fragmentInstance.serializeToByteBuffer();
     FragmentInstance deserializeFragmentInstance = FragmentInstance.deserializeFrom(byteBuffer);
-    assertNull(deserializeFragmentInstance.getRegionReplicaSet());
-    // Because the RegionReplicaSet won't be considered in serialization, we need to set it
+    assertNull(deserializeFragmentInstance.getExecutorType());
+    // Because the ExecutorType won't be considered in serialization, we need to set it
     // from original object before comparison.
-    deserializeFragmentInstance.setRegionReplicaSet(fragmentInstance.getRegionReplicaSet());
+    deserializeFragmentInstance.setExecutorType(fragmentInstance.getExecutorType());
+    assertEquals(deserializeFragmentInstance, fragmentInstance);
+
+    // test FI with QueryExecutor
+    fragmentInstance.setExecutorAndHost(new QueryExecutor(dataNodeLocation));
+    byteBuffer = fragmentInstance.serializeToByteBuffer();
+    deserializeFragmentInstance = FragmentInstance.deserializeFrom(byteBuffer);
+    assertNull(deserializeFragmentInstance.getExecutorType());
+    deserializeFragmentInstance.setExecutorType(fragmentInstance.getExecutorType());
     assertEquals(deserializeFragmentInstance, fragmentInstance);
   }
 
@@ -112,12 +122,12 @@ public class FragmentInstanceSerdeTest {
         new TRegionReplicaSet(
             new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
             ImmutableList.of(dataNodeLocation));
-    fragmentInstance.setDataRegionAndHost(regionReplicaSet);
+    fragmentInstance.setExecutorAndHost(new StorageExecutor(regionReplicaSet));
 
     ByteBuffer byteBuffer = fragmentInstance.serializeToByteBuffer();
     FragmentInstance deserializeFragmentInstance = FragmentInstance.deserializeFrom(byteBuffer);
-    assertNull(deserializeFragmentInstance.getRegionReplicaSet());
-    deserializeFragmentInstance.setRegionReplicaSet(fragmentInstance.getRegionReplicaSet());
+    assertNull(deserializeFragmentInstance.getExecutorType());
+    deserializeFragmentInstance.setExecutorType(fragmentInstance.getExecutorType());
     assertEquals(deserializeFragmentInstance, fragmentInstance);
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
index ad2d8a1e4c..ff1aaa111f 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.partition.StorageExecutor;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -140,7 +141,7 @@ public class StandaloneSchedulerTest {
             QueryType.WRITE,
             conf.getQueryTimeoutThreshold(),
             sessionInfo);
-    fragmentInstance.setDataRegionAndHost(regionReplicaSet);
+    fragmentInstance.setExecutorAndHost(new StorageExecutor(regionReplicaSet));
 
     configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln.wf01.wt01.status"));
     MPPQueryContext context =
@@ -244,7 +245,7 @@ public class StandaloneSchedulerTest {
             QueryType.WRITE,
             conf.getQueryTimeoutThreshold(),
             sessionInfo);
-    fragmentInstance.setDataRegionAndHost(regionReplicaSet);
+    fragmentInstance.setExecutorAndHost(new StorageExecutor(regionReplicaSet));
 
     configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln.wf01.GPS"));
     MPPQueryContext context =
@@ -358,7 +359,7 @@ public class StandaloneSchedulerTest {
             QueryType.WRITE,
             conf.getQueryTimeoutThreshold(),
             sessionInfo);
-    fragmentInstance.setDataRegionAndHost(regionReplicaSet);
+    fragmentInstance.setExecutorAndHost(new StorageExecutor(regionReplicaSet));
 
     configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln.d3"));
     MPPQueryContext context =
@@ -410,7 +411,7 @@ public class StandaloneSchedulerTest {
             QueryType.WRITE,
             conf.getQueryTimeoutThreshold(),
             sessionInfo);
-    fragmentInstance.setDataRegionAndHost(regionReplicaSet);
+    fragmentInstance.setExecutorAndHost(new StorageExecutor(regionReplicaSet));
 
     configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath(deviceId));
     configNode.getBelongedDataRegionIdWithAutoCreate(new PartialPath(deviceId));
@@ -492,7 +493,7 @@ public class StandaloneSchedulerTest {
             QueryType.WRITE,
             conf.getQueryTimeoutThreshold(),
             sessionInfo);
-    fragmentInstance.setDataRegionAndHost(regionReplicaSet);
+    fragmentInstance.setExecutorAndHost(new StorageExecutor(regionReplicaSet));
 
     configNode.getBelongedSchemaRegionIdWithAutoCreate(deviceId);
     configNode.getBelongedDataRegionIdWithAutoCreate(deviceId);
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index b1efcb63af..59f69ac81f 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -99,7 +99,7 @@ struct TPlanNode {
 
 struct TSendFragmentInstanceReq {
   1: required TFragmentInstance fragmentInstance
-  2: required common.TConsensusGroupId consensusGroupId
+  2: optional common.TConsensusGroupId consensusGroupId
 }
 
 struct TSendFragmentInstanceResp {