You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/18 06:13:25 UTC

[iotdb] branch rel/1.1 updated: [To rel/1.1] distinguish whether an exception has occurred when releasing the QueryExecution resource

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.1 by this push:
     new c90d1047cf [To rel/1.1] distinguish whether an exception has occurred when releasing the QueryExecution resource
c90d1047cf is described below

commit c90d1047cf2b6c6e4580916a6fe63d692684fb5e
Author: Zhijia Cao <ca...@126.com>
AuthorDate: Tue Apr 18 14:13:19 2023 +0800

    [To rel/1.1] distinguish whether an exception has occurred when releasing the QueryExecution resource
---
 .../fragment/FragmentInstanceManager.java          |  8 +++--
 .../org/apache/iotdb/db/mpp/plan/Coordinator.java  |  8 +++--
 .../analyze/schema/ClusterSchemaFetchExecutor.java |  7 +++-
 .../db/mpp/plan/execution/IQueryExecution.java     |  4 ++-
 .../db/mpp/plan/execution/QueryExecution.java      | 22 +++++++-----
 .../mpp/plan/execution/config/ConfigExecution.java |  5 ++-
 .../db/mpp/plan/scheduler/ClusterScheduler.java    |  4 +--
 .../db/mpp/plan/scheduler/IQueryTerminator.java    |  2 +-
 .../iotdb/db/mpp/plan/scheduler/IScheduler.java    |  2 +-
 .../mpp/plan/scheduler/SimpleQueryTerminator.java  | 35 ++++++++++++++++--
 .../plan/scheduler/load/LoadTsFileScheduler.java   |  2 +-
 .../service/thrift/impl/ClientRPCServiceImpl.java  | 42 ++++++++++++++++++----
 .../impl/DataNodeInternalRPCServiceImpl.java       |  2 +-
 .../db/mpp/execution/ConfigExecutionTest.java      |  2 +-
 .../execution/operator/MergeSortOperatorTest.java  |  5 ++-
 thrift/src/main/thrift/datanode.thrift             |  1 +
 16 files changed, 119 insertions(+), 32 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 6e5390fdc1..d563f9cf24 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -243,14 +243,18 @@ public class FragmentInstanceManager {
   }
 
   /** Cancels a FragmentInstance. */
