You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/26 07:38:30 UTC

[iotdb] 01/01: complete last query distribution plan

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/last_query_distribution
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 15ee42cc3f9793affcdd75ef3d54e622c2abf6d2
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu May 26 15:38:16 2022 +0800

    complete last query distribution plan
---
 .../planner/distribution/ExchangeNodeAdder.java    |  19 ++++
 .../plan/planner/distribution/SourceRewriter.java  |  93 ++++++++++------
 .../plan/node/process/LastQueryMergeNode.java      |   2 +-
 .../plan/node/source/AlignedLastQueryScanNode.java |  14 ++-
 .../plan/node/source/LastQueryScanNode.java        |  14 ++-
 ...Test.java => DistributionPlannerBasicTest.java} |   2 +-
 .../mpp/plan/plan/distribution/LastQueryTest.java  | 118 +++++++++++++++++++++
 7 files changed, 228 insertions(+), 34 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
index daef502c7b..6c502c464f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
@@ -33,10 +33,13 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeS
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LastQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
@@ -140,6 +143,17 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
     return processNoChildSourceNode(node, context);
   }
 
+  @Override
+  public PlanNode visitLastQueryScan(LastQueryScanNode node, NodeGroupContext context) {
+    return processNoChildSourceNode(node, context);
+  }
+
+  @Override
+  public PlanNode visitAlignedLastQueryScan(
+      AlignedLastQueryScanNode node, NodeGroupContext context) {
+    return processNoChildSourceNode(node, context);
+  }
+
   public PlanNode visitSeriesAggregationScan(
       SeriesAggregationScanNode node, NodeGroupContext context) {
     return processNoChildSourceNode(node, context);
@@ -183,6 +197,11 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
     return node;
   }
 
+  @Override
+  public PlanNode visitLastQueryMerge(LastQueryMergeNode node, NodeGroupContext context) {
+    return processMultiChildNode(node, context);
+  }
+
   @Override
   public PlanNode visitTimeJoin(TimeJoinNode node, NodeGroupContext context) {
     return processMultiChildNode(node, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index d72a1d1b60..65d4d6552f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -36,10 +36,13 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryS
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LastQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSourceNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
@@ -214,21 +217,64 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
   // TODO: (xingtanzjr) a temporary way to resolve the distribution of single SeriesScanNode issue
   @Override
   public PlanNode visitSeriesScan(SeriesScanNode node, DistributionPlanContext context) {
+    TimeJoinNode timeJoinNode =
+        new TimeJoinNode(context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder());
+    return processRawSeriesScan(node, context, timeJoinNode);
+  }
+
+  @Override
+  public PlanNode visitAlignedSeriesScan(
+      AlignedSeriesScanNode node, DistributionPlanContext context) {
+    TimeJoinNode timeJoinNode =
+        new TimeJoinNode(context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder());
+    return processRawSeriesScan(node, context, timeJoinNode);
+  }
+
+  @Override
+  public PlanNode visitLastQueryScan(LastQueryScanNode node, DistributionPlanContext context) {
+    LastQueryMergeNode mergeNode =
+        new LastQueryMergeNode(
+            context.queryContext.getQueryId().genPlanNodeId(), node.getPartitionTimeFilter());
+    return processRawSeriesScan(node, context, mergeNode);
+  }
+
+  @Override
+  public PlanNode visitAlignedLastQueryScan(
+      AlignedLastQueryScanNode node, DistributionPlanContext context) {
+    LastQueryMergeNode mergeNode =
+        new LastQueryMergeNode(
+            context.queryContext.getQueryId().genPlanNodeId(), node.getPartitionTimeFilter());
+    return processRawSeriesScan(node, context, mergeNode);
+  }
+
+  private PlanNode processRawSeriesScan(
+      SeriesSourceNode node, DistributionPlanContext context, MultiChildNode parent) {
+    List<SeriesSourceNode> sourceNodes = splitSeriesSourceNodeByPartition(node, context);
+    if (sourceNodes.size() == 1) {
+      return sourceNodes.get(0);
+    }
+    sourceNodes.forEach(parent::addChild);
+    return parent;
+  }
+
+  private List<SeriesSourceNode> splitSeriesSourceNodeByPartition(
+      SeriesSourceNode node, DistributionPlanContext context) {
+    List<SeriesSourceNode> ret = new ArrayList<>();
     List<TRegionReplicaSet> dataDistribution =
-        analysis.getPartitionInfo(node.getSeriesPath(), node.getTimeFilter());
+        analysis.getPartitionInfo(node.getPartitionPath(), node.getPartitionTimeFilter());
     if (dataDistribution.size() == 1) {
       node.setRegionReplicaSet(dataDistribution.get(0));
-      return node;
+      ret.add(node);
+      return ret;
     }
-    TimeJoinNode timeJoinNode =
-        new TimeJoinNode(context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder());
+
     for (TRegionReplicaSet dataRegion : dataDistribution) {
-      SeriesScanNode split = (SeriesScanNode) node.clone();
+      SeriesSourceNode split = (SeriesSourceNode) node.clone();
       split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
       split.setRegionReplicaSet(dataRegion);
-      timeJoinNode.addChild(split);
+      ret.add(split);
     }
-    return timeJoinNode;
+    return ret;
   }
 
   @Override
@@ -286,26 +332,6 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
     return aggregationNode;
   }
 
-  @Override
-  public PlanNode visitAlignedSeriesScan(
-      AlignedSeriesScanNode node, DistributionPlanContext context) {
-    List<TRegionReplicaSet> dataDistribution =
-        analysis.getPartitionInfo(node.getAlignedPath(), node.getTimeFilter());
-    if (dataDistribution.size() == 1) {
-      node.setRegionReplicaSet(dataDistribution.get(0));
-      return node;
-    }
-    TimeJoinNode timeJoinNode =
-        new TimeJoinNode(context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder());
-    for (TRegionReplicaSet dataRegion : dataDistribution) {
-      AlignedSeriesScanNode split = (AlignedSeriesScanNode) node.clone();
-      split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
-      split.setRegionReplicaSet(dataRegion);
-      timeJoinNode.addChild(split);
-    }
-    return timeJoinNode;
-  }
-
   @Override
   public PlanNode visitSchemaFetchMerge(
       SchemaFetchMergeNode node, DistributionPlanContext context) {
@@ -335,6 +361,11 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
     return root;
   }
 
+  @Override
+  public PlanNode visitLastQueryMerge(LastQueryMergeNode node, DistributionPlanContext context) {
+    return processRawMultiChildNode(node, context);
+  }
+
   @Override
   public PlanNode visitTimeJoin(TimeJoinNode node, DistributionPlanContext context) {
     // Although some logic is similar between Aggregation and RawDataQuery,
@@ -343,8 +374,11 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
     if (isAggregationQuery(node)) {
       return planAggregationWithTimeJoin(node, context);
     }
+    return processRawMultiChildNode(node, context);
+  }
 
-    TimeJoinNode root = (TimeJoinNode) node.clone();
+  private PlanNode processRawMultiChildNode(MultiChildNode node, DistributionPlanContext context) {
+    MultiChildNode root = (MultiChildNode) node.clone();
     // Step 1: Get all source nodes. For the node which is not source, add it as the child of
     // current TimeJoinNode
     List<SourceNode> sources = new ArrayList<>();
@@ -385,7 +419,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
             } else {
               // We clone a TimeJoinNode from root to make the params to be consistent.
               // But we need to assign a new ID to it
-              TimeJoinNode parentOfGroup = (TimeJoinNode) root.clone();
+              MultiChildNode parentOfGroup = (MultiChildNode) root.clone();
               parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
               seriesScanNodes.forEach(parentOfGroup::addChild);
               root.addChild(parentOfGroup);
@@ -402,7 +436,6 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
         root.addChild(visit(child, context));
       }
     }
