You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/03/23 11:13:29 UTC

[iotdb] 02/02: complete basic distributed plan

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/distribution_planner
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 56618845ed2e67bc3206fdfe3ade496d1d37d5ad
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Mar 23 19:12:33 2022 +0800

    complete basic distributed plan
---
 .../iotdb/commons/partition/DataPartitionInfo.java | 36 +++++++--
 .../iotdb/commons/partition/DataRegionId.java      |  8 ++
 ...ionPlaceInfo.java => DataRegionReplicaSet.java} | 11 ++-
 .../iotdb/commons/partition/DeviceGroupId.java     | 12 +++
 .../{FragmentId.java => PlanFragmentId.java}       | 17 ++--
 .../iotdb/db/mpp/execution/FragmentInfo.java       |  6 +-
 .../iotdb/db/mpp/execution/QueryExecution.java     |  7 --
 .../mpp/execution/scheduler/ClusterScheduler.java  |  4 +-
 .../db/mpp/execution/scheduler/IScheduler.java     |  4 +-
 .../execution/scheduler/StandaloneScheduler.java   |  4 +-
 .../apache/iotdb/db/mpp/sql/analyze/Analysis.java  | 28 +------
 .../db/mpp/sql/planner/DistributionPlanner.java    | 51 +++++++-----
 .../mpp/sql/planner/plan/DistributedQueryPlan.java |  4 +-
 .../db/mpp/sql/planner/plan/FragmentInstance.java  | 41 +++++++++-
 .../sql/planner/plan/IFragmentParallelPlaner.java  |  6 +-
 .../db/mpp/sql/planner/plan/PlanFragment.java      | 44 ++++++++++-
 .../db/mpp/sql/planner/plan/PlanFragmentId.java    | 42 ----------
 .../plan/SimpleFragmentParallelPlanner.java        | 92 +++++++++++++++++++++-
 .../db/mpp/sql/planner/plan/node/PlanNodeUtil.java | 25 ++++++
 .../planner/plan/node/process/ExchangeNode.java    | 26 +++++-
 .../planner/plan/node/sink/FragmentSinkNode.java   | 34 ++++++--
 .../planner/plan/node/source/CsvSourceNode.java    | 11 +++
 .../plan/node/source/SeriesAggregateScanNode.java  | 16 ++--
 .../planner/plan/node/source/SeriesScanNode.java   | 29 +++----
 .../sql/planner/plan/node/source/SourceNode.java   |  5 ++
 .../db/mpp/sql/plan/DistributionPlannerTest.java   | 58 ++++++++------
 26 files changed, 449 insertions(+), 172 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionInfo.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionInfo.java
