You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/08/29 06:32:35 UTC
[iotdb] 01/01: [IOTDB-4178] Stop StandaloneScheduler keep running while dispatching failed
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch IOTDB-4178
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 418791a77f69779e3a1e91ed64c59e8e14a70467
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Aug 29 14:32:20 2022 +0800
[IOTDB-4178] Stop StandaloneScheduler keep running while dispatching failed
---
.../mpp/FragmentInstanceDispatchException.java | 1 +
.../fragment/FragmentInstanceContext.java | 12 ++++++
.../fragment/FragmentInstanceExecution.java | 8 ----
.../fragment/FragmentInstanceManager.java | 43 +++++++++-------------
.../db/mpp/plan/scheduler/StandaloneScheduler.java | 19 ++++++++--
5 files changed, 45 insertions(+), 38 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/mpp/FragmentInstanceDispatchException.java b/server/src/main/java/org/apache/iotdb/db/exception/mpp/FragmentInstanceDispatchException.java
index d9e760e728..d9b9045729 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/mpp/FragmentInstanceDispatchException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/mpp/FragmentInstanceDispatchException.java
@@ -30,6 +30,7 @@ public class FragmentInstanceDispatchException extends Exception {
}
public FragmentInstanceDispatchException(TSStatus failureStatus) {
+ super(failureStatus.getMessage());
this.failureStatus = failureStatus;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 4d0f9d9ffd..593947d080 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -176,10 +176,22 @@ public class FragmentInstanceContext extends QueryContext {
stateMachine.transitionToFlushing();
}
+ public void cancel() {
+ stateMachine.cancel();
+ }
+
+ public void abort() {
+ stateMachine.abort();
+ }
+
public long getEndTime() {
return executionEndTime.get();
}
+ public FragmentInstanceInfo getInstanceInfo() {
+ return new FragmentInstanceInfo(stateMachine.getState(), getEndTime(), getFailedCause());
+ }
+
public FragmentInstanceStateMachine getStateMachine() {
return stateMachine;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index 767487fc67..df99f50d58 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -89,14 +89,6 @@ public class FragmentInstanceExecution {
stateMachine.getState(), context.getEndTime(), context.getFailedCause());
}
- public void cancel() {
- stateMachine.cancel();
- }
-
- public void abort() {
- stateMachine.abort();
- }
-
// this is a separate method to ensure that the `this` reference is not leaked during construction
private void initialize(CounterStat failedInstances, IDriverScheduler scheduler) {
requireNonNull(failedInstances, "failedInstances is null");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 6506e7f619..f6ebb1b42e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -162,26 +162,27 @@ public class FragmentInstanceManager {
return execution != null ? execution.getInstanceInfo() : createFailedInstanceInfo(instanceId);
}
- /** Aborts a FragmentInstance. */
+ /** Aborts a FragmentInstance. keep FragmentInstanceContext for later state tracking */
public FragmentInstanceInfo abortFragmentInstance(FragmentInstanceId fragmentInstanceId) {
- FragmentInstanceExecution execution = instanceExecution.remove(fragmentInstanceId);
- if (execution != null) {
- instanceContext.remove(fragmentInstanceId);
- execution.abort();
- return execution.getInstanceInfo();
+ instanceExecution.remove(fragmentInstanceId);
+ FragmentInstanceContext context = instanceContext.get(fragmentInstanceId);
+ if (context != null) {
+ context.abort();
+ return context.getInstanceInfo();
}
return null;
}
/** Cancels a FragmentInstance. */
public FragmentInstanceInfo cancelTask(FragmentInstanceId instanceId) {
+ logger.error("cancelTask");
requireNonNull(instanceId, "taskId is null");
- FragmentInstanceExecution execution = instanceExecution.remove(instanceId);
- if (execution != null) {
- instanceContext.remove(instanceId);
- execution.cancel();
- return execution.getInstanceInfo();
+ FragmentInstanceContext context = instanceContext.remove(instanceId);
+ if (context != null) {
+ instanceExecution.remove(instanceId);
+ context.cancel();
+ return context.getInstanceInfo();
}
return null;
}
@@ -194,11 +195,11 @@ public class FragmentInstanceManager {
*/
public FragmentInstanceInfo getInstanceInfo(FragmentInstanceId instanceId) {
requireNonNull(instanceId, "instanceId is null");
- FragmentInstanceExecution execution = instanceExecution.get(instanceId);
- if (execution == null) {
+ FragmentInstanceContext context = instanceContext.get(instanceId);
+ if (context == null) {
return null;
}
- return execution.getInstanceInfo();
+ return context.getInstanceInfo();
}
public CounterStat getFailedInstances() {
@@ -217,18 +218,8 @@ public class FragmentInstanceManager {
.entrySet()
.removeIf(
entry -> {
- FragmentInstanceId instanceId = entry.getKey();
- FragmentInstanceExecution execution = instanceExecution.get(instanceId);
- if (execution == null) {
- return true;
- }
- long endTime = execution.getInstanceInfo().getEndTime();
- if (endTime != -1 && endTime <= oldestAllowedInstance) {
- instanceContext.remove(instanceId);
- return true;
- } else {
- return false;
- }
+ long endTime = entry.getValue().getEndTime();
+ return endTime != -1 && endTime <= oldestAllowedInstance;
});
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
index cf2ad5ca47..fb48c43e04 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
@@ -36,12 +36,14 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInfo;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import io.airlift.units.Duration;
@@ -100,8 +102,15 @@ public class StandaloneScheduler implements IScheduler {
if (groupId instanceof DataRegionId) {
DataRegion region =
StorageEngineV2.getInstance().getDataRegion((DataRegionId) groupId);
- FragmentInstanceManager.getInstance()
- .execDataQueryFragmentInstance(fragmentInstance, region);
+ FragmentInstanceInfo info =
+ FragmentInstanceManager.getInstance()
+ .execDataQueryFragmentInstance(fragmentInstance, region);
+ // query dispatch failed
+ if (info.getState().isFailed()) {
+ stateMachine.transitionToFailed(
+ RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, info.getMessage()));
+ return;
+ }
} else {
ISchemaRegion region =
SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) groupId);
@@ -111,12 +120,14 @@ public class StandaloneScheduler implements IScheduler {
}
} catch (Exception e) {
stateMachine.transitionToFailed(e);
+ LOGGER.info("transit to FAILED");
+ return;
}
// The FragmentInstances has been dispatched successfully to corresponding host, we mark the
stateMachine.transitionToRunning();
- LOGGER.info("{} transit to RUNNING", getLogHeader());
+ LOGGER.info("transit to RUNNING");
this.stateTracker.start();
- LOGGER.info("{} state tracker starts", getLogHeader());
+ LOGGER.info("state tracker starts");
break;
case WRITE:
// reject non-query operations when system is read-only