You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/06 08:37:19 UTC
[iotdb] branch xingtanzjr/query_execution updated: rewrite the Id related logic
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/query_execution
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/xingtanzjr/query_execution by this push:
new f0f7eab8c1 rewrite the Id related logic
f0f7eab8c1 is described below
commit f0f7eab8c1a5ee2a14981f2df39327d481a846b5
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Apr 6 16:37:08 2022 +0800
rewrite the Id related logic
---
.../iotdb/db/mpp/common/MPPQueryContext.java | 16 +++++-
.../apache/iotdb/db/mpp/common/PlanFragmentId.java | 7 +++
.../org/apache/iotdb/db/mpp/common/QueryId.java | 14 +++++
.../ResultNodeContext.java} | 21 +++----
.../apache/iotdb/db/mpp/execution/Coordinator.java | 7 ++-
.../iotdb/db/mpp/execution/QueryExecution.java | 21 ++++---
.../db/mpp/sql/planner/DistributionPlanner.java | 65 +++++++++++++++++-----
.../iotdb/db/mpp/sql/planner/LogicalPlanner.java | 39 +++++++------
.../db/mpp/sql/planner/plan/PlanFragment.java | 4 ++
.../sql/planner/plan/node/process/LimitNode.java | 3 +-
.../planner/plan/node/process/TimeJoinNode.java | 4 +-
.../db/mpp/sql/statement/StatementVisitor.java | 4 --
.../iotdb/db/query/expression/Expression.java | 3 +-
.../query/expression/binary/BinaryExpression.java | 3 +-
.../db/query/expression/unary/ConstantOperand.java | 3 +-
.../query/expression/unary/FunctionExpression.java | 6 +-
.../query/expression/unary/LogicNotExpression.java | 3 +-
.../query/expression/unary/NegationExpression.java | 3 +-
.../query/expression/unary/TimeSeriesOperand.java | 6 +-
.../db/mpp/execution/QueryStateMachineTest.java | 32 ++++++-----
.../iotdb/db/mpp/sql/analyze/AnalyzerTest.java | 3 +-
.../db/mpp/sql/plan/DistributionPlannerTest.java | 57 +++++++++----------
.../iotdb/db/mpp/sql/plan/LogicalPlannerTest.java | 8 +--
thrift/src/main/thrift/mpp.thrift | 2 +-
24 files changed, 208 insertions(+), 126 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
index ee866653fd..b58e94e91a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.common;
import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
/**
* This class is used to record the context of a query including QueryId, query statement, session
@@ -29,14 +30,19 @@ public class MPPQueryContext {
private QueryId queryId;
private SessionInfo session;
private QueryType queryType;
+ private String hostname;
+ private PlanNodeId virtualResultNodeId;
- public MPPQueryContext() {}
+ public MPPQueryContext(QueryId queryId) {
+ this.queryId = queryId;
+ }
public MPPQueryContext(String sql, QueryId queryId, SessionInfo session, QueryType type) {
this.sql = sql;
this.queryId = queryId;
this.session = session;
this.queryType = type;
+ this.virtualResultNodeId = getVirtualResultNodeId(queryId);
}
public QueryId getQueryId() {
@@ -46,4 +52,12 @@ public class MPPQueryContext {
public QueryType getQueryType() {
return queryType;
}
+
+ public String getHostname() {
+ return hostname;
+ }
+
+ private PlanNodeId getVirtualResultNodeId(QueryId queryId) {
+ return new PlanNodeId(String.format("%s_result_node", queryId.getId()));
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
index e90890648a..b077f08d1d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
@@ -31,6 +31,8 @@ public class PlanFragmentId {
private final QueryId queryId;
private final int id;
+ private int nextFragmentInstanceId;
+
public static PlanFragmentId valueOf(String stageId) {
List<String> ids = QueryId.parseDottedId(stageId, 2, "stageId");
return valueOf(ids);
@@ -48,6 +50,11 @@ public class PlanFragmentId {
public PlanFragmentId(QueryId queryId, int id) {
this.queryId = requireNonNull(queryId, "queryId is null");
this.id = id;
+ this.nextFragmentInstanceId = 0;
+ }
+
+ public FragmentInstanceId genFragmentInstanceId() {
+ return new FragmentInstanceId(this, String.valueOf(nextFragmentInstanceId++));
}
public QueryId getQueryId() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
index abdaf74269..c7959d661b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.common;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
@@ -32,6 +33,9 @@ public class QueryId {
private final String id;
+ private int nextPlanNodeIndex;
+ private int nextPlanFragmentIndex;
+
public static QueryId valueOf(String queryId) {
// ID is verified in the constructor
return new QueryId(queryId);
@@ -39,6 +43,16 @@ public class QueryId {
public QueryId(String id) {
this.id = validateId(id);
+ this.nextPlanNodeIndex = 0;
+ this.nextPlanFragmentIndex = 0;
+ }
+
+ public PlanNodeId genPlanNodeId() {
+ return new PlanNodeId(String.format("%s_%d", id, nextPlanNodeIndex++));
+ }
+
+ public PlanFragmentId genPlanFragmentId() {
+ return new PlanFragmentId(this, nextPlanFragmentIndex++);
}
public String getId() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeIdAllocator.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/ResultNodeContext.java
similarity index 65%
rename from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeIdAllocator.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/common/ResultNodeContext.java
index 8176239b96..e40a4bac44 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeIdAllocator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/ResultNodeContext.java
@@ -17,20 +17,17 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.sql.planner.plan.node;
+package org.apache.iotdb.db.mpp.common;
-import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-public class PlanNodeIdAllocator {
- public static int initialId = 0;
+public class ResultNodeContext {
+ private QueryId queryId;
+ private FragmentInstanceId virtualResultInstanceId;
+ private PlanNodeId virtualResultNodeId;
- public static synchronized PlanNodeId generateId() {
- initialId++;
- return new PlanNodeId(String.valueOf(initialId));
- }
-
- @TestOnly
- public static synchronized void reset() {
- initialId = 0;
+ public ResultNodeContext(QueryId queryId) {
+ this.queryId = queryId;
+ // this.virtualResultInstanceId = new FragmentInstanceId(new PlanFragmentId(queryId, ))
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index 4e3f8ad681..1cd95a0202 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -80,10 +80,15 @@ public class Coordinator {
COORDINATOR_SCHEDULED_EXECUTOR_SIZE, COORDINATOR_SCHEDULED_EXECUTOR_NAME);
}
+ // Get the hostname of current coordinator
+ private String getHostname() {
+ // TODO: (xingtanzjr) how to get the hostname ?
+ return "";
+ }
+
public static Coordinator getInstance() {
return INSTANCE;
}
-
// private TQueryResponse executeQuery(TQueryRequest request) {
//
// }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index bb37be005b..0e2bc6c240 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.mpp.execution;
-import com.google.common.util.concurrent.SettableFuture;
import org.apache.iotdb.db.mpp.buffer.ISourceHandle;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.execution.scheduler.ClusterScheduler;
@@ -34,6 +33,8 @@ import org.apache.iotdb.db.mpp.sql.statement.Statement;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
+import com.google.common.util.concurrent.SettableFuture;
+
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -136,11 +137,8 @@ public class QueryExecution {
releaseResource();
}
- /**
- * Release the resources that current QueryExecution hold.
- */
- private void releaseResource() {
- }
+ /** Release the resources that current QueryExecution hold. */
+ private void releaseResource() {}
/**
* This method will be called by the request thread from client connection. This method will block
@@ -163,11 +161,12 @@ public class QueryExecution {
// Although we monitor the state to transition to RUNNING, the future will return if any
// Terminated state is triggered
SettableFuture<QueryState> future = SettableFuture.create();
- stateMachine.addStateChangeListener(state -> {
- if (state == QueryState.RUNNING || state.isDone()) {
- future.set(state);
- }
- });
+ stateMachine.addStateChangeListener(
+ state -> {
+ if (state == QueryState.RUNNING || state.isDone()) {
+ future.set(state);
+ }
+ });
try {
QueryState state = future.get();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index d4d713f1fa..5b6a24bc4c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.sql.planner;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.planner.plan.*;
@@ -39,6 +40,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList;
public class DistributionPlanner {
private Analysis analysis;
+ private MPPQueryContext context;
private LogicalQueryPlan logicalPlan;
private int planFragmentIndex = 0;
@@ -46,30 +48,27 @@ public class DistributionPlanner {
public DistributionPlanner(Analysis analysis, LogicalQueryPlan logicalPlan) {
this.analysis = analysis;
this.logicalPlan = logicalPlan;
+ this.context = logicalPlan.getContext();
}
public PlanNode rewriteSource() {
SourceRewriter rewriter = new SourceRewriter();
- return rewriter.visit(logicalPlan.getRootNode(), new DistributionPlanContext());
+ return rewriter.visit(logicalPlan.getRootNode(), new DistributionPlanContext(context));
}
public PlanNode addExchangeNode(PlanNode root) {
ExchangeNodeAdder adder = new ExchangeNodeAdder();
- return adder.visit(root, new NodeGroupContext());
+ return adder.visit(root, new NodeGroupContext(context));
}
public SubPlan splitFragment(PlanNode root) {
- FragmentBuilder fragmentBuilder = new FragmentBuilder();
+ FragmentBuilder fragmentBuilder = new FragmentBuilder(context);
return fragmentBuilder.splitToSubPlan(root);
}
public DistributedQueryPlan planFragments() {
PlanNode rootAfterRewrite = rewriteSource();
- System.out.println("===== Step 2: Partition SourceNode =====");
- System.out.println(PlanNodeUtil.nodeToString(rootAfterRewrite));
PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
- System.out.println("===== Step 3: Add ExchangeNode =====");
- System.out.println(PlanNodeUtil.nodeToString(rootWithExchange));
SubPlan subPlan = splitFragment(rootWithExchange);
List<FragmentInstance> fragmentInstances = planFragmentInstances(subPlan);
return new DistributedQueryPlan(
@@ -83,6 +82,25 @@ public class DistributionPlanner {
return parallelPlaner.parallelPlan();
}
+ // TODO: (xingtanzjr) Maybe we should handle ResultNode in LogicalPlanner ?
+ public void SetSinkForRootInstance(SubPlan subPlan, List<FragmentInstance> instances) {
+ FragmentInstance rootInstance = null;
+ for (FragmentInstance instance : instances) {
+ if (instance.getFragment().getId().equals(subPlan.getPlanFragment().getId())) {
+ rootInstance = instance;
+ break;
+ }
+ }
+ // root should not be null during normal process
+ if (rootInstance == null) {
+ return;
+ }
+ FragmentSinkNode sinkNode = new FragmentSinkNode(context.getQueryId().genPlanNodeId());
+ sinkNode.setChild(rootInstance.getFragment().getRoot());
+ rootInstance.getFragment().setRoot(sinkNode);
+ // sinkNode.setDownStream();
+ }
+
private PlanFragmentId getNextFragmentId() {
return new PlanFragmentId(this.logicalPlan.getContext().getQueryId(), this.planFragmentIndex++);
}
@@ -111,7 +129,7 @@ public class DistributionPlanner {
// SeriesScanNode.
for (RegionReplicaSet dataRegion : dataDistribution) {
SeriesScanNode split = (SeriesScanNode) handle.clone();
- split.setId(PlanNodeIdAllocator.generateId());
+ split.setId(context.queryContext.getQueryId().genPlanNodeId());
split.setDataRegionReplicaSet(dataRegion);
sources.add(split);
}
@@ -140,8 +158,10 @@ public class DistributionPlanner {
if (seriesScanNodes.size() == 1) {
root.addChild(seriesScanNodes.get(0));
} else {
- // We clone a TimeJoinNode from root to make the params to be consistent
+ // We clone a TimeJoinNode from root to make the params to be consistent.
+ // But we need to assign a new ID to it
TimeJoinNode parentOfGroup = (TimeJoinNode) root.clone();
+ root.setId(context.queryContext.getQueryId().genPlanNodeId());
seriesScanNodes.forEach(parentOfGroup::addChild);
root.addChild(parentOfGroup);
}
@@ -155,7 +175,13 @@ public class DistributionPlanner {
}
}
- private class DistributionPlanContext {}
+ private class DistributionPlanContext {
+ private MPPQueryContext queryContext;
+
+ public DistributionPlanContext(MPPQueryContext queryContext) {
+ this.queryContext = queryContext;
+ }
+ }
private class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
@Override
@@ -220,7 +246,8 @@ public class DistributionPlanner {
visitedChildren.forEach(
child -> {
if (!dataRegion.equals(context.getNodeDistribution(child.getId()).dataRegion)) {
- ExchangeNode exchangeNode = new ExchangeNode(PlanNodeIdAllocator.generateId());
+ ExchangeNode exchangeNode =
+ new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
exchangeNode.setChild(child);
newNode.addChild(exchangeNode);
} else {
@@ -255,10 +282,12 @@ public class DistributionPlanner {
}
private class NodeGroupContext {
- Map<PlanNodeId, NodeDistribution> nodeDistribution;
+ private MPPQueryContext queryContext;
+ private Map<PlanNodeId, NodeDistribution> nodeDistribution;
- public NodeGroupContext() {
- nodeDistribution = new HashMap<>();
+ public NodeGroupContext(MPPQueryContext queryContext) {
+ this.queryContext = queryContext;
+ this.nodeDistribution = new HashMap<>();
}
public void putNodeDistribution(PlanNodeId nodeId, NodeDistribution distribution) {
@@ -288,6 +317,12 @@ public class DistributionPlanner {
}
private class FragmentBuilder {
+ private MPPQueryContext context;
+
+ public FragmentBuilder(MPPQueryContext context) {
+ this.context = context;
+ }
+
public SubPlan splitToSubPlan(PlanNode root) {
SubPlan rootSubPlan = createSubPlan(root);
splitToSubPlan(root, rootSubPlan);
@@ -298,7 +333,7 @@ public class DistributionPlanner {
if (root instanceof ExchangeNode) {
// We add a FragmentSinkNode for newly created PlanFragment
ExchangeNode exchangeNode = (ExchangeNode) root;
- FragmentSinkNode sinkNode = new FragmentSinkNode(PlanNodeIdAllocator.generateId());
+ FragmentSinkNode sinkNode = new FragmentSinkNode(context.getQueryId().genPlanNodeId());
sinkNode.setChild(exchangeNode.getChild());
sinkNode.setDownStreamNode(exchangeNode);
// Record the source node info in the ExchangeNode so that we can keep the connection of
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
index a753f4555c..f8c2a38d90 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.optimization.PlanOptimizer;
import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeIdAllocator;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AlterTimeSeriesNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
@@ -60,7 +59,7 @@ public class LogicalPlanner {
}
public LogicalQueryPlan plan(Analysis analysis) {
- PlanNode rootNode = new LogicalPlanVisitor(analysis).process(analysis.getStatement());
+ PlanNode rootNode = new LogicalPlanVisitor(analysis).process(analysis.getStatement(), context);
// optimize the query logical plan
if (analysis.getStatement() instanceof QueryStatement) {
@@ -127,7 +126,7 @@ public class LogicalPlanner {
}
if (queryStatement.isAlignByDevice()) {
- DeviceMergeNode deviceMergeNode = new DeviceMergeNode(PlanNodeIdAllocator.generateId());
+ DeviceMergeNode deviceMergeNode = new DeviceMergeNode(context.getQueryId().genPlanNodeId());
for (Map.Entry<String, Set<SourceNode>> entry : deviceNameToSourceNodesMap.entrySet()) {
String deviceName = entry.getKey();
List<PlanNode> planNodes = new ArrayList<>(entry.getValue());
@@ -136,7 +135,7 @@ public class LogicalPlanner {
} else {
TimeJoinNode timeJoinNode =
new TimeJoinNode(
- PlanNodeIdAllocator.generateId(),
+ context.getQueryId().genPlanNodeId(),
queryStatement.getResultOrder(),
null,
planNodes);
@@ -152,13 +151,18 @@ public class LogicalPlanner {
.collect(Collectors.toList());
TimeJoinNode timeJoinNode =
new TimeJoinNode(
- PlanNodeIdAllocator.generateId(), queryStatement.getResultOrder(), null, planNodes);
+ context.getQueryId().genPlanNodeId(),
+ queryStatement.getResultOrder(),
+ null,
+ planNodes);
return new PlanBuilder(timeJoinNode);
}
private Set<SourceNode> planResultColumn(ResultColumn resultColumn) {
Set<SourceNode> resultSourceNodeSet = new HashSet<>();
- resultColumn.getExpression().collectPlanNode(resultSourceNodeSet);
+ resultColumn
+ .getExpression()
+ .collectPlanNode(resultSourceNodeSet, context.getQueryId().genPlanNodeId());
return resultSourceNodeSet;
}
@@ -168,7 +172,7 @@ public class LogicalPlanner {
}
return planBuilder.withNewRoot(
- new FilterNode(PlanNodeIdAllocator.generateId(), planBuilder.getRoot(), queryFilter));
+ new FilterNode(context.getQueryId().genPlanNodeId(), planBuilder.getRoot(), queryFilter));
}
private PlanBuilder planGroupByLevel(
@@ -179,7 +183,7 @@ public class LogicalPlanner {
return planBuilder.withNewRoot(
new GroupByLevelNode(
- PlanNodeIdAllocator.generateId(),
+ context.getQueryId().genPlanNodeId(),
planBuilder.getRoot(),
groupByLevelComponent.getLevels(),
groupByLevelComponent.getGroupedPathMap()));
@@ -198,7 +202,7 @@ public class LogicalPlanner {
return planBuilder.withNewRoot(
new FilterNullNode(
- PlanNodeIdAllocator.generateId(),
+ context.getQueryId().genPlanNodeId(),
planBuilder.getRoot(),
filterNullComponent.getWithoutPolicyType(),
filterNullComponent.getWithoutNullColumns().stream()
@@ -212,7 +216,8 @@ public class LogicalPlanner {
}
return planBuilder.withNewRoot(
- new SortNode(PlanNodeIdAllocator.generateId(), planBuilder.getRoot(), null, resultOrder));
+ new SortNode(
+ context.getQueryId().genPlanNodeId(), planBuilder.getRoot(), null, resultOrder));
}
private PlanBuilder planLimit(PlanBuilder planBuilder, int rowLimit) {
@@ -221,7 +226,7 @@ public class LogicalPlanner {
}
return planBuilder.withNewRoot(
- new LimitNode(PlanNodeIdAllocator.generateId(), rowLimit, planBuilder.getRoot()));
+ new LimitNode(context.getQueryId().genPlanNodeId(), rowLimit, planBuilder.getRoot()));
}
private PlanBuilder planOffset(PlanBuilder planBuilder, int rowOffset) {
@@ -230,14 +235,14 @@ public class LogicalPlanner {
}
return planBuilder.withNewRoot(
- new OffsetNode(PlanNodeIdAllocator.generateId(), planBuilder.getRoot(), rowOffset));
+ new OffsetNode(context.getQueryId().genPlanNodeId(), planBuilder.getRoot(), rowOffset));
}
@Override
public PlanNode visitCreateTimeseries(
CreateTimeSeriesStatement createTimeSeriesStatement, MPPQueryContext context) {
return new CreateTimeSeriesNode(
- PlanNodeIdAllocator.generateId(),
+ context.getQueryId().genPlanNodeId(),
createTimeSeriesStatement.getPath(),
createTimeSeriesStatement.getDataType(),
createTimeSeriesStatement.getEncoding(),
@@ -253,7 +258,7 @@ public class LogicalPlanner {
CreateAlignedTimeSeriesStatement createAlignedTimeSeriesStatement,
MPPQueryContext context) {
return new CreateAlignedTimeSeriesNode(
- PlanNodeIdAllocator.generateId(),
+ context.getQueryId().genPlanNodeId(),
createAlignedTimeSeriesStatement.getDevicePath(),
createAlignedTimeSeriesStatement.getMeasurements(),
createAlignedTimeSeriesStatement.getDataTypes(),
@@ -269,7 +274,7 @@ public class LogicalPlanner {
public PlanNode visitAlterTimeseries(
AlterTimeSeriesStatement alterTimeSeriesStatement, MPPQueryContext context) {
return new AlterTimeSeriesNode(
- PlanNodeIdAllocator.generateId(),
+ context.getQueryId().genPlanNodeId(),
alterTimeSeriesStatement.getPath(),
alterTimeSeriesStatement.getAlterType(),
alterTimeSeriesStatement.getAlterMap(),
@@ -290,7 +295,7 @@ public class LogicalPlanner {
insertTabletStatement.getDevicePath(),
Arrays.asList(insertTabletStatement.getMeasurements()));
return new InsertTabletNode(
- PlanNodeIdAllocator.generateId(),
+ context.getQueryId().genPlanNodeId(),
insertTabletStatement.getDevicePath(),
insertTabletStatement.isAligned(),
measurementSchemas.toArray(new MeasurementSchema[0]),
@@ -312,7 +317,7 @@ public class LogicalPlanner {
insertRowStatement.getDevicePath(),
Arrays.asList(insertRowStatement.getMeasurements()));
return new InsertRowNode(
- PlanNodeIdAllocator.generateId(),
+ context.getQueryId().genPlanNodeId(),
insertRowStatement.getDevicePath(),
insertRowStatement.isAligned(),
measurementSchemas.toArray(new MeasurementSchema[0]),
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
index 5863bbfed3..4cf7551e13 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
@@ -46,6 +46,10 @@ public class PlanFragment {
return root;
}
+ public void setRoot(PlanNode root) {
+ this.root = root;
+ }
+
public String toString() {
return String.format("PlanFragment-%s", getId());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
index 5b888d837e..837db3e20b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeIdAllocator;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -60,7 +59,7 @@ public class LimitNode extends ProcessNode {
@Override
public PlanNode clone() {
- return new LimitNode(PlanNodeIdAllocator.generateId(), this.limit);
+ return new LimitNode(getId(), this.limit);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
index 67e874e52e..7aa99e33c7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeIdAllocator;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
@@ -74,8 +73,7 @@ public class TimeJoinNode extends ProcessNode {
@Override
public PlanNode clone() {
- return new TimeJoinNode(
- PlanNodeIdAllocator.generateId(), this.mergeOrder, this.filterNullPolicy);
+ return new TimeJoinNode(getId(), this.mergeOrder, this.filterNullPolicy);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
index 3fd959c57d..cee9a7980c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
@@ -37,10 +37,6 @@ import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
*/
public abstract class StatementVisitor<R, C> {
- public R process(StatementNode node) {
- return process(node, null);
- }
-
public R process(StatementNode node, C context) {
return node.accept(this, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index 47e80bab76..dfdd47fbd1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
@@ -84,7 +85,7 @@ public abstract class Expression {
public abstract void constructUdfExecutors(
Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId);
- public abstract void collectPlanNode(Set<SourceNode> planNodeSet);
+ public abstract void collectPlanNode(Set<SourceNode> planNodeSet, PlanNodeId nodeId);
public abstract void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index 7d61695827..4e482905ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
@@ -200,7 +201,7 @@ public abstract class BinaryExpression extends Expression {
}
@Override
- public void collectPlanNode(Set<SourceNode> planNodeSet) {
+ public void collectPlanNode(Set<SourceNode> planNodeSet, PlanNodeId nodeId) {
// TODO: support nested expressions
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
index 70fe842fe9..1cc9043ceb 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.expression.unary;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
@@ -104,7 +105,7 @@ public class ConstantOperand extends Expression {
}
@Override
- public void collectPlanNode(Set<SourceNode> planNodeSet) {
+ public void collectPlanNode(Set<SourceNode> planNodeSet, PlanNodeId nodeId) {
// Do nothing
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index 3611cbee44..ef77e955a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeIdAllocator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
@@ -236,9 +236,9 @@ public class FunctionExpression extends Expression {
}
@Override
- public void collectPlanNode(Set<SourceNode> planNodeSet) {
+ public void collectPlanNode(Set<SourceNode> planNodeSet, PlanNodeId nodeId) {
if (isBuiltInAggregationFunctionExpression) {
- planNodeSet.add(new SeriesAggregateScanNode(PlanNodeIdAllocator.generateId(), this));
+ planNodeSet.add(new SeriesAggregateScanNode(nodeId, this));
}
// TODO: support UDF
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java
index 983f58c4c2..b1183df457 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.qp.utils.WildcardsRemover;
@@ -129,7 +130,7 @@ public class LogicNotExpression extends Expression {
}
@Override
- public void collectPlanNode(Set<SourceNode> planNodeSet) {
+ public void collectPlanNode(Set<SourceNode> planNodeSet, PlanNodeId nodeId) {
// TODO: support LogicNotExpression
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
index e5ecde9c38..fca985cf79 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
@@ -128,7 +129,7 @@ public class NegationExpression extends Expression {
}
@Override
- public void collectPlanNode(Set<SourceNode> planNodeSet) {
+ public void collectPlanNode(Set<SourceNode> planNodeSet, PlanNodeId nodeId) {
// TODO: support nested expressions
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index 6294372399..f33d353ce1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeIdAllocator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
@@ -114,8 +114,8 @@ public class TimeSeriesOperand extends Expression {
}
@Override
- public void collectPlanNode(Set<SourceNode> planNodeSet) {
- planNodeSet.add(new SeriesScanNode(PlanNodeIdAllocator.generateId(), path));
+ public void collectPlanNode(Set<SourceNode> planNodeSet, PlanNodeId nodeId) {
+ planNodeSet.add(new SeriesScanNode(nodeId, path));
}
@Override
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/QueryStateMachineTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/QueryStateMachineTest.java
index c183655ce5..3cc8db156a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/QueryStateMachineTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/QueryStateMachineTest.java
@@ -19,12 +19,13 @@
package org.apache.iotdb.db.mpp.execution;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.junit.Assert;
import org.junit.Test;
@@ -62,10 +63,10 @@ public class QueryStateMachineTest {
public void TestFragmentInstanceToFinished() {
List<FragmentInstanceId> instanceIds = genFragmentInstanceIdList();
QueryStateMachine stateMachine = genQueryStateMachine();
- for(FragmentInstanceId id : instanceIds) {
+ for (FragmentInstanceId id : instanceIds) {
stateMachine.initialFragInstanceState(id, FragmentInstanceState.RUNNING);
}
- for(FragmentInstanceId id : instanceIds) {
+ for (FragmentInstanceId id : instanceIds) {
stateMachine.updateFragInstanceState(id, FragmentInstanceState.FINISHED);
}
Assert.assertEquals(stateMachine.getState(), QueryState.FINISHED);
@@ -75,7 +76,7 @@ public class QueryStateMachineTest {
public void TestFragmentInstanceToTerminalState() {
List<FragmentInstanceId> instanceIds = genFragmentInstanceIdList();
QueryStateMachine stateMachine = genQueryStateMachine();
- for(FragmentInstanceId id : instanceIds) {
+ for (FragmentInstanceId id : instanceIds) {
stateMachine.initialFragInstanceState(id, FragmentInstanceState.RUNNING);
}
stateMachine.updateFragInstanceState(instanceIds.get(0), FragmentInstanceState.FAILED);
@@ -86,16 +87,18 @@ public class QueryStateMachineTest {
public void TestListener() throws ExecutionException, InterruptedException {
AtomicInteger stateChangeCounter = new AtomicInteger(0);
QueryStateMachine stateMachine = genQueryStateMachine();
- stateMachine.addStateChangeListener(state -> {
- stateChangeCounter.getAndIncrement();
- });
+ stateMachine.addStateChangeListener(
+ state -> {
+ stateChangeCounter.getAndIncrement();
+ });
stateMachine.transitionToFinished();
SettableFuture<QueryState> future = SettableFuture.create();
- stateMachine.addStateChangeListener(state -> {
- if (state == QueryState.FINISHED) {
- future.set(QueryState.FINISHED);
- }
- });
+ stateMachine.addStateChangeListener(
+ state -> {
+ if (state == QueryState.FINISHED) {
+ future.set(QueryState.FINISHED);
+ }
+ });
future.get();
Assert.assertEquals(stateChangeCounter.get(), 2);
}
@@ -113,7 +116,8 @@ public class QueryStateMachineTest {
}
private QueryStateMachine genQueryStateMachine() {
- return new QueryStateMachine(genQueryId(), IoTDBThreadPoolFactory.newSingleThreadExecutor("TestQueryStateMachine"));
+ return new QueryStateMachine(
+ genQueryId(), IoTDBThreadPoolFactory.newSingleThreadExecutor("TestQueryStateMachine"));
}
private List<FragmentInstanceId> genFragmentInstanceIdList() {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/AnalyzerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/AnalyzerTest.java
index 7583f0e673..d8d1880232 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/AnalyzerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/AnalyzerTest.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.mpp.sql.analyze;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
import org.junit.Assert;
@@ -47,7 +48,7 @@ public class AnalyzerTest {
private void assertAnalyzeSemanticException(String sql, String message) {
try {
- Analyzer analyzer = new Analyzer(new MPPQueryContext());
+ Analyzer analyzer = new Analyzer(new MPPQueryContext(new QueryId("test_query")));
analyzer.analyze(StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset()));
fail();
} catch (RuntimeException e) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index 50ae6202ab..ee53241aed 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -38,8 +38,6 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.mpp.sql.planner.plan.SubPlan;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeIdAllocator;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
@@ -56,72 +54,74 @@ public class DistributionPlannerTest {
@Test
public void TestRewriteSourceNode() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_query");
+
TimeJoinNode timeJoinNode =
new TimeJoinNode(
- PlanNodeIdAllocator.generateId(), OrderBy.TIMESTAMP_ASC, FilterNullPolicy.NO_FILTER);
+ queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC, FilterNullPolicy.NO_FILTER);
timeJoinNode.addChild(
- new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s1")));
+ new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d1.s1")));
timeJoinNode.addChild(
- new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s2")));
+ new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d1.s2")));
timeJoinNode.addChild(
- new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d22.s1")));
+ new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d22.s1")));
- LimitNode root = new LimitNode(PlanNodeIdAllocator.generateId(), 10, timeJoinNode);
+ LimitNode root = new LimitNode(queryId.genPlanNodeId(), 10, timeJoinNode);
Analysis analysis = constructAnalysis();
DistributionPlanner planner =
- new DistributionPlanner(analysis, new LogicalQueryPlan(new MPPQueryContext(), root));
+ new DistributionPlanner(analysis, new LogicalQueryPlan(new MPPQueryContext(queryId), root));
PlanNode newRoot = planner.rewriteSource();
- System.out.println(PlanNodeUtil.nodeToString(newRoot));
assertEquals(newRoot.getChildren().get(0).getChildren().size(), 3);
}
@Test
public void TestAddExchangeNode() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_query");
TimeJoinNode timeJoinNode =
new TimeJoinNode(
- PlanNodeIdAllocator.generateId(), OrderBy.TIMESTAMP_ASC, FilterNullPolicy.NO_FILTER);
+ queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC, FilterNullPolicy.NO_FILTER);
timeJoinNode.addChild(
- new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s1")));
+ new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d1.s1")));
timeJoinNode.addChild(
- new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s2")));
+ new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d1.s2")));
timeJoinNode.addChild(
- new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d22.s1")));
+ new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d22.s1")));
- LimitNode root = new LimitNode(PlanNodeIdAllocator.generateId(), 10, timeJoinNode);
+ LimitNode root = new LimitNode(queryId.genPlanNodeId(), 10, timeJoinNode);
Analysis analysis = constructAnalysis();
DistributionPlanner planner =
- new DistributionPlanner(analysis, new LogicalQueryPlan(new MPPQueryContext(), root));
+ new DistributionPlanner(analysis, new LogicalQueryPlan(new MPPQueryContext(queryId), root));
PlanNode rootAfterRewrite = planner.rewriteSource();
PlanNode rootWithExchange = planner.addExchangeNode(rootAfterRewrite);
- // PlanNodeUtil.printPlanNode(rootWithExchange);
assertEquals(rootWithExchange.getChildren().get(0).getChildren().size(), 3);
}
@Test
public void TestSplitFragment() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_query");
TimeJoinNode timeJoinNode =
new TimeJoinNode(
- PlanNodeIdAllocator.generateId(), OrderBy.TIMESTAMP_ASC, FilterNullPolicy.NO_FILTER);
+ queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC, FilterNullPolicy.NO_FILTER);
timeJoinNode.addChild(
- new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s1")));
+ new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d1.s1")));
timeJoinNode.addChild(
- new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s2")));
+ new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d1.s2")));
timeJoinNode.addChild(
- new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d22.s1")));
+ new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d22.s1")));
- LimitNode root = new LimitNode(PlanNodeIdAllocator.generateId(), 10, timeJoinNode);
+ LimitNode root = new LimitNode(queryId.genPlanNodeId(), 10, timeJoinNode);
Analysis analysis = constructAnalysis();
- MPPQueryContext context = new MPPQueryContext("", new QueryId("query1"), null, QueryType.READ);
+ MPPQueryContext context = new MPPQueryContext("", queryId, null, QueryType.READ);
DistributionPlanner planner =
new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
PlanNode rootAfterRewrite = planner.rewriteSource();
@@ -132,22 +132,23 @@ public class DistributionPlannerTest {
@Test
public void TestParallelPlan() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_query");
TimeJoinNode timeJoinNode =
new TimeJoinNode(
- PlanNodeIdAllocator.generateId(), OrderBy.TIMESTAMP_ASC, FilterNullPolicy.NO_FILTER);
+ queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC, FilterNullPolicy.NO_FILTER);
timeJoinNode.addChild(
- new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s1")));
+ new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d1.s1")));
timeJoinNode.addChild(
- new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s2")));
+ new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d1.s2")));
timeJoinNode.addChild(
- new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d333.s1")));
+ new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d333.s1")));
- LimitNode root = new LimitNode(PlanNodeIdAllocator.generateId(), 10, timeJoinNode);
+ LimitNode root = new LimitNode(queryId.genPlanNodeId(), 10, timeJoinNode);
Analysis analysis = constructAnalysis();
- MPPQueryContext context = new MPPQueryContext("", new QueryId("query1"), null, QueryType.READ);
+ MPPQueryContext context = new MPPQueryContext("", queryId, null, QueryType.READ);
DistributionPlanner planner =
new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
DistributedQueryPlan plan = planner.planFragments();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
index 1f8ee23109..403af0d743 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
@@ -22,11 +22,11 @@ package org.apache.iotdb.db.mpp.sql.plan;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
import org.apache.iotdb.db.mpp.sql.planner.LogicalPlanner;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeIdAllocator;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AlterTimeSeriesNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
@@ -53,9 +53,7 @@ public class LogicalPlannerTest {
LogicalPlanPrinter planPrinter = new LogicalPlanPrinter();
@Before
- public void setUp() {
- PlanNodeIdAllocator.reset();
- }
+ public void setUp() {}
@Test
@Ignore
@@ -357,7 +355,7 @@ public class LogicalPlannerTest {
try {
Statement statement =
StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset());
- MPPQueryContext context = new MPPQueryContext();
+ MPPQueryContext context = new MPPQueryContext(new QueryId("test_query"));
// TODO: do analyze after implementing ISchemaFetcher and IPartitionFetcher
// Analyzer analyzer = new Analyzer(context);
// Analysis analysis = analyzer.analyze(statement);
diff --git a/thrift/src/main/thrift/mpp.thrift b/thrift/src/main/thrift/mpp.thrift
index 00b6b6cdeb..25ed861b14 100644
--- a/thrift/src/main/thrift/mpp.thrift
+++ b/thrift/src/main/thrift/mpp.thrift
@@ -23,7 +23,7 @@ namespace java org.apache.iotdb.mpp.rpc.thrift
struct TFragmentInstanceId {
1: required string queryId
2: required string fragmentId
- 3: required string instanceId
+ 3: required i64 instanceId
}
struct GetDataBlockRequest {