You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/04/18 00:55:11 UTC
[iotdb] branch master updated: Distinguish whether an exception has occurred when releasing the QueryExecution resource
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new df88dc93e4 Distinguish whether an exception has occurred when releasing the QueryExecution resource
df88dc93e4 is described below
commit df88dc93e4895202b2e0754ec8fe5cea32ea9d31
Author: Zhijia Cao <ca...@126.com>
AuthorDate: Tue Apr 18 08:55:05 2023 +0800
Distinguish whether an exception has occurred when releasing the QueryExecution resource
---
.../fragment/FragmentInstanceManager.java | 8 +++--
.../org/apache/iotdb/db/mpp/plan/Coordinator.java | 8 +++--
.../analyze/schema/ClusterSchemaFetchExecutor.java | 7 +++-
.../db/mpp/plan/execution/IQueryExecution.java | 4 ++-
.../db/mpp/plan/execution/QueryExecution.java | 24 +++++++------
.../mpp/plan/execution/config/ConfigExecution.java | 5 ++-
.../db/mpp/plan/scheduler/ClusterScheduler.java | 4 +--
.../db/mpp/plan/scheduler/IQueryTerminator.java | 2 +-
.../iotdb/db/mpp/plan/scheduler/IScheduler.java | 2 +-
.../mpp/plan/scheduler/SimpleQueryTerminator.java | 35 ++++++++++++++++--
.../plan/scheduler/load/LoadTsFileScheduler.java | 2 +-
.../service/thrift/impl/ClientRPCServiceImpl.java | 42 ++++++++++++++++++----
.../impl/DataNodeInternalRPCServiceImpl.java | 2 +-
.../service/thrift/impl/MLNodeRPCServiceImpl.java | 15 ++++++--
.../db/mpp/execution/ConfigExecutionTest.java | 2 +-
.../execution/operator/MergeSortOperatorTest.java | 5 ++-
thrift/src/main/thrift/datanode.thrift | 1 +
17 files changed, 131 insertions(+), 37 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 6e5390fdc1..d563f9cf24 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -243,14 +243,18 @@ public class FragmentInstanceManager {
}
/** Cancels a FragmentInstance. */
- public FragmentInstanceInfo cancelTask(FragmentInstanceId instanceId) {
+ public FragmentInstanceInfo cancelTask(FragmentInstanceId instanceId, boolean hasThrowable) {
logger.debug("[CancelFI]");
requireNonNull(instanceId, "taskId is null");
FragmentInstanceContext context = instanceContext.remove(instanceId);
if (context != null) {
instanceExecution.remove(instanceId);
- context.cancel();
+ if (hasThrowable) {
+ context.cancel();
+ } else {
+ context.finished();
+ }
return context.getInstanceInfo();
}
return null;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index 2a2ae6f0d8..0d44473624 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -208,12 +208,12 @@ public class Coordinator {
return queryIdGenerator.createNextQueryId();
}
- public void cleanupQueryExecution(Long queryId) {
+ public void cleanupQueryExecution(Long queryId, Throwable t) {
IQueryExecution queryExecution = getQueryExecution(queryId);
if (queryExecution != null) {
try (SetThreadName threadName = new SetThreadName(queryExecution.getQueryId())) {
LOGGER.debug("[CleanUpQuery]]");
- queryExecution.stopAndCleanup();
+ queryExecution.stopAndCleanup(t);
queryExecutionMap.remove(queryId);
if (queryExecution.isQuery()) {
long costTime = queryExecution.getTotalExecutionTime();
@@ -228,6 +228,10 @@ public class Coordinator {
}
}
+ public void cleanupQueryExecution(Long queryId) {
+ cleanupQueryExecution(queryId, null);
+ }
+
public IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
getInternalServiceClientManager() {
return SYNC_INTERNAL_SERVICE_CLIENT_MANAGER;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index 9e8576a18c..6d6dcec353 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -152,6 +152,7 @@ class ClusterSchemaFetchExecutor {
private ClusterSchemaTree executeSchemaFetchQuery(SchemaFetchStatement schemaFetchStatement) {
long queryId = queryIdProvider.get();
+ Throwable t = null;
try {
ExecutionResult executionResult = statementExecutor.apply(queryId, schemaFetchStatement);
if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -170,6 +171,7 @@ class ClusterSchemaFetchExecutor {
try {
tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
} catch (IoTDBException e) {
+ t = e;
throw new RuntimeException("Fetch Schema failed. ", e);
}
if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
@@ -183,8 +185,11 @@ class ClusterSchemaFetchExecutor {
result.setDatabases(databaseSet);
return result;
}
+ } catch (Throwable throwable) {
+ t = throwable;
+ throw throwable;
} finally {
- coordinator.cleanupQueryExecution(queryId);
+ coordinator.cleanupQueryExecution(queryId, t);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
index 1dae431f10..e1c9a13895 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
@@ -31,10 +31,12 @@ public interface IQueryExecution {
void start();
- void stop();
+ void stop(Throwable t);
void stopAndCleanup();
+ void stopAndCleanup(Throwable t);
+
void cancel();
ExecutionResult getStatus();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index e75b8e39c0..94fb69067e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -129,7 +129,7 @@ public class QueryExecution implements IQueryExecution {
private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
asyncInternalServiceClientManager;
- private AtomicBoolean stopped;
+ private final AtomicBoolean stopped;
private long totalExecutionTime;
@@ -179,7 +179,7 @@ public class QueryExecution implements IQueryExecution {
Throwable cause = stateMachine.getFailureException();
releaseResource(cause);
}
- this.stop();
+ this.stop(null);
}
});
this.stopped = new AtomicBoolean(false);
@@ -254,10 +254,10 @@ public class QueryExecution implements IQueryExecution {
partitionFetcher.invalidAllCache();
// clear runtime variables in MPPQueryContext
context.prepareForRetry();
+ // re-stop
+ this.stopped.compareAndSet(true, false);
// re-analyze the query
this.analysis = analyze(rawStatement, context, partitionFetcher, schemaFetcher);
- // re-stop
- this.stopped = new AtomicBoolean(false);
// re-start the QueryExecution
this.start();
return getStatus();
@@ -359,16 +359,16 @@ public class QueryExecution implements IQueryExecution {
}
// Stop the workers for this query
- public void stop() {
+ public void stop(Throwable t) {
// only stop once
if (stopped.compareAndSet(false, true) && this.scheduler != null) {
- this.scheduler.stop();
+ this.scheduler.stop(t);
}
}
// Stop the query and clean up all the resources this query occupied
public void stopAndCleanup() {
- stop();
+ stop(null);
releaseResource();
}
@@ -390,13 +390,13 @@ public class QueryExecution implements IQueryExecution {
// If the QueryExecution's state is abnormal, we should also abort the resultHandle without
// waiting it to be finished.
if (resultHandle != null) {
- resultHandle.abort();
+ resultHandle.close();
}
}
// Stop the query and clean up all the resources this query occupied
public void stopAndCleanup(Throwable t) {
- stop();
+ stop(t);
releaseResource(t);
}
@@ -410,7 +410,11 @@ public class QueryExecution implements IQueryExecution {
// If the QueryExecution's state is abnormal, we should also abort the resultHandle without
// waiting it to be finished.
if (resultHandle != null) {
- resultHandle.abort(t);
+ if (t != null) {
+ resultHandle.abort(t);
+ } else {
+ resultHandle.close();
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
index 6aca2d96fc..20a4aa9f82 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
@@ -145,11 +145,14 @@ public class ConfigExecution implements IQueryExecution {
}
@Override
- public void stop() {}
+ public void stop(Throwable t) {}
@Override
public void stopAndCleanup() {}
+ @Override
+ public void stopAndCleanup(Throwable t) {}
+
@Override
public void cancel() {
throw new UnsupportedOperationException(getClass().getName());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index d704726266..86b63408d8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -154,7 +154,7 @@ public class ClusterScheduler implements IScheduler {
}
@Override
- public void stop() {
+ public void stop(Throwable t) {
// TODO: It seems that it is unnecessary to check whether they are null or not. Is it a best
// practice ?
dispatcher.abort();
@@ -163,7 +163,7 @@ public class ClusterScheduler implements IScheduler {
}
// TODO: (xingtanzjr) handle the exception when the termination cannot succeed
if (queryTerminator != null) {
- queryTerminator.terminate();
+ queryTerminator.terminate(t);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IQueryTerminator.java
index 8726de7dcd..2a3ccdf509 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IQueryTerminator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IQueryTerminator.java
@@ -22,5 +22,5 @@ package org.apache.iotdb.db.mpp.plan.scheduler;
import java.util.concurrent.Future;
public interface IQueryTerminator {
- Future<Boolean> terminate();
+ Future<Boolean> terminate(Throwable t);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IScheduler.java
index 40b9ba8703..9bd8381324 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IScheduler.java
@@ -28,7 +28,7 @@ public interface IScheduler {
void start();
- void stop();
+ void stop(Throwable t);
Duration getTotalCpuTime();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
index 14bace8dc2..841abf4dea 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
@@ -78,14 +78,18 @@ public class SimpleQueryTerminator implements IQueryTerminator {
}
@Override
- public Future<Boolean> terminate() {
+ public Future<Boolean> terminate(Throwable t) {
// For the failure dispatch, the termination should not be triggered because of connection issue
this.relatedHost =
this.relatedHost.stream()
.filter(endPoint -> !queryContext.getEndPointBlackList().contains(endPoint))
.collect(Collectors.toList());
+ if (t == null) {
+ return scheduledExecutor.schedule(
+ this::syncTerminate, TERMINATION_GRACE_PERIOD_IN_MS, TimeUnit.MILLISECONDS);
+ }
return scheduledExecutor.schedule(
- this::syncTerminate, TERMINATION_GRACE_PERIOD_IN_MS, TimeUnit.MILLISECONDS);
+ this::syncTerminateThrowable, TERMINATION_GRACE_PERIOD_IN_MS, TimeUnit.MILLISECONDS);
}
public Boolean syncTerminate() {
@@ -99,7 +103,32 @@ public class SimpleQueryTerminator implements IQueryTerminator {
}
try (SyncDataNodeInternalServiceClient client =
internalServiceClientManager.borrowClient(endPoint)) {
- client.cancelQuery(new TCancelQueryReq(queryId.getId(), unfinishedFIs));
+ client.cancelQuery(new TCancelQueryReq(queryId.getId(), unfinishedFIs, false));
+ } catch (ClientManagerException e) {
+ logger.warn("can't connect to node {}", endPoint, e);
+ // we shouldn't return here and need to cancel queryTasks in other nodes
+ succeed = false;
+ } catch (TException t) {
+ logger.warn("cancel query {} on node {} failed.", queryId.getId(), endPoint, t);
+ // we shouldn't return here and need to cancel queryTasks in other nodes
+ succeed = false;
+ }
+ }
+ return succeed;
+ }
+
+ public Boolean syncTerminateThrowable() {
+ boolean succeed = true;
+ for (TEndPoint endPoint : relatedHost) {
+ // we only send cancel query request if there is remaining unfinished FI in that node
+ List<TFragmentInstanceId> unfinishedFIs =
+ stateTracker.filterUnFinishedFIs(ownedFragmentInstance.get(endPoint));
+ if (unfinishedFIs.isEmpty()) {
+ continue;
+ }
+ try (SyncDataNodeInternalServiceClient client =
+ internalServiceClientManager.borrowClient(endPoint)) {
+ client.cancelQuery(new TCancelQueryReq(queryId.getId(), unfinishedFIs, true));
} catch (ClientManagerException e) {
logger.warn("can't connect to node {}", endPoint, e);
// we shouldn't return here and need to cancel queryTasks in other nodes
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
index 64784bee4c..e37449055b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
@@ -287,7 +287,7 @@ public class LoadTsFileScheduler implements IScheduler {
}
@Override
- public void stop() {}
+ public void stop(Throwable t) {}
@Override
public Duration getTotalCpuTime() {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 6c1f310195..e1c31f3231 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -199,6 +199,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
long startTime = System.currentTimeMillis();
StatementType statementType = null;
+ Throwable t = null;
try {
Statement s = StatementGenerator.createStatement(statement, clientSession.getZoneId());
@@ -256,8 +257,12 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
}
} catch (Exception e) {
finished = true;
+ t = e;
return RpcUtils.getTSExecuteStatementResp(
onQueryException(e, "\"" + statement + "\". " + OperationType.EXECUTE_STATEMENT));
+ } catch (Error error) {
+ t = error;
+ throw error;
} finally {
COORDINATOR.recordExecutionTime(queryId, System.currentTimeMillis() - startTime);
if (finished) {
@@ -268,7 +273,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
statementType,
executionTime > 0 ? executionTime : System.currentTimeMillis() - startTime);
}
- COORDINATOR.cleanupQueryExecution(queryId);
+ COORDINATOR.cleanupQueryExecution(queryId, t);
}
SESSION_MANAGER.updateIdleTime();
if (quota != null) {
@@ -287,6 +292,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
}
long startTime = System.currentTimeMillis();
+ Throwable t = null;
try {
Statement s = StatementGenerator.createStatement(req, clientSession.getZoneId());
@@ -336,8 +342,12 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
}
} catch (Exception e) {
finished = true;
+ t = e;
return RpcUtils.getTSExecuteStatementResp(
onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_RAW_DATA_QUERY));
+ } catch (Error error) {
+ t = error;
+ throw error;
} finally {
COORDINATOR.recordExecutionTime(queryId, System.currentTimeMillis() - startTime);
if (finished) {
@@ -345,7 +355,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
OperationType.EXECUTE_RAW_DATA_QUERY,
StatementType.QUERY,
COORDINATOR.getTotalExecutionTime(queryId));
- COORDINATOR.cleanupQueryExecution(queryId);
+ COORDINATOR.cleanupQueryExecution(queryId, t);
}
SESSION_MANAGER.updateIdleTime();
if (quota != null) {
@@ -364,6 +374,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
}
long startTime = System.currentTimeMillis();
+ Throwable t = null;
try {
Statement s = StatementGenerator.createStatement(req, clientSession.getZoneId());
// permission check
@@ -413,8 +424,12 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
} catch (Exception e) {
finished = true;
+ t = e;
return RpcUtils.getTSExecuteStatementResp(
onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY));
+ } catch (Error error) {
+ t = error;
+ throw error;
} finally {
COORDINATOR.recordExecutionTime(queryId, System.currentTimeMillis() - startTime);
if (finished) {
@@ -422,7 +437,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
OperationType.EXECUTE_LAST_DATA_QUERY,
StatementType.QUERY,
COORDINATOR.getTotalExecutionTime(queryId));
- COORDINATOR.cleanupQueryExecution(queryId);
+ COORDINATOR.cleanupQueryExecution(queryId, t);
}
SESSION_MANAGER.updateIdleTime();
if (quota != null) {
@@ -441,6 +456,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
}
long startTime = System.currentTimeMillis();
+ Throwable t = null;
try {
Statement s = StatementGenerator.createStatement(req, clientSession.getZoneId());
// permission check
@@ -487,8 +503,12 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
} catch (Exception e) {
finished = true;
+ t = e;
return RpcUtils.getTSExecuteStatementResp(
onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY));
+ } catch (Error error) {
+ t = error;
+ throw error;
} finally {
COORDINATOR.recordExecutionTime(queryId, System.currentTimeMillis() - startTime);
if (finished) {
@@ -496,7 +516,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
OperationType.EXECUTE_LAST_DATA_QUERY,
StatementType.QUERY,
COORDINATOR.getTotalExecutionTime(queryId));
- COORDINATOR.cleanupQueryExecution(queryId);
+ COORDINATOR.cleanupQueryExecution(queryId, t);
}
SESSION_MANAGER.updateIdleTime();
if (quota != null) {
@@ -540,6 +560,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
long startTime = System.currentTimeMillis();
boolean finished = false;
StatementType statementType = null;
+ Throwable t = null;
try {
IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
if (!SESSION_MANAGER.checkLogin(clientSession)) {
@@ -570,7 +591,11 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
}
} catch (Exception e) {
finished = true;
+ t = e;
return RpcUtils.getTSFetchResultsResp(onQueryException(e, OperationType.FETCH_RESULTS));
+ } catch (Error error) {
+ t = error;
+ throw error;
} finally {
COORDINATOR.recordExecutionTime(req.queryId, System.currentTimeMillis() - startTime);
if (finished) {
@@ -580,7 +605,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
statementType,
COORDINATOR.getTotalExecutionTime(req.queryId));
}
- COORDINATOR.cleanupQueryExecution(req.queryId);
+ COORDINATOR.cleanupQueryExecution(req.queryId, t);
}
SESSION_MANAGER.updateIdleTime();
}
@@ -1059,6 +1084,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
boolean finished = false;
long startTime = System.currentTimeMillis();
StatementType statementType = null;
+ Throwable t = null;
try {
IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
if (!SESSION_MANAGER.checkLogin(clientSession)) {
@@ -1089,7 +1115,11 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
}
} catch (Exception e) {
finished = true;
+ t = e;
return RpcUtils.getTSFetchResultsResp(onQueryException(e, OperationType.FETCH_RESULTS));
+ } catch (Error error) {
+ t = error;
+ throw error;
} finally {
COORDINATOR.recordExecutionTime(req.queryId, System.currentTimeMillis() - startTime);
if (finished) {
@@ -1099,7 +1129,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
statementType,
COORDINATOR.getTotalExecutionTime(req.queryId));
}
- COORDINATOR.cleanupQueryExecution(req.queryId);
+ COORDINATOR.cleanupQueryExecution(req.queryId, t);
}
SESSION_MANAGER.updateIdleTime();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index b575bc7543..56b172bfa0 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -330,7 +330,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
.map(FragmentInstanceId::fromThrift)
.collect(Collectors.toList());
for (FragmentInstanceId taskId : taskIds) {
- FragmentInstanceManager.getInstance().cancelTask(taskId);
+ FragmentInstanceManager.getInstance().cancelTask(taskId, req.hasThrowable);
}
return new TCancelResp(true);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/MLNodeRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/MLNodeRPCServiceImpl.java
index 74e14f9f3f..cca9d07e97 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/MLNodeRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/MLNodeRPCServiceImpl.java
@@ -88,7 +88,7 @@ public class MLNodeRPCServiceImpl implements IMLNodeRPCServiceWithHandler {
public TFetchTimeseriesResp fetchTimeseries(TFetchTimeseriesReq req) throws TException {
boolean finished = false;
TFetchTimeseriesResp resp = new TFetchTimeseriesResp();
-
+ Throwable t = null;
try {
QueryStatement s =
(QueryStatement) StatementGenerator.createStatement(req, session.getZoneId());
@@ -131,11 +131,15 @@ public class MLNodeRPCServiceImpl implements IMLNodeRPCServiceWithHandler {
}
} catch (Exception e) {
finished = true;
+ t = e;
resp.setStatus(onQueryException(e, OperationType.EXECUTE_STATEMENT));
return resp;
+ } catch (Error error) {
+ t = error;
+ throw error;
} finally {
if (finished) {
- COORDINATOR.cleanupQueryExecution(resp.queryId);
+ COORDINATOR.cleanupQueryExecution(resp.queryId, t);
}
}
}
@@ -144,6 +148,7 @@ public class MLNodeRPCServiceImpl implements IMLNodeRPCServiceWithHandler {
public TFetchMoreDataResp fetchMoreData(TFetchMoreDataReq req) throws TException {
TFetchMoreDataResp resp = new TFetchMoreDataResp();
boolean finished = false;
+ Throwable t = null;
try {
IQueryExecution queryExecution = COORDINATOR.getQueryExecution(req.queryId);
resp.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
@@ -164,11 +169,15 @@ public class MLNodeRPCServiceImpl implements IMLNodeRPCServiceWithHandler {
}
} catch (Exception e) {
finished = true;
+ t = e;
resp.setStatus(onQueryException(e, OperationType.FETCH_RESULTS));
return resp;
+ } catch (Error error) {
+ t = error;
+ throw error;
} finally {
if (finished) {
- COORDINATOR.cleanupQueryExecution(req.queryId);
+ COORDINATOR.cleanupQueryExecution(req.queryId, t);
}
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
index 5da8688345..020207d54d 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
@@ -153,7 +153,7 @@ public class ConfigExecutionTest {
} catch (InterruptedException e) {
ExecutionResult result = execution.getStatus();
assertEquals(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), result.status.code);
- execution.stop();
+ execution.stop(e);
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
index 13ce985ae1..c4e1dae0d3 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
@@ -1656,11 +1656,14 @@ public class MergeSortOperatorTest {
public void start() {}
@Override
- public void stop() {}
+ public void stop(Throwable t) {}
@Override
public void stopAndCleanup() {}
+ @Override
+ public void stopAndCleanup(Throwable t) {}
+
@Override
public void cancel() {}
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index 23cc0fa032..c9558af83d 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -144,6 +144,7 @@ struct TFragmentInstanceInfoResp {
struct TCancelQueryReq {
1: required string queryId
2: required list<TFragmentInstanceId> fragmentInstanceIds
+ 3: required bool hasThrowable
}
struct TCancelPlanFragmentReq {