-  public FragmentInstanceInfo cancelTask(FragmentInstanceId instanceId) {
+  public FragmentInstanceInfo cancelTask(FragmentInstanceId instanceId, boolean hasThrowable) {
     logger.debug("[CancelFI]");
     requireNonNull(instanceId, "taskId is null");
 
     FragmentInstanceContext context = instanceContext.remove(instanceId);
     if (context != null) {
       instanceExecution.remove(instanceId);
-      context.cancel();
+      if (hasThrowable) {
+        context.cancel();
+      } else {
+        context.finished();
+      }
       return context.getInstanceInfo();
     }
     return null;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index 2a2ae6f0d8..0d44473624 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -208,12 +208,12 @@ public class Coordinator {
     return queryIdGenerator.createNextQueryId();
   }
 
-  public void cleanupQueryExecution(Long queryId) {
+  public void cleanupQueryExecution(Long queryId, Throwable t) {
     IQueryExecution queryExecution = getQueryExecution(queryId);
     if (queryExecution != null) {
       try (SetThreadName threadName = new SetThreadName(queryExecution.getQueryId())) {
         LOGGER.debug("[CleanUpQuery]]");
-        queryExecution.stopAndCleanup();
+        queryExecution.stopAndCleanup(t);
         queryExecutionMap.remove(queryId);
         if (queryExecution.isQuery()) {
           long costTime = queryExecution.getTotalExecutionTime();
@@ -228,6 +228,10 @@ public class Coordinator {
     }
   }
 
+  public void cleanupQueryExecution(Long queryId) {
+    cleanupQueryExecution(queryId, null);
+  }
+
   public IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
       getInternalServiceClientManager() {
     return SYNC_INTERNAL_SERVICE_CLIENT_MANAGER;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index 9e8576a18c..6d6dcec353 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -152,6 +152,7 @@ class ClusterSchemaFetchExecutor {
 
   private ClusterSchemaTree executeSchemaFetchQuery(SchemaFetchStatement schemaFetchStatement) {
     long queryId = queryIdProvider.get();
+    Throwable t = null;
     try {
       ExecutionResult executionResult = statementExecutor.apply(queryId, schemaFetchStatement);
       if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -170,6 +171,7 @@ class ClusterSchemaFetchExecutor {
           try {
             tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
           } catch (IoTDBException e) {
+            t = e;
             throw new RuntimeException("Fetch Schema failed. ", e);
           }
           if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
@@ -183,8 +185,11 @@ class ClusterSchemaFetchExecutor {
         result.setDatabases(databaseSet);
         return result;
       }
+    } catch (Throwable throwable) {
+      t = throwable;
+      throw throwable;
     } finally {
-      coordinator.cleanupQueryExecution(queryId);
+      coordinator.cleanupQueryExecution(queryId, t);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
index 1dae431f10..e1c9a13895 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
@@ -31,10 +31,12 @@ public interface IQueryExecution {
 
   void start();
 
-  void stop();
+  void stop(Throwable t);
 
   void stopAndCleanup();
 
+  void stopAndCleanup(Throwable t);
+
   void cancel();
 
   ExecutionResult getStatus();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 3588cc4462..94fb69067e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -129,7 +129,7 @@ public class QueryExecution implements IQueryExecution {
   private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
       asyncInternalServiceClientManager;
 
-  private AtomicBoolean stopped;
+  private final AtomicBoolean stopped;
 
   private long totalExecutionTime;
 
@@ -179,7 +179,7 @@ public class QueryExecution implements IQueryExecution {
               Throwable cause = stateMachine.getFailureException();
               releaseResource(cause);
             }
-            this.stop();
+            this.stop(null);
           }
         });
     this.stopped = new AtomicBoolean(false);
@@ -254,6 +254,8 @@ public class QueryExecution implements IQueryExecution {
     partitionFetcher.invalidAllCache();
     // clear runtime variables in MPPQueryContext
     context.prepareForRetry();
+    // re-stop
+    this.stopped.compareAndSet(true, false);
     // re-analyze the query
     this.analysis = analyze(rawStatement, context, partitionFetcher, schemaFetcher);
     // re-start the QueryExecution
@@ -357,16 +359,16 @@ public class QueryExecution implements IQueryExecution {
   }
 
   // Stop the workers for this query
-  public void stop() {
+  public void stop(Throwable t) {
     // only stop once
     if (stopped.compareAndSet(false, true) && this.scheduler != null) {
-      this.scheduler.stop();
+      this.scheduler.stop(t);
     }
   }
 
   // Stop the query and clean up all the resources this query occupied
   public void stopAndCleanup() {
-    stop();
+    stop(null);
     releaseResource();
   }
 
@@ -388,13 +390,13 @@ public class QueryExecution implements IQueryExecution {
     // If the QueryExecution's state is abnormal, we should also abort the resultHandle without
     // waiting it to be finished.
     if (resultHandle != null) {
-      resultHandle.abort();
+      resultHandle.close();
     }
   }
 
   // Stop the query and clean up all the resources this query occupied
   public void stopAndCleanup(Throwable t) {
-    stop();
+    stop(t);
     releaseResource(t);
   }
 
@@ -408,7 +410,11 @@ public class QueryExecution implements IQueryExecution {
     // If the QueryExecution's state is abnormal, we should also abort the resultHandle without
     // waiting it to be finished.
     if (resultHandle != null) {
-      resultHandle.abort(t);
+      if (t != null) {
+        resultHandle.abort(t);
+      } else {
+        resultHandle.close();
+      }
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
index 6aca2d96fc..20a4aa9f82 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
@@ -145,11 +145,14 @@ public class ConfigExecution implements IQueryExecution {
   }
 
   @Override
-  public void stop() {}
+  public void stop(Throwable t) {}
 
   @Override
   public void stopAndCleanup() {}
 
+  @Override
+  public void stopAndCleanup(Throwable t) {}
+
   @Override
   public void cancel() {
     throw new UnsupportedOperationException(getClass().getName());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index d704726266..86b63408d8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -154,7 +154,7 @@ public class ClusterScheduler implements IScheduler {
   }
 
   @Override
-  public void stop() {
+  public void stop(Throwable t) {
     // TODO: It seems that it is unnecessary to check whether they are null or not. Is it a best
     // practice ?
     dispatcher.abort();
@@ -163,7 +163,7 @@ public class ClusterScheduler implements IScheduler {
     }
     // TODO: (xingtanzjr) handle the exception when the termination cannot succeed
     if (queryTerminator != null) {
-      queryTerminator.terminate();
+      queryTerminator.terminate(t);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IQueryTerminator.java
index 8726de7dcd..2a3ccdf509 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IQueryTerminator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IQueryTerminator.java
@@ -22,5 +22,5 @@ package org.apache.iotdb.db.mpp.plan.scheduler;
 import java.util.concurrent.Future;
 
 public interface IQueryTerminator {
-  Future<Boolean> terminate();
+  Future<Boolean> terminate(Throwable t);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IScheduler.java
index 40b9ba8703..9bd8381324 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IScheduler.java
@@ -28,7 +28,7 @@ public interface IScheduler {
 
   void start();
 
-  void stop();
+  void stop(Throwable t);
 
   Duration getTotalCpuTime();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
index 14bace8dc2..841abf4dea 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
@@ -78,14 +78,18 @@ public class SimpleQueryTerminator implements IQueryTerminator {
   }
 
   @Override
-  public Future<Boolean> terminate() {
+  public Future<Boolean> terminate(Throwable t) {
     // For the failure dispatch, the termination should not be triggered because of connection issue
     this.relatedHost =
         this.relatedHost.stream()
             .filter(endPoint -> !queryContext.getEndPointBlackList().contains(endPoint))
             .collect(Collectors.toList());
+    if (t == null) {
+      return scheduledExecutor.schedule(
+          this::syncTerminate, TERMINATION_GRACE_PERIOD_IN_MS, TimeUnit.MILLISECONDS);
+    }
     return scheduledExecutor.schedule(
-        this::syncTerminate, TERMINATION_GRACE_PERIOD_IN_MS, TimeUnit.MILLISECONDS);
+        this::syncTerminateThrowable, TERMINATION_GRACE_PERIOD_IN_MS, TimeUnit.MILLISECONDS);
   }
 
   public Boolean syncTerminate() {
@@ -99,7 +103,32 @@ public class SimpleQueryTerminator implements IQueryTerminator {
       }
       try (SyncDataNodeInternalServiceClient client =
           internalServiceClientManager.borrowClient(endPoint)) {
-        client.cancelQuery(new TCancelQueryReq(queryId.getId(), unfinishedFIs));
+        client.cancelQuery(new TCancelQueryReq(queryId.getId(), unfinishedFIs, false));
+      } catch (ClientManagerException e) {
+        logger.warn("can't connect to node {}", endPoint, e);
+        // we shouldn't return here and need to cancel queryTasks in other nodes
+        succeed = false;
+      } catch (TException t) {
+        logger.warn("cancel query {} on node {} failed.", queryId.getId(), endPoint, t);
+        // we shouldn't return here and need to cancel queryTasks in other nodes
+        succeed = false;
+      }
+    }
+    return succeed;
+  }
+
+  public Boolean syncTerminateThrowable() {
+    boolean succeed = true;
+    for (TEndPoint endPoint : relatedHost) {
+      // we only send cancel query request if there is remaining unfinished FI in that node
+      List<TFragmentInstanceId> unfinishedFIs =
+          stateTracker.filterUnFinishedFIs(ownedFragmentInstance.get(endPoint));
+      if (unfinishedFIs.isEmpty()) {
+        continue;
+      }
+      try (SyncDataNodeInternalServiceClient client =
+          internalServiceClientManager.borrowClient(endPoint)) {
+        client.cancelQuery(new TCancelQueryReq(queryId.getId(), unfinishedFIs, true));
       } catch (ClientManagerException e) {
         logger.warn("can't connect to node {}", endPoint, e);
         // we shouldn't return here and need to cancel queryTasks in other nodes
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
index 64784bee4c..e37449055b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
@@ -287,7 +287,7 @@ public class LoadTsFileScheduler implements IScheduler {
   }
 
   @Override
-  public void stop() {}
+  public void stop(Throwable t) {}
 
   @Override
   public Duration getTotalCpuTime() {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 2375416f2f..9e68514b6a 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -195,6 +195,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
 
     long startTime = System.currentTimeMillis();
     StatementType statementType = null;
+    Throwable t = null;
     try {
       Statement s = StatementGenerator.createStatement(statement, clientSession.getZoneId());
 
@@ -248,8 +249,12 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
       }
     } catch (Exception e) {
       finished = true;
+      t = e;
       return RpcUtils.getTSExecuteStatementResp(
           onQueryException(e, "\"" + statement + "\". " + OperationType.EXECUTE_STATEMENT));
+    } catch (Error error) {
+      t = error;
+      throw error;
     } finally {
       COORDINATOR.recordExecutionTime(queryId, System.currentTimeMillis() - startTime);
       if (finished) {
@@ -260,7 +265,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
               statementType,
               executionTime > 0 ? executionTime : System.currentTimeMillis() - startTime);
         }
-        COORDINATOR.cleanupQueryExecution(queryId);
+        COORDINATOR.cleanupQueryExecution(queryId, t);
       }
       SESSION_MANAGER.updateIdleTime();
     }
@@ -275,6 +280,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
       return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
     }
     long startTime = System.currentTimeMillis();
+    Throwable t = null;
     try {
       Statement s = StatementGenerator.createStatement(req, clientSession.getZoneId());
 
@@ -319,8 +325,12 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
       }
     } catch (Exception e) {
       finished = true;
+      t = e;
       return RpcUtils.getTSExecuteStatementResp(
           onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_RAW_DATA_QUERY));
+    } catch (Error error) {
+      t = error;
+      throw error;
     } finally {
       COORDINATOR.recordExecutionTime(queryId, System.currentTimeMillis() - startTime);
       if (finished) {
@@ -328,7 +338,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
             OperationType.EXECUTE_RAW_DATA_QUERY,
             StatementType.QUERY,
             COORDINATOR.getTotalExecutionTime(queryId));
-        COORDINATOR.cleanupQueryExecution(queryId);
+        COORDINATOR.cleanupQueryExecution(queryId, t);
       }
       SESSION_MANAGER.updateIdleTime();
     }
@@ -343,6 +353,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
       return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
     }
     long startTime = System.currentTimeMillis();
+    Throwable t = null;
     try {
       Statement s = StatementGenerator.createStatement(req, clientSession.getZoneId());
       // permission check
@@ -387,8 +398,12 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
 
     } catch (Exception e) {
       finished = true;
+      t = e;
       return RpcUtils.getTSExecuteStatementResp(
           onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY));
+    } catch (Error error) {
+      t = error;
+      throw error;
     } finally {
       COORDINATOR.recordExecutionTime(queryId, System.currentTimeMillis() - startTime);
       if (finished) {
@@ -396,7 +411,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
             OperationType.EXECUTE_LAST_DATA_QUERY,
             StatementType.QUERY,
             COORDINATOR.getTotalExecutionTime(queryId));
-        COORDINATOR.cleanupQueryExecution(queryId);
+        COORDINATOR.cleanupQueryExecution(queryId, t);
       }
       SESSION_MANAGER.updateIdleTime();
     }
@@ -411,6 +426,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
       return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
     }
     long startTime = System.currentTimeMillis();
+    Throwable t = null;
     try {
       Statement s = StatementGenerator.createStatement(req, clientSession.getZoneId());
       // permission check
@@ -452,8 +468,12 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
 
     } catch (Exception e) {
       finished = true;
+      t = e;
       return RpcUtils.getTSExecuteStatementResp(
           onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY));
+    } catch (Error error) {
+      t = error;
+      throw error;
     } finally {
       COORDINATOR.recordExecutionTime(queryId, System.currentTimeMillis() - startTime);
       if (finished) {
@@ -461,7 +481,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
             OperationType.EXECUTE_LAST_DATA_QUERY,
             StatementType.QUERY,
             COORDINATOR.getTotalExecutionTime(queryId));
-        COORDINATOR.cleanupQueryExecution(queryId);
+        COORDINATOR.cleanupQueryExecution(queryId, t);
       }
       SESSION_MANAGER.updateIdleTime();
     }
@@ -502,6 +522,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
     long startTime = System.currentTimeMillis();
     boolean finished = false;
     StatementType statementType = null;
+    Throwable t = null;
     try {
       IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
       if (!SESSION_MANAGER.checkLogin(clientSession)) {
@@ -532,7 +553,11 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
       }
     } catch (Exception e) {
       finished = true;
+      t = e;
       return RpcUtils.getTSFetchResultsResp(onQueryException(e, OperationType.FETCH_RESULTS));
+    } catch (Error error) {
+      t = error;
+      throw error;
     } finally {
       COORDINATOR.recordExecutionTime(req.queryId, System.currentTimeMillis() - startTime);
       if (finished) {
@@ -542,7 +567,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
               statementType,
               COORDINATOR.getTotalExecutionTime(req.queryId));
         }
-        COORDINATOR.cleanupQueryExecution(req.queryId);
+        COORDINATOR.cleanupQueryExecution(req.queryId, t);
       }
       SESSION_MANAGER.updateIdleTime();
     }
@@ -1012,6 +1037,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
     boolean finished = false;
     long startTime = System.currentTimeMillis();
     StatementType statementType = null;
+    Throwable t = null;
     try {
       IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
       if (!SESSION_MANAGER.checkLogin(clientSession)) {
@@ -1042,7 +1068,11 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
       }
     } catch (Exception e) {
       finished = true;
+      t = e;
       return RpcUtils.getTSFetchResultsResp(onQueryException(e, OperationType.FETCH_RESULTS));
+    } catch (Error error) {
+      t = error;
+      throw error;
     } finally {
       COORDINATOR.recordExecutionTime(req.queryId, System.currentTimeMillis() - startTime);
       if (finished) {
@@ -1052,7 +1082,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
               statementType,
               COORDINATOR.getTotalExecutionTime(req.queryId));
         }
-        COORDINATOR.cleanupQueryExecution(req.queryId);
+        COORDINATOR.cleanupQueryExecution(req.queryId, t);
       }
       SESSION_MANAGER.updateIdleTime();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index efe659f6e8..eee0c2c69a 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -316,7 +316,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
               .map(FragmentInstanceId::fromThrift)
               .collect(Collectors.toList());
       for (FragmentInstanceId taskId : taskIds) {
-        FragmentInstanceManager.getInstance().cancelTask(taskId);
+        FragmentInstanceManager.getInstance().cancelTask(taskId, req.hasThrowable);
       }
       return new TCancelResp(true);
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
index 5da8688345..020207d54d 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
@@ -153,7 +153,7 @@ public class ConfigExecutionTest {
     } catch (InterruptedException e) {
       ExecutionResult result = execution.getStatus();
       assertEquals(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), result.status.code);
-      execution.stop();
+      execution.stop(e);
     }
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
index f91bc21956..2e9aa31929 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
@@ -1651,11 +1651,14 @@ public class MergeSortOperatorTest {
     public void start() {}
 
     @Override
-    public void stop() {}
+    public void stop(Throwable t) {}
 
     @Override
     public void stopAndCleanup() {}
 
+    @Override
+    public void stopAndCleanup(Throwable t) {}
+
     @Override
     public void cancel() {}
 
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index c80ae4452c..aa4cdc030f 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -144,6 +144,7 @@ struct TFragmentInstanceInfoResp {
 struct TCancelQueryReq {
   1: required string queryId
   2: required list<TFragmentInstanceId> fragmentInstanceIds
+  3: required bool hasThrowable
 }
 
 struct TCancelPlanFragmentReq {