You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/01/22 15:11:42 UTC

[doris] branch master updated: [Enhancement](export) cancel all running coordinators when execute cancel-export statement. (#15801)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ab04a458aa [Enhancement](export) cancel all running coordinators when execute cancel-export statement. (#15801)
ab04a458aa is described below

commit ab04a458aa991b261f5778d220a8610bd724a1bc
Author: Xiangyu Wang <du...@gmail.com>
AuthorDate: Sun Jan 22 23:11:32 2023 +0800

    [Enhancement](export) cancel all running coordinators when execute cancel-export statement. (#15801)
---
 .../src/main/java/org/apache/doris/load/ExportJob.java  | 17 ++++++++++++++++-
 1 file changed, 16 insertions(+), 1 deletion(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
index 7a8c9cbe19..3b79ca70de 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
@@ -67,6 +67,7 @@ import org.apache.doris.planner.ScanNode;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.Coordinator;
 import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.qe.QeProcessorImpl;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.qe.SqlModeHelper;
 import org.apache.doris.rewrite.ExprRewriter;
@@ -658,7 +659,21 @@ public class ExportJob implements Writable {
             failMsg = new ExportFailMsg(type, msg);
         }
         if (updateState(ExportJob.JobState.CANCELLED, false)) {
-            releaseSnapshotPaths();
+            // cancel all running coordinators, so that the scheduler's worker thread will be released
+            for (Coordinator coordinator : coordList) {
+                Coordinator registeredCoordinator = QeProcessorImpl.INSTANCE.getCoordinator(coordinator.getQueryId());
+                if (registeredCoordinator != null) {
+                    registeredCoordinator.cancel();
+                }
+            }
+
+            // release snapshot
+            Status releaseSnapshotStatus = releaseSnapshotPaths();
+            if (!releaseSnapshotStatus.ok()) {
+                // snapshot will be removed by GC thread on BE, finally.
+                LOG.warn("failed to release snapshot for export job: {}. err: {}", id,
+                        releaseSnapshotStatus.getErrorMsg());
+            }
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org