You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/06 08:37:19 UTC

[iotdb] branch xingtanzjr/query_execution updated: rewrite the Id related logic

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/query_execution
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/xingtanzjr/query_execution by this push:
     new f0f7eab8c1 rewrite the Id related logic
f0f7eab8c1 is described below

commit f0f7eab8c1a5ee2a14981f2df39327d481a846b5
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Apr 6 16:37:08 2022 +0800

    rewrite the Id related logic
---
 .../iotdb/db/mpp/common/MPPQueryContext.java       | 16 +++++-
 .../apache/iotdb/db/mpp/common/PlanFragmentId.java |  7 +++
 .../org/apache/iotdb/db/mpp/common/QueryId.java    | 14 +++++
 .../ResultNodeContext.java}                        | 21 +++----
 .../apache/iotdb/db/mpp/execution/Coordinator.java |  7 ++-
 .../iotdb/db/mpp/execution/QueryExecution.java     | 21 ++++---
 .../db/mpp/sql/planner/DistributionPlanner.java    | 65 +++++++++++++++++-----
 .../iotdb/db/mpp/sql/planner/LogicalPlanner.java   | 39 +++++++------
 .../db/mpp/sql/planner/plan/PlanFragment.java      |  4 ++
 .../sql/planner/plan/node/process/LimitNode.java   |  3 +-
 .../planner/plan/node/process/TimeJoinNode.java    |  4 +-
 .../db/mpp/sql/statement/StatementVisitor.java     |  4 --
 .../iotdb/db/query/expression/Expression.java      |  3 +-
 .../query/expression/binary/BinaryExpression.java  |  3 +-
 .../db/query/expression/unary/ConstantOperand.java |  3 +-
 .../query/expression/unary/FunctionExpression.java |  6 +-
 .../query/expression/unary/LogicNotExpression.java |  3 +-
 .../query/expression/unary/NegationExpression.java |  3 +-
 .../query/expression/unary/TimeSeriesOperand.java  |  6 +-
 .../db/mpp/execution/QueryStateMachineTest.java    | 32 ++++++-----
 .../iotdb/db/mpp/sql/analyze/AnalyzerTest.java     |  3 +-
 .../db/mpp/sql/plan/DistributionPlannerTest.java   | 57 +++++++++----------
 .../iotdb/db/mpp/sql/plan/LogicalPlannerTest.java  |  8 +--
 thrift/src/main/thrift/mpp.thrift                  |  2 +-
 24 files changed, 208 insertions(+), 126 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
