You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/08/29 06:32:34 UTC

[iotdb] branch IOTDB-4178 created (now 418791a77f)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch IOTDB-4178
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 418791a77f [IOTDB-4178] Stop StandaloneScheduler keep running while dispatching failed

This branch includes the following new commits:

     new 418791a77f [IOTDB-4178] Stop StandaloneScheduler keep running while dispatching failed

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: [IOTDB-4178] Stop StandaloneScheduler keep running while dispatching failed

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch IOTDB-4178
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 418791a77f69779e3a1e91ed64c59e8e14a70467
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Aug 29 14:32:20 2022 +0800

    [IOTDB-4178] Stop StandaloneScheduler keep running while dispatching failed
---
 .../mpp/FragmentInstanceDispatchException.java     |  1 +
 .../fragment/FragmentInstanceContext.java          | 12 ++++++
 .../fragment/FragmentInstanceExecution.java        |  8 ----
 .../fragment/FragmentInstanceManager.java          | 43 +++++++++-------------
 .../db/mpp/plan/scheduler/StandaloneScheduler.java | 19 ++++++++--
 5 files changed, 45 insertions(+), 38 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/exception/mpp/FragmentInstanceDispatchException.java b/server/src/main/java/org/apache/iotdb/db/exception/mpp/FragmentInstanceDispatchException.java
index d9e760e728..d9b9045729 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/mpp/FragmentInstanceDispatchException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/mpp/FragmentInstanceDispatchException.java
@@ -30,6 +30,7 @@ public class FragmentInstanceDispatchException extends Exception {
   }
 
   public FragmentInstanceDispatchException(TSStatus failureStatus) {
+    super(failureStatus.getMessage());
     this.failureStatus = failureStatus;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 4d0f9d9ffd..593947d080 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -176,10 +176,22 @@ public class FragmentInstanceContext extends QueryContext {
     stateMachine.transitionToFlushing();
   }
 
+  public void cancel() {
+    stateMachine.cancel();
+  }
+
+  public void abort() {
+    stateMachine.abort();
+  }
+
   public long getEndTime() {
     return executionEndTime.get();
   }
 
+  public FragmentInstanceInfo getInstanceInfo() {
+    return new FragmentInstanceInfo(stateMachine.getState(), getEndTime(), getFailedCause());
+  }
+
   public FragmentInstanceStateMachine getStateMachine() {
     return stateMachine;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index 767487fc67..df99f50d58 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -89,14 +89,6 @@ public class FragmentInstanceExecution {
         stateMachine.getState(), context.getEndTime(), context.getFailedCause());
   }
 
-  public void cancel() {
-    stateMachine.cancel();
-  }
-
-  public void abort() {
-    stateMachine.abort();
-  }
-
   // this is a separate method to ensure that the `this` reference is not leaked during construction
   private void initialize(CounterStat failedInstances, IDriverScheduler scheduler) {
     requireNonNull(failedInstances, "failedInstances is null");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 6506e7f619..f6ebb1b42e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -162,26 +162,27 @@ public class FragmentInstanceManager {
     return execution != null ? execution.getInstanceInfo() : createFailedInstanceInfo(instanceId);
   }
 
-  /** Aborts a FragmentInstance. */
+  /** Aborts a FragmentInstance. keep FragmentInstanceContext for later state tracking */
   public FragmentInstanceInfo abortFragmentInstance(FragmentInstanceId fragmentInstanceId) {
-    FragmentInstanceExecution execution = instanceExecution.remove(fragmentInstanceId);
-    if (execution != null) {
-      instanceContext.remove(fragmentInstanceId);
-      execution.abort();
-      return execution.getInstanceInfo();
+    instanceExecution.remove(fragmentInstanceId);
+    FragmentInstanceContext context = instanceContext.get(fragmentInstanceId);
+    if (context != null) {
+      context.abort();
+      return context.getInstanceInfo();
     }
     return null;
   }
 
   /** Cancels a FragmentInstance. */
   public FragmentInstanceInfo cancelTask(FragmentInstanceId instanceId) {
+    logger.error("cancelTask");
     requireNonNull(instanceId, "taskId is null");
 
-    FragmentInstanceExecution execution = instanceExecution.remove(instanceId);
-    if (execution != null) {
-      instanceContext.remove(instanceId);
-      execution.cancel();
-      return execution.getInstanceInfo();
+    FragmentInstanceContext context = instanceContext.remove(instanceId);
+    if (context != null) {
+      instanceExecution.remove(instanceId);
+      context.cancel();
+      return context.getInstanceInfo();
     }
     return null;
   }
@@ -194,11 +195,11 @@ public class FragmentInstanceManager {
    */
   public FragmentInstanceInfo getInstanceInfo(FragmentInstanceId instanceId) {
     requireNonNull(instanceId, "instanceId is null");
-    FragmentInstanceExecution execution = instanceExecution.get(instanceId);
-    if (execution == null) {
+    FragmentInstanceContext context = instanceContext.get(instanceId);
+    if (context == null) {
       return null;
     }
-    return execution.getInstanceInfo();
+    return context.getInstanceInfo();
   }
 
   public CounterStat getFailedInstances() {
@@ -217,18 +218,8 @@ public class FragmentInstanceManager {
         .entrySet()
         .removeIf(
             entry -> {
-              FragmentInstanceId instanceId = entry.getKey();
-              FragmentInstanceExecution execution = instanceExecution.get(instanceId);
-              if (execution == null) {
-                return true;
-              }
-              long endTime = execution.getInstanceInfo().getEndTime();
-              if (endTime != -1 && endTime <= oldestAllowedInstance) {
-                instanceContext.remove(instanceId);
-                return true;
-              } else {
-                return false;
-              }
+              long endTime = entry.getValue().getEndTime();
+              return endTime != -1 && endTime <= oldestAllowedInstance;
             });
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
index cf2ad5ca47..fb48c43e04 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
@@ -36,12 +36,14 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInfo;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
 import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import io.airlift.units.Duration;
@@ -100,8 +102,15 @@ public class StandaloneScheduler implements IScheduler {
             if (groupId instanceof DataRegionId) {
               DataRegion region =
                   StorageEngineV2.getInstance().getDataRegion((DataRegionId) groupId);
-              FragmentInstanceManager.getInstance()
-                  .execDataQueryFragmentInstance(fragmentInstance, region);
+              FragmentInstanceInfo info =
+                  FragmentInstanceManager.getInstance()
+                      .execDataQueryFragmentInstance(fragmentInstance, region);
+              // query dispatch failed
+              if (info.getState().isFailed()) {
+                stateMachine.transitionToFailed(
+                    RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, info.getMessage()));
+                return;
+              }
             } else {
               ISchemaRegion region =
                   SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) groupId);
@@ -111,12 +120,14 @@ public class StandaloneScheduler implements IScheduler {
           }
         } catch (Exception e) {
           stateMachine.transitionToFailed(e);
+          LOGGER.info("transit to FAILED");
+          return;
         }
         // The FragmentInstances has been dispatched successfully to corresponding host, we mark the
         stateMachine.transitionToRunning();
-        LOGGER.info("{} transit to RUNNING", getLogHeader());
+        LOGGER.info("transit to RUNNING");
         this.stateTracker.start();
-        LOGGER.info("{} state tracker starts", getLogHeader());
+        LOGGER.info("state tracker starts");
         break;
       case WRITE:
         // reject non-query operations when system is read-only