You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/22 08:20:00 UTC

[iotdb] branch xingtanzjr/query_state updated: optimize the StateTracker both in print and judgement

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/query_state
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/xingtanzjr/query_state by this push:
     new cea6728101 optimize the StateTracker both in print and judgement
cea6728101 is described below

commit cea67281019deaeff29f4abe48cd7254fb517e94
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Aug 22 16:19:45 2022 +0800

    optimize the StateTracker both in print and judgement
---
 .../iotdb/db/mpp/execution/QueryStateMachine.java  | 25 -------
 .../fragment/FragmentInstanceManager.java          |  4 +-
 .../planner/distribution/DistributionPlanner.java  |  6 +-
 .../SimpleFragmentParallelPlanner.java             |  9 +--
 .../distribution/WriteFragmentParallelPlanner.java |  2 +-
 .../db/mpp/plan/planner/plan/FragmentInstance.java | 22 +++++-
 .../db/mpp/plan/planner/plan/PlanFragment.java     | 42 ++++++++----
 .../iotdb/db/mpp/plan/planner/plan/SubPlan.java    |  2 +-
 .../db/mpp/plan/scheduler/ClusterScheduler.java    |  5 --
 .../scheduler/FixedRateFragInsStateTracker.java    | 33 +++++++--
 .../scheduler/FragmentInstanceDispatcherImpl.java  |  4 +-
 .../db/mpp/plan/scheduler/StandaloneScheduler.java |  7 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |  2 -
 .../db/mpp/execution/QueryStateMachineTest.java    | 25 -------
 .../distribution/AggregationDistributionTest.java  | 78 ++++++++++++++--------
 .../mpp/plan/plan/distribution/LastQueryTest.java  | 40 +++++++++--
 16 files changed, 177 insertions(+), 129 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index 703255afe7..d5aaa093fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -19,14 +19,10 @@
 package org.apache.iotdb.db.mpp.execution;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 
