You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/08/29 06:32:35 UTC

[iotdb] 01/01: [IOTDB-4178] Stop StandaloneScheduler keep running while dispatching failed

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch IOTDB-4178
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 418791a77f69779e3a1e91ed64c59e8e14a70467
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Aug 29 14:32:20 2022 +0800

    [IOTDB-4178] Stop StandaloneScheduler keep running while dispatching failed
---
 .../mpp/FragmentInstanceDispatchException.java     |  1 +
 .../fragment/FragmentInstanceContext.java          | 12 ++++++
 .../fragment/FragmentInstanceExecution.java        |  8 ----
 .../fragment/FragmentInstanceManager.java          | 43 +++++++++-------------
 .../db/mpp/plan/scheduler/StandaloneScheduler.java | 19 ++++++++--
 5 files changed, 45 insertions(+), 38 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/exception/mpp/FragmentInstanceDispatchException.java b/server/src/main/java/org/apache/iotdb/db/exception/mpp/FragmentInstanceDispatchException.java
index d9e760e728..d9b9045729 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/mpp/FragmentInstanceDispatchException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/mpp/FragmentInstanceDispatchException.java
@@ -30,6 +30,7 @@ public class FragmentInstanceDispatchException extends Exception {
   }
 
   public FragmentInstanceDispatchException(TSStatus failureStatus) {
+    super(failureStatus.getMessage());
     this.failureStatus = failureStatus;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 4d0f9d9ffd..593947d080 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -176,10 +176,22 @@ public class FragmentInstanceContext extends QueryContext {
     stateMachine.transitionToFlushing();
   }
 
+  public void cancel() {
+    stateMachine.cancel();
+  }
+
+  public void abort() {
+    stateMachine.abort();
+  }
+
   public long getEndTime() {
     return executionEndTime.get();
   }
 
+  public FragmentInstanceInfo getInstanceInfo() {
+    return new FragmentInstanceInfo(stateMachine.getState(), getEndTime(), getFailedCause());
+  }
+
   public FragmentInstanceStateMachine getStateMachine() {
     return stateMachine;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index 767487fc67..df99f50d58 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -89,14 +89,6 @@ public class FragmentInstanceExecution {
         stateMachine.getState(), context.getEndTime(), context.getFailedCause());
   }
 
-  public void cancel() {
-    stateMachine.cancel();
-  }
-
-  public void abort() {
-    stateMachine.abort();
-  }
-
   // this is a separate method to ensure that the `this` reference is not leaked during construction
   private void initialize(CounterStat failedInstances, IDriverScheduler scheduler) {
     requireNonNull(failedInstances, "failedInstances is null");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 6506e7f619..f6ebb1b42e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -162,26 +162,27 @@ public class FragmentInstanceManager {
     return execution != null ? execution.getInstanceInfo() : createFailedInstanceInfo(instanceId);
   }
 
-  /** Aborts a FragmentInstance. */
+  /** Aborts a FragmentInstance. keep FragmentInstanceContext for later state tracking */
   public FragmentInstanceInfo abortFragmentInstance(FragmentInstanceId fragmentInstanceId) {
-    FragmentInstanceExecution execution = instanceExecution.remove(fragmentInstanceId);
-    if (execution != null) {
-      instanceContext.remove(fragmentInstanceId);
-      execution.abort();
-      return execution.getInstanceInfo();
+    instanceExecution.remove(fragmentInstanceId);
+    FragmentInstanceContext context = instanceContext.get(fragmentInstanceId);
+    if (context != null) {
+      context.abort();
+      return context.getInstanceInfo();
     }
     return null;
   }
 
   /** Cancels a FragmentInstance. */
   public FragmentInstanceInfo cancelTask(FragmentInstanceId instanceId) {
+    logger.error("cancelTask");
     requireNonNull(instanceId, "taskId is null");
 
-    FragmentInstanceExecution execution = instanceExecution.remove(instanceId);
-    if (execution != null) {
-      instanceContext.remove(instanceId);
-      execution.cancel();
-      return execution.getInstanceInfo();
+    FragmentInstanceContext context = instanceContext.remove(instanceId);
+    if (context != null) {
+      instanceExecution.remove(instanceId);
+      context.cancel();
+      return context.getInstanceInfo();
     }
     return null;
   }
@@ -194,11 +195,11 @@ public class FragmentInstanceManager {
    */
   public FragmentInstanceInfo getInstanceInfo(FragmentInstanceId instanceId) {
     requireNonNull(instanceId, "instanceId is null");
-    FragmentInstanceExecution execution = instanceExecution.get(instanceId);
-    if (execution == null) {
+    FragmentInstanceContext context = instanceContext.get(instanceId);
+    if (context == null) {
       return null;
     }
-    return execution.getInstanceInfo();
+    return context.getInstanceInfo();
   }
 
   public CounterStat getFailedInstances() {
@@ -217,18 +218,8 @@ public class FragmentInstanceManager {
         .entrySet()
         .removeIf(
             entry -> {
-              FragmentInstanceId instanceId = entry.getKey();
-              FragmentInstanceExecution execution = instanceExecution.get(instanceId);
-              if (execution == null) {
-                return true;
-              }
-              long endTime = execution.getInstanceInfo().getEndTime();
-              if (endTime != -1 && endTime <= oldestAllowedInstance) {
-                instanceContext.remove(instanceId);
-                return true;
-              } else {
-                return false;
-              }
+              long endTime = entry.getValue().getEndTime();
+              return endTime != -1 && endTime <= oldestAllowedInstance;
             });
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
index cf2ad5ca47..fb48c43e04 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
@@ -36,12 +36,14 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInfo;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
 import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import io.airlift.units.Duration;
@@ -100,8 +102,15 @@ public class StandaloneScheduler implements IScheduler {
             if (groupId instanceof DataRegionId) {
               DataRegion region =
                   StorageEngineV2.getInstance().getDataRegion((DataRegionId) groupId);
-              FragmentInstanceManager.getInstance()
-                  .execDataQueryFragmentInstance(fragmentInstance, region);
+              FragmentInstanceInfo info =
+                  FragmentInstanceManager.getInstance()
+                      .execDataQueryFragmentInstance(fragmentInstance, region);
+              // query dispatch failed
+              if (info.getState().isFailed()) {
+                stateMachine.transitionToFailed(
+                    RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, info.getMessage()));
+                return;
+              }
             } else {
               ISchemaRegion region =
                   SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) groupId);
@@ -111,12 +120,14 @@ public class StandaloneScheduler implements IScheduler {
           }
         } catch (Exception e) {
           stateMachine.transitionToFailed(e);
+          LOGGER.info("transit to FAILED");
+          return;
         }
         // The FragmentInstances has been dispatched successfully to corresponding host, we mark the
         stateMachine.transitionToRunning();
-        LOGGER.info("{} transit to RUNNING", getLogHeader());
+        LOGGER.info("transit to RUNNING");
         this.stateTracker.start();
-        LOGGER.info("{} state tracker starts", getLogHeader());
+        LOGGER.info("state tracker starts");
         break;
       case WRITE:
         // reject non-query operations when system is read-only