You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/01/22 15:11:42 UTC
[doris] branch master updated: [Enhancement](export) cancel all running coordinators when execute cancel-export statement. (#15801)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ab04a458aa [Enhancement](export) cancel all running coordinators when execute cancel-export statement. (#15801)
ab04a458aa is described below
commit ab04a458aa991b261f5778d220a8610bd724a1bc
Author: Xiangyu Wang <du...@gmail.com>
AuthorDate: Sun Jan 22 23:11:32 2023 +0800
[Enhancement](export) cancel all running coordinators when execute cancel-export statement. (#15801)
---
.../src/main/java/org/apache/doris/load/ExportJob.java | 17 ++++++++++++++++-
1 file changed, 16 insertions(+), 1 deletion(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
index 7a8c9cbe19..3b79ca70de 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
@@ -67,6 +67,7 @@ import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.SqlModeHelper;
import org.apache.doris.rewrite.ExprRewriter;
@@ -658,7 +659,21 @@ public class ExportJob implements Writable {
failMsg = new ExportFailMsg(type, msg);
}
if (updateState(ExportJob.JobState.CANCELLED, false)) {
- releaseSnapshotPaths();
+ // cancel all running coordinators, so that the scheduler's worker thread will be released
+ for (Coordinator coordinator : coordList) {
+ Coordinator registeredCoordinator = QeProcessorImpl.INSTANCE.getCoordinator(coordinator.getQueryId());
+ if (registeredCoordinator != null) {
+ registeredCoordinator.cancel();
+ }
+ }
+
+ // release snapshot
+ Status releaseSnapshotStatus = releaseSnapshotPaths();
+ if (!releaseSnapshotStatus.ok()) {
+ // snapshot will be removed by GC thread on BE, finally.
+ LOG.warn("failed to release snapshot for export job: {}. err: {}", id,
+ releaseSnapshotStatus.getErrorMsg());
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org