@@ -37,7 +33,6 @@ import java.util.concurrent.ExecutorService;
 public class QueryStateMachine {
   private final String name;
   private final StateMachine<QueryState> queryState;
-  private final Map<FragmentInstanceId, FragmentInstanceState> fragInstanceStateMap;
 
   // The executor will be used in all the state machines belonged to this query.
   private Executor stateMachineExecutor;
@@ -47,7 +42,6 @@ public class QueryStateMachine {
   public QueryStateMachine(QueryId queryId, ExecutorService executor) {
     this.name = String.format("QueryStateMachine[%s]", queryId);
     this.stateMachineExecutor = executor;
-    this.fragInstanceStateMap = new ConcurrentHashMap<>();
     this.queryState =
         new StateMachine<>(
             queryId.toString(),
@@ -56,25 +50,6 @@ public class QueryStateMachine {
             QueryState.TERMINAL_INSTANCE_STATES);
   }
 
-  public void initialFragInstanceState(FragmentInstanceId id, FragmentInstanceState state) {
-    this.fragInstanceStateMap.put(id, state);
-  }
-
-  public void updateFragInstanceState(FragmentInstanceId id, FragmentInstanceState state) {
-    this.fragInstanceStateMap.put(id, state);
-    // TODO: (xingtanzjr) we need to distinguish the Timeout situation
-    if (state.isFailed()) {
-      transitionToFailed(
-          new RuntimeException(String.format("FragmentInstance[%s] is failed.", id)));
-    }
-    boolean allFinished =
-        fragInstanceStateMap.values().stream()
-            .allMatch(currentState -> currentState == FragmentInstanceState.FINISHED);
-    if (allFinished) {
-      transitionToFinished();
-    }
-  }
-
   public void addStateChangeListener(
       StateMachine.StateChangeListener<QueryState> stateChangeListener) {
     queryState.addStateChangeListener(stateChangeListener);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index ca164e9e51..6506e7f619 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -102,7 +102,7 @@ public class FragmentInstanceManager {
                 try {
                   DataDriver driver =
                       planner.plan(
-                          instance.getFragment().getRoot(),
+                          instance.getFragment().getPlanNodeTree(),
                           instance.getFragment().getTypeProvider(),
                           context,
                           instance.getTimeFilter(),
@@ -144,7 +144,7 @@ public class FragmentInstanceManager {
 
               try {
                 SchemaDriver driver =
-                    planner.plan(instance.getFragment().getRoot(), context, schemaRegion);
+                    planner.plan(instance.getFragment().getPlanNodeTree(), context, schemaRegion);
                 return createFragmentInstanceExecution(
                     scheduler,
                     instanceId,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
index 2aad1c8a85..60882f5cdf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
@@ -73,6 +73,8 @@ public class DistributionPlanner {
           .setColumnToTsBlockIndexMap(rootWithExchange.getOutputColumnNames());
     }
     SubPlan subPlan = splitFragment(rootWithExchange);
+    // Mark the root Fragment of root SubPlan as `root`
+    subPlan.getPlanFragment().setRoot(true);
     List<FragmentInstance> fragmentInstances = planFragmentInstances(subPlan);
     // Only execute this step for READ operation
     if (context.getQueryType() == QueryType.READ) {
@@ -111,14 +113,14 @@ public class DistributionPlanner {
         context.getLocalDataBlockEndpoint(),
         context.getResultNodeContext().getVirtualFragmentInstanceId(),
         context.getResultNodeContext().getVirtualResultNodeId());
-    sinkNode.setChild(rootInstance.getFragment().getRoot());
+    sinkNode.setChild(rootInstance.getFragment().getPlanNodeTree());
     context
         .getResultNodeContext()
         .setUpStream(
             rootInstance.getHostDataNode().mPPDataExchangeEndPoint,
             rootInstance.getId(),
             sinkNode.getPlanNodeId());
-    rootInstance.getFragment().setRoot(sinkNode);
+    rootInstance.getFragment().setPlanNodeTree(sinkNode);
   }
 
   private PlanFragmentId getNextFragmentId() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index e49b8b90e8..d5c73f6822 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -82,7 +82,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
   private void prepare() {
     List<PlanFragment> fragments = subPlan.getPlanFragmentList();
     for (PlanFragment fragment : fragments) {
-      recordPlanNodeRelation(fragment.getRoot(), fragment.getId());
+      recordPlanNodeRelation(fragment.getPlanNodeTree(), fragment.getId());
       produceFragmentInstance(fragment);
     }
   }
@@ -90,7 +90,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
   private void produceFragmentInstance(PlanFragment fragment) {
     // If one PlanFragment will produce several FragmentInstance, the instanceIdx will be increased
     // one by one
-    PlanNode rootCopy = PlanNodeUtil.deepCopy(fragment.getRoot());
+    PlanNode rootCopy = PlanNodeUtil.deepCopy(fragment.getPlanNodeTree());
     Filter timeFilter = analysis.getGlobalTimeFilter();
     FragmentInstance fragmentInstance =
         new FragmentInstance(
@@ -98,7 +98,8 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
             fragment.getId().genFragmentInstanceId(),
             timeFilter,
             queryContext.getQueryType(),
-            queryContext.getTimeOut());
+            queryContext.getTimeOut(),
+            fragment.isRoot());
 
     // Get the target region for origin PlanFragment, then its instance will be distributed one
     // of them.
@@ -174,7 +175,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
 
   private void calculateNodeTopologyBetweenInstance() {
     for (FragmentInstance instance : fragmentInstanceList) {
-      PlanNode rootNode = instance.getFragment().getRoot();
+      PlanNode rootNode = instance.getFragment().getPlanNodeTree();
       if (rootNode instanceof FragmentSinkNode) {
         // Set target Endpoint for FragmentSinkNode
         FragmentSinkNode sinkNode = (FragmentSinkNode) rootNode;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
index 4f043a05b9..0bbf402818 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
@@ -49,7 +49,7 @@ public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner {
   public List<FragmentInstance> parallelPlan() {
     PlanFragment fragment = subPlan.getPlanFragment();
     Filter timeFilter = analysis.getGlobalTimeFilter();
-    PlanNode node = fragment.getRoot();
+    PlanNode node = fragment.getPlanNodeTree();
     if (!(node instanceof WritePlanNode)) {
       throw new IllegalArgumentException("PlanNode should be IWritePlanNode in WRITE operation");
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
index bcc54931de..3f43e8245c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
@@ -63,6 +63,8 @@ public class FragmentInstance implements IConsensusRequest {
 
   private final long timeOut;
 
+  private boolean isRoot;
+
   // We can add some more params for a specific FragmentInstance
   // So that we can make different FragmentInstance owns different data range.
 
@@ -77,6 +79,18 @@ public class FragmentInstance implements IConsensusRequest {
     this.id = id;
     this.type = type;
     this.timeOut = timeOut > 0 ? timeOut : config.getQueryTimeoutThreshold();
+    this.isRoot = false;
+  }
+
+  public FragmentInstance(
+      PlanFragment fragment,
+      FragmentInstanceId id,
+      Filter timeFilter,
+      QueryType type,
+      long timeOut,
+      boolean isRoot) {
+    this(fragment, id, timeFilter, type, timeOut);
+    this.isRoot = isRoot;
   }
 
   public TRegionReplicaSet getDataRegionId() {
@@ -119,8 +133,12 @@ public class FragmentInstance implements IConsensusRequest {
     return id;
   }
 
+  public boolean isRoot() {
+    return isRoot;
+  }
+
   public String getDownstreamInfo() {
-    PlanNode root = getFragment().getRoot();
+    PlanNode root = getFragment().getPlanNodeTree();
     if (root instanceof FragmentSinkNode) {
       FragmentSinkNode sink = (FragmentSinkNode) root;
       return String.format(
@@ -158,7 +176,7 @@ public class FragmentInstance implements IConsensusRequest {
             "Region: %s ",
             getRegionReplicaSet() == null ? "Not set" : getRegionReplicaSet().getRegionId()));
     ret.append("\n---- Plan Node Tree ----\n");
-    ret.append(PlanNodeUtil.nodeToString(getFragment().getRoot()));
+    ret.append(PlanNodeUtil.nodeToString(getFragment().getPlanNodeTree()));
     ret.append(String.format("timeOut-%s:", getTimeOut()));
     return ret.toString();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
index f7a91edcbe..bbf3231695 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
@@ -37,24 +37,28 @@ import java.util.Objects;
 public class PlanFragment {
   // TODO once you add field for this class you need to change the serialize and deserialize methods
   private PlanFragmentId id;
-  private PlanNode root;
+  private PlanNode planNodeTree;
   private TypeProvider typeProvider;
 
-  public PlanFragment(PlanFragmentId id, PlanNode root) {
+  // indicate whether this PlanFragment is the root of the whole Fragment-Plan-Tree or not
+  private boolean isRoot;
+
+  public PlanFragment(PlanFragmentId id, PlanNode planNodeTree) {
     this.id = id;
-    this.root = root;
+    this.planNodeTree = planNodeTree;
+    this.isRoot = false;
   }
 
   public PlanFragmentId getId() {
     return id;
   }
 
-  public PlanNode getRoot() {
-    return root;
+  public PlanNode getPlanNodeTree() {
+    return planNodeTree;
   }
 
-  public void setRoot(PlanNode root) {
-    this.root = root;
+  public void setPlanNodeTree(PlanNode planNodeTree) {
+    this.planNodeTree = planNodeTree;
   }
 
   public TypeProvider getTypeProvider() {
@@ -65,6 +69,18 @@ public class PlanFragment {
     this.typeProvider = typeProvider;
   }
 
+  public void setId(PlanFragmentId id) {
+    this.id = id;
+  }
+
+  public boolean isRoot() {
+    return isRoot;
+  }
+
+  public void setRoot(boolean root) {
+    isRoot = root;
+  }
+
   @Override
   public String toString() {
     return String.format("PlanFragment-%s", getId());
@@ -76,7 +92,7 @@ public class PlanFragment {
   // and the DataRegions of all SourceNodes should be same in one PlanFragment.
   // So we can use the DataRegion of one SourceNode as the PlanFragment's DataRegion.
   public TRegionReplicaSet getTargetRegion() {
-    return getNodeRegion(root);
+    return getNodeRegion(planNodeTree);
   }
 
   private TRegionReplicaSet getNodeRegion(PlanNode root) {
@@ -93,7 +109,7 @@ public class PlanFragment {
   }
 
   public PlanNode getPlanNodeById(PlanNodeId nodeId) {
-    return getPlanNodeById(root, nodeId);
+    return getPlanNodeById(planNodeTree, nodeId);
   }
 
   private PlanNode getPlanNodeById(PlanNode root, PlanNodeId nodeId) {
@@ -111,7 +127,7 @@ public class PlanFragment {
 
   public void serialize(ByteBuffer byteBuffer) {
     id.serialize(byteBuffer);
-    root.serialize(byteBuffer);
+    planNodeTree.serialize(byteBuffer);
     if (typeProvider == null) {
       ReadWriteIOUtils.write((byte) 0, byteBuffer);
     } else {
@@ -122,7 +138,7 @@ public class PlanFragment {
 
   public void serialize(DataOutputStream stream) throws IOException {
     id.serialize(stream);
-    root.serialize(stream);
+    planNodeTree.serialize(stream);
     if (typeProvider == null) {
       ReadWriteIOUtils.write((byte) 0, stream);
     } else {
@@ -160,11 +176,11 @@ public class PlanFragment {
       return false;
     }
     PlanFragment that = (PlanFragment) o;
-    return Objects.equals(id, that.id) && Objects.equals(root, that.root);
+    return Objects.equals(id, that.id) && Objects.equals(planNodeTree, that.planNodeTree);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(id, root);
+    return Objects.hash(id, planNodeTree);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/SubPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/SubPlan.java
index 7ac6f92d0a..7b177618e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/SubPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/SubPlan.java
@@ -44,7 +44,7 @@ public class SubPlan {
     result.append(
         String.format(
             "SubPlan-%s. RootNodeId: %s\n",
-            planFragment.getId(), planFragment.getRoot().getPlanNodeId()));
+            planFragment.getId(), planFragment.getPlanNodeTree().getPlanNodeId()));
     children.forEach(result::append);
     return result.toString();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index 1df6c79d61..115774169d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInfo;
-import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
 import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -131,10 +130,6 @@ public class ClusterScheduler implements IScheduler {
     // The FragmentInstances has been dispatched successfully to corresponding host, we mark the
     // QueryState to Running
     stateMachine.transitionToRunning();
-    instances.forEach(
-        instance -> {
-          stateMachine.initialFragInstanceState(instance.getId(), FragmentInstanceState.RUNNING);
-        });
 
     // TODO: (xingtanzjr) start the stateFetcher/heartbeat for each fragment instance
     this.stateTracker.start();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
index fb91167a6d..08563d9361 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
@@ -34,8 +34,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -49,7 +50,7 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
   // TODO: (xingtanzjr) consider how much Interval is OK for state tracker
   private static final long STATE_FETCH_INTERVAL_IN_MS = 500;
   private ScheduledFuture<?> trackTask;
-  private final ConcurrentHashMap<FragmentInstanceId, InstanceStateMetrics> lastState;
+  private final Map<FragmentInstanceId, InstanceStateMetrics> instanceStateMap;
   private volatile boolean aborted;
 
   public FixedRateFragInsStateTracker(
@@ -59,7 +60,7 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
       IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
     super(stateMachine, scheduledExecutor, instances, internalServiceClientManager);
     this.aborted = false;
-    this.lastState = new ConcurrentHashMap<>();
+    this.instanceStateMap = new HashMap<>();
   }
 
   @Override
@@ -95,7 +96,9 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
     for (FragmentInstance instance : instances) {
       try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) {
         FragmentInstanceState state = fetchState(instance);
-        InstanceStateMetrics metrics = lastState.computeIfAbsent(instance.getId(), k -> new InstanceStateMetrics());
+        InstanceStateMetrics metrics =
+            instanceStateMap.computeIfAbsent(
+                instance.getId(), k -> new InstanceStateMetrics(instance.isRoot()));
         if (needPrintState(metrics.lastState, state, metrics.durationToLastPrintInMS)) {
           logger.info("State is {}", state);
           metrics.reset(state);
@@ -104,7 +107,7 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
         }
 
         if (state != null) {
-          stateMachine.updateFragInstanceState(instance.getId(), state);
+          updateQueryState(instance.getId(), state);
         }
       } catch (TException | IOException e) {
         // TODO: do nothing ?
@@ -113,6 +116,22 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
     }
   }
 
+  private void updateQueryState(FragmentInstanceId instanceId, FragmentInstanceState state) {
+    if (state.isFailed()) {
+      stateMachine.transitionToFailed(
+          new RuntimeException(String.format("FragmentInstance[%s] is failed.", instanceId)));
+    }
+    boolean queryFinished =
+        instanceStateMap.values().stream()
+            .filter(instanceStateMetrics -> instanceStateMetrics.isRootInstance)
+            .allMatch(
+                instanceStateMetrics ->
+                    instanceStateMetrics.lastState == FragmentInstanceState.FINISHED);
+    if (queryFinished) {
+      stateMachine.transitionToFinished();
+    }
+  }
+
   private boolean needPrintState(
       FragmentInstanceState previous, FragmentInstanceState current, long durationToLastPrintInMS) {
     if (current != previous) {
@@ -122,10 +141,12 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
   }
 
   private static class InstanceStateMetrics {
+    private final boolean isRootInstance;
     private FragmentInstanceState lastState;
     private long durationToLastPrintInMS;
 
-    private InstanceStateMetrics() {
+    private InstanceStateMetrics(boolean isRootInstance) {
+      this.isRootInstance = isRootInstance;
       this.lastState = null;
       this.durationToLastPrintInMS = 0L;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index ab5b2e8909..e2b5a88591 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -172,7 +172,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
         case WRITE:
           TSendPlanNodeReq sendPlanNodeReq =
               new TSendPlanNodeReq(
-                  new TPlanNode(instance.getFragment().getRoot().serializeToByteBuffer()),
+                  new TPlanNode(instance.getFragment().getPlanNodeTree().serializeToByteBuffer()),
                   instance.getRegionReplicaSet().getRegionId());
           TSendPlanNodeResp sendPlanNodeResp = client.sendPlanNode(sendPlanNodeReq);
           if (!sendPlanNodeResp.accepted) {
@@ -256,7 +256,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
         }
         break;
       case WRITE:
-        PlanNode planNode = instance.getFragment().getRoot();
+        PlanNode planNode = instance.getFragment().getPlanNodeTree();
         boolean hasFailedMeasurement = false;
         String partialInsertMessage = null;
         if (planNode instanceof InsertNode) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
index 5a1f6ee329..cf2ad5ca47 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInfo;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
-import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
 import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
 import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
@@ -116,10 +115,6 @@ public class StandaloneScheduler implements IScheduler {
         // The FragmentInstances has been dispatched successfully to corresponding host, we mark the
         stateMachine.transitionToRunning();
         LOGGER.info("{} transit to RUNNING", getLogHeader());
-        instances.forEach(
-            instance ->
-                stateMachine.initialFragInstanceState(
-                    instance.getId(), FragmentInstanceState.RUNNING));
         this.stateTracker.start();
         LOGGER.info("{} state tracker starts", getLogHeader());
         break;
@@ -133,7 +128,7 @@ public class StandaloneScheduler implements IScheduler {
         }
         try {
           for (FragmentInstance fragmentInstance : instances) {
-            PlanNode planNode = fragmentInstance.getFragment().getRoot();
+            PlanNode planNode = fragmentInstance.getFragment().getPlanNodeTree();
             ConsensusGroupId groupId =
                 ConsensusGroupId.Factory.createFromTConsensusGroupId(
                     fragmentInstance.getRegionReplicaSet().getRegionId());
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 1042cb81ab..0a14bda8e5 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -271,7 +271,6 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
   @Override
   public TCancelResp cancelQuery(TCancelQueryReq req) {
     try (SetThreadName threadName = new SetThreadName(req.getQueryId())) {
-      LOGGER.info("start cancelling query.");
       List<FragmentInstanceId> taskIds =
           req.getFragmentInstanceIds().stream()
               .map(FragmentInstanceId::fromThrift)
@@ -279,7 +278,6 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
       for (FragmentInstanceId taskId : taskIds) {
         FragmentInstanceManager.getInstance().cancelTask(taskId);
       }
-      LOGGER.info("finish cancelling query.");
       return new TCancelResp(true);
     }
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/QueryStateMachineTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/QueryStateMachineTest.java
index 8553d11be5..339a79b45b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/QueryStateMachineTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/QueryStateMachineTest.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
@@ -60,30 +59,6 @@ public class QueryStateMachineTest {
     Assert.assertEquals(stateMachine.getState(), QueryState.FINISHED);
   }
 
-  @Test
-  public void TestFragmentInstanceToFinished() {
-    List<FragmentInstanceId> instanceIds = genFragmentInstanceIdList();
-    QueryStateMachine stateMachine = genQueryStateMachine();
-    for (FragmentInstanceId id : instanceIds) {
-      stateMachine.initialFragInstanceState(id, FragmentInstanceState.RUNNING);
-    }
-    for (FragmentInstanceId id : instanceIds) {
-      stateMachine.updateFragInstanceState(id, FragmentInstanceState.FINISHED);
-    }
-    Assert.assertEquals(stateMachine.getState(), QueryState.FINISHED);
-  }
-
-  @Test
-  public void TestFragmentInstanceToTerminalState() {
-    List<FragmentInstanceId> instanceIds = genFragmentInstanceIdList();
-    QueryStateMachine stateMachine = genQueryStateMachine();
-    for (FragmentInstanceId id : instanceIds) {
-      stateMachine.initialFragInstanceState(id, FragmentInstanceState.RUNNING);
-    }
-    stateMachine.updateFragInstanceState(instanceIds.get(0), FragmentInstanceState.FAILED);
-    Assert.assertEquals(stateMachine.getState(), QueryState.FAILED);
-  }
-
   @Test
   public void TestListener() throws ExecutionException, InterruptedException {
     AtomicInteger stateChangeCounter = new AtomicInteger(0);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
index 82d20e210b..05dd7a5740 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
@@ -88,7 +88,8 @@ public class AggregationDistributionTest {
     expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
     expectedStep.put(d2s1Path, AggregationStep.PARTIAL);
     List<FragmentInstance> fragmentInstances = plan.getInstances();
-    fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+    fragmentInstances.forEach(
+        f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
   }
 
   private void verifyAggregationStep(Map<String, AggregationStep> expected, PlanNode root) {
@@ -138,13 +139,14 @@ public class AggregationDistributionTest {
     expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
     expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
     List<FragmentInstance> fragmentInstances = plan.getInstances();
-    fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+    fragmentInstances.forEach(
+        f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
     AggregationNode aggregationNode =
         (AggregationNode)
             fragmentInstances
                 .get(0)
                 .getFragment()
-                .getRoot()
+                .getPlanNodeTree()
                 .getChildren()
                 .get(0)
                 .getChildren()
@@ -175,7 +177,8 @@ public class AggregationDistributionTest {
     expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
     expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
     List<FragmentInstance> fragmentInstances = plan.getInstances();
-    fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+    fragmentInstances.forEach(
+        f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
   }
 
   @Test
@@ -198,7 +201,8 @@ public class AggregationDistributionTest {
     expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
     expectedStep.put(d4s1Path, AggregationStep.PARTIAL);
     List<FragmentInstance> fragmentInstances = plan.getInstances();
-    fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+    fragmentInstances.forEach(
+        f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
   }
 
   @Test
@@ -235,7 +239,8 @@ public class AggregationDistributionTest {
     expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
     expectedStep.put(d2s1Path, AggregationStep.PARTIAL);
     List<FragmentInstance> fragmentInstances = plan.getInstances();
-    fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+    fragmentInstances.forEach(
+        f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
   }
 
   @Test
@@ -272,19 +277,22 @@ public class AggregationDistributionTest {
     expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
     expectedStep.put(d4s1Path, AggregationStep.PARTIAL);
     List<FragmentInstance> fragmentInstances = plan.getInstances();
-    fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+    fragmentInstances.forEach(
+        f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
 
     Map<String, List<String>> expectedDescriptorValue = new HashMap<>();
     expectedDescriptorValue.put(groupedPath, Arrays.asList(groupedPath, d3s1Path, d4s1Path));
     verifyGroupByLevelDescriptor(
         expectedDescriptorValue,
-        (GroupByLevelNode) fragmentInstances.get(0).getFragment().getRoot().getChildren().get(0));
+        (GroupByLevelNode)
+            fragmentInstances.get(0).getFragment().getPlanNodeTree().getChildren().get(0));
 
     Map<String, List<String>> expectedDescriptorValue2 = new HashMap<>();
     expectedDescriptorValue2.put(groupedPath, Arrays.asList(d3s1Path, d4s1Path));
     verifyGroupByLevelDescriptor(
         expectedDescriptorValue2,
-        (GroupByLevelNode) fragmentInstances.get(1).getFragment().getRoot().getChildren().get(0));
+        (GroupByLevelNode)
+            fragmentInstances.get(1).getFragment().getPlanNodeTree().getChildren().get(0));
   }
 
   @Test
@@ -332,19 +340,22 @@ public class AggregationDistributionTest {
     expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
     expectedStep.put(d4s1Path, AggregationStep.PARTIAL);
     List<FragmentInstance> fragmentInstances = plan.getInstances();
-    fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+    fragmentInstances.forEach(
+        f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
 
     Map<String, List<String>> expectedDescriptorValue = new HashMap<>();
     expectedDescriptorValue.put(groupedPath, Arrays.asList(groupedPath, d3s1Path, d4s1Path));
     verifyGroupByLevelDescriptor(
         expectedDescriptorValue,
-        (GroupByLevelNode) fragmentInstances.get(0).getFragment().getRoot().getChildren().get(0));
+        (GroupByLevelNode)
+            fragmentInstances.get(0).getFragment().getPlanNodeTree().getChildren().get(0));
 
     Map<String, List<String>> expectedDescriptorValue2 = new HashMap<>();
     expectedDescriptorValue2.put(groupedPath, Arrays.asList(d3s1Path, d4s1Path));
     verifyGroupByLevelDescriptor(
         expectedDescriptorValue2,
-        (GroupByLevelNode) fragmentInstances.get(1).getFragment().getRoot().getChildren().get(0));
+        (GroupByLevelNode)
+            fragmentInstances.get(1).getFragment().getPlanNodeTree().getChildren().get(0));
 
     verifySlidingWindowDescriptor(
         Arrays.asList(d3s1Path, d4s1Path),
@@ -352,7 +363,7 @@ public class AggregationDistributionTest {
             fragmentInstances
                 .get(0)
                 .getFragment()
-                .getRoot()
+                .getPlanNodeTree()
                 .getChildren()
                 .get(0)
                 .getChildren()
@@ -363,7 +374,7 @@ public class AggregationDistributionTest {
             fragmentInstances
                 .get(1)
                 .getFragment()
-                .getRoot()
+                .getPlanNodeTree()
                 .getChildren()
                 .get(0)
                 .getChildren()
@@ -408,21 +419,24 @@ public class AggregationDistributionTest {
     expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
     expectedStep.put(d1s2Path, AggregationStep.PARTIAL);
     List<FragmentInstance> fragmentInstances = plan.getInstances();
-    fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+    fragmentInstances.forEach(
+        f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
 
     Map<String, List<String>> expectedDescriptorValue = new HashMap<>();
     expectedDescriptorValue.put(groupedPathS1, Arrays.asList(groupedPathS1, d1s1Path));
     expectedDescriptorValue.put(groupedPathS2, Arrays.asList(groupedPathS2, d1s2Path));
     verifyGroupByLevelDescriptor(
         expectedDescriptorValue,
-        (GroupByLevelNode) fragmentInstances.get(0).getFragment().getRoot().getChildren().get(0));
+        (GroupByLevelNode)
+            fragmentInstances.get(0).getFragment().getPlanNodeTree().getChildren().get(0));
 
     Map<String, List<String>> expectedDescriptorValue2 = new HashMap<>();
     expectedDescriptorValue2.put(groupedPathS1, Collections.singletonList(d1s1Path));
     expectedDescriptorValue2.put(groupedPathS2, Collections.singletonList(d1s2Path));
     verifyGroupByLevelDescriptor(
         expectedDescriptorValue2,
-        (GroupByLevelNode) fragmentInstances.get(1).getFragment().getRoot().getChildren().get(0));
+        (GroupByLevelNode)
+            fragmentInstances.get(1).getFragment().getPlanNodeTree().getChildren().get(0));
   }
 
   @Test
@@ -468,21 +482,24 @@ public class AggregationDistributionTest {
     expectedStep.put(d1s2Path, AggregationStep.PARTIAL);
     expectedStep.put(d2s1Path, AggregationStep.PARTIAL);
     List<FragmentInstance> fragmentInstances = plan.getInstances();
-    fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+    fragmentInstances.forEach(
+        f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
 
     Map<String, List<String>> expectedDescriptorValue = new HashMap<>();
     expectedDescriptorValue.put(groupedPathS1, Arrays.asList(groupedPathS1, d1s1Path, d2s1Path));
     expectedDescriptorValue.put(groupedPathS2, Arrays.asList(groupedPathS2, d1s2Path));
     verifyGroupByLevelDescriptor(
         expectedDescriptorValue,
-        (GroupByLevelNode) fragmentInstances.get(0).getFragment().getRoot().getChildren().get(0));
+        (GroupByLevelNode)
+            fragmentInstances.get(0).getFragment().getPlanNodeTree().getChildren().get(0));
 
     Map<String, List<String>> expectedDescriptorValue2 = new HashMap<>();
     expectedDescriptorValue2.put(groupedPathS1, Collections.singletonList(d1s1Path));
     expectedDescriptorValue2.put(groupedPathS2, Collections.singletonList(d1s2Path));
     verifyGroupByLevelDescriptor(
         expectedDescriptorValue2,
-        (GroupByLevelNode) fragmentInstances.get(2).getFragment().getRoot().getChildren().get(0));
+        (GroupByLevelNode)
+            fragmentInstances.get(2).getFragment().getPlanNodeTree().getChildren().get(0));
   }
 
   @Test
@@ -540,27 +557,31 @@ public class AggregationDistributionTest {
     expectedStep.put(d1s2Path, AggregationStep.PARTIAL);
     expectedStep.put(d2s1Path, AggregationStep.PARTIAL);
     List<FragmentInstance> fragmentInstances = plan.getInstances();
-    fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+    fragmentInstances.forEach(
+        f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
 
     Map<String, List<String>> expectedDescriptorValue = new HashMap<>();
     expectedDescriptorValue.put(groupedPathS1, Arrays.asList(groupedPathS1, d1s1Path));
     expectedDescriptorValue.put(groupedPathS2, Arrays.asList(groupedPathS2, d1s2Path));
     verifyGroupByLevelDescriptor(
         expectedDescriptorValue,
-        (GroupByLevelNode) fragmentInstances.get(0).getFragment().getRoot().getChildren().get(0));
+        (GroupByLevelNode)
+            fragmentInstances.get(0).getFragment().getPlanNodeTree().getChildren().get(0));
 
     Map<String, List<String>> expectedDescriptorValue2 = new HashMap<>();
     expectedDescriptorValue2.put(groupedPathS1, Collections.singletonList(d2s1Path));
     verifyGroupByLevelDescriptor(
         expectedDescriptorValue2,
-        (GroupByLevelNode) fragmentInstances.get(1).getFragment().getRoot().getChildren().get(0));
+        (GroupByLevelNode)
+            fragmentInstances.get(1).getFragment().getPlanNodeTree().getChildren().get(0));
 
     Map<String, List<String>> expectedDescriptorValue3 = new HashMap<>();
     expectedDescriptorValue3.put(groupedPathS1, Collections.singletonList(d1s1Path));
     expectedDescriptorValue3.put(groupedPathS2, Collections.singletonList(d1s2Path));
     verifyGroupByLevelDescriptor(
         expectedDescriptorValue3,
-        (GroupByLevelNode) fragmentInstances.get(2).getFragment().getRoot().getChildren().get(0));
+        (GroupByLevelNode)
+            fragmentInstances.get(2).getFragment().getPlanNodeTree().getChildren().get(0));
 
     verifySlidingWindowDescriptor(
         Arrays.asList(d1s1Path, d1s2Path),
@@ -568,7 +589,7 @@ public class AggregationDistributionTest {
             fragmentInstances
                 .get(0)
                 .getFragment()
-                .getRoot()
+                .getPlanNodeTree()
                 .getChildren()
                 .get(0)
                 .getChildren()
@@ -579,7 +600,7 @@ public class AggregationDistributionTest {
             fragmentInstances
                 .get(1)
                 .getFragment()
-                .getRoot()
+                .getPlanNodeTree()
                 .getChildren()
                 .get(0)
                 .getChildren()
@@ -590,7 +611,7 @@ public class AggregationDistributionTest {
             fragmentInstances
                 .get(2)
                 .getFragment()
-                .getRoot()
+                .getPlanNodeTree()
                 .getChildren()
                 .get(0)
                 .getChildren()
@@ -610,7 +631,8 @@ public class AggregationDistributionTest {
         new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
     DistributedQueryPlan plan = planner.planFragments();
     assertEquals(1, plan.getInstances().size());
-    assertEquals(root, plan.getInstances().get(0).getFragment().getRoot().getChildren().get(0));
+    assertEquals(
+        root, plan.getInstances().get(0).getFragment().getPlanNodeTree().getChildren().get(0));
   }
 
   private void verifyGroupByLevelDescriptor(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/LastQueryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/LastQueryTest.java
index c321bb67bb..b150d8d6f8 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/LastQueryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/LastQueryTest.java
@@ -62,7 +62,13 @@ public class LastQueryTest {
     DistributedQueryPlan distributedQueryPlan = planner.planFragments();
     Assert.assertEquals(1, distributedQueryPlan.getInstances().size());
     Assert.assertTrue(
-        distributedQueryPlan.getInstances().get(0).getFragment().getRoot().getChildren().get(0)
+        distributedQueryPlan
+                .getInstances()
+                .get(0)
+                .getFragment()
+                .getPlanNodeTree()
+                .getChildren()
+                .get(0)
             instanceof LastQueryNode);
   }
 
@@ -80,7 +86,13 @@ public class LastQueryTest {
     DistributedQueryPlan distributedQueryPlan = planner.planFragments();
     Assert.assertEquals(2, distributedQueryPlan.getInstances().size());
     PlanNode rootNode =
-        distributedQueryPlan.getInstances().get(0).getFragment().getRoot().getChildren().get(0);
+        distributedQueryPlan
+            .getInstances()
+            .get(0)
+            .getFragment()
+            .getPlanNodeTree()
+            .getChildren()
+            .get(0);
     Assert.assertTrue(rootNode instanceof LastQueryMergeNode);
     rootNode
         .getChildren()
@@ -104,7 +116,13 @@ public class LastQueryTest {
     DistributedQueryPlan distributedQueryPlan = planner.planFragments();
     Assert.assertEquals(3, distributedQueryPlan.getInstances().size());
     PlanNode rootNode =
-        distributedQueryPlan.getInstances().get(0).getFragment().getRoot().getChildren().get(0);
+        distributedQueryPlan
+            .getInstances()
+            .get(0)
+            .getFragment()
+            .getPlanNodeTree()
+            .getChildren()
+            .get(0);
     Assert.assertTrue(rootNode instanceof LastQueryMergeNode);
     rootNode
         .getChildren()
@@ -128,7 +146,13 @@ public class LastQueryTest {
     DistributedQueryPlan distributedQueryPlan = planner.planFragments();
     Assert.assertEquals(2, distributedQueryPlan.getInstances().size());
     PlanNode rootNode =
-        distributedQueryPlan.getInstances().get(0).getFragment().getRoot().getChildren().get(0);
+        distributedQueryPlan
+            .getInstances()
+            .get(0)
+            .getFragment()
+            .getPlanNodeTree()
+            .getChildren()
+            .get(0);
     Assert.assertTrue(rootNode instanceof LastQueryMergeNode);
     rootNode
         .getChildren()
@@ -156,7 +180,13 @@ public class LastQueryTest {
     DistributedQueryPlan distributedQueryPlan = planner.planFragments();
     Assert.assertEquals(2, distributedQueryPlan.getInstances().size());
     PlanNode rootNode =
-        distributedQueryPlan.getInstances().get(0).getFragment().getRoot().getChildren().get(0);
+        distributedQueryPlan
+            .getInstances()
+            .get(0)
+            .getFragment()
+            .getPlanNodeTree()
+            .getChildren()
+            .get(0);
     Assert.assertTrue(rootNode instanceof LastQueryCollectNode);
     rootNode
         .getChildren()