-
     return root;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java
index 144ea7dc76..d697835fdc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java
@@ -35,7 +35,7 @@ import java.util.Objects;
 
 import static org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS;
 
-public class LastQueryMergeNode extends ProcessNode {
+public class LastQueryMergeNode extends MultiChildNode {
 
   // make sure child in list has been ordered by their sensor name
   private List<PlanNode> children;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedLastQueryScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedLastQueryScanNode.java
index 66c8a54f34..514d2485ed 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedLastQueryScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedLastQueryScanNode.java
@@ -19,12 +19,14 @@
 package org.apache.iotdb.db.mpp.plan.planner.plan.node.source;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import com.google.common.collect.ImmutableList;
 
@@ -34,7 +36,7 @@ import java.util.Objects;
 
 import static org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS;
 
-public class AlignedLastQueryScanNode extends SourceNode {
+public class AlignedLastQueryScanNode extends SeriesSourceNode {
   // The path of the target series which will be scanned.
   private final AlignedPath seriesPath;
 
@@ -136,4 +138,14 @@ public class AlignedLastQueryScanNode extends SourceNode {
   public AlignedPath getSeriesPath() {
     return seriesPath;
   }
+
+  @Override
+  public PartialPath getPartitionPath() {
+    return seriesPath;
+  }
+
+  @Override
+  public Filter getPartitionTimeFilter() {
+    return null;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java
index be97c5187d..e1db6e1d6a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java
@@ -19,12 +19,14 @@
 package org.apache.iotdb.db.mpp.plan.planner.plan.node.source;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import com.google.common.collect.ImmutableList;
 
@@ -32,7 +34,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Objects;
 
-public class LastQueryScanNode extends SourceNode {
+public class LastQueryScanNode extends SeriesSourceNode {
 
   public static final List<String> LAST_QUERY_HEADER_COLUMNS =
       ImmutableList.of("timeseries", "value", "dataType");
@@ -138,4 +140,14 @@ public class LastQueryScanNode extends SourceNode {
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
     return new LastQueryScanNode(planNodeId, partialPath);
   }
+
+  @Override
+  public PartialPath getPartitionPath() {
+    return seriesPath;
+  }
+
+  @Override
+  public Filter getPartitionTimeFilter() {
+    return null;
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/BasicTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/DistributionPlannerBasicTest.java
similarity index 99%
rename from server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/BasicTest.java
rename to server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/DistributionPlannerBasicTest.java
index f6133ac41c..ea762fb2c4 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/BasicTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/DistributionPlannerBasicTest.java
@@ -52,7 +52,7 @@ import java.util.Arrays;
 
 import static org.junit.Assert.assertEquals;
 
-public class BasicTest {
+public class DistributionPlannerBasicTest {
 
   @Test
   public void testSingleSeriesScan() throws IllegalPathException {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/LastQueryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/LastQueryTest.java
new file mode 100644
index 0000000000..2a3765dded
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/LastQueryTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.plan.distribution;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
+import org.apache.iotdb.db.mpp.plan.planner.LogicalPlanBuilder;
+import org.apache.iotdb.db.mpp.plan.planner.distribution.DistributionPlanner;
+import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class LastQueryTest {
+
+  @Test
+  public void testLastQuery1Series1Region() throws IllegalPathException {
+    String d2s1Path = "root.sg.d22.s1";
+    MPPQueryContext context =
+        new MPPQueryContext(
+            "", new QueryId("test_last_1_series_1_region"), null, new TEndPoint(), new TEndPoint());
+    DistributionPlanner planner =
+        new DistributionPlanner(
+            Util.constructAnalysis(),
+            constructLastQuery(Collections.singletonList(d2s1Path), context));
+
+    DistributedQueryPlan distributedQueryPlan = planner.planFragments();
+    Assert.assertEquals(1, distributedQueryPlan.getInstances().size());
+  }
+
+  @Test
+  public void testLastQuery1Series2Region() throws IllegalPathException {
+    String d1s1Path = "root.sg.d1.s1";
+    MPPQueryContext context =
+        new MPPQueryContext(
+            "", new QueryId("test_last_1_series_2_region"), null, new TEndPoint(), new TEndPoint());
+    DistributionPlanner planner =
+        new DistributionPlanner(
+            Util.constructAnalysis(),
+            constructLastQuery(Collections.singletonList(d1s1Path), context));
+
+    DistributedQueryPlan distributedQueryPlan = planner.planFragments();
+    Assert.assertEquals(2, distributedQueryPlan.getInstances().size());
+  }
+
+  @Test
+  public void testLastQuery2Series3Region() throws IllegalPathException {
+    String d1s1Path = "root.sg.d1.s1";
+    String d2s1Path = "root.sg.d22.s1";
+    MPPQueryContext context =
+        new MPPQueryContext(
+            "", new QueryId("test_last_1_series_2_region"), null, new TEndPoint(), new TEndPoint());
+    DistributionPlanner planner =
+        new DistributionPlanner(
+            Util.constructAnalysis(),
+            constructLastQuery(Arrays.asList(d1s1Path, d2s1Path), context));
+
+    DistributedQueryPlan distributedQueryPlan = planner.planFragments();
+    Assert.assertEquals(3, distributedQueryPlan.getInstances().size());
+  }
+
+  @Test
+  public void testLastQuery2Series2Region() throws IllegalPathException {
+    String d3s1Path = "root.sg.d333.s1";
+    String d4s1Path = "root.sg.d4444.s1";
+    MPPQueryContext context =
+        new MPPQueryContext(
+            "", new QueryId("test_last_1_series_2_region"), null, new TEndPoint(), new TEndPoint());
+    DistributionPlanner planner =
+        new DistributionPlanner(
+            Util.constructAnalysis(),
+            constructLastQuery(Arrays.asList(d3s1Path, d4s1Path), context));
+
+    DistributedQueryPlan distributedQueryPlan = planner.planFragments();
+    Assert.assertEquals(2, distributedQueryPlan.getInstances().size());
+  }
+
+  private LogicalQueryPlan constructLastQuery(List<String> paths, MPPQueryContext context)
+      throws IllegalPathException {
+    LogicalPlanBuilder builder = new LogicalPlanBuilder(context);
+    Set<Expression> expressions = new HashSet<>();
+    for (String path : paths) {
+      expressions.add(new TimeSeriesOperand(new MeasurementPath(path)));
+    }
+    PlanNode root = builder.planLast(expressions, null).getRoot();
+    return new LogicalQueryPlan(context, root);
+  }
+}