You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/26 07:38:30 UTC
[iotdb] 01/01: complete last query distribution plan
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/last_query_distribution
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 15ee42cc3f9793affcdd75ef3d54e622c2abf6d2
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu May 26 15:38:16 2022 +0800
complete last query distribution plan
---
.../planner/distribution/ExchangeNodeAdder.java | 19 ++++
.../plan/planner/distribution/SourceRewriter.java | 93 ++++++++++------
.../plan/node/process/LastQueryMergeNode.java | 2 +-
.../plan/node/source/AlignedLastQueryScanNode.java | 14 ++-
.../plan/node/source/LastQueryScanNode.java | 14 ++-
...Test.java => DistributionPlannerBasicTest.java} | 2 +-
.../mpp/plan/plan/distribution/LastQueryTest.java | 118 +++++++++++++++++++++
7 files changed, 228 insertions(+), 34 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
index daef502c7b..6c502c464f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
@@ -33,10 +33,13 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeS
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LastQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
@@ -140,6 +143,17 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
return processNoChildSourceNode(node, context);
}
+ @Override
+ public PlanNode visitLastQueryScan(LastQueryScanNode node, NodeGroupContext context) {
+ return processNoChildSourceNode(node, context);
+ }
+
+ @Override
+ public PlanNode visitAlignedLastQueryScan(
+ AlignedLastQueryScanNode node, NodeGroupContext context) {
+ return processNoChildSourceNode(node, context);
+ }
+
public PlanNode visitSeriesAggregationScan(
SeriesAggregationScanNode node, NodeGroupContext context) {
return processNoChildSourceNode(node, context);
@@ -183,6 +197,11 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
return node;
}
+ @Override
+ public PlanNode visitLastQueryMerge(LastQueryMergeNode node, NodeGroupContext context) {
+ return processMultiChildNode(node, context);
+ }
+
@Override
public PlanNode visitTimeJoin(TimeJoinNode node, NodeGroupContext context) {
return processMultiChildNode(node, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index d72a1d1b60..65d4d6552f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -36,10 +36,13 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryS
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LastQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSourceNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
@@ -214,21 +217,64 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
// TODO: (xingtanzjr) a temporary way to resolve the distribution of single SeriesScanNode issue
@Override
public PlanNode visitSeriesScan(SeriesScanNode node, DistributionPlanContext context) {
+ TimeJoinNode timeJoinNode =
+ new TimeJoinNode(context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder());
+ return processRawSeriesScan(node, context, timeJoinNode);
+ }
+
+ @Override
+ public PlanNode visitAlignedSeriesScan(
+ AlignedSeriesScanNode node, DistributionPlanContext context) {
+ TimeJoinNode timeJoinNode =
+ new TimeJoinNode(context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder());
+ return processRawSeriesScan(node, context, timeJoinNode);
+ }
+
+ @Override
+ public PlanNode visitLastQueryScan(LastQueryScanNode node, DistributionPlanContext context) {
+ LastQueryMergeNode mergeNode =
+ new LastQueryMergeNode(
+ context.queryContext.getQueryId().genPlanNodeId(), node.getPartitionTimeFilter());
+ return processRawSeriesScan(node, context, mergeNode);
+ }
+
+ @Override
+ public PlanNode visitAlignedLastQueryScan(
+ AlignedLastQueryScanNode node, DistributionPlanContext context) {
+ LastQueryMergeNode mergeNode =
+ new LastQueryMergeNode(
+ context.queryContext.getQueryId().genPlanNodeId(), node.getPartitionTimeFilter());
+ return processRawSeriesScan(node, context, mergeNode);
+ }
+
+ private PlanNode processRawSeriesScan(
+ SeriesSourceNode node, DistributionPlanContext context, MultiChildNode parent) {
+ List<SeriesSourceNode> sourceNodes = splitSeriesSourceNodeByPartition(node, context);
+ if (sourceNodes.size() == 1) {
+ return sourceNodes.get(0);
+ }
+ sourceNodes.forEach(parent::addChild);
+ return parent;
+ }
+
+ private List<SeriesSourceNode> splitSeriesSourceNodeByPartition(
+ SeriesSourceNode node, DistributionPlanContext context) {
+ List<SeriesSourceNode> ret = new ArrayList<>();
List<TRegionReplicaSet> dataDistribution =
- analysis.getPartitionInfo(node.getSeriesPath(), node.getTimeFilter());
+ analysis.getPartitionInfo(node.getPartitionPath(), node.getPartitionTimeFilter());
if (dataDistribution.size() == 1) {
node.setRegionReplicaSet(dataDistribution.get(0));
- return node;
+ ret.add(node);
+ return ret;
}
- TimeJoinNode timeJoinNode =
- new TimeJoinNode(context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder());
+
for (TRegionReplicaSet dataRegion : dataDistribution) {
- SeriesScanNode split = (SeriesScanNode) node.clone();
+ SeriesSourceNode split = (SeriesSourceNode) node.clone();
split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
split.setRegionReplicaSet(dataRegion);
- timeJoinNode.addChild(split);
+ ret.add(split);
}
- return timeJoinNode;
+ return ret;
}
@Override
@@ -286,26 +332,6 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
return aggregationNode;
}
- @Override
- public PlanNode visitAlignedSeriesScan(
- AlignedSeriesScanNode node, DistributionPlanContext context) {
- List<TRegionReplicaSet> dataDistribution =
- analysis.getPartitionInfo(node.getAlignedPath(), node.getTimeFilter());
- if (dataDistribution.size() == 1) {
- node.setRegionReplicaSet(dataDistribution.get(0));
- return node;
- }
- TimeJoinNode timeJoinNode =
- new TimeJoinNode(context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder());
- for (TRegionReplicaSet dataRegion : dataDistribution) {
- AlignedSeriesScanNode split = (AlignedSeriesScanNode) node.clone();
- split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
- split.setRegionReplicaSet(dataRegion);
- timeJoinNode.addChild(split);
- }
- return timeJoinNode;
- }
-
@Override
public PlanNode visitSchemaFetchMerge(
SchemaFetchMergeNode node, DistributionPlanContext context) {
@@ -335,6 +361,11 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
return root;
}
+ @Override
+ public PlanNode visitLastQueryMerge(LastQueryMergeNode node, DistributionPlanContext context) {
+ return processRawMultiChildNode(node, context);
+ }
+
@Override
public PlanNode visitTimeJoin(TimeJoinNode node, DistributionPlanContext context) {
// Although some logic is similar between Aggregation and RawDataQuery,
@@ -343,8 +374,11 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
if (isAggregationQuery(node)) {
return planAggregationWithTimeJoin(node, context);
}
+ return processRawMultiChildNode(node, context);
+ }
- TimeJoinNode root = (TimeJoinNode) node.clone();
+ private PlanNode processRawMultiChildNode(MultiChildNode node, DistributionPlanContext context) {
+ MultiChildNode root = (MultiChildNode) node.clone();
// Step 1: Get all source nodes. For the node which is not source, add it as the child of
// current TimeJoinNode
List<SourceNode> sources = new ArrayList<>();
@@ -385,7 +419,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
} else {
// We clone a TimeJoinNode from root to make the params to be consistent.
// But we need to assign a new ID to it
- TimeJoinNode parentOfGroup = (TimeJoinNode) root.clone();
+ MultiChildNode parentOfGroup = (MultiChildNode) root.clone();
parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
seriesScanNodes.forEach(parentOfGroup::addChild);
root.addChild(parentOfGroup);
@@ -402,7 +436,6 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
root.addChild(visit(child, context));
}
}
-
return root;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java
index 144ea7dc76..d697835fdc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java
@@ -35,7 +35,7 @@ import java.util.Objects;
import static org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS;
-public class LastQueryMergeNode extends ProcessNode {
+public class LastQueryMergeNode extends MultiChildNode {
// make sure child in list has been ordered by their sensor name
private List<PlanNode> children;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedLastQueryScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedLastQueryScanNode.java
index 66c8a54f34..514d2485ed 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedLastQueryScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedLastQueryScanNode.java
@@ -19,12 +19,14 @@
package org.apache.iotdb.db.mpp.plan.planner.plan.node.source;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import com.google.common.collect.ImmutableList;
@@ -34,7 +36,7 @@ import java.util.Objects;
import static org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS;
-public class AlignedLastQueryScanNode extends SourceNode {
+public class AlignedLastQueryScanNode extends SeriesSourceNode {
// The path of the target series which will be scanned.
private final AlignedPath seriesPath;
@@ -136,4 +138,14 @@ public class AlignedLastQueryScanNode extends SourceNode {
public AlignedPath getSeriesPath() {
return seriesPath;
}
+
+ @Override
+ public PartialPath getPartitionPath() {
+ return seriesPath;
+ }
+
+ @Override
+ public Filter getPartitionTimeFilter() {
+ return null;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java
index be97c5187d..e1db6e1d6a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java
@@ -19,12 +19,14 @@
package org.apache.iotdb.db.mpp.plan.planner.plan.node.source;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import com.google.common.collect.ImmutableList;
@@ -32,7 +34,7 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
-public class LastQueryScanNode extends SourceNode {
+public class LastQueryScanNode extends SeriesSourceNode {
public static final List<String> LAST_QUERY_HEADER_COLUMNS =
ImmutableList.of("timeseries", "value", "dataType");
@@ -138,4 +140,14 @@ public class LastQueryScanNode extends SourceNode {
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
return new LastQueryScanNode(planNodeId, partialPath);
}
+
+ @Override
+ public PartialPath getPartitionPath() {
+ return seriesPath;
+ }
+
+ @Override
+ public Filter getPartitionTimeFilter() {
+ return null;
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/BasicTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/DistributionPlannerBasicTest.java
similarity index 99%
rename from server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/BasicTest.java
rename to server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/DistributionPlannerBasicTest.java
index f6133ac41c..ea762fb2c4 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/BasicTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/DistributionPlannerBasicTest.java
@@ -52,7 +52,7 @@ import java.util.Arrays;
import static org.junit.Assert.assertEquals;
-public class BasicTest {
+public class DistributionPlannerBasicTest {
@Test
public void testSingleSeriesScan() throws IllegalPathException {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/LastQueryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/LastQueryTest.java
new file mode 100644
index 0000000000..2a3765dded
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/LastQueryTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.plan.distribution;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
+import org.apache.iotdb.db.mpp.plan.planner.LogicalPlanBuilder;
+import org.apache.iotdb.db.mpp.plan.planner.distribution.DistributionPlanner;
+import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class LastQueryTest {
+
+ @Test
+ public void testLastQuery1Series1Region() throws IllegalPathException {
+ String d2s1Path = "root.sg.d22.s1";
+ MPPQueryContext context =
+ new MPPQueryContext(
+ "", new QueryId("test_last_1_series_1_region"), null, new TEndPoint(), new TEndPoint());
+ DistributionPlanner planner =
+ new DistributionPlanner(
+ Util.constructAnalysis(),
+ constructLastQuery(Collections.singletonList(d2s1Path), context));
+
+ DistributedQueryPlan distributedQueryPlan = planner.planFragments();
+ Assert.assertEquals(1, distributedQueryPlan.getInstances().size());
+ }
+
+ @Test
+ public void testLastQuery1Series2Region() throws IllegalPathException {
+ String d1s1Path = "root.sg.d1.s1";
+ MPPQueryContext context =
+ new MPPQueryContext(
+ "", new QueryId("test_last_1_series_2_region"), null, new TEndPoint(), new TEndPoint());
+ DistributionPlanner planner =
+ new DistributionPlanner(
+ Util.constructAnalysis(),
+ constructLastQuery(Collections.singletonList(d1s1Path), context));
+
+ DistributedQueryPlan distributedQueryPlan = planner.planFragments();
+ Assert.assertEquals(2, distributedQueryPlan.getInstances().size());
+ }
+
+ @Test
+ public void testLastQuery2Series3Region() throws IllegalPathException {
+ String d1s1Path = "root.sg.d1.s1";
+ String d2s1Path = "root.sg.d22.s1";
+ MPPQueryContext context =
+ new MPPQueryContext(
+ "", new QueryId("test_last_1_series_2_region"), null, new TEndPoint(), new TEndPoint());
+ DistributionPlanner planner =
+ new DistributionPlanner(
+ Util.constructAnalysis(),
+ constructLastQuery(Arrays.asList(d1s1Path, d2s1Path), context));
+
+ DistributedQueryPlan distributedQueryPlan = planner.planFragments();
+ Assert.assertEquals(3, distributedQueryPlan.getInstances().size());
+ }
+
+ @Test
+ public void testLastQuery2Series2Region() throws IllegalPathException {
+ String d3s1Path = "root.sg.d333.s1";
+ String d4s1Path = "root.sg.d4444.s1";
+ MPPQueryContext context =
+ new MPPQueryContext(
+ "", new QueryId("test_last_1_series_2_region"), null, new TEndPoint(), new TEndPoint());
+ DistributionPlanner planner =
+ new DistributionPlanner(
+ Util.constructAnalysis(),
+ constructLastQuery(Arrays.asList(d3s1Path, d4s1Path), context));
+
+ DistributedQueryPlan distributedQueryPlan = planner.planFragments();
+ Assert.assertEquals(2, distributedQueryPlan.getInstances().size());
+ }
+
+ private LogicalQueryPlan constructLastQuery(List<String> paths, MPPQueryContext context)
+ throws IllegalPathException {
+ LogicalPlanBuilder builder = new LogicalPlanBuilder(context);
+ Set<Expression> expressions = new HashSet<>();
+ for (String path : paths) {
+ expressions.add(new TimeSeriesOperand(new MeasurementPath(path)));
+ }
+ PlanNode root = builder.planLast(expressions, null).getRoot();
+ return new LogicalQueryPlan(context, root);
+ }
+}