You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/22 08:20:00 UTC
[iotdb] branch xingtanzjr/query_state updated: optimize the StateTracker both in print and judgement
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/query_state
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/xingtanzjr/query_state by this push:
new cea6728101 optimize the StateTracker both in print and judgement
cea6728101 is described below
commit cea67281019deaeff29f4abe48cd7254fb517e94
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Aug 22 16:19:45 2022 +0800
optimize the StateTracker both in print and judgement
---
.../iotdb/db/mpp/execution/QueryStateMachine.java | 25 -------
.../fragment/FragmentInstanceManager.java | 4 +-
.../planner/distribution/DistributionPlanner.java | 6 +-
.../SimpleFragmentParallelPlanner.java | 9 +--
.../distribution/WriteFragmentParallelPlanner.java | 2 +-
.../db/mpp/plan/planner/plan/FragmentInstance.java | 22 +++++-
.../db/mpp/plan/planner/plan/PlanFragment.java | 42 ++++++++----
.../iotdb/db/mpp/plan/planner/plan/SubPlan.java | 2 +-
.../db/mpp/plan/scheduler/ClusterScheduler.java | 5 --
.../scheduler/FixedRateFragInsStateTracker.java | 33 +++++++--
.../scheduler/FragmentInstanceDispatcherImpl.java | 4 +-
.../db/mpp/plan/scheduler/StandaloneScheduler.java | 7 +-
.../impl/DataNodeInternalRPCServiceImpl.java | 2 -
.../db/mpp/execution/QueryStateMachineTest.java | 25 -------
.../distribution/AggregationDistributionTest.java | 78 ++++++++++++++--------
.../mpp/plan/plan/distribution/LastQueryTest.java | 40 +++++++++--
16 files changed, 177 insertions(+), 129 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index 703255afe7..d5aaa093fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -19,14 +19,10 @@
package org.apache.iotdb.db.mpp.execution;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
import com.google.common.util.concurrent.ListenableFuture;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
@@ -37,7 +33,6 @@ import java.util.concurrent.ExecutorService;
public class QueryStateMachine {
private final String name;
private final StateMachine<QueryState> queryState;
- private final Map<FragmentInstanceId, FragmentInstanceState> fragInstanceStateMap;
// The executor will be used in all the state machines belonged to this query.
private Executor stateMachineExecutor;
@@ -47,7 +42,6 @@ public class QueryStateMachine {
public QueryStateMachine(QueryId queryId, ExecutorService executor) {
this.name = String.format("QueryStateMachine[%s]", queryId);
this.stateMachineExecutor = executor;
- this.fragInstanceStateMap = new ConcurrentHashMap<>();
this.queryState =
new StateMachine<>(
queryId.toString(),
@@ -56,25 +50,6 @@ public class QueryStateMachine {
QueryState.TERMINAL_INSTANCE_STATES);
}
- public void initialFragInstanceState(FragmentInstanceId id, FragmentInstanceState state) {
- this.fragInstanceStateMap.put(id, state);
- }
-
- public void updateFragInstanceState(FragmentInstanceId id, FragmentInstanceState state) {
- this.fragInstanceStateMap.put(id, state);
- // TODO: (xingtanzjr) we need to distinguish the Timeout situation
- if (state.isFailed()) {
- transitionToFailed(
- new RuntimeException(String.format("FragmentInstance[%s] is failed.", id)));
- }
- boolean allFinished =
- fragInstanceStateMap.values().stream()
- .allMatch(currentState -> currentState == FragmentInstanceState.FINISHED);
- if (allFinished) {
- transitionToFinished();
- }
- }
-
public void addStateChangeListener(
StateMachine.StateChangeListener<QueryState> stateChangeListener) {
queryState.addStateChangeListener(stateChangeListener);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index ca164e9e51..6506e7f619 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -102,7 +102,7 @@ public class FragmentInstanceManager {
try {
DataDriver driver =
planner.plan(
- instance.getFragment().getRoot(),
+ instance.getFragment().getPlanNodeTree(),
instance.getFragment().getTypeProvider(),
context,
instance.getTimeFilter(),
@@ -144,7 +144,7 @@ public class FragmentInstanceManager {
try {
SchemaDriver driver =
- planner.plan(instance.getFragment().getRoot(), context, schemaRegion);
+ planner.plan(instance.getFragment().getPlanNodeTree(), context, schemaRegion);
return createFragmentInstanceExecution(
scheduler,
instanceId,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
index 2aad1c8a85..60882f5cdf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
@@ -73,6 +73,8 @@ public class DistributionPlanner {
.setColumnToTsBlockIndexMap(rootWithExchange.getOutputColumnNames());
}
SubPlan subPlan = splitFragment(rootWithExchange);
+ // Mark the root Fragment of root SubPlan as `root`
+ subPlan.getPlanFragment().setRoot(true);
List<FragmentInstance> fragmentInstances = planFragmentInstances(subPlan);
// Only execute this step for READ operation
if (context.getQueryType() == QueryType.READ) {
@@ -111,14 +113,14 @@ public class DistributionPlanner {
context.getLocalDataBlockEndpoint(),
context.getResultNodeContext().getVirtualFragmentInstanceId(),
context.getResultNodeContext().getVirtualResultNodeId());
- sinkNode.setChild(rootInstance.getFragment().getRoot());
+ sinkNode.setChild(rootInstance.getFragment().getPlanNodeTree());
context
.getResultNodeContext()
.setUpStream(
rootInstance.getHostDataNode().mPPDataExchangeEndPoint,
rootInstance.getId(),
sinkNode.getPlanNodeId());
- rootInstance.getFragment().setRoot(sinkNode);
+ rootInstance.getFragment().setPlanNodeTree(sinkNode);
}
private PlanFragmentId getNextFragmentId() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index e49b8b90e8..d5c73f6822 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -82,7 +82,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
private void prepare() {
List<PlanFragment> fragments = subPlan.getPlanFragmentList();
for (PlanFragment fragment : fragments) {
- recordPlanNodeRelation(fragment.getRoot(), fragment.getId());
+ recordPlanNodeRelation(fragment.getPlanNodeTree(), fragment.getId());
produceFragmentInstance(fragment);
}
}
@@ -90,7 +90,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
private void produceFragmentInstance(PlanFragment fragment) {
// If one PlanFragment will produce several FragmentInstance, the instanceIdx will be increased
// one by one
- PlanNode rootCopy = PlanNodeUtil.deepCopy(fragment.getRoot());
+ PlanNode rootCopy = PlanNodeUtil.deepCopy(fragment.getPlanNodeTree());
Filter timeFilter = analysis.getGlobalTimeFilter();
FragmentInstance fragmentInstance =
new FragmentInstance(
@@ -98,7 +98,8 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
fragment.getId().genFragmentInstanceId(),
timeFilter,
queryContext.getQueryType(),
- queryContext.getTimeOut());
+ queryContext.getTimeOut(),
+ fragment.isRoot());
// Get the target region for origin PlanFragment, then its instance will be distributed one
// of them.
@@ -174,7 +175,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
private void calculateNodeTopologyBetweenInstance() {
for (FragmentInstance instance : fragmentInstanceList) {
- PlanNode rootNode = instance.getFragment().getRoot();
+ PlanNode rootNode = instance.getFragment().getPlanNodeTree();
if (rootNode instanceof FragmentSinkNode) {
// Set target Endpoint for FragmentSinkNode
FragmentSinkNode sinkNode = (FragmentSinkNode) rootNode;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
index 4f043a05b9..0bbf402818 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
@@ -49,7 +49,7 @@ public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner {
public List<FragmentInstance> parallelPlan() {
PlanFragment fragment = subPlan.getPlanFragment();
Filter timeFilter = analysis.getGlobalTimeFilter();
- PlanNode node = fragment.getRoot();
+ PlanNode node = fragment.getPlanNodeTree();
if (!(node instanceof WritePlanNode)) {
throw new IllegalArgumentException("PlanNode should be IWritePlanNode in WRITE operation");
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
index bcc54931de..3f43e8245c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
@@ -63,6 +63,8 @@ public class FragmentInstance implements IConsensusRequest {
private final long timeOut;
+ private boolean isRoot;
+
// We can add some more params for a specific FragmentInstance
// So that we can make different FragmentInstance owns different data range.
@@ -77,6 +79,18 @@ public class FragmentInstance implements IConsensusRequest {
this.id = id;
this.type = type;
this.timeOut = timeOut > 0 ? timeOut : config.getQueryTimeoutThreshold();
+ this.isRoot = false;
+ }
+
+ public FragmentInstance(
+ PlanFragment fragment,
+ FragmentInstanceId id,
+ Filter timeFilter,
+ QueryType type,
+ long timeOut,
+ boolean isRoot) {
+ this(fragment, id, timeFilter, type, timeOut);
+ this.isRoot = isRoot;
}
public TRegionReplicaSet getDataRegionId() {
@@ -119,8 +133,12 @@ public class FragmentInstance implements IConsensusRequest {
return id;
}
+ public boolean isRoot() {
+ return isRoot;
+ }
+
public String getDownstreamInfo() {
- PlanNode root = getFragment().getRoot();
+ PlanNode root = getFragment().getPlanNodeTree();
if (root instanceof FragmentSinkNode) {
FragmentSinkNode sink = (FragmentSinkNode) root;
return String.format(
@@ -158,7 +176,7 @@ public class FragmentInstance implements IConsensusRequest {
"Region: %s ",
getRegionReplicaSet() == null ? "Not set" : getRegionReplicaSet().getRegionId()));
ret.append("\n---- Plan Node Tree ----\n");
- ret.append(PlanNodeUtil.nodeToString(getFragment().getRoot()));
+ ret.append(PlanNodeUtil.nodeToString(getFragment().getPlanNodeTree()));
ret.append(String.format("timeOut-%s:", getTimeOut()));
return ret.toString();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
index f7a91edcbe..bbf3231695 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
@@ -37,24 +37,28 @@ import java.util.Objects;
public class PlanFragment {
// TODO once you add field for this class you need to change the serialize and deserialize methods
private PlanFragmentId id;
- private PlanNode root;
+ private PlanNode planNodeTree;
private TypeProvider typeProvider;
- public PlanFragment(PlanFragmentId id, PlanNode root) {
+ // indicate whether this PlanFragment is the root of the whole Fragment-Plan-Tree or not
+ private boolean isRoot;
+
+ public PlanFragment(PlanFragmentId id, PlanNode planNodeTree) {
this.id = id;
- this.root = root;
+ this.planNodeTree = planNodeTree;
+ this.isRoot = false;
}
public PlanFragmentId getId() {
return id;
}
- public PlanNode getRoot() {
- return root;
+ public PlanNode getPlanNodeTree() {
+ return planNodeTree;
}
- public void setRoot(PlanNode root) {
- this.root = root;
+ public void setPlanNodeTree(PlanNode planNodeTree) {
+ this.planNodeTree = planNodeTree;
}
public TypeProvider getTypeProvider() {
@@ -65,6 +69,18 @@ public class PlanFragment {
this.typeProvider = typeProvider;
}
+ public void setId(PlanFragmentId id) {
+ this.id = id;
+ }
+
+ public boolean isRoot() {
+ return isRoot;
+ }
+
+ public void setRoot(boolean root) {
+ isRoot = root;
+ }
+
@Override
public String toString() {
return String.format("PlanFragment-%s", getId());
@@ -76,7 +92,7 @@ public class PlanFragment {
// and the DataRegions of all SourceNodes should be same in one PlanFragment.
// So we can use the DataRegion of one SourceNode as the PlanFragment's DataRegion.
public TRegionReplicaSet getTargetRegion() {
- return getNodeRegion(root);
+ return getNodeRegion(planNodeTree);
}
private TRegionReplicaSet getNodeRegion(PlanNode root) {
@@ -93,7 +109,7 @@ public class PlanFragment {
}
public PlanNode getPlanNodeById(PlanNodeId nodeId) {
- return getPlanNodeById(root, nodeId);
+ return getPlanNodeById(planNodeTree, nodeId);
}
private PlanNode getPlanNodeById(PlanNode root, PlanNodeId nodeId) {
@@ -111,7 +127,7 @@ public class PlanFragment {
public void serialize(ByteBuffer byteBuffer) {
id.serialize(byteBuffer);
- root.serialize(byteBuffer);
+ planNodeTree.serialize(byteBuffer);
if (typeProvider == null) {
ReadWriteIOUtils.write((byte) 0, byteBuffer);
} else {
@@ -122,7 +138,7 @@ public class PlanFragment {
public void serialize(DataOutputStream stream) throws IOException {
id.serialize(stream);
- root.serialize(stream);
+ planNodeTree.serialize(stream);
if (typeProvider == null) {
ReadWriteIOUtils.write((byte) 0, stream);
} else {
@@ -160,11 +176,11 @@ public class PlanFragment {
return false;
}
PlanFragment that = (PlanFragment) o;
- return Objects.equals(id, that.id) && Objects.equals(root, that.root);
+ return Objects.equals(id, that.id) && Objects.equals(planNodeTree, that.planNodeTree);
}
@Override
public int hashCode() {
- return Objects.hash(id, root);
+ return Objects.hash(id, planNodeTree);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/SubPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/SubPlan.java
index 7ac6f92d0a..7b177618e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/SubPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/SubPlan.java
@@ -44,7 +44,7 @@ public class SubPlan {
result.append(
String.format(
"SubPlan-%s. RootNodeId: %s\n",
- planFragment.getId(), planFragment.getRoot().getPlanNodeId()));
+ planFragment.getId(), planFragment.getPlanNodeTree().getPlanNodeId()));
children.forEach(result::append);
return result.toString();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index 1df6c79d61..115774169d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInfo;
-import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -131,10 +130,6 @@ public class ClusterScheduler implements IScheduler {
// The FragmentInstances has been dispatched successfully to corresponding host, we mark the
// QueryState to Running
stateMachine.transitionToRunning();
- instances.forEach(
- instance -> {
- stateMachine.initialFragInstanceState(instance.getId(), FragmentInstanceState.RUNNING);
- });
// TODO: (xingtanzjr) start the stateFetcher/heartbeat for each fragment instance
this.stateTracker.start();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
index fb91167a6d..08563d9361 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
@@ -34,8 +34,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -49,7 +50,7 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
// TODO: (xingtanzjr) consider how much Interval is OK for state tracker
private static final long STATE_FETCH_INTERVAL_IN_MS = 500;
private ScheduledFuture<?> trackTask;
- private final ConcurrentHashMap<FragmentInstanceId, InstanceStateMetrics> lastState;
+ private final Map<FragmentInstanceId, InstanceStateMetrics> instanceStateMap;
private volatile boolean aborted;
public FixedRateFragInsStateTracker(
@@ -59,7 +60,7 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
super(stateMachine, scheduledExecutor, instances, internalServiceClientManager);
this.aborted = false;
- this.lastState = new ConcurrentHashMap<>();
+ this.instanceStateMap = new HashMap<>();
}
@Override
@@ -95,7 +96,9 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
for (FragmentInstance instance : instances) {
try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) {
FragmentInstanceState state = fetchState(instance);
- InstanceStateMetrics metrics = lastState.computeIfAbsent(instance.getId(), k -> new InstanceStateMetrics());
+ InstanceStateMetrics metrics =
+ instanceStateMap.computeIfAbsent(
+ instance.getId(), k -> new InstanceStateMetrics(instance.isRoot()));
if (needPrintState(metrics.lastState, state, metrics.durationToLastPrintInMS)) {
logger.info("State is {}", state);
metrics.reset(state);
@@ -104,7 +107,7 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
}
if (state != null) {
- stateMachine.updateFragInstanceState(instance.getId(), state);
+ updateQueryState(instance.getId(), state);
}
} catch (TException | IOException e) {
// TODO: do nothing ?
@@ -113,6 +116,22 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
}
}
+ private void updateQueryState(FragmentInstanceId instanceId, FragmentInstanceState state) {
+ if (state.isFailed()) {
+ stateMachine.transitionToFailed(
+ new RuntimeException(String.format("FragmentInstance[%s] is failed.", instanceId)));
+ }
+ boolean queryFinished =
+ instanceStateMap.values().stream()
+ .filter(instanceStateMetrics -> instanceStateMetrics.isRootInstance)
+ .allMatch(
+ instanceStateMetrics ->
+ instanceStateMetrics.lastState == FragmentInstanceState.FINISHED);
+ if (queryFinished) {
+ stateMachine.transitionToFinished();
+ }
+ }
+
private boolean needPrintState(
FragmentInstanceState previous, FragmentInstanceState current, long durationToLastPrintInMS) {
if (current != previous) {
@@ -122,10 +141,12 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
}
private static class InstanceStateMetrics {
+ private final boolean isRootInstance;
private FragmentInstanceState lastState;
private long durationToLastPrintInMS;
- private InstanceStateMetrics() {
+ private InstanceStateMetrics(boolean isRootInstance) {
+ this.isRootInstance = isRootInstance;
this.lastState = null;
this.durationToLastPrintInMS = 0L;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index ab5b2e8909..e2b5a88591 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -172,7 +172,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
case WRITE:
TSendPlanNodeReq sendPlanNodeReq =
new TSendPlanNodeReq(
- new TPlanNode(instance.getFragment().getRoot().serializeToByteBuffer()),
+ new TPlanNode(instance.getFragment().getPlanNodeTree().serializeToByteBuffer()),
instance.getRegionReplicaSet().getRegionId());
TSendPlanNodeResp sendPlanNodeResp = client.sendPlanNode(sendPlanNodeReq);
if (!sendPlanNodeResp.accepted) {
@@ -256,7 +256,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
}
break;
case WRITE:
- PlanNode planNode = instance.getFragment().getRoot();
+ PlanNode planNode = instance.getFragment().getPlanNodeTree();
boolean hasFailedMeasurement = false;
String partialInsertMessage = null;
if (planNode instanceof InsertNode) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
index 5a1f6ee329..cf2ad5ca47 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInfo;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
-import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
@@ -116,10 +115,6 @@ public class StandaloneScheduler implements IScheduler {
// The FragmentInstances has been dispatched successfully to corresponding host, we mark the
stateMachine.transitionToRunning();
LOGGER.info("{} transit to RUNNING", getLogHeader());
- instances.forEach(
- instance ->
- stateMachine.initialFragInstanceState(
- instance.getId(), FragmentInstanceState.RUNNING));
this.stateTracker.start();
LOGGER.info("{} state tracker starts", getLogHeader());
break;
@@ -133,7 +128,7 @@ public class StandaloneScheduler implements IScheduler {
}
try {
for (FragmentInstance fragmentInstance : instances) {
- PlanNode planNode = fragmentInstance.getFragment().getRoot();
+ PlanNode planNode = fragmentInstance.getFragment().getPlanNodeTree();
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(
fragmentInstance.getRegionReplicaSet().getRegionId());
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 1042cb81ab..0a14bda8e5 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -271,7 +271,6 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
@Override
public TCancelResp cancelQuery(TCancelQueryReq req) {
try (SetThreadName threadName = new SetThreadName(req.getQueryId())) {
- LOGGER.info("start cancelling query.");
List<FragmentInstanceId> taskIds =
req.getFragmentInstanceIds().stream()
.map(FragmentInstanceId::fromThrift)
@@ -279,7 +278,6 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
for (FragmentInstanceId taskId : taskIds) {
FragmentInstanceManager.getInstance().cancelTask(taskId);
}
- LOGGER.info("finish cancelling query.");
return new TCancelResp(true);
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/QueryStateMachineTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/QueryStateMachineTest.java
index 8553d11be5..339a79b45b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/QueryStateMachineTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/QueryStateMachineTest.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
@@ -60,30 +59,6 @@ public class QueryStateMachineTest {
Assert.assertEquals(stateMachine.getState(), QueryState.FINISHED);
}
- @Test
- public void TestFragmentInstanceToFinished() {
- List<FragmentInstanceId> instanceIds = genFragmentInstanceIdList();
- QueryStateMachine stateMachine = genQueryStateMachine();
- for (FragmentInstanceId id : instanceIds) {
- stateMachine.initialFragInstanceState(id, FragmentInstanceState.RUNNING);
- }
- for (FragmentInstanceId id : instanceIds) {
- stateMachine.updateFragInstanceState(id, FragmentInstanceState.FINISHED);
- }
- Assert.assertEquals(stateMachine.getState(), QueryState.FINISHED);
- }
-
- @Test
- public void TestFragmentInstanceToTerminalState() {
- List<FragmentInstanceId> instanceIds = genFragmentInstanceIdList();
- QueryStateMachine stateMachine = genQueryStateMachine();
- for (FragmentInstanceId id : instanceIds) {
- stateMachine.initialFragInstanceState(id, FragmentInstanceState.RUNNING);
- }
- stateMachine.updateFragInstanceState(instanceIds.get(0), FragmentInstanceState.FAILED);
- Assert.assertEquals(stateMachine.getState(), QueryState.FAILED);
- }
-
@Test
public void TestListener() throws ExecutionException, InterruptedException {
AtomicInteger stateChangeCounter = new AtomicInteger(0);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
index 82d20e210b..05dd7a5740 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
@@ -88,7 +88,8 @@ public class AggregationDistributionTest {
expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
expectedStep.put(d2s1Path, AggregationStep.PARTIAL);
List<FragmentInstance> fragmentInstances = plan.getInstances();
- fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+ fragmentInstances.forEach(
+ f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
}
private void verifyAggregationStep(Map<String, AggregationStep> expected, PlanNode root) {
@@ -138,13 +139,14 @@ public class AggregationDistributionTest {
expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
List<FragmentInstance> fragmentInstances = plan.getInstances();
- fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+ fragmentInstances.forEach(
+ f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
AggregationNode aggregationNode =
(AggregationNode)
fragmentInstances
.get(0)
.getFragment()
- .getRoot()
+ .getPlanNodeTree()
.getChildren()
.get(0)
.getChildren()
@@ -175,7 +177,8 @@ public class AggregationDistributionTest {
expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
List<FragmentInstance> fragmentInstances = plan.getInstances();
- fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+ fragmentInstances.forEach(
+ f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
}
@Test
@@ -198,7 +201,8 @@ public class AggregationDistributionTest {
expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
expectedStep.put(d4s1Path, AggregationStep.PARTIAL);
List<FragmentInstance> fragmentInstances = plan.getInstances();
- fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+ fragmentInstances.forEach(
+ f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
}
@Test
@@ -235,7 +239,8 @@ public class AggregationDistributionTest {
expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
expectedStep.put(d2s1Path, AggregationStep.PARTIAL);
List<FragmentInstance> fragmentInstances = plan.getInstances();
- fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+ fragmentInstances.forEach(
+ f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
}
@Test
@@ -272,19 +277,22 @@ public class AggregationDistributionTest {
expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
expectedStep.put(d4s1Path, AggregationStep.PARTIAL);
List<FragmentInstance> fragmentInstances = plan.getInstances();
- fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+ fragmentInstances.forEach(
+ f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
Map<String, List<String>> expectedDescriptorValue = new HashMap<>();
expectedDescriptorValue.put(groupedPath, Arrays.asList(groupedPath, d3s1Path, d4s1Path));
verifyGroupByLevelDescriptor(
expectedDescriptorValue,
- (GroupByLevelNode) fragmentInstances.get(0).getFragment().getRoot().getChildren().get(0));
+ (GroupByLevelNode)
+ fragmentInstances.get(0).getFragment().getPlanNodeTree().getChildren().get(0));
Map<String, List<String>> expectedDescriptorValue2 = new HashMap<>();
expectedDescriptorValue2.put(groupedPath, Arrays.asList(d3s1Path, d4s1Path));
verifyGroupByLevelDescriptor(
expectedDescriptorValue2,
- (GroupByLevelNode) fragmentInstances.get(1).getFragment().getRoot().getChildren().get(0));
+ (GroupByLevelNode)
+ fragmentInstances.get(1).getFragment().getPlanNodeTree().getChildren().get(0));
}
@Test
@@ -332,19 +340,22 @@ public class AggregationDistributionTest {
expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
expectedStep.put(d4s1Path, AggregationStep.PARTIAL);
List<FragmentInstance> fragmentInstances = plan.getInstances();
- fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+ fragmentInstances.forEach(
+ f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
Map<String, List<String>> expectedDescriptorValue = new HashMap<>();
expectedDescriptorValue.put(groupedPath, Arrays.asList(groupedPath, d3s1Path, d4s1Path));
verifyGroupByLevelDescriptor(
expectedDescriptorValue,
- (GroupByLevelNode) fragmentInstances.get(0).getFragment().getRoot().getChildren().get(0));
+ (GroupByLevelNode)
+ fragmentInstances.get(0).getFragment().getPlanNodeTree().getChildren().get(0));
Map<String, List<String>> expectedDescriptorValue2 = new HashMap<>();
expectedDescriptorValue2.put(groupedPath, Arrays.asList(d3s1Path, d4s1Path));
verifyGroupByLevelDescriptor(
expectedDescriptorValue2,
- (GroupByLevelNode) fragmentInstances.get(1).getFragment().getRoot().getChildren().get(0));
+ (GroupByLevelNode)
+ fragmentInstances.get(1).getFragment().getPlanNodeTree().getChildren().get(0));
verifySlidingWindowDescriptor(
Arrays.asList(d3s1Path, d4s1Path),
@@ -352,7 +363,7 @@ public class AggregationDistributionTest {
fragmentInstances
.get(0)
.getFragment()
- .getRoot()
+ .getPlanNodeTree()
.getChildren()
.get(0)
.getChildren()
@@ -363,7 +374,7 @@ public class AggregationDistributionTest {
fragmentInstances
.get(1)
.getFragment()
- .getRoot()
+ .getPlanNodeTree()
.getChildren()
.get(0)
.getChildren()
@@ -408,21 +419,24 @@ public class AggregationDistributionTest {
expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
expectedStep.put(d1s2Path, AggregationStep.PARTIAL);
List<FragmentInstance> fragmentInstances = plan.getInstances();
- fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+ fragmentInstances.forEach(
+ f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
Map<String, List<String>> expectedDescriptorValue = new HashMap<>();
expectedDescriptorValue.put(groupedPathS1, Arrays.asList(groupedPathS1, d1s1Path));
expectedDescriptorValue.put(groupedPathS2, Arrays.asList(groupedPathS2, d1s2Path));
verifyGroupByLevelDescriptor(
expectedDescriptorValue,
- (GroupByLevelNode) fragmentInstances.get(0).getFragment().getRoot().getChildren().get(0));
+ (GroupByLevelNode)
+ fragmentInstances.get(0).getFragment().getPlanNodeTree().getChildren().get(0));
Map<String, List<String>> expectedDescriptorValue2 = new HashMap<>();
expectedDescriptorValue2.put(groupedPathS1, Collections.singletonList(d1s1Path));
expectedDescriptorValue2.put(groupedPathS2, Collections.singletonList(d1s2Path));
verifyGroupByLevelDescriptor(
expectedDescriptorValue2,
- (GroupByLevelNode) fragmentInstances.get(1).getFragment().getRoot().getChildren().get(0));
+ (GroupByLevelNode)
+ fragmentInstances.get(1).getFragment().getPlanNodeTree().getChildren().get(0));
}
@Test
@@ -468,21 +482,24 @@ public class AggregationDistributionTest {
expectedStep.put(d1s2Path, AggregationStep.PARTIAL);
expectedStep.put(d2s1Path, AggregationStep.PARTIAL);
List<FragmentInstance> fragmentInstances = plan.getInstances();
- fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+ fragmentInstances.forEach(
+ f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
Map<String, List<String>> expectedDescriptorValue = new HashMap<>();
expectedDescriptorValue.put(groupedPathS1, Arrays.asList(groupedPathS1, d1s1Path, d2s1Path));
expectedDescriptorValue.put(groupedPathS2, Arrays.asList(groupedPathS2, d1s2Path));
verifyGroupByLevelDescriptor(
expectedDescriptorValue,
- (GroupByLevelNode) fragmentInstances.get(0).getFragment().getRoot().getChildren().get(0));
+ (GroupByLevelNode)
+ fragmentInstances.get(0).getFragment().getPlanNodeTree().getChildren().get(0));
Map<String, List<String>> expectedDescriptorValue2 = new HashMap<>();
expectedDescriptorValue2.put(groupedPathS1, Collections.singletonList(d1s1Path));
expectedDescriptorValue2.put(groupedPathS2, Collections.singletonList(d1s2Path));
verifyGroupByLevelDescriptor(
expectedDescriptorValue2,
- (GroupByLevelNode) fragmentInstances.get(2).getFragment().getRoot().getChildren().get(0));
+ (GroupByLevelNode)
+ fragmentInstances.get(2).getFragment().getPlanNodeTree().getChildren().get(0));
}
@Test
@@ -540,27 +557,31 @@ public class AggregationDistributionTest {
expectedStep.put(d1s2Path, AggregationStep.PARTIAL);
expectedStep.put(d2s1Path, AggregationStep.PARTIAL);
List<FragmentInstance> fragmentInstances = plan.getInstances();
- fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+ fragmentInstances.forEach(
+ f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
Map<String, List<String>> expectedDescriptorValue = new HashMap<>();
expectedDescriptorValue.put(groupedPathS1, Arrays.asList(groupedPathS1, d1s1Path));
expectedDescriptorValue.put(groupedPathS2, Arrays.asList(groupedPathS2, d1s2Path));
verifyGroupByLevelDescriptor(
expectedDescriptorValue,
- (GroupByLevelNode) fragmentInstances.get(0).getFragment().getRoot().getChildren().get(0));
+ (GroupByLevelNode)
+ fragmentInstances.get(0).getFragment().getPlanNodeTree().getChildren().get(0));
Map<String, List<String>> expectedDescriptorValue2 = new HashMap<>();
expectedDescriptorValue2.put(groupedPathS1, Collections.singletonList(d2s1Path));
verifyGroupByLevelDescriptor(
expectedDescriptorValue2,
- (GroupByLevelNode) fragmentInstances.get(1).getFragment().getRoot().getChildren().get(0));
+ (GroupByLevelNode)
+ fragmentInstances.get(1).getFragment().getPlanNodeTree().getChildren().get(0));
Map<String, List<String>> expectedDescriptorValue3 = new HashMap<>();
expectedDescriptorValue3.put(groupedPathS1, Collections.singletonList(d1s1Path));
expectedDescriptorValue3.put(groupedPathS2, Collections.singletonList(d1s2Path));
verifyGroupByLevelDescriptor(
expectedDescriptorValue3,
- (GroupByLevelNode) fragmentInstances.get(2).getFragment().getRoot().getChildren().get(0));
+ (GroupByLevelNode)
+ fragmentInstances.get(2).getFragment().getPlanNodeTree().getChildren().get(0));
verifySlidingWindowDescriptor(
Arrays.asList(d1s1Path, d1s2Path),
@@ -568,7 +589,7 @@ public class AggregationDistributionTest {
fragmentInstances
.get(0)
.getFragment()
- .getRoot()
+ .getPlanNodeTree()
.getChildren()
.get(0)
.getChildren()
@@ -579,7 +600,7 @@ public class AggregationDistributionTest {
fragmentInstances
.get(1)
.getFragment()
- .getRoot()
+ .getPlanNodeTree()
.getChildren()
.get(0)
.getChildren()
@@ -590,7 +611,7 @@ public class AggregationDistributionTest {
fragmentInstances
.get(2)
.getFragment()
- .getRoot()
+ .getPlanNodeTree()
.getChildren()
.get(0)
.getChildren()
@@ -610,7 +631,8 @@ public class AggregationDistributionTest {
new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
DistributedQueryPlan plan = planner.planFragments();
assertEquals(1, plan.getInstances().size());
- assertEquals(root, plan.getInstances().get(0).getFragment().getRoot().getChildren().get(0));
+ assertEquals(
+ root, plan.getInstances().get(0).getFragment().getPlanNodeTree().getChildren().get(0));
}
private void verifyGroupByLevelDescriptor(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/LastQueryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/LastQueryTest.java
index c321bb67bb..b150d8d6f8 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/LastQueryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/LastQueryTest.java
@@ -62,7 +62,13 @@ public class LastQueryTest {
DistributedQueryPlan distributedQueryPlan = planner.planFragments();
Assert.assertEquals(1, distributedQueryPlan.getInstances().size());
Assert.assertTrue(
- distributedQueryPlan.getInstances().get(0).getFragment().getRoot().getChildren().get(0)
+ distributedQueryPlan
+ .getInstances()
+ .get(0)
+ .getFragment()
+ .getPlanNodeTree()
+ .getChildren()
+ .get(0)
instanceof LastQueryNode);
}
@@ -80,7 +86,13 @@ public class LastQueryTest {
DistributedQueryPlan distributedQueryPlan = planner.planFragments();
Assert.assertEquals(2, distributedQueryPlan.getInstances().size());
PlanNode rootNode =
- distributedQueryPlan.getInstances().get(0).getFragment().getRoot().getChildren().get(0);
+ distributedQueryPlan
+ .getInstances()
+ .get(0)
+ .getFragment()
+ .getPlanNodeTree()
+ .getChildren()
+ .get(0);
Assert.assertTrue(rootNode instanceof LastQueryMergeNode);
rootNode
.getChildren()
@@ -104,7 +116,13 @@ public class LastQueryTest {
DistributedQueryPlan distributedQueryPlan = planner.planFragments();
Assert.assertEquals(3, distributedQueryPlan.getInstances().size());
PlanNode rootNode =
- distributedQueryPlan.getInstances().get(0).getFragment().getRoot().getChildren().get(0);
+ distributedQueryPlan
+ .getInstances()
+ .get(0)
+ .getFragment()
+ .getPlanNodeTree()
+ .getChildren()
+ .get(0);
Assert.assertTrue(rootNode instanceof LastQueryMergeNode);
rootNode
.getChildren()
@@ -128,7 +146,13 @@ public class LastQueryTest {
DistributedQueryPlan distributedQueryPlan = planner.planFragments();
Assert.assertEquals(2, distributedQueryPlan.getInstances().size());
PlanNode rootNode =
- distributedQueryPlan.getInstances().get(0).getFragment().getRoot().getChildren().get(0);
+ distributedQueryPlan
+ .getInstances()
+ .get(0)
+ .getFragment()
+ .getPlanNodeTree()
+ .getChildren()
+ .get(0);
Assert.assertTrue(rootNode instanceof LastQueryMergeNode);
rootNode
.getChildren()
@@ -156,7 +180,13 @@ public class LastQueryTest {
DistributedQueryPlan distributedQueryPlan = planner.planFragments();
Assert.assertEquals(2, distributedQueryPlan.getInstances().size());
PlanNode rootNode =
- distributedQueryPlan.getInstances().get(0).getFragment().getRoot().getChildren().get(0);
+ distributedQueryPlan
+ .getInstances()
+ .get(0)
+ .getFragment()
+ .getPlanNodeTree()
+ .getChildren()
+ .get(0);
Assert.assertTrue(rootNode instanceof LastQueryCollectNode);
rootNode
.getChildren()