index ee866653fd..b58e94e91a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.mpp.common;
 
 import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 
 /**
  * This class is used to record the context of a query including QueryId, query statement, session
@@ -29,14 +30,19 @@ public class MPPQueryContext {
   private QueryId queryId;
   private SessionInfo session;
   private QueryType queryType;
+  private String hostname;
+  private PlanNodeId virtualResultNodeId;
 
-  public MPPQueryContext() {}
+  public MPPQueryContext(QueryId queryId) {
+    this.queryId = queryId;
+  }
 
   public MPPQueryContext(String sql, QueryId queryId, SessionInfo session, QueryType type) {
     this.sql = sql;
     this.queryId = queryId;
     this.session = session;
     this.queryType = type;
+    this.virtualResultNodeId = getVirtualResultNodeId(queryId);
   }
 
   public QueryId getQueryId() {
@@ -46,4 +52,12 @@ public class MPPQueryContext {
   public QueryType getQueryType() {
     return queryType;
   }
+
+  public String getHostname() {
+    return hostname;
+  }
+
+  private PlanNodeId getVirtualResultNodeId(QueryId queryId) {
+    return new PlanNodeId(String.format("%s_result_node", queryId.getId()));
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
index e90890648a..b077f08d1d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
@@ -31,6 +31,8 @@ public class PlanFragmentId {
   private final QueryId queryId;
   private final int id;
 
+  private int nextFragmentInstanceId;
+
   public static PlanFragmentId valueOf(String stageId) {
     List<String> ids = QueryId.parseDottedId(stageId, 2, "stageId");
     return valueOf(ids);
@@ -48,6 +50,11 @@ public class PlanFragmentId {
   public PlanFragmentId(QueryId queryId, int id) {
     this.queryId = requireNonNull(queryId, "queryId is null");
     this.id = id;
+    this.nextFragmentInstanceId = 0;
+  }
+
+  public FragmentInstanceId genFragmentInstanceId() {
+    return new FragmentInstanceId(this, String.valueOf(nextFragmentInstanceId++));
   }
 
   public QueryId getQueryId() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
index abdaf74269..c7959d661b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.common;
 
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.nio.ByteBuffer;
@@ -32,6 +33,9 @@ public class QueryId {
 
   private final String id;
 
+  private int nextPlanNodeIndex;
+  private int nextPlanFragmentIndex;
+
   public static QueryId valueOf(String queryId) {
     // ID is verified in the constructor
     return new QueryId(queryId);
@@ -39,6 +43,16 @@ public class QueryId {
 
   public QueryId(String id) {
     this.id = validateId(id);
+    this.nextPlanNodeIndex = 0;
+    this.nextPlanFragmentIndex = 0;
+  }
+
+  public PlanNodeId genPlanNodeId() {
+    return new PlanNodeId(String.format("%s_%d", id, nextPlanNodeIndex++));
+  }
+
+  public PlanFragmentId genPlanFragmentId() {
+    return new PlanFragmentId(this, nextPlanFragmentIndex++);
   }
 
   public String getId() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeIdAllocator.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/ResultNodeContext.java
similarity index 65%
rename from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeIdAllocator.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/common/ResultNodeContext.java
index 8176239b96..e40a4bac44 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeIdAllocator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/ResultNodeContext.java
@@ -17,20 +17,17 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.sql.planner.plan.node;
+package org.apache.iotdb.db.mpp.common;
 
-import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 
-public class PlanNodeIdAllocator {
-  public static int initialId = 0;
+public class ResultNodeContext {
+  private QueryId queryId;
+  private FragmentInstanceId virtualResultInstanceId;
+  private PlanNodeId virtualResultNodeId;
 
-  public static synchronized PlanNodeId generateId() {
-    initialId++;
-    return new PlanNodeId(String.valueOf(initialId));
-  }
-
-  @TestOnly
-  public static synchronized void reset() {
-    initialId = 0;
+  public ResultNodeContext(QueryId queryId) {
+    this.queryId = queryId;
+    //    this.virtualResultInstanceId = new FragmentInstanceId(new PlanFragmentId(queryId, ))
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index 4e3f8ad681..1cd95a0202 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -80,10 +80,15 @@ public class Coordinator {
         COORDINATOR_SCHEDULED_EXECUTOR_SIZE, COORDINATOR_SCHEDULED_EXECUTOR_NAME);
   }
 
+  // Get the hostname of current coordinator
+  private String getHostname() {
+    // TODO: (xingtanzjr) how to get the hostname ?
+    return "";
+  }
+
   public static Coordinator getInstance() {
     return INSTANCE;
   }
-
   //    private TQueryResponse executeQuery(TQueryRequest request) {
   //
   //    }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index bb37be005b..0e2bc6c240 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
-import com.google.common.util.concurrent.SettableFuture;
 import org.apache.iotdb.db.mpp.buffer.ISourceHandle;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.execution.scheduler.ClusterScheduler;
@@ -34,6 +33,8 @@ import org.apache.iotdb.db.mpp.sql.statement.Statement;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import com.google.common.util.concurrent.SettableFuture;
+
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -136,11 +137,8 @@ public class QueryExecution {
     releaseResource();
   }
 
-  /**
-   * Release the resources that current QueryExecution hold.
-   */
-  private void releaseResource() {
-  }
+  /** Release the resources that current QueryExecution hold. */
+  private void releaseResource() {}
 
   /**
    * This method will be called by the request thread from client connection. This method will block
@@ -163,11 +161,12 @@ public class QueryExecution {
     // Although we monitor the state to transition to RUNNING, the future will return if any
     // Terminated state is triggered
     SettableFuture<QueryState> future = SettableFuture.create();
-    stateMachine.addStateChangeListener(state -> {
-      if (state == QueryState.RUNNING || state.isDone()) {
-        future.set(state);
-      }
-    });
+    stateMachine.addStateChangeListener(
+        state -> {
+          if (state == QueryState.RUNNING || state.isDone()) {
+            future.set(state);
+          }
+        });
 
     try {
       QueryState state = future.get();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index d4d713f1fa..5b6a24bc4c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.mpp.sql.planner;
 
 import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.*;
@@ -39,6 +40,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList;
 
 public class DistributionPlanner {
   private Analysis analysis;
+  private MPPQueryContext context;
   private LogicalQueryPlan logicalPlan;
 
   private int planFragmentIndex = 0;
@@ -46,30 +48,27 @@ public class DistributionPlanner {
   public DistributionPlanner(Analysis analysis, LogicalQueryPlan logicalPlan) {
     this.analysis = analysis;
     this.logicalPlan = logicalPlan;
+    this.context = logicalPlan.getContext();
   }
 
   public PlanNode rewriteSource() {
     SourceRewriter rewriter = new SourceRewriter();
-    return rewriter.visit(logicalPlan.getRootNode(), new DistributionPlanContext());
+    return rewriter.visit(logicalPlan.getRootNode(), new DistributionPlanContext(context));
   }
 
   public PlanNode addExchangeNode(PlanNode root) {
     ExchangeNodeAdder adder = new ExchangeNodeAdder();
-    return adder.visit(root, new NodeGroupContext());
+    return adder.visit(root, new NodeGroupContext(context));
   }
 
   public SubPlan splitFragment(PlanNode root) {
-    FragmentBuilder fragmentBuilder = new FragmentBuilder();
+    FragmentBuilder fragmentBuilder = new FragmentBuilder(context);
     return fragmentBuilder.splitToSubPlan(root);
   }
 
   public DistributedQueryPlan planFragments() {
     PlanNode rootAfterRewrite = rewriteSource();
-    System.out.println("===== Step 2: Partition SourceNode =====");
-    System.out.println(PlanNodeUtil.nodeToString(rootAfterRewrite));
     PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
-    System.out.println("===== Step 3: Add ExchangeNode =====");
-    System.out.println(PlanNodeUtil.nodeToString(rootWithExchange));
     SubPlan subPlan = splitFragment(rootWithExchange);
     List<FragmentInstance> fragmentInstances = planFragmentInstances(subPlan);
     return new DistributedQueryPlan(
@@ -83,6 +82,25 @@ public class DistributionPlanner {
     return parallelPlaner.parallelPlan();
   }
 
+  // TODO: (xingtanzjr) Maybe we should handle ResultNode in LogicalPlanner ?
+  public void SetSinkForRootInstance(SubPlan subPlan, List<FragmentInstance> instances) {
+    FragmentInstance rootInstance = null;
+    for (FragmentInstance instance : instances) {
+      if (instance.getFragment().getId().equals(subPlan.getPlanFragment().getId())) {
+        rootInstance = instance;
+        break;
+      }
+    }
+    // root should not be null during normal process
+    if (rootInstance == null) {
+      return;
+    }
+    FragmentSinkNode sinkNode = new FragmentSinkNode(context.getQueryId().genPlanNodeId());
+    sinkNode.setChild(rootInstance.getFragment().getRoot());
+    rootInstance.getFragment().setRoot(sinkNode);
+    //    sinkNode.setDownStream();
+  }
+
   private PlanFragmentId getNextFragmentId() {
     return new PlanFragmentId(this.logicalPlan.getContext().getQueryId(), this.planFragmentIndex++);
   }
@@ -111,7 +129,7 @@ public class DistributionPlanner {
           // SeriesScanNode.
           for (RegionReplicaSet dataRegion : dataDistribution) {
             SeriesScanNode split = (SeriesScanNode) handle.clone();
-            split.setId(PlanNodeIdAllocator.generateId());
+            split.setId(context.queryContext.getQueryId().genPlanNodeId());
             split.setDataRegionReplicaSet(dataRegion);
             sources.add(split);
           }
@@ -140,8 +158,10 @@ public class DistributionPlanner {
             if (seriesScanNodes.size() == 1) {
               root.addChild(seriesScanNodes.get(0));
             } else {
-              // We clone a TimeJoinNode from root to make the params to be consistent
+              // We clone a TimeJoinNode from root to make the params to be consistent.
+              // But we need to assign a new ID to it
               TimeJoinNode parentOfGroup = (TimeJoinNode) root.clone();
+              root.setId(context.queryContext.getQueryId().genPlanNodeId());
               seriesScanNodes.forEach(parentOfGroup::addChild);
               root.addChild(parentOfGroup);
             }
@@ -155,7 +175,13 @@ public class DistributionPlanner {
     }
   }
 
-  private class DistributionPlanContext {}
+  private class DistributionPlanContext {
+    private MPPQueryContext queryContext;
+
+    public DistributionPlanContext(MPPQueryContext queryContext) {
+      this.queryContext = queryContext;
+    }
+  }
 
   private class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
     @Override
@@ -220,7 +246,8 @@ public class DistributionPlanner {
       visitedChildren.forEach(
           child -> {
             if (!dataRegion.equals(context.getNodeDistribution(child.getId()).dataRegion)) {
-              ExchangeNode exchangeNode = new ExchangeNode(PlanNodeIdAllocator.generateId());
+              ExchangeNode exchangeNode =
+                  new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
               exchangeNode.setChild(child);
               newNode.addChild(exchangeNode);
             } else {
@@ -255,10 +282,12 @@ public class DistributionPlanner {
   }
 
   private class NodeGroupContext {
-    Map<PlanNodeId, NodeDistribution> nodeDistribution;
+    private MPPQueryContext queryContext;
+    private Map<PlanNodeId, NodeDistribution> nodeDistribution;
 
-    public NodeGroupContext() {
-      nodeDistribution = new HashMap<>();
+    public NodeGroupContext(MPPQueryContext queryContext) {
+      this.queryContext = queryContext;
+      this.nodeDistribution = new HashMap<>();
     }
 
     public void putNodeDistribution(PlanNodeId nodeId, NodeDistribution distribution) {
@@ -288,6 +317,12 @@ public class DistributionPlanner {
   }
 
   private class FragmentBuilder {
+    private MPPQueryContext context;
+
+    public FragmentBuilder(MPPQueryContext context) {
+      this.context = context;
+    }
+
     public SubPlan splitToSubPlan(PlanNode root) {
       SubPlan rootSubPlan = createSubPlan(root);
       splitToSubPlan(root, rootSubPlan);
@@ -298,7 +333,7 @@ public class DistributionPlanner {
       if (root instanceof ExchangeNode) {
         // We add a FragmentSinkNode for newly created PlanFragment
         ExchangeNode exchangeNode = (ExchangeNode) root;
-        FragmentSinkNode sinkNode = new FragmentSinkNode(PlanNodeIdAllocator.generateId());
+        FragmentSinkNode sinkNode = new FragmentSinkNode(context.getQueryId().genPlanNodeId());
         sinkNode.setChild(exchangeNode.getChild());
         sinkNode.setDownStreamNode(exchangeNode);
         // Record the source node info in the ExchangeNode so that we can keep the connection of
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
index a753f4555c..f8c2a38d90 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.optimization.PlanOptimizer;
 import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeIdAllocator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AlterTimeSeriesNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
@@ -60,7 +59,7 @@ public class LogicalPlanner {
   }
 
   public LogicalQueryPlan plan(Analysis analysis) {
-    PlanNode rootNode = new LogicalPlanVisitor(analysis).process(analysis.getStatement());
+    PlanNode rootNode = new LogicalPlanVisitor(analysis).process(analysis.getStatement(), context);
 
     // optimize the query logical plan
     if (analysis.getStatement() instanceof QueryStatement) {
@@ -127,7 +126,7 @@ public class LogicalPlanner {
       }
 
       if (queryStatement.isAlignByDevice()) {
-        DeviceMergeNode deviceMergeNode = new DeviceMergeNode(PlanNodeIdAllocator.generateId());
+        DeviceMergeNode deviceMergeNode = new DeviceMergeNode(context.getQueryId().genPlanNodeId());
         for (Map.Entry<String, Set<SourceNode>> entry : deviceNameToSourceNodesMap.entrySet()) {
           String deviceName = entry.getKey();
           List<PlanNode> planNodes = new ArrayList<>(entry.getValue());
@@ -136,7 +135,7 @@ public class LogicalPlanner {
           } else {
             TimeJoinNode timeJoinNode =
                 new TimeJoinNode(
-                    PlanNodeIdAllocator.generateId(),
+                    context.getQueryId().genPlanNodeId(),
                     queryStatement.getResultOrder(),
                     null,
                     planNodes);
@@ -152,13 +151,18 @@ public class LogicalPlanner {
               .collect(Collectors.toList());
       TimeJoinNode timeJoinNode =
           new TimeJoinNode(
-              PlanNodeIdAllocator.generateId(), queryStatement.getResultOrder(), null, planNodes);
+              context.getQueryId().genPlanNodeId(),
+              queryStatement.getResultOrder(),
+              null,
+              planNodes);
       return new PlanBuilder(timeJoinNode);
     }
 
     private Set<SourceNode> planResultColumn(ResultColumn resultColumn) {
       Set<SourceNode> resultSourceNodeSet = new HashSet<>();
-      resultColumn.getExpression().collectPlanNode(resultSourceNodeSet);
+      resultColumn
+          .getExpression()
+          .collectPlanNode(resultSourceNodeSet, context.getQueryId().genPlanNodeId());
       return resultSourceNodeSet;
     }
 
@@ -168,7 +172,7 @@ public class LogicalPlanner {
       }
 
       return planBuilder.withNewRoot(
-          new FilterNode(PlanNodeIdAllocator.generateId(), planBuilder.getRoot(), queryFilter));
+          new FilterNode(context.getQueryId().genPlanNodeId(), planBuilder.getRoot(), queryFilter));
     }
 
     private PlanBuilder planGroupByLevel(
@@ -179,7 +183,7 @@ public class LogicalPlanner {
 
       return planBuilder.withNewRoot(
           new GroupByLevelNode(
-              PlanNodeIdAllocator.generateId(),
+              context.getQueryId().genPlanNodeId(),
               planBuilder.getRoot(),
               groupByLevelComponent.getLevels(),
               groupByLevelComponent.getGroupedPathMap()));
@@ -198,7 +202,7 @@ public class LogicalPlanner {
 
       return planBuilder.withNewRoot(
           new FilterNullNode(
-              PlanNodeIdAllocator.generateId(),
+              context.getQueryId().genPlanNodeId(),
               planBuilder.getRoot(),
               filterNullComponent.getWithoutPolicyType(),
               filterNullComponent.getWithoutNullColumns().stream()
@@ -212,7 +216,8 @@ public class LogicalPlanner {
       }
 
       return planBuilder.withNewRoot(
-          new SortNode(PlanNodeIdAllocator.generateId(), planBuilder.getRoot(), null, resultOrder));
+          new SortNode(
+              context.getQueryId().genPlanNodeId(), planBuilder.getRoot(), null, resultOrder));
     }
 
     private PlanBuilder planLimit(PlanBuilder planBuilder, int rowLimit) {
@@ -221,7 +226,7 @@ public class LogicalPlanner {
       }
 
       return planBuilder.withNewRoot(
-          new LimitNode(PlanNodeIdAllocator.generateId(), rowLimit, planBuilder.getRoot()));
+          new LimitNode(context.getQueryId().genPlanNodeId(), rowLimit, planBuilder.getRoot()));
     }
 
     private PlanBuilder planOffset(PlanBuilder planBuilder, int rowOffset) {
@@ -230,14 +235,14 @@ public class LogicalPlanner {
       }
 
       return planBuilder.withNewRoot(
-          new OffsetNode(PlanNodeIdAllocator.generateId(), planBuilder.getRoot(), rowOffset));
+          new OffsetNode(context.getQueryId().genPlanNodeId(), planBuilder.getRoot(), rowOffset));
     }
 
     @Override
     public PlanNode visitCreateTimeseries(
         CreateTimeSeriesStatement createTimeSeriesStatement, MPPQueryContext context) {
       return new CreateTimeSeriesNode(
-          PlanNodeIdAllocator.generateId(),
+          context.getQueryId().genPlanNodeId(),
           createTimeSeriesStatement.getPath(),
           createTimeSeriesStatement.getDataType(),
           createTimeSeriesStatement.getEncoding(),
@@ -253,7 +258,7 @@ public class LogicalPlanner {
         CreateAlignedTimeSeriesStatement createAlignedTimeSeriesStatement,
         MPPQueryContext context) {
       return new CreateAlignedTimeSeriesNode(
-          PlanNodeIdAllocator.generateId(),
+          context.getQueryId().genPlanNodeId(),
           createAlignedTimeSeriesStatement.getDevicePath(),
           createAlignedTimeSeriesStatement.getMeasurements(),
           createAlignedTimeSeriesStatement.getDataTypes(),
@@ -269,7 +274,7 @@ public class LogicalPlanner {
     public PlanNode visitAlterTimeseries(
         AlterTimeSeriesStatement alterTimeSeriesStatement, MPPQueryContext context) {
       return new AlterTimeSeriesNode(
-          PlanNodeIdAllocator.generateId(),
+          context.getQueryId().genPlanNodeId(),
           alterTimeSeriesStatement.getPath(),
           alterTimeSeriesStatement.getAlterType(),
           alterTimeSeriesStatement.getAlterMap(),
@@ -290,7 +295,7 @@ public class LogicalPlanner {
                   insertTabletStatement.getDevicePath(),
                   Arrays.asList(insertTabletStatement.getMeasurements()));
       return new InsertTabletNode(
-          PlanNodeIdAllocator.generateId(),
+          context.getQueryId().genPlanNodeId(),
           insertTabletStatement.getDevicePath(),
           insertTabletStatement.isAligned(),
           measurementSchemas.toArray(new MeasurementSchema[0]),
@@ -312,7 +317,7 @@ public class LogicalPlanner {
                   insertRowStatement.getDevicePath(),
                   Arrays.asList(insertRowStatement.getMeasurements()));
       return new InsertRowNode(
-          PlanNodeIdAllocator.generateId(),
+          context.getQueryId().genPlanNodeId(),
           insertRowStatement.getDevicePath(),
           insertRowStatement.isAligned(),
           measurementSchemas.toArray(new MeasurementSchema[0]),
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
index 5863bbfed3..4cf7551e13 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
@@ -46,6 +46,10 @@ public class PlanFragment {
     return root;
   }
 
+  public void setRoot(PlanNode root) {
+    this.root = root;
+  }
+
   public String toString() {
     return String.format("PlanFragment-%s", getId());
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
index 5b888d837e..837db3e20b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeIdAllocator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.tsfile.utils.Pair;
 
@@ -60,7 +59,7 @@ public class LimitNode extends ProcessNode {
 
   @Override
   public PlanNode clone() {
-    return new LimitNode(PlanNodeIdAllocator.generateId(), this.limit);
+    return new LimitNode(getId(), this.limit);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
index 67e874e52e..7aa99e33c7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeIdAllocator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
@@ -74,8 +73,7 @@ public class TimeJoinNode extends ProcessNode {
 
   @Override
   public PlanNode clone() {
-    return new TimeJoinNode(
-        PlanNodeIdAllocator.generateId(), this.mergeOrder, this.filterNullPolicy);
+    return new TimeJoinNode(getId(), this.mergeOrder, this.filterNullPolicy);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
index 3fd959c57d..cee9a7980c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
@@ -37,10 +37,6 @@ import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
  */
 public abstract class StatementVisitor<R, C> {
 
-  public R process(StatementNode node) {
-    return process(node, null);
-  }
-
   public R process(StatementNode node, C context) {
     return node.accept(this, context);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index 47e80bab76..dfdd47fbd1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
@@ -84,7 +85,7 @@ public abstract class Expression {
   public abstract void constructUdfExecutors(
       Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId);
 
-  public abstract void collectPlanNode(Set<SourceNode> planNodeSet);
+  public abstract void collectPlanNode(Set<SourceNode> planNodeSet, PlanNodeId nodeId);
 
   public abstract void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index 7d61695827..4e482905ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
@@ -200,7 +201,7 @@ public abstract class BinaryExpression extends Expression {
   }
 
   @Override
-  public void collectPlanNode(Set<SourceNode> planNodeSet) {
+  public void collectPlanNode(Set<SourceNode> planNodeSet, PlanNodeId nodeId) {
     // TODO: support nested expressions
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
index 70fe842fe9..1cc9043ceb 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.expression.unary;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
@@ -104,7 +105,7 @@ public class ConstantOperand extends Expression {
   }
 
   @Override
-  public void collectPlanNode(Set<SourceNode> planNodeSet) {
+  public void collectPlanNode(Set<SourceNode> planNodeSet, PlanNodeId nodeId) {
     // Do nothing
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index 3611cbee44..ef77e955a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeIdAllocator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
@@ -236,9 +236,9 @@ public class FunctionExpression extends Expression {
   }
 
   @Override
-  public void collectPlanNode(Set<SourceNode> planNodeSet) {
+  public void collectPlanNode(Set<SourceNode> planNodeSet, PlanNodeId nodeId) {
     if (isBuiltInAggregationFunctionExpression) {
-      planNodeSet.add(new SeriesAggregateScanNode(PlanNodeIdAllocator.generateId(), this));
+      planNodeSet.add(new SeriesAggregateScanNode(nodeId, this));
     }
     // TODO: support UDF
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java
index 983f58c4c2..b1183df457 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
@@ -129,7 +130,7 @@ public class LogicNotExpression extends Expression {
   }
 
   @Override
-  public void collectPlanNode(Set<SourceNode> planNodeSet) {
+  public void collectPlanNode(Set<SourceNode> planNodeSet, PlanNodeId nodeId) {
     // TODO: support LogicNotExpression
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
index e5ecde9c38..fca985cf79 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
@@ -128,7 +129,7 @@ public class NegationExpression extends Expression {
   }
 
   @Override
-  public void collectPlanNode(Set<SourceNode> planNodeSet) {
+  public void collectPlanNode(Set<SourceNode> planNodeSet, PlanNodeId nodeId) {
     // TODO: support nested expressions
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index 6294372399..f33d353ce1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeIdAllocator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
@@ -114,8 +114,8 @@ public class TimeSeriesOperand extends Expression {
   }
 
   @Override
-  public void collectPlanNode(Set<SourceNode> planNodeSet) {
-    planNodeSet.add(new SeriesScanNode(PlanNodeIdAllocator.generateId(), path));
+  public void collectPlanNode(Set<SourceNode> planNodeSet, PlanNodeId nodeId) {
+    planNodeSet.add(new SeriesScanNode(nodeId, path));
   }
 
   @Override
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/QueryStateMachineTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/QueryStateMachineTest.java
index c183655ce5..3cc8db156a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/QueryStateMachineTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/QueryStateMachineTest.java
@@ -19,12 +19,13 @@
 
 package org.apache.iotdb.db.mpp.execution;
 
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -62,10 +63,10 @@ public class QueryStateMachineTest {
   public void TestFragmentInstanceToFinished() {
     List<FragmentInstanceId> instanceIds = genFragmentInstanceIdList();
     QueryStateMachine stateMachine = genQueryStateMachine();
-    for(FragmentInstanceId id : instanceIds) {
+    for (FragmentInstanceId id : instanceIds) {
       stateMachine.initialFragInstanceState(id, FragmentInstanceState.RUNNING);
     }
-    for(FragmentInstanceId id : instanceIds) {
+    for (FragmentInstanceId id : instanceIds) {
       stateMachine.updateFragInstanceState(id, FragmentInstanceState.FINISHED);
     }
     Assert.assertEquals(stateMachine.getState(), QueryState.FINISHED);
@@ -75,7 +76,7 @@ public class QueryStateMachineTest {
   public void TestFragmentInstanceToTerminalState() {
     List<FragmentInstanceId> instanceIds = genFragmentInstanceIdList();
     QueryStateMachine stateMachine = genQueryStateMachine();
-    for(FragmentInstanceId id : instanceIds) {
+    for (FragmentInstanceId id : instanceIds) {
       stateMachine.initialFragInstanceState(id, FragmentInstanceState.RUNNING);
     }
     stateMachine.updateFragInstanceState(instanceIds.get(0), FragmentInstanceState.FAILED);
@@ -86,16 +87,18 @@ public class QueryStateMachineTest {
   public void TestListener() throws ExecutionException, InterruptedException {
     AtomicInteger stateChangeCounter = new AtomicInteger(0);
     QueryStateMachine stateMachine = genQueryStateMachine();
-    stateMachine.addStateChangeListener(state -> {
-      stateChangeCounter.getAndIncrement();
-    });
+    stateMachine.addStateChangeListener(
+        state -> {
+          stateChangeCounter.getAndIncrement();
+        });
     stateMachine.transitionToFinished();
     SettableFuture<QueryState> future = SettableFuture.create();
-    stateMachine.addStateChangeListener(state -> {
-      if (state == QueryState.FINISHED) {
-        future.set(QueryState.FINISHED);
-      }
-    });
+    stateMachine.addStateChangeListener(
+        state -> {
+          if (state == QueryState.FINISHED) {
+            future.set(QueryState.FINISHED);
+          }
+        });
     future.get();
     Assert.assertEquals(stateChangeCounter.get(), 2);
   }
@@ -113,7 +116,8 @@ public class QueryStateMachineTest {
   }
 
   private QueryStateMachine genQueryStateMachine() {
-    return new QueryStateMachine(genQueryId(), IoTDBThreadPoolFactory.newSingleThreadExecutor("TestQueryStateMachine"));
+    return new QueryStateMachine(
+        genQueryId(), IoTDBThreadPoolFactory.newSingleThreadExecutor("TestQueryStateMachine"));
   }
 
   private List<FragmentInstanceId> genFragmentInstanceIdList() {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/AnalyzerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/AnalyzerTest.java
index 7583f0e673..d8d1880232 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/AnalyzerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/AnalyzerTest.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.sql.analyze;
 
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
 
 import org.junit.Assert;
@@ -47,7 +48,7 @@ public class AnalyzerTest {
 
   private void assertAnalyzeSemanticException(String sql, String message) {
     try {
-      Analyzer analyzer = new Analyzer(new MPPQueryContext());
+      Analyzer analyzer = new Analyzer(new MPPQueryContext(new QueryId("test_query")));
       analyzer.analyze(StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset()));
       fail();
     } catch (RuntimeException e) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index 50ae6202ab..ee53241aed 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -38,8 +38,6 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.mpp.sql.planner.plan.SubPlan;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeIdAllocator;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
@@ -56,72 +54,74 @@ public class DistributionPlannerTest {
 
   @Test
   public void TestRewriteSourceNode() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_query");
+
     TimeJoinNode timeJoinNode =
         new TimeJoinNode(
-            PlanNodeIdAllocator.generateId(), OrderBy.TIMESTAMP_ASC, FilterNullPolicy.NO_FILTER);
+            queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC, FilterNullPolicy.NO_FILTER);
 
     timeJoinNode.addChild(
-        new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s1")));
+        new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d1.s1")));
     timeJoinNode.addChild(
-        new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s2")));
+        new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d1.s2")));
     timeJoinNode.addChild(
-        new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d22.s1")));
+        new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d22.s1")));
 
-    LimitNode root = new LimitNode(PlanNodeIdAllocator.generateId(), 10, timeJoinNode);
+    LimitNode root = new LimitNode(queryId.genPlanNodeId(), 10, timeJoinNode);
 
     Analysis analysis = constructAnalysis();
 
     DistributionPlanner planner =
-        new DistributionPlanner(analysis, new LogicalQueryPlan(new MPPQueryContext(), root));
+        new DistributionPlanner(analysis, new LogicalQueryPlan(new MPPQueryContext(queryId), root));
     PlanNode newRoot = planner.rewriteSource();
 
-    System.out.println(PlanNodeUtil.nodeToString(newRoot));
     assertEquals(newRoot.getChildren().get(0).getChildren().size(), 3);
   }
 
   @Test
   public void TestAddExchangeNode() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_query");
     TimeJoinNode timeJoinNode =
         new TimeJoinNode(
-            PlanNodeIdAllocator.generateId(), OrderBy.TIMESTAMP_ASC, FilterNullPolicy.NO_FILTER);
+            queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC, FilterNullPolicy.NO_FILTER);
 
     timeJoinNode.addChild(
-        new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s1")));
+        new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d1.s1")));
     timeJoinNode.addChild(
-        new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s2")));
+        new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d1.s2")));
     timeJoinNode.addChild(
-        new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d22.s1")));
+        new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d22.s1")));
 
-    LimitNode root = new LimitNode(PlanNodeIdAllocator.generateId(), 10, timeJoinNode);
+    LimitNode root = new LimitNode(queryId.genPlanNodeId(), 10, timeJoinNode);
 
     Analysis analysis = constructAnalysis();
 
     DistributionPlanner planner =
-        new DistributionPlanner(analysis, new LogicalQueryPlan(new MPPQueryContext(), root));
+        new DistributionPlanner(analysis, new LogicalQueryPlan(new MPPQueryContext(queryId), root));
     PlanNode rootAfterRewrite = planner.rewriteSource();
     PlanNode rootWithExchange = planner.addExchangeNode(rootAfterRewrite);
-    //    PlanNodeUtil.printPlanNode(rootWithExchange);
     assertEquals(rootWithExchange.getChildren().get(0).getChildren().size(), 3);
   }
 
   @Test
   public void TestSplitFragment() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_query");
     TimeJoinNode timeJoinNode =
         new TimeJoinNode(
-            PlanNodeIdAllocator.generateId(), OrderBy.TIMESTAMP_ASC, FilterNullPolicy.NO_FILTER);
+            queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC, FilterNullPolicy.NO_FILTER);
 
     timeJoinNode.addChild(
-        new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s1")));
+        new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d1.s1")));
     timeJoinNode.addChild(
-        new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s2")));
+        new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d1.s2")));
     timeJoinNode.addChild(
-        new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d22.s1")));
+        new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d22.s1")));
 
-    LimitNode root = new LimitNode(PlanNodeIdAllocator.generateId(), 10, timeJoinNode);
+    LimitNode root = new LimitNode(queryId.genPlanNodeId(), 10, timeJoinNode);
 
     Analysis analysis = constructAnalysis();
 
-    MPPQueryContext context = new MPPQueryContext("", new QueryId("query1"), null, QueryType.READ);
+    MPPQueryContext context = new MPPQueryContext("", queryId, null, QueryType.READ);
     DistributionPlanner planner =
         new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
     PlanNode rootAfterRewrite = planner.rewriteSource();
@@ -132,22 +132,23 @@ public class DistributionPlannerTest {
 
   @Test
   public void TestParallelPlan() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_query");
     TimeJoinNode timeJoinNode =
         new TimeJoinNode(
-            PlanNodeIdAllocator.generateId(), OrderBy.TIMESTAMP_ASC, FilterNullPolicy.NO_FILTER);
+            queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC, FilterNullPolicy.NO_FILTER);
 
     timeJoinNode.addChild(
-        new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s1")));
+        new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d1.s1")));
     timeJoinNode.addChild(
-        new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s2")));
+        new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d1.s2")));
     timeJoinNode.addChild(
-        new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d333.s1")));
+        new SeriesScanNode(queryId.genPlanNodeId(), new PartialPath("root.sg.d333.s1")));
 
-    LimitNode root = new LimitNode(PlanNodeIdAllocator.generateId(), 10, timeJoinNode);
+    LimitNode root = new LimitNode(queryId.genPlanNodeId(), 10, timeJoinNode);
 
     Analysis analysis = constructAnalysis();
 
-    MPPQueryContext context = new MPPQueryContext("", new QueryId("query1"), null, QueryType.READ);
+    MPPQueryContext context = new MPPQueryContext("", queryId, null, QueryType.READ);
     DistributionPlanner planner =
         new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
     DistributedQueryPlan plan = planner.planFragments();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
index 1f8ee23109..403af0d743 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
@@ -22,11 +22,11 @@ package org.apache.iotdb.db.mpp.sql.plan;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
 import org.apache.iotdb.db.mpp.sql.planner.LogicalPlanner;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeIdAllocator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AlterTimeSeriesNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
@@ -53,9 +53,7 @@ public class LogicalPlannerTest {
   LogicalPlanPrinter planPrinter = new LogicalPlanPrinter();
 
   @Before
-  public void setUp() {
-    PlanNodeIdAllocator.reset();
-  }
+  public void setUp() {}
 
   @Test
   @Ignore
@@ -357,7 +355,7 @@ public class LogicalPlannerTest {
     try {
       Statement statement =
           StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset());
-      MPPQueryContext context = new MPPQueryContext();
+      MPPQueryContext context = new MPPQueryContext(new QueryId("test_query"));
       // TODO: do analyze after implementing ISchemaFetcher and IPartitionFetcher
       //      Analyzer analyzer = new Analyzer(context);
       //      Analysis analysis = analyzer.analyze(statement);
diff --git a/thrift/src/main/thrift/mpp.thrift b/thrift/src/main/thrift/mpp.thrift
index 00b6b6cdeb..25ed861b14 100644
--- a/thrift/src/main/thrift/mpp.thrift
+++ b/thrift/src/main/thrift/mpp.thrift
@@ -23,7 +23,7 @@ namespace java org.apache.iotdb.mpp.rpc.thrift
 struct TFragmentInstanceId {
   1: required string queryId
   2: required string fragmentId
-  3: required string instanceId
+  3: required i64 instanceId
 }
 
 struct GetDataBlockRequest {