You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/01/22 15:22:18 UTC

[doris] branch branch-1.2-lts updated (a986c181c2 -> 5e35e74682)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a change to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


    from a986c181c2 [improvement](vertical compaction) cache segment in vertical compaction (#16101)
     new 33376b71f0 [Feature](export) Support cancel export statement (#15128)
     new 5e35e74682 [Enhancement](export) cancel all running coordinators when execute cancel-export statement. (#15801)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 fe/fe-core/src/main/cup/sql_parser.cup             |   4 +
 .../{CancelLoadStmt.java => CancelExportStmt.java} |  40 ++---
 .../org/apache/doris/analysis/CancelLoadStmt.java  |   3 +
 .../main/java/org/apache/doris/load/ExportJob.java |  29 +++-
 .../main/java/org/apache/doris/load/ExportMgr.java |  61 ++++++-
 .../main/java/org/apache/doris/qe/DdlExecutor.java |   3 +
 .../org/apache/doris/task/ExportExportingTask.java |  26 ++-
 .../doris/analysis/CancelExportStmtTest.java       | 190 +++++++++++++++++++++
 8 files changed, 316 insertions(+), 40 deletions(-)
 copy fe/fe-core/src/main/java/org/apache/doris/analysis/{CancelLoadStmt.java => CancelExportStmt.java} (78%)
 create mode 100644 fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 01/02: [Feature](export) Support cancel export statement (#15128)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 33376b71f00ecca0a1797324c054d0ff32a44c31
Author: wxy <du...@gmail.com>
AuthorDate: Wed Jan 4 14:08:25 2023 +0800

    [Feature](export) Support cancel export statement (#15128)
    
    
    
    Co-authored-by: wangxiangyu@360shuke.com <wa...@360shuke.com>
---
 fe/fe-core/src/main/cup/sql_parser.cup             |   4 +
 .../{CancelLoadStmt.java => CancelExportStmt.java} |  40 ++---
 .../org/apache/doris/analysis/CancelLoadStmt.java  |   3 +
 .../main/java/org/apache/doris/load/ExportJob.java |  14 +-
 .../main/java/org/apache/doris/load/ExportMgr.java |  61 ++++++-
 .../main/java/org/apache/doris/qe/DdlExecutor.java |   3 +
 .../org/apache/doris/task/ExportExportingTask.java |  26 ++-
 .../doris/analysis/CancelExportStmtTest.java       | 190 +++++++++++++++++++++
 8 files changed, 301 insertions(+), 40 deletions(-)

diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup
index 84853c31c9..83d105c734 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -3965,6 +3965,10 @@ cancel_param ::=
     {:
         RESULT = new CancelLoadStmt(db, parser.where);
     :}
+    | KW_EXPORT opt_db:db opt_wild_where
+    {:
+        RESULT = new CancelExportStmt(db, parser.where);
+    :}
     | KW_ALTER KW_TABLE opt_alter_type:type KW_FROM table_name:table cancel_rollup_job_id_list:list
     {:
         RESULT = new CancelAlterTableStmt(type, table, list);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelExportStmt.java
similarity index 78%
copy from fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java
copy to fe/fe-core/src/main/java/org/apache/doris/analysis/CancelExportStmt.java
index c08f6370a4..71998ba226 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelExportStmt.java
@@ -21,7 +21,7 @@ import org.apache.doris.analysis.BinaryPredicate.Operator;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.UserException;
-import org.apache.doris.load.loadv2.JobState;
+import org.apache.doris.load.ExportJob;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.Sets;
@@ -31,11 +31,11 @@ import java.util.Set;
 
 
 /**
- * CANCEL LOAD statement used to cancel load job.
+ * CANCEL EXPORT statement used to cancel export job.
  * syntax:
- *     CANCEL LOAD [FROM db] WHERE load_label (= "xxx" | LIKE "xxx")
+ *     CANCEL EXPORT [FROM db] WHERE [LABEL = "export_label" | LABEL like "label_pattern" | STATE = "PENDING/EXPORTING"]
  **/
-public class CancelLoadStmt extends DdlStmt {
+public class CancelExportStmt extends DdlStmt {
 
     private static final Set<String> SUPPORT_COLUMNS = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
 
@@ -53,7 +53,7 @@ public class CancelLoadStmt extends DdlStmt {
 
     private Expr whereClause;
 
-    public CancelLoadStmt(String dbName, Expr whereClause) {
+    public CancelExportStmt(String dbName, Expr whereClause) {
         this.dbName = dbName;
         this.whereClause = whereClause;
         this.SUPPORT_COLUMNS.add("label");
@@ -63,34 +63,33 @@ public class CancelLoadStmt extends DdlStmt {
     private void checkColumn(Expr expr, boolean like) throws AnalysisException {
         String inputCol = ((SlotRef) expr.getChild(0)).getColumnName();
         if (!SUPPORT_COLUMNS.contains(inputCol)) {
-            throw new AnalysisException("Current not support " + inputCol);
+            throw new AnalysisException("Current only support label and state, invalid column: " + inputCol);
         }
         if (!(expr.getChild(1) instanceof StringLiteral)) {
-            throw new AnalysisException("Value must is string");
+            throw new AnalysisException("Value must be a string");
         }
 
         String inputValue = expr.getChild(1).getStringValue();
         if (Strings.isNullOrEmpty(inputValue)) {
-            throw new AnalysisException("Value can't is null");
-        }
-        if (like && !inputValue.contains("%")) {
-            inputValue = "%" + inputValue + "%";
+            throw new AnalysisException("Value can't be null");
         }
+
         if (inputCol.equalsIgnoreCase("label")) {
             label = inputValue;
         }
+
         if (inputCol.equalsIgnoreCase("state")) {
             if (like) {
                 throw new AnalysisException("Only label can use like");
             }
             state = inputValue;
             try {
-                JobState jobState = JobState.valueOf(state);
-                if (jobState != JobState.PENDING && jobState != JobState.ETL && jobState != JobState.LOADING) {
-                    throw new AnalysisException("invalid state: " + state);
+                ExportJob.JobState jobState = ExportJob.JobState.valueOf(state);
+                if (jobState != ExportJob.JobState.PENDING && jobState != ExportJob.JobState.EXPORTING) {
+                    throw new AnalysisException("Only support PENDING/EXPORTING, invalid state: " + state);
                 }
             } catch (IllegalArgumentException e) {
-                throw new AnalysisException("invalid state: " + state);
+                throw new AnalysisException("Only support PENDING/EXPORTING, invalid state: " + state);
             }
         }
     }
@@ -118,15 +117,18 @@ public class CancelLoadStmt extends DdlStmt {
 
     private void compoundCheck(Expr expr) throws AnalysisException {
         if (expr == null) {
-            throw new AnalysisException("Where clause can't is null");
+            throw new AnalysisException("Where clause can't be null");
         }
         if (expr instanceof CompoundPredicate) {
             // current only support label and state
             CompoundPredicate compoundPredicate = (CompoundPredicate) expr;
+            if (CompoundPredicate.Operator.NOT == compoundPredicate.getOp()) {
+                throw new AnalysisException("Current not support NOT operator");
+            }
             for (int i = 0; i < 2; i++) {
                 Expr child = compoundPredicate.getChild(i);
                 if (child instanceof CompoundPredicate) {
-                    throw new AnalysisException("Current only support label and state");
+                    throw new AnalysisException("Current not support nested clause");
                 }
                 likeCheck(child);
                 binaryCheck(child);
@@ -147,8 +149,6 @@ public class CancelLoadStmt extends DdlStmt {
             dbName = ClusterNamespace.getFullName(getClusterName(), dbName);
         }
 
-        // check auth after we get real load job
-        // analyze expr
         likeCheck(whereClause);
         binaryCheck(whereClause);
         compoundCheck(whereClause);
@@ -157,7 +157,7 @@ public class CancelLoadStmt extends DdlStmt {
     @Override
     public String toSql() {
         StringBuilder stringBuilder = new StringBuilder();
-        stringBuilder.append("CANCEL LOAD ");
+        stringBuilder.append("CANCEL EXPORT ");
         if (!Strings.isNullOrEmpty(dbName)) {
             stringBuilder.append("FROM ").append(dbName);
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java
index c08f6370a4..2ca9970677 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java
@@ -123,6 +123,9 @@ public class CancelLoadStmt extends DdlStmt {
         if (expr instanceof CompoundPredicate) {
             // current only support label and state
             CompoundPredicate compoundPredicate = (CompoundPredicate) expr;
+            if (CompoundPredicate.Operator.NOT == compoundPredicate.getOp()) {
+                throw new AnalysisException("Current not support NOT operator");
+            }
             for (int i = 0; i < 2; i++) {
                 Expr child = compoundPredicate.getChild(i);
                 if (child instanceof CompoundPredicate) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
index 00057dfd67..7f681548df 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
@@ -521,7 +521,7 @@ public class ExportJob implements Writable {
         return whereExpr;
     }
 
-    public JobState getState() {
+    public synchronized JobState getState() {
         return state;
     }
 
@@ -651,11 +651,12 @@ public class ExportJob implements Writable {
     }
 
     public synchronized void cancel(ExportFailMsg.CancelType type, String msg) {
-        releaseSnapshotPaths();
         if (msg != null) {
             failMsg = new ExportFailMsg(type, msg);
         }
-        updateState(ExportJob.JobState.CANCELLED, false);
+        if (updateState(ExportJob.JobState.CANCELLED, false)) {
+            releaseSnapshotPaths();
+        }
     }
 
     public synchronized boolean updateState(ExportJob.JobState newState) {
@@ -663,6 +664,9 @@ public class ExportJob implements Writable {
     }
 
     public synchronized boolean updateState(ExportJob.JobState newState, boolean isReplay) {
+        if (isFinalState()) {
+            return false;
+        }
         state = newState;
         switch (newState) {
             case PENDING:
@@ -686,6 +690,10 @@ public class ExportJob implements Writable {
         return true;
     }
 
+    public synchronized boolean isFinalState() {
+        return this.state == ExportJob.JobState.CANCELLED || this.state == ExportJob.JobState.FINISHED;
+    }
+
     public Status releaseSnapshotPaths() {
         List<Pair<TNetworkAddress, String>> snapshotPaths = getSnapshotPaths();
         LOG.debug("snapshotPaths:{}", snapshotPaths);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
index b332f7ed25..cdee254f8a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
@@ -17,6 +17,8 @@
 
 package org.apache.doris.load;
 
+import org.apache.doris.analysis.CancelExportStmt;
+import org.apache.doris.analysis.CompoundPredicate;
 import org.apache.doris.analysis.ExportStmt;
 import org.apache.doris.analysis.TableName;
 import org.apache.doris.catalog.Database;
@@ -24,6 +26,7 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.CaseSensibility;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.LabelAlreadyUsedException;
 import org.apache.doris.common.PatternMatcher;
@@ -33,10 +36,12 @@ import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.gson.Gson;
+import org.apache.commons.lang.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -48,6 +53,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 public class ExportMgr {
@@ -99,11 +105,62 @@ public class ExportMgr {
         LOG.info("add export job. {}", job);
     }
 
+    public void cancelExportJob(CancelExportStmt stmt) throws DdlException, AnalysisException {
+        // List of export jobs waiting to be cancelled
+        List<ExportJob> matchExportJobs = getWaitingCancelJobs(stmt);
+        if (matchExportJobs.isEmpty()) {
+            throw new DdlException("Export job(s) do not exist");
+        }
+        matchExportJobs = matchExportJobs.stream()
+                .filter(job -> !job.isFinalState()).collect(Collectors.toList());
+        if (matchExportJobs.isEmpty()) {
+            throw new DdlException("All export job(s) are at final state (CANCELLED/FINISHED)");
+        }
+        for (ExportJob exportJob : matchExportJobs) {
+            exportJob.cancel(ExportFailMsg.CancelType.USER_CANCEL, "user cancel");
+        }
+    }
+
     public void unprotectAddJob(ExportJob job) {
         idToJob.put(job.getId(), job);
         labelToJobId.putIfAbsent(job.getLabel(), job.getId());
     }
 
+    private List<ExportJob> getWaitingCancelJobs(CancelExportStmt stmt) throws AnalysisException {
+        Predicate<ExportJob> jobFilter = buildCancelJobFilter(stmt);
+        readLock();
+        try {
+            return getJobs().stream().filter(jobFilter).collect(Collectors.toList());
+        } finally {
+            readUnlock();
+        }
+    }
+
+    @VisibleForTesting
+    public static Predicate<ExportJob> buildCancelJobFilter(CancelExportStmt stmt) throws AnalysisException {
+        String label = stmt.getLabel();
+        String state = stmt.getState();
+        PatternMatcher matcher = PatternMatcher.createMysqlPattern(label, CaseSensibility.LABEL.getCaseSensibility());
+
+        return job -> {
+            boolean labelFilter = true;
+            boolean stateFilter = true;
+            if (StringUtils.isNotEmpty(label)) {
+                labelFilter = label.contains("%") ? matcher.match(job.getLabel()) :
+                        job.getLabel().equalsIgnoreCase(label);
+            }
+            if (StringUtils.isNotEmpty(state)) {
+                stateFilter = job.getState().name().equalsIgnoreCase(state);
+            }
+
+            if (stmt.getOperator() != null && CompoundPredicate.Operator.OR.equals(stmt.getOperator())) {
+                return labelFilter || stateFilter;
+            }
+
+            return labelFilter && stateFilter;
+        };
+    }
+
     private ExportJob createJob(long jobId, ExportStmt stmt) throws Exception {
         ExportJob job = new ExportJob(jobId);
         job.setJob(stmt);
@@ -294,12 +351,12 @@ public class ExportMgr {
     }
 
     public void replayUpdateJobState(long jobId, ExportJob.JobState newState) {
-        writeLock();
+        readLock();
         try {
             ExportJob job = idToJob.get(jobId);
             job.updateState(newState, true);
         } finally {
-            writeUnlock();
+            readUnlock();
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index d48f8fdfdf..fa907027aa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -48,6 +48,7 @@ import org.apache.doris.analysis.BackupStmt;
 import org.apache.doris.analysis.CancelAlterSystemStmt;
 import org.apache.doris.analysis.CancelAlterTableStmt;
 import org.apache.doris.analysis.CancelBackupStmt;
+import org.apache.doris.analysis.CancelExportStmt;
 import org.apache.doris.analysis.CancelLoadStmt;
 import org.apache.doris.analysis.CleanLabelStmt;
 import org.apache.doris.analysis.CreateCatalogStmt;
@@ -188,6 +189,8 @@ public class DdlExecutor {
             } else {
                 env.getLoadManager().createLoadJobFromStmt(loadStmt);
             }
+        } else if (ddlStmt instanceof CancelExportStmt) {
+            env.getExportMgr().cancelExportJob((CancelExportStmt) ddlStmt);
         } else if (ddlStmt instanceof CancelLoadStmt) {
             env.getLoadManager().cancelLoadJob((CancelLoadStmt) ddlStmt);
         } else if (ddlStmt instanceof CreateRoutineLoadStmt) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java
index 4f8084a272..8066e280a5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java
@@ -161,18 +161,17 @@ public class ExportExportingTask extends MasterTask {
             }
         }
 
-        // release snapshot
-        Status releaseSnapshotStatus = job.releaseSnapshotPaths();
-        if (!releaseSnapshotStatus.ok()) {
-            // even if release snapshot failed, do nothing cancel this job.
-            // snapshot will be removed by GC thread on BE, finally.
-            LOG.warn("failed to release snapshot for export job: {}. err: {}", job.getId(),
-                    releaseSnapshotStatus.getErrorMsg());
-        }
-
         if (job.updateState(ExportJob.JobState.FINISHED)) {
             LOG.warn("export job success. job: {}", job);
             registerProfile();
+            // release snapshot
+            Status releaseSnapshotStatus = job.releaseSnapshotPaths();
+            if (!releaseSnapshotStatus.ok()) {
+                // even if release snapshot failed, do not cancel this job.
+                // snapshot will be removed by GC thread on BE, finally.
+                LOG.warn("failed to release snapshot for export job: {}. err: {}", job.getId(),
+                        releaseSnapshotStatus.getErrorMsg());
+            }
         }
 
         synchronized (this) {
@@ -336,12 +335,9 @@ public class ExportExportingTask extends MasterTask {
             }
         }
 
-        if (!failed) {
-            exportedFiles.clear();
-            job.addExportedFiles(newFiles);
-            ClientPool.brokerPool.returnObject(address, client);
-        }
-
+        exportedFiles.clear();
+        job.addExportedFiles(newFiles);
+        ClientPool.brokerPool.returnObject(address, client);
         return Status.OK;
     }
 }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java
new file mode 100644
index 0000000000..30be49e031
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java
@@ -0,0 +1,190 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.analysis.CompoundPredicate.Operator;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ExceptionChecker;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.UserException;
+import org.apache.doris.load.ExportJob;
+import org.apache.doris.load.ExportMgr;
+import org.apache.doris.utframe.TestWithFeService;
+
+import com.google.common.collect.Lists;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.wildfly.common.Assert;
+
+import java.util.List;
+import java.util.function.Predicate;
+
+public class CancelExportStmtTest extends TestWithFeService {
+
+    private Analyzer analyzer;
+    private String dbName = "testDb";
+    private String tblName = "table1";
+
+    @Override
+    protected void runBeforeAll() throws Exception {
+        FeConstants.runningUnitTest = true;
+        createDatabase(dbName);
+        useDatabase(dbName);
+        createTable("create table " + tblName + "\n" + "(k1 int, k2 int) distributed by hash(k1) buckets 1\n"
+                + "properties(\"replication_num\" = \"1\");");
+        analyzer = new Analyzer(connectContext.getEnv(), connectContext);
+    }
+
+    @Test
+    public void testNormal() throws UserException {
+        SlotRef labelSlotRef = new SlotRef(null, "label");
+        StringLiteral labelStringLiteral = new StringLiteral("doris_test_label");
+
+        SlotRef stateSlotRef = new SlotRef(null, "state");
+
+        BinaryPredicate labelBinaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, labelSlotRef,
+                labelStringLiteral);
+        CancelExportStmt stmt = new CancelExportStmt(null, labelBinaryPredicate);
+        stmt.analyze(analyzer);
+        Assertions.assertEquals("CANCEL EXPORT FROM default_cluster:testDb WHERE `label` = 'doris_test_label'",
+                stmt.toString());
+
+        SlotRef labelSlotRefUpper = new SlotRef(null, "LABEL");
+        BinaryPredicate labelBinaryPredicateUpper = new BinaryPredicate(BinaryPredicate.Operator.EQ, labelSlotRefUpper,
+                labelStringLiteral);
+        CancelExportStmt stmtUpper = new CancelExportStmt(null, labelBinaryPredicateUpper);
+        stmtUpper.analyze(analyzer);
+        Assertions.assertEquals("CANCEL EXPORT FROM default_cluster:testDb WHERE `LABEL` = 'doris_test_label'",
+                stmtUpper.toString());
+
+        StringLiteral stateStringLiteral = new StringLiteral("PENDING");
+        BinaryPredicate stateBinaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef,
+                stateStringLiteral);
+        stmt = new CancelExportStmt(null, stateBinaryPredicate);
+        stmt.analyze(analyzer);
+        Assertions.assertEquals("CANCEL EXPORT FROM default_cluster:testDb WHERE `state` = 'PENDING'", stmt.toString());
+
+        LikePredicate labelLikePredicate = new LikePredicate(LikePredicate.Operator.LIKE, labelSlotRef,
+                labelStringLiteral);
+        stmt = new CancelExportStmt(null, labelLikePredicate);
+        stmt.analyze(analyzer);
+        Assertions.assertEquals("CANCEL EXPORT FROM default_cluster:testDb WHERE `label` LIKE 'doris_test_label'",
+                stmt.toString());
+
+        CompoundPredicate compoundAndPredicate = new CompoundPredicate(Operator.AND, labelBinaryPredicate,
+                stateBinaryPredicate);
+        stmt = new CancelExportStmt(null, compoundAndPredicate);
+        stmt.analyze(analyzer);
+        Assertions.assertEquals(
+                "CANCEL EXPORT FROM default_cluster:testDb WHERE `label` = 'doris_test_label' AND `state` = 'PENDING'",
+                stmt.toString());
+
+        CompoundPredicate compoundOrPredicate = new CompoundPredicate(Operator.OR, labelBinaryPredicate,
+                stateBinaryPredicate);
+        stmt = new CancelExportStmt(null, compoundOrPredicate);
+        stmt.analyze(analyzer);
+        Assertions.assertEquals(
+                "CANCEL EXPORT FROM default_cluster:testDb WHERE `label` = 'doris_test_label' OR `state` = 'PENDING'",
+                stmt.toString());
+    }
+
+    @Test
+    public void testError1() {
+        SlotRef stateSlotRef = new SlotRef(null, "state");
+        StringLiteral stateStringLiteral = new StringLiteral("FINISHED");
+
+        LikePredicate stateLikePredicate =
+                new LikePredicate(LikePredicate.Operator.LIKE, stateSlotRef, stateStringLiteral);
+        CancelExportStmt stmt = new CancelExportStmt(null, stateLikePredicate);
+        ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, "Only label can use like",
+                () -> stmt.analyze(analyzer));
+    }
+
+    @Test
+    public void testError2() {
+        SlotRef stateSlotRef = new SlotRef(null, "state");
+        StringLiteral stateStringLiteral1 = new StringLiteral("EXPORTING");
+        BinaryPredicate stateEqPredicate1 =
+                new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, stateStringLiteral1);
+
+        StringLiteral stateStringLiteral2 = new StringLiteral("PENDING");
+        BinaryPredicate stateEqPredicate2 =
+                new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, stateStringLiteral2);
+
+        SlotRef labelSlotRef = new SlotRef(null, "label");
+        StringLiteral labelStringLiteral1 = new StringLiteral("test_label");
+        BinaryPredicate labelEqPredicate1 =
+                new BinaryPredicate(BinaryPredicate.Operator.EQ, labelSlotRef, labelStringLiteral1);
+
+        CompoundPredicate compoundAndPredicate1 = new CompoundPredicate(Operator.AND, stateEqPredicate1,
+                stateEqPredicate2);
+        CompoundPredicate compoundAndPredicate2 = new CompoundPredicate(Operator.AND, compoundAndPredicate1,
+                labelEqPredicate1);
+        CompoundPredicate compoundAndPredicate3 = new CompoundPredicate(Operator.NOT, stateEqPredicate1, null);
+
+
+        CancelExportStmt stmt1 = new CancelExportStmt(null, compoundAndPredicate2);
+        ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, "Current not support nested clause",
+                () -> stmt1.analyze(analyzer));
+
+
+        CancelExportStmt stmt2 = new CancelExportStmt(null, compoundAndPredicate3);
+        ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, "Current not support NOT operator",
+                () -> stmt2.analyze(analyzer));
+    }
+
+    @Test
+    public void testCancelJobFilter() throws UserException {
+        List<ExportJob> exportJobList1 = Lists.newLinkedList();
+        List<ExportJob> exportJobList2 = Lists.newLinkedList();
+        ExportJob job1 = new ExportJob();
+        ExportJob job2 = new ExportJob();
+        job2.updateState(ExportJob.JobState.CANCELLED, true);
+        ExportJob job3 = new ExportJob();
+        job3.updateState(ExportJob.JobState.EXPORTING, true);
+        ExportJob job4 = new ExportJob();
+        exportJobList1.add(job1);
+        exportJobList1.add(job2);
+        exportJobList1.add(job3);
+        exportJobList1.add(job4);
+        exportJobList2.add(job1);
+        exportJobList2.add(job2);
+
+        SlotRef stateSlotRef = new SlotRef(null, "state");
+        StringLiteral stateStringLiteral = new StringLiteral("PENDING");
+        BinaryPredicate stateEqPredicate =
+                new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, stateStringLiteral);
+        CancelExportStmt stmt = new CancelExportStmt(null, stateEqPredicate);
+        stmt.analyze(analyzer);
+        Predicate<ExportJob> filter = ExportMgr.buildCancelJobFilter(stmt);
+
+        Assert.assertTrue(exportJobList1.stream().filter(filter).count() == 2);
+        Assert.assertTrue(exportJobList2.stream().filter(filter).count() == 1);
+
+        stateStringLiteral = new StringLiteral("EXPORTING");
+        stateEqPredicate =
+                new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, stateStringLiteral);
+        stmt = new CancelExportStmt(null, stateEqPredicate);
+        stmt.analyze(analyzer);
+        filter = ExportMgr.buildCancelJobFilter(stmt);
+
+        Assert.assertTrue(exportJobList1.stream().filter(filter).count() == 1);
+
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 02/02: [Enhancement](export) cancel all running coordinators when execute cancel-export statement. (#15801)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 5e35e7468261e7253b288d4cd6fbff2878c79018
Author: Xiangyu Wang <du...@gmail.com>
AuthorDate: Sun Jan 22 23:11:32 2023 +0800

    [Enhancement](export) cancel all running coordinators when execute cancel-export statement. (#15801)
---
 .../src/main/java/org/apache/doris/load/ExportJob.java  | 17 ++++++++++++++++-
 1 file changed, 16 insertions(+), 1 deletion(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
index 7f681548df..7c68e24689 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
@@ -66,6 +66,7 @@ import org.apache.doris.planner.ScanNode;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.Coordinator;
 import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.qe.QeProcessorImpl;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.qe.SqlModeHelper;
 import org.apache.doris.rewrite.ExprRewriter;
@@ -655,7 +656,21 @@ public class ExportJob implements Writable {
             failMsg = new ExportFailMsg(type, msg);
         }
         if (updateState(ExportJob.JobState.CANCELLED, false)) {
-            releaseSnapshotPaths();
+            // cancel all running coordinators, so that the scheduler's worker thread will be released
+            for (Coordinator coordinator : coordList) {
+                Coordinator registeredCoordinator = QeProcessorImpl.INSTANCE.getCoordinator(coordinator.getQueryId());
+                if (registeredCoordinator != null) {
+                    registeredCoordinator.cancel();
+                }
+            }
+
+            // release snapshot
+            Status releaseSnapshotStatus = releaseSnapshotPaths();
+            if (!releaseSnapshotStatus.ok()) {
+                // snapshot will be removed by GC thread on BE, finally.
+                LOG.warn("failed to release snapshot for export job: {}. err: {}", id,
+                        releaseSnapshotStatus.getErrorMsg());
+            }
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org