index 910a649..6d7a4a4 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionInfo.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionInfo.java
@@ -18,20 +18,44 @@
  */
 package org.apache.iotdb.commons.partition;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 public class DataPartitionInfo {
 
   // Map<StorageGroup, Map<DeviceGroupID, Map<TimePartitionId, List<DataRegionPlaceInfo>>>>
-  private Map<String, Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionPlaceInfo>>>> dataPartitionInfo;
+  private Map<String, Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionReplicaSet>>>> dataPartitionMap;
 
-  public Map<String, Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionPlaceInfo>>>> getDataPartitionInfo() {
-    return dataPartitionInfo;
+  public Map<String, Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionReplicaSet>>>> getDataPartitionMap() {
+    return dataPartitionMap;
   }
 
-  public void setDataPartitionInfo(
-      Map<String, Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionPlaceInfo>>>> dataPartitionInfo) {
-    this.dataPartitionInfo = dataPartitionInfo;
+  public void setDataPartitionMap(
+      Map<String, Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionReplicaSet>>>> dataPartitionMap) {
+    this.dataPartitionMap = dataPartitionMap;
+  }
+
+  public List<DataRegionReplicaSet> getDataRegionReplicaSet(String deviceName, List<TimePartitionId> timePartitionIdList) {
+    String storageGroup = getStorageGroupByDevice(deviceName);
+    DeviceGroupId deviceGroupId = calculateDeviceGroupId(deviceName);
+    // TODO: (xingtanzjr) the timePartitionIdList is ignored
+    return dataPartitionMap.get(storageGroup).get(deviceGroupId).values().stream().flatMap(Collection::stream).collect(Collectors.toList());
+  }
+
+  private DeviceGroupId calculateDeviceGroupId(String deviceName) {
+    // TODO: (xingtanzjr) implement the real algorithm for calculation of DeviceGroupId
+    return new DeviceGroupId(deviceName.length());
+  }
+
+  private String getStorageGroupByDevice(String deviceName) {
+    for(String storageGroup : dataPartitionMap.keySet()) {
+      if (deviceName.startsWith(storageGroup)) {
+        return storageGroup;
+      }
+    }
+    // TODO: (xingtanzjr) how to handle this exception in IoTDB
+    return null;
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionId.java
index 92e290a..dffdd90 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionId.java
@@ -21,6 +21,10 @@ package org.apache.iotdb.commons.partition;
 public class DataRegionId {
   private int dataRegionId;
 
+  public DataRegionId(int dataRegionId) {
+    this.dataRegionId = dataRegionId;
+  }
+
   public int getDataRegionId() {
     return dataRegionId;
   }
@@ -28,4 +32,8 @@ public class DataRegionId {
   public void setDataRegionId(int dataRegionId) {
     this.dataRegionId = dataRegionId;
   }
+
+  public String toString() {
+    return String.format("DataRegion-%d", dataRegionId);
+  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionPlaceInfo.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionReplicaSet.java
similarity index 81%
rename from node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionPlaceInfo.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionReplicaSet.java
index bd79fc3..2078bc2 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionPlaceInfo.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionReplicaSet.java
@@ -21,10 +21,15 @@ package org.apache.iotdb.commons.partition;
 import java.util.List;
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
 
-public class DataRegionPlaceInfo {
+public class DataRegionReplicaSet {
   private DataRegionId dataRegionId;
   private List<EndPoint> endPointList;
 
+  public DataRegionReplicaSet(DataRegionId dataRegionId, List<EndPoint> endPointList) {
+    this.dataRegionId = dataRegionId;
+    this.endPointList = endPointList;
+  }
+
   public List<EndPoint> getEndPointList() {
     return endPointList;
   }
@@ -40,4 +45,8 @@ public class DataRegionPlaceInfo {
   public void setDataRegionId(DataRegionId dataRegionId) {
     this.dataRegionId = dataRegionId;
   }
+
+  public String toString() {
+    return String.format("%s:%s", dataRegionId, endPointList);
+  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DeviceGroupId.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DeviceGroupId.java
index e6aee53..2f8158f 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DeviceGroupId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DeviceGroupId.java
@@ -21,6 +21,10 @@ package org.apache.iotdb.commons.partition;
 public class DeviceGroupId {
   private int deviceGroupId;
 
+  public DeviceGroupId(int deviceGroupId) {
+    this.deviceGroupId = deviceGroupId;
+  }
+
   public int getDeviceGroupId() {
     return deviceGroupId;
   }
@@ -28,4 +32,12 @@ public class DeviceGroupId {
   public void setDeviceGroupId(int deviceGroupId) {
     this.deviceGroupId = deviceGroupId;
   }
+
+  public int hashCode() {
+    return new Integer(deviceGroupId).hashCode();
+  }
+
+  public boolean equals(Object obj) {
+    return obj instanceof DeviceGroupId && this.deviceGroupId == ((DeviceGroupId)obj).deviceGroupId;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
similarity index 77%
rename from server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentId.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
index ef0df6b..d43823c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
@@ -23,26 +23,26 @@ import java.util.List;
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 
-public class FragmentId {
+public class PlanFragmentId {
 
   private final QueryId queryId;
   private final int id;
 
-  public static FragmentId valueOf(String stageId) {
+  public static PlanFragmentId valueOf(String stageId) {
     List<String> ids = QueryId.parseDottedId(stageId, 2, "stageId");
     return valueOf(ids);
   }
 
-  public static FragmentId valueOf(List<String> ids) {
+  public static PlanFragmentId valueOf(List<String> ids) {
     checkArgument(ids.size() == 2, "Expected two ids but got: %s", ids);
-    return new FragmentId(new QueryId(ids.get(0)), Integer.parseInt(ids.get(1)));
+    return new PlanFragmentId(new QueryId(ids.get(0)), Integer.parseInt(ids.get(1)));
   }
 
-  public FragmentId(String queryId, int id) {
+  public PlanFragmentId(String queryId, int id) {
     this(new QueryId(queryId), id);
   }
 
-  public FragmentId(QueryId queryId, int id) {
+  public PlanFragmentId(QueryId queryId, int id) {
     this.queryId = requireNonNull(queryId, "queryId is null");
     this.id = id;
   }
@@ -54,4 +54,9 @@ public class FragmentId {
   public int getId() {
     return id;
   }
+
+  public String toString() {
+    return String.format("%s.%d", queryId, id);
+  }
+
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInfo.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInfo.java
index 7bd9300..9e3412e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInfo.java
@@ -18,21 +18,21 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
-import org.apache.iotdb.db.mpp.common.FragmentId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
 
 import java.util.List;
 
 public class FragmentInfo {
 
-  private final FragmentId stageId;
+  private final PlanFragmentId stageId;
   private final FragmentState state;
   private final PlanFragment plan;
 
   private final List<FragmentInfo> childrenFragments;
 
   public FragmentInfo(
-      FragmentId stageId,
+      PlanFragmentId stageId,
       FragmentState state,
       PlanFragment plan,
       List<FragmentInfo> childrenFragments) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index e58b755..90b3bf5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -61,7 +61,6 @@ public class QueryExecution {
   public void start() {
     doLogicalPlan();
     doDistributedPlan();
-    planFragmentInstances();
     schedule();
   }
 
@@ -89,12 +88,6 @@ public class QueryExecution {
     this.distributedPlan = planner.planFragments();
   }
 
-  // Convert fragment to detailed instance
-  // And for parallel-able fragment, clone it into several instances with different params.
-  public void planFragmentInstances() {
-
-  }
-
 
   /**
    * This method will be called by the request thread from client connection. This method will block
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
index 4af8b31..75d285c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
-import org.apache.iotdb.db.mpp.common.FragmentId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.FragmentInfo;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
@@ -67,7 +67,7 @@ public class ClusterScheduler implements IScheduler {
   public void failFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause) {}
 
   @Override
-  public void cancelFragment(FragmentId fragmentId) {}
+  public void cancelFragment(PlanFragmentId planFragmentId) {}
 
   // Send the instances to other nodes
   private void sendFragmentInstances() {}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java
index 16145fd..b116f84 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
-import org.apache.iotdb.db.mpp.common.FragmentId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.FragmentInfo;
 
@@ -36,5 +36,5 @@ public interface IScheduler {
 
   void failFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause);
 
-  void cancelFragment(FragmentId fragmentId);
+  void cancelFragment(PlanFragmentId planFragmentId);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
index bcf2bbd..b96d214 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.mpp.execution.scheduler;
 
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.metadata.SchemaEngine;
-import org.apache.iotdb.db.mpp.common.FragmentId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.FragmentInfo;
 
@@ -54,5 +54,5 @@ public class StandaloneScheduler implements IScheduler {
   public void failFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause) {}
 
   @Override
-  public void cancelFragment(FragmentId fragmentId) {}
+  public void cancelFragment(PlanFragmentId planFragmentId) {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java
index 60e3da1..3c8ecfc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java
@@ -20,11 +20,9 @@
 package org.apache.iotdb.db.mpp.sql.analyze;
 
 import org.apache.iotdb.commons.partition.DataPartitionInfo;
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
 import org.apache.iotdb.commons.partition.SchemaPartitionInfo;
-import org.apache.iotdb.commons.partition.TimePartitionId;
-import org.apache.iotdb.db.metadata.SchemaRegion;
 import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.mpp.common.DataRegion;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
@@ -44,31 +42,13 @@ public class Analysis {
   // indicate whether this statement is write or read
   private QueryType queryType;
 
-  // DataPartitionInfo
-  // DeviceGroup -> DataRegionTimeSlice -> List<DataRegion>
-  @Deprecated
-  private Map<String, Map<TimePartitionId, List<DataRegion>>> dataPartitionInfoOld;
-
   private DataPartitionInfo dataPartitionInfo;
 
   private SchemaPartitionInfo schemaPartitionInfo;
 
-  public Set<DataRegion> getPartitionInfo(PartialPath seriesPath, Filter timefilter) {
-    if (timefilter == null) {
-      // TODO: (xingtanzjr) we need to have a method to get the deviceGroup by device
-      String deviceGroup = seriesPath.getDevice();
-      Set<DataRegion> result = new HashSet<>();
-      this.dataPartitionInfoOld.get(deviceGroup).values().forEach(result::addAll);
-      return result;
-    } else {
-      // TODO: (xingtanzjr) complete this branch
-      return null;
-    }
-  }
-
-  public void setDataPartitionInfoOld(
-      Map<String, Map<TimePartitionId, List<DataRegion>>> dataPartitionInfoOld) {
-    this.dataPartitionInfoOld = dataPartitionInfoOld;
+  public List<DataRegionReplicaSet> getPartitionInfo(PartialPath seriesPath, Filter timefilter) {
+    // TODO: (xingtanzjr) implement the calculation of timePartitionIdList
+    return dataPartitionInfo.getDataRegionReplicaSet(seriesPath.getDevice(), null);
   }
 
   public Statement getStatement() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index e4cde96..b9da77d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -18,10 +18,10 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner;
 
-import org.apache.iotdb.db.mpp.common.DataRegion;
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
-import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan;
-import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.mpp.sql.planner.plan.*;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.*;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
@@ -38,6 +38,8 @@ public class DistributionPlanner {
   private Analysis analysis;
   private LogicalQueryPlan logicalPlan;
 
+  private int planFragmentIndex = 0;
+
   public DistributionPlanner(Analysis analysis, LogicalQueryPlan logicalPlan) {
     this.analysis = analysis;
     this.logicalPlan = logicalPlan;
@@ -62,8 +64,20 @@ public class DistributionPlanner {
     PlanNode rootAfterRewrite = rewriteSource();
     PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
     SubPlan subPlan = splitFragment(rootWithExchange);
+    List<FragmentInstance> fragmentInstances = planFragmentInstances(subPlan);
     return new DistributedQueryPlan(
-        logicalPlan.getContext(), subPlan, subPlan.getPlanFragmentList());
+        logicalPlan.getContext(), subPlan, subPlan.getPlanFragmentList(), fragmentInstances);
+  }
+
+  // Convert fragment to detailed instance
+  // And for parallel-able fragment, clone it into several instances with different params.
+  public List<FragmentInstance> planFragmentInstances(SubPlan subPlan) {
+    IFragmentParallelPlaner parallelPlaner = new SimpleFragmentParallelPlanner(subPlan);
+    return parallelPlaner.parallelPlan();
+  }
+
+  private PlanFragmentId getNextFragmentId() {
+    return new PlanFragmentId(this.logicalPlan.getContext().getQueryId(), this.planFragmentIndex ++);
   }
 
   private class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanContext> {
@@ -84,13 +98,13 @@ public class DistributionPlanner {
           // If the child is SeriesScanNode, we need to check whether this node should be seperated
           // into several splits.
           SeriesScanNode handle = (SeriesScanNode) child;
-          Set<DataRegion> dataDistribution =
+          List<DataRegionReplicaSet> dataDistribution =
               analysis.getPartitionInfo(handle.getSeriesPath(), handle.getTimeFilter());
           // If the size of dataDistribution is m, this SeriesScanNode should be seperated into m
           // SeriesScanNode.
-          for (DataRegion dataRegion : dataDistribution) {
+          for (DataRegionReplicaSet dataRegion : dataDistribution) {
             SeriesScanNode split = (SeriesScanNode) handle.clone();
-            split.setDataRegion(dataRegion);
+            split.setDataRegionReplicaSet(dataRegion);
             sources.add(split);
           }
         } else if (child instanceof SeriesAggregateScanNode) {
@@ -107,8 +121,8 @@ public class DistributionPlanner {
       }
 
       // Step 2: For the source nodes, group them by the DataRegion.
-      Map<DataRegion, List<SeriesScanNode>> sourceGroup =
-          sources.stream().collect(Collectors.groupingBy(SeriesScanNode::getDataRegion));
+      Map<DataRegionReplicaSet, List<SeriesScanNode>> sourceGroup =
+          sources.stream().collect(Collectors.groupingBy(SeriesScanNode::getDataRegionReplicaSet));
       // Step 3: For the source nodes which belong to same data region, add a TimeJoinNode for them
       // and make the
       // new TimeJoinNode as the child of current TimeJoinNode
@@ -158,13 +172,13 @@ public class DistributionPlanner {
 
     public PlanNode visitSeriesScan(SeriesScanNode node, NodeGroupContext context) {
       context.putNodeDistribution(
-          node.getId(), new NodeDistribution(NodeDistributionType.NO_CHILD, node.getDataRegion()));
+          node.getId(), new NodeDistribution(NodeDistributionType.NO_CHILD, node.getDataRegionReplicaSet()));
       return node.clone();
     }
 
     public PlanNode visitSeriesAggregate(SeriesAggregateScanNode node, NodeGroupContext context) {
       context.putNodeDistribution(
-          node.getId(), new NodeDistribution(NodeDistributionType.NO_CHILD, node.getDataRegion()));
+          node.getId(), new NodeDistribution(NodeDistributionType.NO_CHILD, node.getDataRegionReplicaSet()));
       return node.clone();
     }
 
@@ -177,7 +191,7 @@ public class DistributionPlanner {
                 visitedChildren.add(visit(child, context));
               });
 
-      DataRegion dataRegion = calculateDataRegionByChildren(visitedChildren, context);
+      DataRegionReplicaSet dataRegion = calculateDataRegionByChildren(visitedChildren, context);
       NodeDistributionType distributionType =
           nodeDistributionIsSame(visitedChildren, context)
               ? NodeDistributionType.SAME_WITH_ALL_CHILDREN
@@ -206,12 +220,11 @@ public class DistributionPlanner {
       return newNode;
     }
 
-    private DataRegion calculateDataRegionByChildren(
+    private DataRegionReplicaSet calculateDataRegionByChildren(
         List<PlanNode> children, NodeGroupContext context) {
       // We always make the dataRegion of TimeJoinNode to be the same as its first child.
       // TODO: (xingtanzjr) We need to implement more suitable policies here
-      DataRegion childDataRegion = context.getNodeDistribution(children.get(0).getId()).dataRegion;
-      return new DataRegion(childDataRegion.getDataRegionId(), childDataRegion.getEndpoint());
+      return context.getNodeDistribution(children.get(0).getId()).dataRegion;
     }
 
     private boolean nodeDistributionIsSame(List<PlanNode> children, NodeGroupContext context) {
@@ -256,9 +269,9 @@ public class DistributionPlanner {
 
   private class NodeDistribution {
     private NodeDistributionType type;
-    private DataRegion dataRegion;
+    private DataRegionReplicaSet dataRegion;
 
-    private NodeDistribution(NodeDistributionType type, DataRegion dataRegion) {
+    private NodeDistribution(NodeDistributionType type, DataRegionReplicaSet dataRegion) {
       this.type = type;
       this.dataRegion = dataRegion;
     }
@@ -277,7 +290,7 @@ public class DistributionPlanner {
         ExchangeNode exchangeNode = (ExchangeNode) root;
         FragmentSinkNode sinkNode = new FragmentSinkNode(PlanNodeIdAllocator.generateId());
         sinkNode.setChild(exchangeNode.getChild());
-        sinkNode.setRemoteDestinationNode(exchangeNode);
+        sinkNode.setDownStreamNode(exchangeNode);
         // Record the source node info in the ExchangeNode so that we can keep the connection of
         // these nodes/fragments
         exchangeNode.setRemoteSourceNode(sinkNode);
@@ -297,7 +310,7 @@ public class DistributionPlanner {
     }
 
     private SubPlan createSubPlan(PlanNode root) {
-      PlanFragment fragment = new PlanFragment(PlanFragmentId.generateId(), root);
+      PlanFragment fragment = new PlanFragment(getNextFragmentId(), root);
       return new SubPlan(fragment);
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
index c1eb086..90ff669 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
@@ -26,12 +26,14 @@ public class DistributedQueryPlan {
   private MPPQueryContext context;
   private SubPlan rootSubPlan;
   private List<PlanFragment> fragments;
+  private List<FragmentInstance> instances;
 
   public DistributedQueryPlan(
-      MPPQueryContext context, SubPlan rootSubPlan, List<PlanFragment> fragments) {
+          MPPQueryContext context, SubPlan rootSubPlan, List<PlanFragment> fragments, List<FragmentInstance> instances) {
     this.context = context;
     this.rootSubPlan = rootSubPlan;
     this.fragments = fragments;
+    this.instances = instances;
   }
 
   public List<PlanFragment> getFragments() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index 56fe111..07e6be1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -18,14 +18,53 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan;
 
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
 
 public class FragmentInstance {
   private FragmentInstanceId id;
 
   // The reference of PlanFragment which this instance is generated from
   private PlanFragment fragment;
-  
+  // The DataRegion where the FragmentInstance should run
+  private DataRegionReplicaSet dataRegion;
+  private EndPoint hostEndpoint;
+
   // We can add some more params for a specific FragmentInstance
   // So that we can make different FragmentInstance owns different data range.
+
+  public FragmentInstance(PlanFragment fragment, int index) {
+    this.fragment = fragment;
+    this.id = generateId(fragment.getId(), index);
+  }
+
+  public static FragmentInstanceId generateId(PlanFragmentId id, int index) {
+    return new FragmentInstanceId(String.format("%s.%d", id, index));
+  }
+
+  public DataRegionReplicaSet getDataRegionId() {
+    return dataRegion;
+  }
+
+  public void setDataRegionId(DataRegionReplicaSet dataRegion) {
+    this.dataRegion = dataRegion;
+  }
+
+  public EndPoint getHostEndpoint() {
+    return hostEndpoint;
+  }
+
+  public void setHostEndpoint(EndPoint hostEndpoint) {
+    this.hostEndpoint = hostEndpoint;
+  }
+
+  public PlanFragment getFragment() {
+    return fragment;
+  }
+
+  public FragmentInstanceId getId() {
+    return id;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/IFragmentParallelPlaner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/IFragmentParallelPlaner.java
index f86b967..e8557f4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/IFragmentParallelPlaner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/IFragmentParallelPlaner.java
@@ -27,9 +27,9 @@ import java.util.List;
 public interface IFragmentParallelPlaner {
     /**
      *
-     * @param root The root of SubPlan tree. The relation between each PlanFragment is necessary because sometimes we
-     *             need to change the source/sink for each FragmentInstance according to its upstream/downstream
+     * The relation between each PlanFragment is necessary because sometimes we
+     * need to change the source/sink for each FragmentInstance according to its upstream/downstream
      * @return All the FragmentInstances which can run in parallel
      */
-    List<FragmentInstance> parallelPlan(SubPlan root);
+    List<FragmentInstance> parallelPlan();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
index f8de7f0..173a8de 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
@@ -18,9 +18,12 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan;
 
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
 
-// TODO: consider whether it is necessary to make PlanFragment as a TreeNode
 /** PlanFragment contains a sub-query of distributed query. */
 public class PlanFragment {
   private PlanFragmentId id;
@@ -42,4 +45,43 @@ public class PlanFragment {
   public String toString() {
     return String.format("PlanFragment-%s", getId());
   }
+
+  // Every Fragment should only run in DataRegion.
+  // But it can select any one of the Endpoint of the target DataRegion
+  // In current version, one PlanFragment should contain at least one SourceNode,
+  // and the DataRegions of all SourceNodes should be same in one PlanFragment.
+  // So we can use the DataRegion of one SourceNode as the PlanFragment's DataRegion.
+  public DataRegionReplicaSet getTargetDataRegion() {
+    return getNodeDataRegion(root);
+  }
+
+  private DataRegionReplicaSet getNodeDataRegion(PlanNode root) {
+    if (root instanceof SourceNode) {
+      return ((SourceNode) root).getDataRegionReplicaSet();
+    }
+    for (PlanNode child : root.getChildren()) {
+      DataRegionReplicaSet result = getNodeDataRegion(child);
+      if (result != null) {
+        return result;
+      }
+    }
+    return null;
+  }
+
+  public PlanNode getPlanNodeById(PlanNodeId nodeId) {
+    return getPlanNodeById(root, nodeId);
+  }
+
+  private PlanNode getPlanNodeById(PlanNode root, PlanNodeId nodeId) {
+    if (root.getId().equals(nodeId)) {
+      return root;
+    }
+    for (PlanNode child : root.getChildren()) {
+      PlanNode node = getPlanNodeById(child, nodeId);
+      if (node != null) {
+        return node;
+      }
+    }
+    return null;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
deleted file mode 100644
index 4f83c05..0000000
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.mpp.sql.planner.plan;
-
-public class PlanFragmentId {
-  private String id;
-
-  public PlanFragmentId(String id) {
-    this.id = id;
-  }
-
-  public static int initialId = 0;
-
-  public static synchronized PlanFragmentId generateId() {
-    initialId++;
-    return new PlanFragmentId(String.valueOf(initialId));
-  }
-
-  public String getId() {
-    return id;
-  }
-
-  public String toString() {
-    return id;
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
index 2277ca4..4a617a0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
@@ -16,18 +16,104 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.iotdb.db.mpp.sql.planner.plan;
 
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
+
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * A simple implementation of IFragmentParallelPlaner.
  * This planner will transform one PlanFragment into only one FragmentInstance.
  */
 public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner{
+
+    private SubPlan subPlan;
+
+    // Record all the FragmentInstances belonged to same PlanFragment
+    Map<PlanFragmentId, FragmentInstance> instanceMap;
+    // Record which PlanFragment the PlanNode belongs
+    Map<PlanNodeId, PlanFragmentId> planNodeMap;
+    List<FragmentInstance> fragmentInstanceList;
+
+    public SimpleFragmentParallelPlanner(SubPlan subPlan) {
+        this.subPlan = subPlan;
+        this.instanceMap = new HashMap<>();
+        this.planNodeMap = new HashMap<>();
+        this.fragmentInstanceList = new ArrayList<>();
+    }
+
     @Override
-    public List<FragmentInstance> parallelPlan(SubPlan root) {
-        return null;
+    public List<FragmentInstance> parallelPlan() {
+        prepare();
+        calculateNodeTopologyBetweenInstance();
+        return fragmentInstanceList;
+    }
+
+    private void prepare() {
+        List<PlanFragment> fragments = subPlan.getPlanFragmentList();
+        for (PlanFragment fragment : fragments) {
+            recordPlanNodeRelation(fragment.getRoot(), fragment.getId());
+            produceFragmentInstance(fragment);
+        }
+    }
+
+    private void produceFragmentInstance(PlanFragment fragment) {
+        // If one PlanFragment will produce several FragmentInstance, the instanceIdx will be increased one by one
+        int instanceIdx = 0;
+        PlanNode rootCopy = PlanNodeUtil.deepCopy(fragment.getRoot());
+        FragmentInstance fragmentInstance = new FragmentInstance(fragment, instanceIdx);
+
+        // Get the target DataRegion for origin PlanFragment, then its instance will be distributed one of them.
+        DataRegionReplicaSet dataRegion = fragment.getTargetDataRegion();
+
+        // Set DataRegion and target host for the instance
+        // We need to store all the replica host in case of the scenario that the instance need to be redirected
+        // to another host when scheduling
+        fragmentInstance.setDataRegionId(dataRegion);
+
+        // TODO: (xingtanzjr) We select the first Endpoint as the default target host for current instance
+        fragmentInstance.setHostEndpoint(dataRegion.getEndPointList().get(0));
+        instanceMap.putIfAbsent(fragment.getId(), fragmentInstance);
+        fragmentInstanceList.add(fragmentInstance);
+    }
+
+    private void calculateNodeTopologyBetweenInstance() {
+        for(FragmentInstance instance : fragmentInstanceList) {
+            PlanNode rootNode = instance.getFragment().getRoot();
+            if (rootNode instanceof FragmentSinkNode) {
+                // Set target Endpoint for FragmentSinkNode
+                FragmentSinkNode sinkNode = (FragmentSinkNode) rootNode;
+                PlanNodeId downStreamNodeId = sinkNode.getDownStreamNode().getId();
+                FragmentInstance downStreamInstance = findDownStreamInstance(downStreamNodeId);
+                sinkNode.setDownStream(downStreamInstance.getHostEndpoint(), downStreamInstance.getId(), downStreamNodeId);
+
+                // Set upstream info for corresponding ExchangeNode in downstream FragmentInstance
+                PlanNode downStreamExchangeNode =  downStreamInstance.getFragment().getPlanNodeById(downStreamNodeId);
+                ((ExchangeNode)downStreamExchangeNode).setUpstream(instance.getHostEndpoint(), instance.getId(), sinkNode.getId());
+            }
+        }
+    }
+
+
+
+    private FragmentInstance findDownStreamInstance(PlanNodeId exchangeNodeId) {
+        return instanceMap.get(planNodeMap.get(exchangeNodeId));
+    }
+
+    private void recordPlanNodeRelation(PlanNode root, PlanFragmentId planFragmentId) {
+        planNodeMap.put(root.getId(), planFragmentId);
+        for (PlanNode child : root.getChildren()) {
+            recordPlanNodeRelation(child, planFragmentId);
+        }
     }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
index 1a597b2..902edd0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
@@ -18,6 +18,9 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node;
 
+import java.util.List;
+import java.util.stream.Collectors;
+
 public class PlanNodeUtil {
   public static void printPlanNode(PlanNode root) {
     printPlanNodeWithLevel(root, 0);
@@ -36,4 +39,26 @@ public class PlanNodeUtil {
       System.out.print("\t");
     }
   }
+
+  public static String nodeToString(PlanNode root) {
+    StringBuilder result = new StringBuilder();
+    nodeToString(root, 0, result);
+    return result.toString();
+  }
+
+  private static void nodeToString(PlanNode root, int level, StringBuilder result) {
+    for (int i = 0 ; i < level; i ++) {
+      result.append("\t");
+    }
+    result.append(root.toString());
+    result.append(System.lineSeparator());
+    for (PlanNode child: root.getChildren()) {
+      nodeToString(child, level + 1, result);
+    }
+  }
+
+  public static PlanNode deepCopy(PlanNode root) {
+    List<PlanNode> children = root.getChildren().stream().map(PlanNodeUtil::deepCopy).collect(Collectors.toList());
+    return root.cloneWithChildren(children);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index be2b351..4ee44a0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -19,11 +19,14 @@
 
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
+import org.apache.iotdb.consensus.common.Endpoint;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
 
 import java.util.List;
 
@@ -34,8 +37,9 @@ public class ExchangeNode extends PlanNode {
   // In current version, one ExchangeNode will only have one source.
   // And the fragment which the sourceNode belongs to will only have one instance.
   // Thus, by nodeId and endpoint, the ExchangeNode can know where its source from.
-  private PlanNodeId sourceNodeId; // sourceNodeId is the same as the child's PlanNodeId
-  private String sourceEndpoint; // The endpoint where the sourceNode will be distributed
+  private EndPoint upstreamEndpoint;
+  private FragmentInstanceId upstreamInstanceId;
+  private PlanNodeId upstreamPlanNodeId;
 
   public ExchangeNode(PlanNodeId id) {
     super(id);
@@ -61,6 +65,12 @@ public class ExchangeNode extends PlanNode {
     return node;
   }
 
+  public void setUpstream(EndPoint endPoint, FragmentInstanceId instanceId, PlanNodeId nodeId) {
+    this.upstreamEndpoint = endPoint;
+    this.upstreamInstanceId = instanceId;
+    this.upstreamPlanNodeId = nodeId;
+  }
+
   @Override
   public List<String> getOutputColumnNames() {
     return null;
@@ -89,4 +99,16 @@ public class ExchangeNode extends PlanNode {
   public void cleanChildren() {
     this.child = null;
   }
+
+  public EndPoint getUpstreamEndpoint() {
+    return upstreamEndpoint;
+  }
+
+  public FragmentInstanceId getUpstreamInstanceId() {
+    return upstreamInstanceId;
+  }
+
+  public PlanNodeId getUpstreamPlanNodeId() {
+    return upstreamPlanNodeId;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
index 935ca13..1632304 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
@@ -18,17 +18,23 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
 
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
 
 import java.util.List;
 
 public class FragmentSinkNode extends SinkNode {
   private PlanNode child;
-  private ExchangeNode remoteDestinationNode;
+  private ExchangeNode downStreamNode;
+
+  private EndPoint downStreamEndpoint;
+  private FragmentInstanceId downStreamInstanceId;
+  private PlanNodeId downStreamPlanNodeId;
 
   public FragmentSinkNode(PlanNodeId id) {
     super(id);
@@ -72,11 +78,29 @@ public class FragmentSinkNode extends SinkNode {
     return String.format("FragmentSinkNode-%s", getId());
   }
 
-  public ExchangeNode getRemoteDestinationNode() {
-    return remoteDestinationNode;
+  public ExchangeNode getDownStreamNode() {
+    return downStreamNode;
+  }
+
+  public void setDownStreamNode(ExchangeNode downStreamNode) {
+    this.downStreamNode = downStreamNode;
+  }
+
+  public void setDownStream(EndPoint endPoint, FragmentInstanceId instanceId, PlanNodeId nodeId) {
+    this.downStreamEndpoint = endPoint;
+    this.downStreamInstanceId = instanceId;
+    this.downStreamPlanNodeId = nodeId;
+  }
+
+  public EndPoint getDownStreamEndpoint() {
+    return downStreamEndpoint;
+  }
+
+  public FragmentInstanceId getDownStreamInstanceId() {
+    return downStreamInstanceId;
   }
 
-  public void setRemoteDestinationNode(ExchangeNode remoteDestinationNode) {
-    this.remoteDestinationNode = remoteDestinationNode;
+  public PlanNodeId getDownStreamPlanNodeId() {
+    return downStreamPlanNodeId;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java
index 797611e..fabe25a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
 
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 
@@ -55,4 +56,14 @@ public class CsvSourceNode extends SourceNode {
 
   @Override
   public void open() throws Exception {}
+
+  @Override
+  public DataRegionReplicaSet getDataRegionReplicaSet() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setDataRegionReplicaSet(DataRegionReplicaSet dataRegionReplicaSet) {
+    throw new UnsupportedOperationException();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
index 674b6b4..e5fd770 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
 
-import org.apache.iotdb.db.mpp.common.DataRegion;
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
 import org.apache.iotdb.db.mpp.common.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -61,7 +61,7 @@ public class SeriesAggregateScanNode extends SourceNode {
   private String columnName;
 
   // The id of DataRegion where the node will run
-  private DataRegion dataRegion;
+  private DataRegionReplicaSet dataRegionReplicaSet;
 
   public SeriesAggregateScanNode(PlanNodeId id) {
     super(id);
@@ -102,13 +102,19 @@ public class SeriesAggregateScanNode extends SourceNode {
   public void open() throws Exception {}
 
   @Override
-  public void close() throws Exception {}
+  public DataRegionReplicaSet getDataRegionReplicaSet() {
+    return this.dataRegionReplicaSet;
+  }
 
-  public DataRegion getDataRegion() {
-    return dataRegion;
+  @Override
+  public void setDataRegionReplicaSet(DataRegionReplicaSet dataRegionReplicaSet) {
+    this.dataRegionReplicaSet = dataRegionReplicaSet;
   }
 
   @Override
+  public void close() throws Exception {}
+
+  @Override
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
     return visitor.visitSeriesAggregate(this, context);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
index bfa2faf..893fd03 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
@@ -18,8 +18,8 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
 
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
 import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.mpp.common.DataRegion;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
@@ -62,16 +62,16 @@ public class SeriesScanNode extends SourceNode {
   private String columnName;
 
   // The id of DataRegion where the node will run
-  private DataRegion dataRegion;
+  private DataRegionReplicaSet dataRegionReplicaSet;
 
   public SeriesScanNode(PlanNodeId id, PartialPath seriesPath) {
     super(id);
     this.seriesPath = seriesPath;
   }
 
-  public SeriesScanNode(PlanNodeId id, PartialPath seriesPath, DataRegion dataRegion) {
+  public SeriesScanNode(PlanNodeId id, PartialPath seriesPath, DataRegionReplicaSet dataRegionReplicaSet) {
     this(id, seriesPath);
-    this.dataRegion = dataRegion;
+    this.dataRegionReplicaSet = dataRegionReplicaSet;
   }
 
   public void setTimeFilter(Filter timeFilter) {
@@ -88,6 +88,15 @@ public class SeriesScanNode extends SourceNode {
   @Override
   public void open() throws Exception {}
 
+  @Override
+  public DataRegionReplicaSet getDataRegionReplicaSet() {
+    return dataRegionReplicaSet;
+  }
+
+  public void setDataRegionReplicaSet(DataRegionReplicaSet dataRegion) {
+     this.dataRegionReplicaSet = dataRegion;
+  }
+
   public void setScanOrder(OrderBy scanOrder) {
     this.scanOrder = scanOrder;
   }
@@ -107,7 +116,7 @@ public class SeriesScanNode extends SourceNode {
 
   @Override
   public PlanNode clone() {
-    return new SeriesScanNode(getId(), getSeriesPath(), this.dataRegion);
+    return new SeriesScanNode(getId(), getSeriesPath(), this.dataRegionReplicaSet);
   }
 
   @Override
@@ -133,17 +142,9 @@ public class SeriesScanNode extends SourceNode {
     return timeFilter;
   }
 
-  public void setDataRegion(DataRegion dataRegion) {
-    this.dataRegion = dataRegion;
-  }
-
-  public DataRegion getDataRegion() {
-    return dataRegion;
-  }
-
   public String toString() {
     return String.format(
         "SeriesScanNode-%s:[SeriesPath: %s, DataRegion: %s]",
-        this.getId(), this.getSeriesPath(), this.getDataRegion());
+        this.getId(), this.getSeriesPath(), this.getDataRegionReplicaSet());
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
index 551e9d2..67c9e2b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
 
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 
@@ -28,4 +29,8 @@ public abstract class SourceNode extends PlanNode implements AutoCloseable {
   }
 
   public abstract void open() throws Exception;
+
+  public abstract DataRegionReplicaSet getDataRegionReplicaSet();
+
+  public abstract void setDataRegionReplicaSet(DataRegionReplicaSet dataRegionReplicaSet);
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index 6d98e93..f59d689 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.mpp.sql.plan;
 
-import org.apache.iotdb.commons.partition.TimePartitionId;
+import org.apache.iotdb.commons.partition.*;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.*;
@@ -37,12 +37,10 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
 import org.junit.Test;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -60,7 +58,7 @@ public class DistributionPlannerTest {
     timeJoinNode.addChild(
         new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s2")));
     timeJoinNode.addChild(
-        new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d2.s1")));
+        new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d22.s1")));
 
     LimitNode root = new LimitNode(PlanNodeIdAllocator.generateId(), 10, timeJoinNode);
 
@@ -70,10 +68,8 @@ public class DistributionPlannerTest {
         new DistributionPlanner(analysis, new LogicalQueryPlan(new MPPQueryContext(), root));
     PlanNode newRoot = planner.rewriteSource();
 
-    //    PlanNodeUtil.printPlanNode(newRoot);
+//        PlanNodeUtil.printPlanNode(newRoot);
     assertEquals(newRoot.getChildren().get(0).getChildren().size(), 3);
-    assertEquals(newRoot.getChildren().get(0).getChildren().get(0).getChildren().size(), 2);
-    assertEquals(newRoot.getChildren().get(0).getChildren().get(1).getChildren().size(), 2);
   }
 
   @Test
@@ -87,7 +83,7 @@ public class DistributionPlannerTest {
     timeJoinNode.addChild(
         new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s2")));
     timeJoinNode.addChild(
-        new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d2.s1")));
+        new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d22.s1")));
 
     LimitNode root = new LimitNode(PlanNodeIdAllocator.generateId(), 10, timeJoinNode);
 
@@ -120,14 +116,15 @@ public class DistributionPlannerTest {
     timeJoinNode.addChild(
         new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s2")));
     timeJoinNode.addChild(
-        new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d2.s1")));
+        new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d22.s1")));
 
     LimitNode root = new LimitNode(PlanNodeIdAllocator.generateId(), 10, timeJoinNode);
 
     Analysis analysis = constructAnalysis();
 
+    MPPQueryContext context = new MPPQueryContext("", new QueryId("query1"), null);
     DistributionPlanner planner =
-        new DistributionPlanner(analysis, new LogicalQueryPlan(new MPPQueryContext(), root));
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
     PlanNode rootAfterRewrite = planner.rewriteSource();
     PlanNode rootWithExchange = planner.addExchangeNode(rootAfterRewrite);
     PlanNodeUtil.printPlanNode(rootWithExchange);
@@ -143,22 +140,37 @@ public class DistributionPlannerTest {
 
   private Analysis constructAnalysis() {
     Analysis analysis = new Analysis();
-    Map<String, Map<TimePartitionId, List<DataRegion>>> dataPartitionInfo = new HashMap<>();
-    List<DataRegion> d1DataRegions = new ArrayList<>();
-    d1DataRegions.add(new DataRegion(1, "192.0.0.1"));
-    d1DataRegions.add(new DataRegion(2, "192.0.0.1"));
-    Map<TimePartitionId, List<DataRegion>> d1DataRegionMap = new HashMap<>();
+
+    String device1 = "root.sg.d1";
+    String device2 = "root.sg.d22";
+    String device3 = "root.sg.d333";
+
+    DataPartitionInfo dataPartitionInfo = new DataPartitionInfo();
+    Map<String, Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionReplicaSet>>>> dataPartitionMap = new HashMap<>();
+    Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionReplicaSet>>> sgPartitionMap = new HashMap<>();
+    List<DataRegionReplicaSet> d1DataRegions = new ArrayList<>();
+    d1DataRegions.add(new DataRegionReplicaSet(new DataRegionId(1),
+            Arrays.asList(new EndPoint("192.0.0.1", 9000), new EndPoint("192.0.0.2", 9000))));
+    d1DataRegions.add(new DataRegionReplicaSet(new DataRegionId(2),
+            Arrays.asList(new EndPoint("192.0.0.3", 9000), new EndPoint("192.0.0.4", 9000))));
+    Map<TimePartitionId, List<DataRegionReplicaSet>> d1DataRegionMap = new HashMap<>();
     d1DataRegionMap.put(new TimePartitionId(), d1DataRegions);
 
-    List<DataRegion> d2DataRegions = new ArrayList<>();
-    d2DataRegions.add(new DataRegion(3, "192.0.0.1"));
-    Map<TimePartitionId, List<DataRegion>> d2DataRegionMap = new HashMap<>();
+    List<DataRegionReplicaSet> d2DataRegions = new ArrayList<>();
+    d2DataRegions.add(new DataRegionReplicaSet(new DataRegionId(3),
+            Arrays.asList(new EndPoint("192.0.0.5", 9000), new EndPoint("192.0.0.6", 9000))));
+    Map<TimePartitionId, List<DataRegionReplicaSet>> d2DataRegionMap = new HashMap<>();
     d2DataRegionMap.put(new TimePartitionId(), d2DataRegions);
 
-    dataPartitionInfo.put("root.sg.d1", d1DataRegionMap);
-    dataPartitionInfo.put("root.sg.d2", d2DataRegionMap);
 
-    analysis.setDataPartitionInfoOld(dataPartitionInfo);
+    sgPartitionMap.put(new DeviceGroupId(device1.length()), d1DataRegionMap);
+    sgPartitionMap.put(new DeviceGroupId(device2.length()), d2DataRegionMap);
+
+    dataPartitionMap.put("root.sg", sgPartitionMap);
+
+    dataPartitionInfo.setDataPartitionMap(dataPartitionMap);
+
+    analysis.setDataPartitionInfo(dataPartitionInfo);
     return analysis;
   }
 }