You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/03/23 11:13:29 UTC
[iotdb] 02/02: complete basic distributed plan
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/distribution_planner
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 56618845ed2e67bc3206fdfe3ade496d1d37d5ad
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Mar 23 19:12:33 2022 +0800
complete basic distributed plan
---
.../iotdb/commons/partition/DataPartitionInfo.java | 36 +++++++--
.../iotdb/commons/partition/DataRegionId.java | 8 ++
...ionPlaceInfo.java => DataRegionReplicaSet.java} | 11 ++-
.../iotdb/commons/partition/DeviceGroupId.java | 12 +++
.../{FragmentId.java => PlanFragmentId.java} | 17 ++--
.../iotdb/db/mpp/execution/FragmentInfo.java | 6 +-
.../iotdb/db/mpp/execution/QueryExecution.java | 7 --
.../mpp/execution/scheduler/ClusterScheduler.java | 4 +-
.../db/mpp/execution/scheduler/IScheduler.java | 4 +-
.../execution/scheduler/StandaloneScheduler.java | 4 +-
.../apache/iotdb/db/mpp/sql/analyze/Analysis.java | 28 +------
.../db/mpp/sql/planner/DistributionPlanner.java | 51 +++++++-----
.../mpp/sql/planner/plan/DistributedQueryPlan.java | 4 +-
.../db/mpp/sql/planner/plan/FragmentInstance.java | 41 +++++++++-
.../sql/planner/plan/IFragmentParallelPlaner.java | 6 +-
.../db/mpp/sql/planner/plan/PlanFragment.java | 44 ++++++++++-
.../db/mpp/sql/planner/plan/PlanFragmentId.java | 42 ----------
.../plan/SimpleFragmentParallelPlanner.java | 92 +++++++++++++++++++++-
.../db/mpp/sql/planner/plan/node/PlanNodeUtil.java | 25 ++++++
.../planner/plan/node/process/ExchangeNode.java | 26 +++++-
.../planner/plan/node/sink/FragmentSinkNode.java | 34 ++++++--
.../planner/plan/node/source/CsvSourceNode.java | 11 +++
.../plan/node/source/SeriesAggregateScanNode.java | 16 ++--
.../planner/plan/node/source/SeriesScanNode.java | 29 +++----
.../sql/planner/plan/node/source/SourceNode.java | 5 ++
.../db/mpp/sql/plan/DistributionPlannerTest.java | 58 ++++++++------
26 files changed, 449 insertions(+), 172 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionInfo.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionInfo.java
index 910a649..6d7a4a4 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionInfo.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionInfo.java
@@ -18,20 +18,44 @@
*/
package org.apache.iotdb.commons.partition;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
public class DataPartitionInfo {
// Map<StorageGroup, Map<DeviceGroupID, Map<TimePartitionId, List<DataRegionPlaceInfo>>>>
- private Map<String, Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionPlaceInfo>>>> dataPartitionInfo;
+ private Map<String, Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionReplicaSet>>>> dataPartitionMap;
- public Map<String, Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionPlaceInfo>>>> getDataPartitionInfo() {
- return dataPartitionInfo;
+ public Map<String, Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionReplicaSet>>>> getDataPartitionMap() {
+ return dataPartitionMap;
}
- public void setDataPartitionInfo(
- Map<String, Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionPlaceInfo>>>> dataPartitionInfo) {
- this.dataPartitionInfo = dataPartitionInfo;
+ public void setDataPartitionMap(
+ Map<String, Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionReplicaSet>>>> dataPartitionMap) {
+ this.dataPartitionMap = dataPartitionMap;
+ }
+
+ public List<DataRegionReplicaSet> getDataRegionReplicaSet(String deviceName, List<TimePartitionId> timePartitionIdList) {
+ String storageGroup = getStorageGroupByDevice(deviceName);
+ DeviceGroupId deviceGroupId = calculateDeviceGroupId(deviceName);
+ // TODO: (xingtanzjr) the timePartitionIdList is ignored
+ return dataPartitionMap.get(storageGroup).get(deviceGroupId).values().stream().flatMap(Collection::stream).collect(Collectors.toList());
+ }
+
+ private DeviceGroupId calculateDeviceGroupId(String deviceName) {
+ // TODO: (xingtanzjr) implement the real algorithm for calculation of DeviceGroupId
+ return new DeviceGroupId(deviceName.length());
+ }
+
+ private String getStorageGroupByDevice(String deviceName) {
+ for(String storageGroup : dataPartitionMap.keySet()) {
+ if (deviceName.startsWith(storageGroup)) {
+ return storageGroup;
+ }
+ }
+ // TODO: (xingtanzjr) how to handle this exception in IoTDB
+ return null;
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionId.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionId.java
index 92e290a..dffdd90 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionId.java
@@ -21,6 +21,10 @@ package org.apache.iotdb.commons.partition;
public class DataRegionId {
private int dataRegionId;
+ public DataRegionId(int dataRegionId) {
+ this.dataRegionId = dataRegionId;
+ }
+
public int getDataRegionId() {
return dataRegionId;
}
@@ -28,4 +32,8 @@ public class DataRegionId {
public void setDataRegionId(int dataRegionId) {
this.dataRegionId = dataRegionId;
}
+
+ public String toString() {
+ return String.format("DataRegion-%d", dataRegionId);
+ }
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionPlaceInfo.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionReplicaSet.java
similarity index 81%
rename from node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionPlaceInfo.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionReplicaSet.java
index bd79fc3..2078bc2 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionPlaceInfo.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionReplicaSet.java
@@ -21,10 +21,15 @@ package org.apache.iotdb.commons.partition;
import java.util.List;
import org.apache.iotdb.service.rpc.thrift.EndPoint;
-public class DataRegionPlaceInfo {
+public class DataRegionReplicaSet {
private DataRegionId dataRegionId;
private List<EndPoint> endPointList;
+ public DataRegionReplicaSet(DataRegionId dataRegionId, List<EndPoint> endPointList) {
+ this.dataRegionId = dataRegionId;
+ this.endPointList = endPointList;
+ }
+
public List<EndPoint> getEndPointList() {
return endPointList;
}
@@ -40,4 +45,8 @@ public class DataRegionPlaceInfo {
public void setDataRegionId(DataRegionId dataRegionId) {
this.dataRegionId = dataRegionId;
}
+
+ public String toString() {
+ return String.format("%s:%s", dataRegionId, endPointList);
+ }
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DeviceGroupId.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DeviceGroupId.java
index e6aee53..2f8158f 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DeviceGroupId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DeviceGroupId.java
@@ -21,6 +21,10 @@ package org.apache.iotdb.commons.partition;
public class DeviceGroupId {
private int deviceGroupId;
+ public DeviceGroupId(int deviceGroupId) {
+ this.deviceGroupId = deviceGroupId;
+ }
+
public int getDeviceGroupId() {
return deviceGroupId;
}
@@ -28,4 +32,12 @@ public class DeviceGroupId {
public void setDeviceGroupId(int deviceGroupId) {
this.deviceGroupId = deviceGroupId;
}
+
+ public int hashCode() {
+ return new Integer(deviceGroupId).hashCode();
+ }
+
+ public boolean equals(Object obj) {
+ return obj instanceof DeviceGroupId && this.deviceGroupId == ((DeviceGroupId)obj).deviceGroupId;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
similarity index 77%
rename from server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentId.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
index ef0df6b..d43823c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
@@ -23,26 +23,26 @@ import java.util.List;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
-public class FragmentId {
+public class PlanFragmentId {
private final QueryId queryId;
private final int id;
- public static FragmentId valueOf(String stageId) {
+ public static PlanFragmentId valueOf(String stageId) {
List<String> ids = QueryId.parseDottedId(stageId, 2, "stageId");
return valueOf(ids);
}
- public static FragmentId valueOf(List<String> ids) {
+ public static PlanFragmentId valueOf(List<String> ids) {
checkArgument(ids.size() == 2, "Expected two ids but got: %s", ids);
- return new FragmentId(new QueryId(ids.get(0)), Integer.parseInt(ids.get(1)));
+ return new PlanFragmentId(new QueryId(ids.get(0)), Integer.parseInt(ids.get(1)));
}
- public FragmentId(String queryId, int id) {
+ public PlanFragmentId(String queryId, int id) {
this(new QueryId(queryId), id);
}
- public FragmentId(QueryId queryId, int id) {
+ public PlanFragmentId(QueryId queryId, int id) {
this.queryId = requireNonNull(queryId, "queryId is null");
this.id = id;
}
@@ -54,4 +54,9 @@ public class FragmentId {
public int getId() {
return id;
}
+
+ public String toString() {
+ return String.format("%s.%d", queryId, id);
+ }
+
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInfo.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInfo.java
index 7bd9300..9e3412e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInfo.java
@@ -18,21 +18,21 @@
*/
package org.apache.iotdb.db.mpp.execution;
-import org.apache.iotdb.db.mpp.common.FragmentId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
import java.util.List;
public class FragmentInfo {
- private final FragmentId stageId;
+ private final PlanFragmentId stageId;
private final FragmentState state;
private final PlanFragment plan;
private final List<FragmentInfo> childrenFragments;
public FragmentInfo(
- FragmentId stageId,
+ PlanFragmentId stageId,
FragmentState state,
PlanFragment plan,
List<FragmentInfo> childrenFragments) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index e58b755..90b3bf5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -61,7 +61,6 @@ public class QueryExecution {
public void start() {
doLogicalPlan();
doDistributedPlan();
- planFragmentInstances();
schedule();
}
@@ -89,12 +88,6 @@ public class QueryExecution {
this.distributedPlan = planner.planFragments();
}
- // Convert fragment to detailed instance
- // And for parallel-able fragment, clone it into several instances with different params.
- public void planFragmentInstances() {
-
- }
-
/**
* This method will be called by the request thread from client connection. This method will block
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
index 4af8b31..75d285c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.mpp.execution.scheduler;
-import org.apache.iotdb.db.mpp.common.FragmentId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.FragmentInfo;
import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
@@ -67,7 +67,7 @@ public class ClusterScheduler implements IScheduler {
public void failFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause) {}
@Override
- public void cancelFragment(FragmentId fragmentId) {}
+ public void cancelFragment(PlanFragmentId planFragmentId) {}
// Send the instances to other nodes
private void sendFragmentInstances() {}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java
index 16145fd..b116f84 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.mpp.execution.scheduler;
-import org.apache.iotdb.db.mpp.common.FragmentId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.FragmentInfo;
@@ -36,5 +36,5 @@ public interface IScheduler {
void failFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause);
- void cancelFragment(FragmentId fragmentId);
+ void cancelFragment(PlanFragmentId planFragmentId);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
index bcf2bbd..b96d214 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.mpp.execution.scheduler;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.metadata.SchemaEngine;
-import org.apache.iotdb.db.mpp.common.FragmentId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.FragmentInfo;
@@ -54,5 +54,5 @@ public class StandaloneScheduler implements IScheduler {
public void failFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause) {}
@Override
- public void cancelFragment(FragmentId fragmentId) {}
+ public void cancelFragment(PlanFragmentId planFragmentId) {}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java
index 60e3da1..3c8ecfc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java
@@ -20,11 +20,9 @@
package org.apache.iotdb.db.mpp.sql.analyze;
import org.apache.iotdb.commons.partition.DataPartitionInfo;
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
import org.apache.iotdb.commons.partition.SchemaPartitionInfo;
-import org.apache.iotdb.commons.partition.TimePartitionId;
-import org.apache.iotdb.db.metadata.SchemaRegion;
import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.mpp.common.DataRegion;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -44,31 +42,13 @@ public class Analysis {
// indicate whether this statement is write or read
private QueryType queryType;
- // DataPartitionInfo
- // DeviceGroup -> DataRegionTimeSlice -> List<DataRegion>
- @Deprecated
- private Map<String, Map<TimePartitionId, List<DataRegion>>> dataPartitionInfoOld;
-
private DataPartitionInfo dataPartitionInfo;
private SchemaPartitionInfo schemaPartitionInfo;
- public Set<DataRegion> getPartitionInfo(PartialPath seriesPath, Filter timefilter) {
- if (timefilter == null) {
- // TODO: (xingtanzjr) we need to have a method to get the deviceGroup by device
- String deviceGroup = seriesPath.getDevice();
- Set<DataRegion> result = new HashSet<>();
- this.dataPartitionInfoOld.get(deviceGroup).values().forEach(result::addAll);
- return result;
- } else {
- // TODO: (xingtanzjr) complete this branch
- return null;
- }
- }
-
- public void setDataPartitionInfoOld(
- Map<String, Map<TimePartitionId, List<DataRegion>>> dataPartitionInfoOld) {
- this.dataPartitionInfoOld = dataPartitionInfoOld;
+ public List<DataRegionReplicaSet> getPartitionInfo(PartialPath seriesPath, Filter timefilter) {
+ // TODO: (xingtanzjr) implement the calculation of timePartitionIdList
+ return dataPartitionInfo.getDataRegionReplicaSet(seriesPath.getDevice(), null);
}
public Statement getStatement() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index e4cde96..b9da77d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -18,10 +18,10 @@
*/
package org.apache.iotdb.db.mpp.sql.planner;
-import org.apache.iotdb.db.mpp.common.DataRegion;
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
-import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan;
-import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.mpp.sql.planner.plan.*;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.*;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
@@ -38,6 +38,8 @@ public class DistributionPlanner {
private Analysis analysis;
private LogicalQueryPlan logicalPlan;
+ private int planFragmentIndex = 0;
+
public DistributionPlanner(Analysis analysis, LogicalQueryPlan logicalPlan) {
this.analysis = analysis;
this.logicalPlan = logicalPlan;
@@ -62,8 +64,20 @@ public class DistributionPlanner {
PlanNode rootAfterRewrite = rewriteSource();
PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
SubPlan subPlan = splitFragment(rootWithExchange);
+ List<FragmentInstance> fragmentInstances = planFragmentInstances(subPlan);
return new DistributedQueryPlan(
- logicalPlan.getContext(), subPlan, subPlan.getPlanFragmentList());
+ logicalPlan.getContext(), subPlan, subPlan.getPlanFragmentList(), fragmentInstances);
+ }
+
+ // Convert fragment to detailed instance
+ // And for parallel-able fragment, clone it into several instances with different params.
+ public List<FragmentInstance> planFragmentInstances(SubPlan subPlan) {
+ IFragmentParallelPlaner parallelPlaner = new SimpleFragmentParallelPlanner(subPlan);
+ return parallelPlaner.parallelPlan();
+ }
+
+ private PlanFragmentId getNextFragmentId() {
+ return new PlanFragmentId(this.logicalPlan.getContext().getQueryId(), this.planFragmentIndex ++);
}
private class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanContext> {
@@ -84,13 +98,13 @@ public class DistributionPlanner {
// If the child is SeriesScanNode, we need to check whether this node should be seperated
// into several splits.
SeriesScanNode handle = (SeriesScanNode) child;
- Set<DataRegion> dataDistribution =
+ List<DataRegionReplicaSet> dataDistribution =
analysis.getPartitionInfo(handle.getSeriesPath(), handle.getTimeFilter());
// If the size of dataDistribution is m, this SeriesScanNode should be seperated into m
// SeriesScanNode.
- for (DataRegion dataRegion : dataDistribution) {
+ for (DataRegionReplicaSet dataRegion : dataDistribution) {
SeriesScanNode split = (SeriesScanNode) handle.clone();
- split.setDataRegion(dataRegion);
+ split.setDataRegionReplicaSet(dataRegion);
sources.add(split);
}
} else if (child instanceof SeriesAggregateScanNode) {
@@ -107,8 +121,8 @@ public class DistributionPlanner {
}
// Step 2: For the source nodes, group them by the DataRegion.
- Map<DataRegion, List<SeriesScanNode>> sourceGroup =
- sources.stream().collect(Collectors.groupingBy(SeriesScanNode::getDataRegion));
+ Map<DataRegionReplicaSet, List<SeriesScanNode>> sourceGroup =
+ sources.stream().collect(Collectors.groupingBy(SeriesScanNode::getDataRegionReplicaSet));
// Step 3: For the source nodes which belong to same data region, add a TimeJoinNode for them
// and make the
// new TimeJoinNode as the child of current TimeJoinNode
@@ -158,13 +172,13 @@ public class DistributionPlanner {
public PlanNode visitSeriesScan(SeriesScanNode node, NodeGroupContext context) {
context.putNodeDistribution(
- node.getId(), new NodeDistribution(NodeDistributionType.NO_CHILD, node.getDataRegion()));
+ node.getId(), new NodeDistribution(NodeDistributionType.NO_CHILD, node.getDataRegionReplicaSet()));
return node.clone();
}
public PlanNode visitSeriesAggregate(SeriesAggregateScanNode node, NodeGroupContext context) {
context.putNodeDistribution(
- node.getId(), new NodeDistribution(NodeDistributionType.NO_CHILD, node.getDataRegion()));
+ node.getId(), new NodeDistribution(NodeDistributionType.NO_CHILD, node.getDataRegionReplicaSet()));
return node.clone();
}
@@ -177,7 +191,7 @@ public class DistributionPlanner {
visitedChildren.add(visit(child, context));
});
- DataRegion dataRegion = calculateDataRegionByChildren(visitedChildren, context);
+ DataRegionReplicaSet dataRegion = calculateDataRegionByChildren(visitedChildren, context);
NodeDistributionType distributionType =
nodeDistributionIsSame(visitedChildren, context)
? NodeDistributionType.SAME_WITH_ALL_CHILDREN
@@ -206,12 +220,11 @@ public class DistributionPlanner {
return newNode;
}
- private DataRegion calculateDataRegionByChildren(
+ private DataRegionReplicaSet calculateDataRegionByChildren(
List<PlanNode> children, NodeGroupContext context) {
// We always make the dataRegion of TimeJoinNode to be the same as its first child.
// TODO: (xingtanzjr) We need to implement more suitable policies here
- DataRegion childDataRegion = context.getNodeDistribution(children.get(0).getId()).dataRegion;
- return new DataRegion(childDataRegion.getDataRegionId(), childDataRegion.getEndpoint());
+ return context.getNodeDistribution(children.get(0).getId()).dataRegion;
}
private boolean nodeDistributionIsSame(List<PlanNode> children, NodeGroupContext context) {
@@ -256,9 +269,9 @@ public class DistributionPlanner {
private class NodeDistribution {
private NodeDistributionType type;
- private DataRegion dataRegion;
+ private DataRegionReplicaSet dataRegion;
- private NodeDistribution(NodeDistributionType type, DataRegion dataRegion) {
+ private NodeDistribution(NodeDistributionType type, DataRegionReplicaSet dataRegion) {
this.type = type;
this.dataRegion = dataRegion;
}
@@ -277,7 +290,7 @@ public class DistributionPlanner {
ExchangeNode exchangeNode = (ExchangeNode) root;
FragmentSinkNode sinkNode = new FragmentSinkNode(PlanNodeIdAllocator.generateId());
sinkNode.setChild(exchangeNode.getChild());
- sinkNode.setRemoteDestinationNode(exchangeNode);
+ sinkNode.setDownStreamNode(exchangeNode);
// Record the source node info in the ExchangeNode so that we can keep the connection of
// these nodes/fragments
exchangeNode.setRemoteSourceNode(sinkNode);
@@ -297,7 +310,7 @@ public class DistributionPlanner {
}
private SubPlan createSubPlan(PlanNode root) {
- PlanFragment fragment = new PlanFragment(PlanFragmentId.generateId(), root);
+ PlanFragment fragment = new PlanFragment(getNextFragmentId(), root);
return new SubPlan(fragment);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
index c1eb086..90ff669 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
@@ -26,12 +26,14 @@ public class DistributedQueryPlan {
private MPPQueryContext context;
private SubPlan rootSubPlan;
private List<PlanFragment> fragments;
+ private List<FragmentInstance> instances;
public DistributedQueryPlan(
- MPPQueryContext context, SubPlan rootSubPlan, List<PlanFragment> fragments) {
+ MPPQueryContext context, SubPlan rootSubPlan, List<PlanFragment> fragments, List<FragmentInstance> instances) {
this.context = context;
this.rootSubPlan = rootSubPlan;
this.fragments = fragments;
+ this.instances = instances;
}
public List<PlanFragment> getFragments() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index 56fe111..07e6be1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -18,14 +18,53 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan;
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
public class FragmentInstance {
private FragmentInstanceId id;
// The reference of PlanFragment which this instance is generated from
private PlanFragment fragment;
-
+ // The DataRegion where the FragmentInstance should run
+ private DataRegionReplicaSet dataRegion;
+ private EndPoint hostEndpoint;
+
// We can add some more params for a specific FragmentInstance
// So that we can make different FragmentInstance owns different data range.
+
+ public FragmentInstance(PlanFragment fragment, int index) {
+ this.fragment = fragment;
+ this.id = generateId(fragment.getId(), index);
+ }
+
+ public static FragmentInstanceId generateId(PlanFragmentId id, int index) {
+ return new FragmentInstanceId(String.format("%s.%d", id, index));
+ }
+
+ public DataRegionReplicaSet getDataRegionId() {
+ return dataRegion;
+ }
+
+ public void setDataRegionId(DataRegionReplicaSet dataRegion) {
+ this.dataRegion = dataRegion;
+ }
+
+ public EndPoint getHostEndpoint() {
+ return hostEndpoint;
+ }
+
+ public void setHostEndpoint(EndPoint hostEndpoint) {
+ this.hostEndpoint = hostEndpoint;
+ }
+
+ public PlanFragment getFragment() {
+ return fragment;
+ }
+
+ public FragmentInstanceId getId() {
+ return id;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/IFragmentParallelPlaner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/IFragmentParallelPlaner.java
index f86b967..e8557f4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/IFragmentParallelPlaner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/IFragmentParallelPlaner.java
@@ -27,9 +27,9 @@ import java.util.List;
public interface IFragmentParallelPlaner {
/**
*
- * @param root The root of SubPlan tree. The relation between each PlanFragment is necessary because sometimes we
- * need to change the source/sink for each FragmentInstance according to its upstream/downstream
+ * The relation between each PlanFragment is necessary because sometimes we
+ * need to change the source/sink for each FragmentInstance according to its upstream/downstream
* @return All the FragmentInstances which can run in parallel
*/
- List<FragmentInstance> parallelPlan(SubPlan root);
+ List<FragmentInstance> parallelPlan();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
index f8de7f0..173a8de 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
@@ -18,9 +18,12 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan;
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
-// TODO: consider whether it is necessary to make PlanFragment as a TreeNode
/** PlanFragment contains a sub-query of distributed query. */
public class PlanFragment {
private PlanFragmentId id;
@@ -42,4 +45,43 @@ public class PlanFragment {
public String toString() {
return String.format("PlanFragment-%s", getId());
}
+
+ // Every Fragment should only run in DataRegion.
+ // But it can select any one of the Endpoint of the target DataRegion
+ // In current version, one PlanFragment should contain at least one SourceNode,
+ // and the DataRegions of all SourceNodes should be same in one PlanFragment.
+ // So we can use the DataRegion of one SourceNode as the PlanFragment's DataRegion.
+ public DataRegionReplicaSet getTargetDataRegion() {
+ return getNodeDataRegion(root);
+ }
+
+ private DataRegionReplicaSet getNodeDataRegion(PlanNode root) {
+ if (root instanceof SourceNode) {
+ return ((SourceNode) root).getDataRegionReplicaSet();
+ }
+ for (PlanNode child : root.getChildren()) {
+ DataRegionReplicaSet result = getNodeDataRegion(child);
+ if (result != null) {
+ return result;
+ }
+ }
+ return null;
+ }
+
+ public PlanNode getPlanNodeById(PlanNodeId nodeId) {
+ return getPlanNodeById(root, nodeId);
+ }
+
+ private PlanNode getPlanNodeById(PlanNode root, PlanNodeId nodeId) {
+ if (root.getId().equals(nodeId)) {
+ return root;
+ }
+ for (PlanNode child : root.getChildren()) {
+ PlanNode node = getPlanNodeById(child, nodeId);
+ if (node != null) {
+ return node;
+ }
+ }
+ return null;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
deleted file mode 100644
index 4f83c05..0000000
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.mpp.sql.planner.plan;
-
-public class PlanFragmentId {
- private String id;
-
- public PlanFragmentId(String id) {
- this.id = id;
- }
-
- public static int initialId = 0;
-
- public static synchronized PlanFragmentId generateId() {
- initialId++;
- return new PlanFragmentId(String.valueOf(initialId));
- }
-
- public String getId() {
- return id;
- }
-
- public String toString() {
- return id;
- }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
index 2277ca4..4a617a0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
@@ -16,18 +16,104 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.iotdb.db.mpp.sql.planner.plan;
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
+
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* A simple implementation of IFragmentParallelPlaner.
* This planner will transform one PlanFragment into only one FragmentInstance.
*/
public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner{
+
+ private SubPlan subPlan;
+
+ // Record all the FragmentInstances belonged to same PlanFragment
+ Map<PlanFragmentId, FragmentInstance> instanceMap;
+ // Record which PlanFragment the PlanNode belongs
+ Map<PlanNodeId, PlanFragmentId> planNodeMap;
+ List<FragmentInstance> fragmentInstanceList;
+
+ public SimpleFragmentParallelPlanner(SubPlan subPlan) {
+ this.subPlan = subPlan;
+ this.instanceMap = new HashMap<>();
+ this.planNodeMap = new HashMap<>();
+ this.fragmentInstanceList = new ArrayList<>();
+ }
+
@Override
- public List<FragmentInstance> parallelPlan(SubPlan root) {
- return null;
+ public List<FragmentInstance> parallelPlan() {
+ prepare();
+ calculateNodeTopologyBetweenInstance();
+ return fragmentInstanceList;
+ }
+
+ private void prepare() {
+ List<PlanFragment> fragments = subPlan.getPlanFragmentList();
+ for (PlanFragment fragment : fragments) {
+ recordPlanNodeRelation(fragment.getRoot(), fragment.getId());
+ produceFragmentInstance(fragment);
+ }
+ }
+
+ private void produceFragmentInstance(PlanFragment fragment) {
+ // If one PlanFragment will produce several FragmentInstance, the instanceIdx will be increased one by one
+ int instanceIdx = 0;
+ PlanNode rootCopy = PlanNodeUtil.deepCopy(fragment.getRoot());
+ FragmentInstance fragmentInstance = new FragmentInstance(fragment, instanceIdx);
+
+ // Get the target DataRegion for origin PlanFragment, then its instance will be distributed one of them.
+ DataRegionReplicaSet dataRegion = fragment.getTargetDataRegion();
+
+ // Set DataRegion and target host for the instance
+ // We need to store all the replica host in case of the scenario that the instance need to be redirected
+ // to another host when scheduling
+ fragmentInstance.setDataRegionId(dataRegion);
+
+ // TODO: (xingtanzjr) We select the first Endpoint as the default target host for current instance
+ fragmentInstance.setHostEndpoint(dataRegion.getEndPointList().get(0));
+ instanceMap.putIfAbsent(fragment.getId(), fragmentInstance);
+ fragmentInstanceList.add(fragmentInstance);
+ }
+
+ private void calculateNodeTopologyBetweenInstance() {
+ for(FragmentInstance instance : fragmentInstanceList) {
+ PlanNode rootNode = instance.getFragment().getRoot();
+ if (rootNode instanceof FragmentSinkNode) {
+ // Set target Endpoint for FragmentSinkNode
+ FragmentSinkNode sinkNode = (FragmentSinkNode) rootNode;
+ PlanNodeId downStreamNodeId = sinkNode.getDownStreamNode().getId();
+ FragmentInstance downStreamInstance = findDownStreamInstance(downStreamNodeId);
+ sinkNode.setDownStream(downStreamInstance.getHostEndpoint(), downStreamInstance.getId(), downStreamNodeId);
+
+ // Set upstream info for corresponding ExchangeNode in downstream FragmentInstance
+ PlanNode downStreamExchangeNode = downStreamInstance.getFragment().getPlanNodeById(downStreamNodeId);
+ ((ExchangeNode)downStreamExchangeNode).setUpstream(instance.getHostEndpoint(), instance.getId(), sinkNode.getId());
+ }
+ }
+ }
+
+
+
+ private FragmentInstance findDownStreamInstance(PlanNodeId exchangeNodeId) {
+ return instanceMap.get(planNodeMap.get(exchangeNodeId));
+ }
+
+ private void recordPlanNodeRelation(PlanNode root, PlanFragmentId planFragmentId) {
+ planNodeMap.put(root.getId(), planFragmentId);
+ for (PlanNode child : root.getChildren()) {
+ recordPlanNodeRelation(child, planFragmentId);
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
index 1a597b2..902edd0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
@@ -18,6 +18,9 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node;
+import java.util.List;
+import java.util.stream.Collectors;
+
public class PlanNodeUtil {
public static void printPlanNode(PlanNode root) {
printPlanNodeWithLevel(root, 0);
@@ -36,4 +39,26 @@ public class PlanNodeUtil {
System.out.print("\t");
}
}
+
+ public static String nodeToString(PlanNode root) {
+ StringBuilder result = new StringBuilder();
+ nodeToString(root, 0, result);
+ return result.toString();
+ }
+
+ private static void nodeToString(PlanNode root, int level, StringBuilder result) {
+ for (int i = 0 ; i < level; i ++) {
+ result.append("\t");
+ }
+ result.append(root.toString());
+ result.append(System.lineSeparator());
+ for (PlanNode child: root.getChildren()) {
+ nodeToString(child, level + 1, result);
+ }
+ }
+
+ public static PlanNode deepCopy(PlanNode root) {
+ List<PlanNode> children = root.getChildren().stream().map(PlanNodeUtil::deepCopy).collect(Collectors.toList());
+ return root.cloneWithChildren(children);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index be2b351..4ee44a0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -19,11 +19,14 @@
package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
+import org.apache.iotdb.consensus.common.Endpoint;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
import com.google.common.collect.ImmutableList;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
import java.util.List;
@@ -34,8 +37,9 @@ public class ExchangeNode extends PlanNode {
// In current version, one ExchangeNode will only have one source.
// And the fragment which the sourceNode belongs to will only have one instance.
// Thus, by nodeId and endpoint, the ExchangeNode can know where its source from.
- private PlanNodeId sourceNodeId; // sourceNodeId is the same as the child's PlanNodeId
- private String sourceEndpoint; // The endpoint where the sourceNode will be distributed
+ private EndPoint upstreamEndpoint;
+ private FragmentInstanceId upstreamInstanceId;
+ private PlanNodeId upstreamPlanNodeId;
public ExchangeNode(PlanNodeId id) {
super(id);
@@ -61,6 +65,12 @@ public class ExchangeNode extends PlanNode {
return node;
}
+ public void setUpstream(EndPoint endPoint, FragmentInstanceId instanceId, PlanNodeId nodeId) {
+ this.upstreamEndpoint = endPoint;
+ this.upstreamInstanceId = instanceId;
+ this.upstreamPlanNodeId = nodeId;
+ }
+
@Override
public List<String> getOutputColumnNames() {
return null;
@@ -89,4 +99,16 @@ public class ExchangeNode extends PlanNode {
public void cleanChildren() {
this.child = null;
}
+
+ public EndPoint getUpstreamEndpoint() {
+ return upstreamEndpoint;
+ }
+
+ public FragmentInstanceId getUpstreamInstanceId() {
+ return upstreamInstanceId;
+ }
+
+ public PlanNodeId getUpstreamPlanNodeId() {
+ return upstreamPlanNodeId;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
index 935ca13..1632304 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
@@ -18,17 +18,23 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
import com.google.common.collect.ImmutableList;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
import java.util.List;
public class FragmentSinkNode extends SinkNode {
private PlanNode child;
- private ExchangeNode remoteDestinationNode;
+ private ExchangeNode downStreamNode;
+
+ private EndPoint downStreamEndpoint;
+ private FragmentInstanceId downStreamInstanceId;
+ private PlanNodeId downStreamPlanNodeId;
public FragmentSinkNode(PlanNodeId id) {
super(id);
@@ -72,11 +78,29 @@ public class FragmentSinkNode extends SinkNode {
return String.format("FragmentSinkNode-%s", getId());
}
- public ExchangeNode getRemoteDestinationNode() {
- return remoteDestinationNode;
+ public ExchangeNode getDownStreamNode() {
+ return downStreamNode;
+ }
+
+ public void setDownStreamNode(ExchangeNode downStreamNode) {
+ this.downStreamNode = downStreamNode;
+ }
+
+ public void setDownStream(EndPoint endPoint, FragmentInstanceId instanceId, PlanNodeId nodeId) {
+ this.downStreamEndpoint = endPoint;
+ this.downStreamInstanceId = instanceId;
+ this.downStreamPlanNodeId = nodeId;
+ }
+
+ public EndPoint getDownStreamEndpoint() {
+ return downStreamEndpoint;
+ }
+
+ public FragmentInstanceId getDownStreamInstanceId() {
+ return downStreamInstanceId;
}
- public void setRemoteDestinationNode(ExchangeNode remoteDestinationNode) {
- this.remoteDestinationNode = remoteDestinationNode;
+ public PlanNodeId getDownStreamPlanNodeId() {
+ return downStreamPlanNodeId;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java
index 797611e..fabe25a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -55,4 +56,14 @@ public class CsvSourceNode extends SourceNode {
@Override
public void open() throws Exception {}
+
+ @Override
+ public DataRegionReplicaSet getDataRegionReplicaSet() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setDataRegionReplicaSet(DataRegionReplicaSet dataRegionReplicaSet) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
index 674b6b4..e5fd770 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
-import org.apache.iotdb.db.mpp.common.DataRegion;
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
import org.apache.iotdb.db.mpp.common.GroupByTimeParameter;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -61,7 +61,7 @@ public class SeriesAggregateScanNode extends SourceNode {
private String columnName;
// The id of DataRegion where the node will run
- private DataRegion dataRegion;
+ private DataRegionReplicaSet dataRegionReplicaSet;
public SeriesAggregateScanNode(PlanNodeId id) {
super(id);
@@ -102,13 +102,19 @@ public class SeriesAggregateScanNode extends SourceNode {
public void open() throws Exception {}
@Override
- public void close() throws Exception {}
+ public DataRegionReplicaSet getDataRegionReplicaSet() {
+ return this.dataRegionReplicaSet;
+ }
- public DataRegion getDataRegion() {
- return dataRegion;
+ @Override
+ public void setDataRegionReplicaSet(DataRegionReplicaSet dataRegionReplicaSet) {
+ this.dataRegionReplicaSet = dataRegionReplicaSet;
}
@Override
+ public void close() throws Exception {}
+
+ @Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitSeriesAggregate(this, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
index bfa2faf..893fd03 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
@@ -18,8 +18,8 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.mpp.common.DataRegion;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
@@ -62,16 +62,16 @@ public class SeriesScanNode extends SourceNode {
private String columnName;
// The id of DataRegion where the node will run
- private DataRegion dataRegion;
+ private DataRegionReplicaSet dataRegionReplicaSet;
public SeriesScanNode(PlanNodeId id, PartialPath seriesPath) {
super(id);
this.seriesPath = seriesPath;
}
- public SeriesScanNode(PlanNodeId id, PartialPath seriesPath, DataRegion dataRegion) {
+ public SeriesScanNode(PlanNodeId id, PartialPath seriesPath, DataRegionReplicaSet dataRegionReplicaSet) {
this(id, seriesPath);
- this.dataRegion = dataRegion;
+ this.dataRegionReplicaSet = dataRegionReplicaSet;
}
public void setTimeFilter(Filter timeFilter) {
@@ -88,6 +88,15 @@ public class SeriesScanNode extends SourceNode {
@Override
public void open() throws Exception {}
+ @Override
+ public DataRegionReplicaSet getDataRegionReplicaSet() {
+ return dataRegionReplicaSet;
+ }
+
+ public void setDataRegionReplicaSet(DataRegionReplicaSet dataRegion) {
+ this.dataRegionReplicaSet = dataRegion;
+ }
+
public void setScanOrder(OrderBy scanOrder) {
this.scanOrder = scanOrder;
}
@@ -107,7 +116,7 @@ public class SeriesScanNode extends SourceNode {
@Override
public PlanNode clone() {
- return new SeriesScanNode(getId(), getSeriesPath(), this.dataRegion);
+ return new SeriesScanNode(getId(), getSeriesPath(), this.dataRegionReplicaSet);
}
@Override
@@ -133,17 +142,9 @@ public class SeriesScanNode extends SourceNode {
return timeFilter;
}
- public void setDataRegion(DataRegion dataRegion) {
- this.dataRegion = dataRegion;
- }
-
- public DataRegion getDataRegion() {
- return dataRegion;
- }
-
public String toString() {
return String.format(
"SeriesScanNode-%s:[SeriesPath: %s, DataRegion: %s]",
- this.getId(), this.getSeriesPath(), this.getDataRegion());
+ this.getId(), this.getSeriesPath(), this.getDataRegionReplicaSet());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
index 551e9d2..67c9e2b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -28,4 +29,8 @@ public abstract class SourceNode extends PlanNode implements AutoCloseable {
}
public abstract void open() throws Exception;
+
+ public abstract DataRegionReplicaSet getDataRegionReplicaSet();
+
+ public abstract void setDataRegionReplicaSet(DataRegionReplicaSet dataRegionReplicaSet);
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index 6d98e93..f59d689 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.mpp.sql.plan;
-import org.apache.iotdb.commons.partition.TimePartitionId;
+import org.apache.iotdb.commons.partition.*;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.*;
@@ -37,12 +37,10 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.junit.Test;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -60,7 +58,7 @@ public class DistributionPlannerTest {
timeJoinNode.addChild(
new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s2")));
timeJoinNode.addChild(
- new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d2.s1")));
+ new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d22.s1")));
LimitNode root = new LimitNode(PlanNodeIdAllocator.generateId(), 10, timeJoinNode);
@@ -70,10 +68,8 @@ public class DistributionPlannerTest {
new DistributionPlanner(analysis, new LogicalQueryPlan(new MPPQueryContext(), root));
PlanNode newRoot = planner.rewriteSource();
- // PlanNodeUtil.printPlanNode(newRoot);
+// PlanNodeUtil.printPlanNode(newRoot);
assertEquals(newRoot.getChildren().get(0).getChildren().size(), 3);
- assertEquals(newRoot.getChildren().get(0).getChildren().get(0).getChildren().size(), 2);
- assertEquals(newRoot.getChildren().get(0).getChildren().get(1).getChildren().size(), 2);
}
@Test
@@ -87,7 +83,7 @@ public class DistributionPlannerTest {
timeJoinNode.addChild(
new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s2")));
timeJoinNode.addChild(
- new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d2.s1")));
+ new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d22.s1")));
LimitNode root = new LimitNode(PlanNodeIdAllocator.generateId(), 10, timeJoinNode);
@@ -120,14 +116,15 @@ public class DistributionPlannerTest {
timeJoinNode.addChild(
new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s2")));
timeJoinNode.addChild(
- new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d2.s1")));
+ new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d22.s1")));
LimitNode root = new LimitNode(PlanNodeIdAllocator.generateId(), 10, timeJoinNode);
Analysis analysis = constructAnalysis();
+ MPPQueryContext context = new MPPQueryContext("", new QueryId("query1"), null);
DistributionPlanner planner =
- new DistributionPlanner(analysis, new LogicalQueryPlan(new MPPQueryContext(), root));
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
PlanNode rootAfterRewrite = planner.rewriteSource();
PlanNode rootWithExchange = planner.addExchangeNode(rootAfterRewrite);
PlanNodeUtil.printPlanNode(rootWithExchange);
@@ -143,22 +140,37 @@ public class DistributionPlannerTest {
private Analysis constructAnalysis() {
Analysis analysis = new Analysis();
- Map<String, Map<TimePartitionId, List<DataRegion>>> dataPartitionInfo = new HashMap<>();
- List<DataRegion> d1DataRegions = new ArrayList<>();
- d1DataRegions.add(new DataRegion(1, "192.0.0.1"));
- d1DataRegions.add(new DataRegion(2, "192.0.0.1"));
- Map<TimePartitionId, List<DataRegion>> d1DataRegionMap = new HashMap<>();
+
+ String device1 = "root.sg.d1";
+ String device2 = "root.sg.d22";
+ String device3 = "root.sg.d333";
+
+ DataPartitionInfo dataPartitionInfo = new DataPartitionInfo();
+ Map<String, Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionReplicaSet>>>> dataPartitionMap = new HashMap<>();
+ Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionReplicaSet>>> sgPartitionMap = new HashMap<>();
+ List<DataRegionReplicaSet> d1DataRegions = new ArrayList<>();
+ d1DataRegions.add(new DataRegionReplicaSet(new DataRegionId(1),
+ Arrays.asList(new EndPoint("192.0.0.1", 9000), new EndPoint("192.0.0.2", 9000))));
+ d1DataRegions.add(new DataRegionReplicaSet(new DataRegionId(2),
+ Arrays.asList(new EndPoint("192.0.0.3", 9000), new EndPoint("192.0.0.4", 9000))));
+ Map<TimePartitionId, List<DataRegionReplicaSet>> d1DataRegionMap = new HashMap<>();
d1DataRegionMap.put(new TimePartitionId(), d1DataRegions);
- List<DataRegion> d2DataRegions = new ArrayList<>();
- d2DataRegions.add(new DataRegion(3, "192.0.0.1"));
- Map<TimePartitionId, List<DataRegion>> d2DataRegionMap = new HashMap<>();
+ List<DataRegionReplicaSet> d2DataRegions = new ArrayList<>();
+ d2DataRegions.add(new DataRegionReplicaSet(new DataRegionId(3),
+ Arrays.asList(new EndPoint("192.0.0.5", 9000), new EndPoint("192.0.0.6", 9000))));
+ Map<TimePartitionId, List<DataRegionReplicaSet>> d2DataRegionMap = new HashMap<>();
d2DataRegionMap.put(new TimePartitionId(), d2DataRegions);
- dataPartitionInfo.put("root.sg.d1", d1DataRegionMap);
- dataPartitionInfo.put("root.sg.d2", d2DataRegionMap);
- analysis.setDataPartitionInfoOld(dataPartitionInfo);
+ sgPartitionMap.put(new DeviceGroupId(device1.length()), d1DataRegionMap);
+ sgPartitionMap.put(new DeviceGroupId(device2.length()), d2DataRegionMap);
+
+ dataPartitionMap.put("root.sg", sgPartitionMap);
+
+ dataPartitionInfo.setDataPartitionMap(dataPartitionMap);
+
+ analysis.setDataPartitionInfo(dataPartitionInfo);
return analysis;
}
}