You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/05/19 08:38:03 UTC

[incubator-doris] branch master updated: [Feature] cancel load support state (#9537)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new cbc7b167b1 [Feature] cancel load support state (#9537)
cbc7b167b1 is described below

commit cbc7b167b1712bd10d94b550e86aeaa358663ccc
Author: Stalary <st...@163.com>
AuthorDate: Thu May 19 16:37:56 2022 +0800

    [Feature] cancel load support state (#9537)
---
 .../org/apache/doris/analysis/CancelLoadStmt.java  | 165 +++++++-------
 .../org/apache/doris/common/CaseSensibility.java   |   3 +
 .../src/main/java/org/apache/doris/load/Load.java  | 178 ---------------
 .../org/apache/doris/load/loadv2/LoadManager.java  | 246 +++++++++++++--------
 .../main/java/org/apache/doris/qe/DdlExecutor.java |  13 +-
 .../apache/doris/analysis/CancelLoadStmtTest.java  | 165 ++++++++------
 6 files changed, 343 insertions(+), 427 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java
index 545ed9d187..7af21948b5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java
@@ -23,35 +23,105 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.UserException;
 
 import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import lombok.Getter;
 
-// CANCEL LOAD statement used to cancel load job.
-//
-// syntax:
-//      CANCEL LOAD [FROM db] WHERE load_label (= "xxx" | LIKE "xxx")
+import java.util.List;
+
+
+/**
+ * CANCEL LOAD statement used to cancel load job.
+ * syntax:
+ *     CANCEL LOAD [FROM db] WHERE load_label (= "xxx" | LIKE "xxx")
+ **/
 public class CancelLoadStmt extends DdlStmt {
 
+    private static final List<String> SUPPORT_COLUMNS = Lists.newArrayList("label", "state");
+
+    @Getter
     private String dbName;
+
+    @Getter
+    private CompoundPredicate.Operator operator;
+
+    @Getter
     private String label;
 
+    @Getter
+    private String state;
+
     private Expr whereClause;
-    private boolean isAccurateMatch;
 
-    public String getDbName() {
-        return dbName;
+    public CancelLoadStmt(String dbName, Expr whereClause) {
+        this.dbName = dbName;
+        this.whereClause = whereClause;
     }
 
-    public String getLabel() {
-        return label;
+    private void checkColumn(Expr expr, boolean like) throws AnalysisException {
+        String inputCol = ((SlotRef) expr.getChild(0)).getColumnName();
+        if (!SUPPORT_COLUMNS.contains(inputCol)) {
+            throw new AnalysisException("Current not support " + inputCol);
+        }
+        if (!(expr.getChild(1) instanceof StringLiteral)) {
+            throw new AnalysisException("Value must is string");
+        }
+
+        String inputValue = expr.getChild(1).getStringValue();
+        if (Strings.isNullOrEmpty(inputValue)) {
+            throw new AnalysisException("Value can't is null");
+        }
+        if (like && !inputValue.contains("%")) {
+            inputValue = "%" + inputValue + "%";
+        }
+        if (inputCol.equalsIgnoreCase("label")) {
+            label = inputValue;
+        }
+        if (inputCol.equalsIgnoreCase("state")) {
+            if (like) {
+                throw new AnalysisException("Only label can use like");
+            }
+            state = inputValue;
+        }
     }
 
-    public CancelLoadStmt(String dbName, Expr whereClause) {
-        this.dbName = dbName;
-        this.whereClause = whereClause;
-        this.isAccurateMatch = false;
+    private void likeCheck(Expr expr) throws AnalysisException {
+        if (expr instanceof LikePredicate) {
+            LikePredicate likePredicate = (LikePredicate) expr;
+            boolean like = LikePredicate.Operator.LIKE.equals(likePredicate.getOp());
+            if (!like) {
+                throw new AnalysisException("Not support REGEXP");
+            }
+            checkColumn(expr, true);
+        }
     }
 
-    public boolean isAccurateMatch() {
-        return isAccurateMatch;
+    private void binaryCheck(Expr expr) throws AnalysisException {
+        if (expr instanceof BinaryPredicate) {
+            BinaryPredicate binaryPredicate = (BinaryPredicate) expr;
+            if (!Operator.EQ.equals(binaryPredicate.getOp())) {
+                throw new AnalysisException("Only support equal or like");
+            }
+            checkColumn(expr, false);
+        }
+    }
+
+    private void compoundCheck(Expr expr) throws AnalysisException {
+        if (expr == null) {
+            throw new AnalysisException("Where clause can't is null");
+        }
+        if (expr instanceof CompoundPredicate) {
+            // current only support label and state
+            CompoundPredicate compoundPredicate = (CompoundPredicate) expr;
+            for (int i = 0; i < 2; i++) {
+                Expr child = compoundPredicate.getChild(i);
+                if (child instanceof CompoundPredicate) {
+                    throw new AnalysisException("Current only support label and state");
+                }
+                likeCheck(child);
+                binaryCheck(child);
+            }
+            operator = compoundPredicate.getOp();
+        }
     }
 
     @Override
@@ -67,63 +137,10 @@ public class CancelLoadStmt extends DdlStmt {
         }
 
         // check auth after we get real load job
-
-        // analyze expr if not null
-        boolean valid = true;
-        do {
-            if (whereClause == null) {
-                valid = false;
-                break;
-            }
-
-            if (whereClause instanceof BinaryPredicate) {
-                BinaryPredicate binaryPredicate = (BinaryPredicate) whereClause;
-                isAccurateMatch = true;
-                if (binaryPredicate.getOp() != Operator.EQ) {
-                    valid = false;
-                    break;
-                }
-            } else if (whereClause instanceof LikePredicate) {
-                LikePredicate likePredicate = (LikePredicate) whereClause;
-                if (likePredicate.getOp() != LikePredicate.Operator.LIKE) {
-                    valid = false;
-                    break;
-                }
-            } else {
-                valid = false;
-                break;
-            }
-
-            // left child
-            if (!(whereClause.getChild(0) instanceof SlotRef)) {
-                valid = false;
-                break;
-            }
-            if (!((SlotRef) whereClause.getChild(0)).getColumnName().equalsIgnoreCase("label")) {
-                valid = false;
-                break;
-            }
-
-            // right child
-            if (!(whereClause.getChild(1) instanceof StringLiteral)) {
-                valid = false;
-                break;
-            }
-
-            label = ((StringLiteral) whereClause.getChild(1)).getStringValue();
-            if (Strings.isNullOrEmpty(label)) {
-                valid = false;
-                break;
-            }
-            if (!isAccurateMatch && !label.contains("%")) {
-                label = "%" + label + "%";
-            }
-        } while (false);
-
-        if (!valid) {
-            throw new AnalysisException("Where clause should looks like: LABEL = \"your_load_label\"," +
-                    " or LABEL LIKE \"matcher\"");
-        }
+        // analyze expr
+        likeCheck(whereClause);
+        binaryCheck(whereClause);
+        compoundCheck(whereClause);
     }
 
     @Override
@@ -131,11 +148,11 @@ public class CancelLoadStmt extends DdlStmt {
         StringBuilder stringBuilder = new StringBuilder();
         stringBuilder.append("CANCEL LOAD ");
         if (!Strings.isNullOrEmpty(dbName)) {
-            stringBuilder.append("FROM " + dbName);
+            stringBuilder.append("FROM ").append(dbName);
         }
 
         if (whereClause != null) {
-            stringBuilder.append(" WHERE " + whereClause.toSql());
+            stringBuilder.append(" WHERE ").append(whereClause.toSql());
         }
         return stringBuilder.toString();
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/CaseSensibility.java b/fe/fe-core/src/main/java/org/apache/doris/common/CaseSensibility.java
index ff4ebdf775..651581a3c6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/CaseSensibility.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/CaseSensibility.java
@@ -17,6 +17,9 @@
 
 package org.apache.doris.common;
 
+/**
+ * CaseSensibility Enum.
+ **/
 public enum CaseSensibility {
     CLUSTER(true),
     DATABASE(true),
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
index 81e500968e..bec97bc58f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
@@ -1728,184 +1728,6 @@ public class Load {
         return false;
     }
 
-    public boolean isLabelExist(String dbName, String labelValue, boolean isAccurateMatch) throws DdlException, AnalysisException {
-        // get load job and check state
-        Database db = Catalog.getCurrentCatalog().getDbOrDdlException(dbName);
-        readLock();
-        try {
-            Map<String, List<LoadJob>> labelToLoadJobs = dbLabelToLoadJobs.get(db.getId());
-            if (labelToLoadJobs == null) {
-                return false;
-            }
-            List<LoadJob> loadJobs = Lists.newArrayList();
-            if (isAccurateMatch) {
-                if (labelToLoadJobs.containsKey(labelValue)) {
-                    loadJobs.addAll(labelToLoadJobs.get(labelValue));
-                }
-            } else {
-                PatternMatcher matcher = PatternMatcher.createMysqlPattern(labelValue, CaseSensibility.LABEL.getCaseSensibility());
-                for (Map.Entry<String, List<LoadJob>> entry : labelToLoadJobs.entrySet()) {
-                    if (matcher.match(entry.getKey())) {
-                        loadJobs.addAll(entry.getValue());
-                    }
-                }
-            }
-            if (loadJobs.isEmpty()) {
-                return false;
-            }
-            if (loadJobs.stream().filter(entity -> entity.getState() != JobState.CANCELLED).count() == 0) {
-                return false;
-            }
-            return true;
-        } finally {
-            readUnlock();
-        }
-    }
-
-    public boolean cancelLoadJob(CancelLoadStmt stmt, boolean isAccurateMatch) throws DdlException, AnalysisException {
-        // get params
-        String dbName = stmt.getDbName();
-        String label = stmt.getLabel();
-
-        // get load job and check state
-        Database db = Catalog.getCurrentCatalog().getDbOrDdlException(dbName);
-        // List of load jobs waiting to be cancelled
-        List<LoadJob> loadJobs = Lists.newArrayList();
-        readLock();
-        try {
-            Map<String, List<LoadJob>> labelToLoadJobs = dbLabelToLoadJobs.get(db.getId());
-            if (labelToLoadJobs == null) {
-                throw new DdlException("Load job does not exist");
-            }
-
-            // get jobs by label
-            List<LoadJob> matchLoadJobs = Lists.newArrayList();
-            if (isAccurateMatch) {
-                if (labelToLoadJobs.containsKey(label)) {
-                    matchLoadJobs.addAll(labelToLoadJobs.get(label));
-                }
-            } else {
-                PatternMatcher matcher = PatternMatcher.createMysqlPattern(label, CaseSensibility.LABEL.getCaseSensibility());
-                for (Map.Entry<String, List<LoadJob>> entry : labelToLoadJobs.entrySet()) {
-                    if (matcher.match(entry.getKey())) {
-                        loadJobs.addAll(entry.getValue());
-                    }
-                }
-            }
-
-            if (matchLoadJobs.isEmpty()) {
-                throw new DdlException("Load job does not exist");
-            }
-
-            // check state here
-            List<LoadJob> uncompletedLoadJob = matchLoadJobs.stream().filter(job -> {
-                JobState state = job.getState();
-                return state != JobState.CANCELLED && state != JobState.QUORUM_FINISHED && state != JobState.FINISHED;
-            }).collect(Collectors.toList());
-            if (uncompletedLoadJob.isEmpty()) {
-                throw new DdlException("There is no uncompleted job which label " +
-                        (isAccurateMatch ? "is " : "like ") + stmt.getLabel());
-            }
-            loadJobs.addAll(uncompletedLoadJob);
-        } finally {
-            readUnlock();
-        }
-
-        // check auth here, cause we need table info
-        Set<String> tableNames = Sets.newHashSet();
-        for (LoadJob loadJob : loadJobs) {
-            tableNames.addAll(loadJob.getTableNames());
-        }
-
-        if (tableNames.isEmpty()) {
-            if (Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), dbName,
-                    PrivPredicate.LOAD)) {
-                ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "CANCEL LOAD");
-            }
-        } else {
-            for (String tblName : tableNames) {
-                if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), dbName, tblName,
-                        PrivPredicate.LOAD)) {
-                    ErrorReport.reportDdlException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "CANCEL LOAD",
-                            ConnectContext.get().getQualifiedUser(),
-                            ConnectContext.get().getRemoteIP(), dbName + ": " + tblName);
-                }
-            }
-        }
-
-        // cancel job
-        for (LoadJob loadJob : loadJobs) {
-            List<String> failedMsg = Lists.newArrayList();
-            boolean ok = cancelLoadJob(loadJob, CancelType.USER_CANCEL, "user cancel", failedMsg);
-            if (!ok) {
-                throw new DdlException("Cancel load job [" + loadJob.getId() + "] fail, " +
-                        "label=[" + loadJob.getLabel() + "] failed msg=" +
-                        (failedMsg.isEmpty() ? "Unknown reason" : failedMsg.get(0)));
-            }
-        }
-
-        return true;
-    }
-
-    public boolean cancelLoadJob(CancelLoadStmt stmt) throws DdlException {
-        // get params
-        String dbName = stmt.getDbName();
-        String label = stmt.getLabel();
-
-        // get load job and check state
-        Database db = Catalog.getCurrentCatalog().getDbOrDdlException(dbName);
-        LoadJob job;
-        readLock();
-        try {
-            Map<String, List<LoadJob>> labelToLoadJobs = dbLabelToLoadJobs.get(db.getId());
-            if (labelToLoadJobs == null) {
-                throw new DdlException("Load job does not exist");
-            }
-
-            List<LoadJob> loadJobs = labelToLoadJobs.get(label);
-            if (loadJobs == null) {
-                throw new DdlException("Load job does not exist");
-            }
-            // only the last one should be running
-            job = loadJobs.get(loadJobs.size() - 1);
-            JobState state = job.getState();
-            if (state == JobState.CANCELLED) {
-                throw new DdlException("Load job has been cancelled");
-            } else if (state == JobState.QUORUM_FINISHED || state == JobState.FINISHED) {
-                throw new DdlException("Load job has been finished");
-            }
-        } finally {
-            readUnlock();
-        }
-
-        // check auth here, cause we need table info
-        Set<String> tableNames = job.getTableNames();
-        if (tableNames.isEmpty()) {
-            // forward compatibility
-            if (!Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), dbName,
-                    PrivPredicate.LOAD)) {
-                ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "CANCEL LOAD");
-            }
-        } else {
-            for (String tblName : tableNames) {
-                if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), dbName, tblName,
-                        PrivPredicate.LOAD)) {
-                    ErrorReport.reportDdlException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "CANCEL LOAD",
-                            ConnectContext.get().getQualifiedUser(),
-                            ConnectContext.get().getRemoteIP(), dbName + ": " + tblName);
-                }
-            }
-        }
-
-        // cancel job
-        List<String> failedMsg = Lists.newArrayList();
-        if (!cancelLoadJob(job, CancelType.USER_CANCEL, "user cancel", failedMsg)) {
-            throw new DdlException("Cancel load job fail: " + (failedMsg.isEmpty() ? "Unknown reason" : failedMsg.get(0)));
-        }
-
-        return true;
-    }
-
     public boolean cancelLoadJob(LoadJob job, CancelType cancelType, String msg) {
         return cancelLoadJob(job, cancelType, msg, null);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 4ac2a3550f..5d54451ee8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -18,6 +18,7 @@
 package org.apache.doris.load.loadv2;
 
 import org.apache.doris.analysis.CancelLoadStmt;
+import org.apache.doris.analysis.CompoundPredicate.Operator;
 import org.apache.doris.analysis.LoadStmt;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
@@ -46,10 +47,12 @@ import org.apache.doris.thrift.TMiniLoadRequest;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.TransactionState;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import org.apache.commons.lang.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -71,11 +74,10 @@ import java.util.stream.Collectors;
 
 /**
  * The broker and mini load jobs(v2) are included in this class.
- *
  * The lock sequence:
  * Database.lock
- *   LoadManager.lock
- *     LoadJob.lock
+ * LoadManager.lock
+ * LoadJob.lock
  */
 public class LoadManager implements Writable {
     private static final Logger LOG = LogManager.getLogger(LoadManager.class);
@@ -92,8 +94,6 @@ public class LoadManager implements Writable {
 
     /**
      * This method will be invoked by the broker load(v2) now.
-     * @param stmt
-     * @throws DdlException
      */
     public long createLoadJobFromStmt(LoadStmt stmt) throws DdlException {
         Database database = checkDb(stmt.getLabel().getDbName());
@@ -112,8 +112,10 @@ public class LoadManager implements Writable {
                     throw new DdlException("LoadManager only support the broker and spark load.");
                 }
                 if (unprotectedGetUnfinishedJobNum() >= Config.desired_max_waiting_jobs) {
-                    throw new DdlException("There are more than " + Config.desired_max_waiting_jobs + " unfinished load jobs, "
-                            + "please retry later. You can use `SHOW LOAD` to view submitted jobs");
+                    throw new DdlException(
+                            "There are more than " + Config.desired_max_waiting_jobs
+                                    + " unfinished load jobs, please retry later. "
+                                    + "You can use `SHOW LOAD` to view submitted jobs");
                 }
             }
 
@@ -139,9 +141,6 @@ public class LoadManager implements Writable {
      * This method will be invoked by streaming mini load.
      * It will begin the txn of mini load immediately without any scheduler .
      *
-     * @param request
-     * @return
-     * @throws UserException
      */
     public long createLoadJobFromMiniLoad(TMiniLoadBeginRequest request) throws UserException {
         String cluster = SystemInfoService.DEFAULT_CLUSTER;
@@ -155,7 +154,8 @@ public class LoadManager implements Writable {
         try {
             loadJob = new MiniLoadJob(database.getId(), table.getId(), request);
             // call unprotectedExecute before adding load job. so that if job is not started ok, no need to add.
-            // NOTICE(cmy): this order is only for Mini Load, because mini load's unprotectedExecute() only do beginTxn().
+            // NOTICE(cmy): this order is only for Mini Load, because mini load's
+            // unprotectedExecute() only do beginTxn().
             // for other kind of load job, execute the job after adding job.
             // Mini load job must be executed before release write lock.
             // Otherwise, the duplicated request maybe get the transaction id before transaction of mini load is begun.
@@ -164,7 +164,8 @@ public class LoadManager implements Writable {
             createLoadJob(loadJob);
         } catch (DuplicatedRequestException e) {
             // this is a duplicate request, just return previous txn id
-            LOG.info("duplicate request for mini load. request id: {}, txn: {}", e.getDuplicatedRequestId(), e.getTxnId());
+            LOG.info("duplicate request for mini load. request id: {}, txn: {}", e.getDuplicatedRequestId(),
+                    e.getTxnId());
             return e.getTxnId();
         } catch (UserException e) {
             if (loadJob != null) {
@@ -190,14 +191,12 @@ public class LoadManager implements Writable {
      * Step1: lock the load manager
      * Step2: check the label in load manager
      * Step3: call the addLoadJob of load class
-     *     Step3.1: lock the load
-     *     Step3.2: check the label in load
-     *     Step3.3: add the loadJob in load rather than load manager
-     *     Step3.4: unlock the load
+     * Step3.1: lock the load
+     * Step3.2: check the label in load
+     * Step3.3: add the loadJob in load rather than load manager
+     * Step3.4: unlock the load
      * Step4: unlock the load manager
-     * @param stmt
-     * @param timestamp
-     * @throws DdlException
+     *
      */
     public void createLoadJobV1FromStmt(LoadStmt stmt, EtlJobType jobType, long timestamp) throws DdlException {
         Database database = checkDb(stmt.getLabel().getDbName());
@@ -215,10 +214,9 @@ public class LoadManager implements Writable {
      * It is used to check the label of v1 and v2 at the same time.
      * Finally, the non-streaming mini load will belongs to load class.
      *
-     * @param request
-     * @return if: mini load is a duplicated load, return false.
-     *         else: return true.
-     * @throws DdlException
+     * @param request request
+     * @return if: mini load is a duplicated load, return false. else: return true.
+     * @deprecated not support mini load
      */
     @Deprecated
     public boolean createLoadJobV1FromRequest(TMiniLoadRequest request) throws DdlException {
@@ -236,6 +234,9 @@ public class LoadManager implements Writable {
         }
     }
 
+    /**
+     * MultiLoadMgr use.
+     **/
     public void createLoadJobV1FromMultiStart(String fullDbName, String label) throws DdlException {
         Database database = checkDb(fullDbName);
         writeLock();
@@ -250,9 +251,7 @@ public class LoadManager implements Writable {
 
     public void replayCreateLoadJob(LoadJob loadJob) {
         createLoadJob(loadJob);
-        LOG.info(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId())
-                .add("msg", "replay create load job")
-                .build());
+        LOG.info(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId()).add("msg", "replay create load job").build());
     }
 
     // add load job and also add to to callback factory
@@ -262,7 +261,8 @@ public class LoadManager implements Writable {
             return;
         }
         addLoadJob(loadJob);
-        // add callback before txn if load job is uncompleted, because callback will be performed on replay without txn begin
+        // add callback before txn if load job is uncompleted,
+        // because callback will be performed on replay without txn begin
         // register txn state listener
         if (!loadJob.isCompleted()) {
             Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(loadJob);
@@ -282,8 +282,11 @@ public class LoadManager implements Writable {
         labelToLoadJobs.get(loadJob.getLabel()).add(loadJob);
     }
 
+    /**
+     * Record finished load job by editLog.
+     **/
     public void recordFinishedLoadJob(String label, long transactionId, String dbName, long tableId, EtlJobType jobType,
-                                      long createTimestamp, String failMsg, String trackingUrl) throws MetaNotFoundException {
+            long createTimestamp, String failMsg, String trackingUrl) throws MetaNotFoundException {
 
         // get db id
         Database db = Catalog.getCurrentCatalog().getDbOrMetaException(dbName);
@@ -291,7 +294,8 @@ public class LoadManager implements Writable {
         LoadJob loadJob;
         switch (jobType) {
             case INSERT:
-                loadJob = new InsertLoadJob(label, transactionId, db.getId(), tableId, createTimestamp, failMsg, trackingUrl);
+                loadJob = new InsertLoadJob(label, transactionId, db.getId(), tableId, createTimestamp, failMsg,
+                        trackingUrl);
                 break;
             default:
                 return;
@@ -301,60 +305,77 @@ public class LoadManager implements Writable {
         Catalog.getCurrentCatalog().getEditLog().logCreateLoadJob(loadJob);
     }
 
-    public void cancelLoadJob(CancelLoadStmt stmt, boolean isAccurateMatch) throws DdlException, AnalysisException {
-        Database db = Catalog.getCurrentCatalog().getDbOrDdlException(stmt.getDbName());
+    /**
+     * Match need cancel loadJob by stmt.
+     **/
+    @VisibleForTesting
+    public static void addNeedCancelLoadJob(CancelLoadStmt stmt, List<LoadJob> loadJobs, List<LoadJob> matchLoadJobs)
+            throws AnalysisException {
+        String label = stmt.getLabel();
+        String state = stmt.getState();
+        PatternMatcher matcher = PatternMatcher.createMysqlPattern(label, CaseSensibility.LABEL.getCaseSensibility());
+        matchLoadJobs.addAll(loadJobs.stream().filter(job -> {
+            if (stmt.getOperator() != null) {
+                // compound
+                boolean labelFilter =
+                        label.contains("%") ? matcher.match(job.getLabel()) : job.getLabel().equalsIgnoreCase(label);
+                boolean stateFilter = job.getState().name().equalsIgnoreCase(state);
+                return Operator.AND.equals(stmt.getOperator()) ? labelFilter && stateFilter :
+                        labelFilter || stateFilter;
+            }
+            if (StringUtils.isNotEmpty(label)) {
+                return label.contains("%") ? matcher.match(job.getLabel()) : job.getLabel().equalsIgnoreCase(label);
+            }
+            if (StringUtils.isNotEmpty(state)) {
+                return job.getState().name().equalsIgnoreCase(state);
+            }
+            return false;
+        }).collect(Collectors.toList()));
+    }
 
+    /**
+     * Cancel load job by stmt.
+     **/
+    public void cancelLoadJob(CancelLoadStmt stmt) throws DdlException, AnalysisException {
+        Database db = Catalog.getCurrentCatalog().getDbOrDdlException(stmt.getDbName());
         // List of load jobs waiting to be cancelled
-        List<LoadJob> loadJobs = Lists.newArrayList();
+        List<LoadJob> matchLoadJobs = Lists.newArrayList();
         readLock();
         try {
             Map<String, List<LoadJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(db.getId());
             if (labelToLoadJobs == null) {
                 throw new DdlException("Load job does not exist");
             }
-
-            // get jobs by label
-            List<LoadJob> matchLoadJobs = Lists.newArrayList();
-            if (isAccurateMatch) {
-                if (labelToLoadJobs.containsKey(stmt.getLabel())) {
-                    matchLoadJobs.addAll(labelToLoadJobs.get(stmt.getLabel()));
-                }
-            } else {
-                PatternMatcher matcher = PatternMatcher.createMysqlPattern(stmt.getLabel(), CaseSensibility.LABEL.getCaseSensibility());
-                for (Map.Entry<String, List<LoadJob>> entry : labelToLoadJobs.entrySet()) {
-                    if (matcher.match(entry.getKey())) {
-                        matchLoadJobs.addAll(entry.getValue());
-                    }
-                }
-            }
-
+            addNeedCancelLoadJob(stmt,
+                    labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
+                    matchLoadJobs);
             if (matchLoadJobs.isEmpty()) {
                 throw new DdlException("Load job does not exist");
             }
-
             // check state here
-            List<LoadJob> uncompletedLoadJob = matchLoadJobs.stream().filter(entity -> !entity.isTxnDone())
-                    .collect(Collectors.toList());
+            List<LoadJob> uncompletedLoadJob =
+                    matchLoadJobs.stream().filter(entity -> !entity.isTxnDone()).collect(Collectors.toList());
             if (uncompletedLoadJob.isEmpty()) {
-                throw new DdlException("There is no uncompleted job which label " +
-                        (isAccurateMatch ? "is " : "like ") + stmt.getLabel());
+                throw new DdlException("There is no uncompleted job");
             }
-
-            loadJobs.addAll(uncompletedLoadJob);
         } finally {
             readUnlock();
         }
-
-        for (LoadJob loadJob : loadJobs) {
+        for (LoadJob loadJob : matchLoadJobs) {
             try {
                 loadJob.cancelJob(new FailMsg(FailMsg.CancelType.USER_CANCEL, "user cancel"));
             } catch (DdlException e) {
-                throw new DdlException("Cancel load job [" + loadJob.getId() + "] fail, " +
-                        "label=[" + loadJob.getLabel() + "] failed msg=" + e.getMessage());
+                throw new DdlException(
+                        "Cancel load job [" + loadJob.getId() + "] fail, " + "label=[" + loadJob.getLabel()
+                                +
+                                "] failed msg=" + e.getMessage());
             }
         }
     }
 
+    /**
+     * Replay end load job.
+     **/
     public void replayEndLoadJob(LoadJobFinalOperation operation) {
         LoadJob job = idToLoadJob.get(operation.getId());
         if (job == null) {
@@ -367,12 +388,13 @@ public class LoadManager implements Writable {
             return;
         }
         job.unprotectReadEndOperation(operation);
-        LOG.info(new LogBuilder(LogKey.LOAD_JOB, operation.getId())
-                .add("operation", operation)
-                .add("msg", "replay end load job")
-                .build());
+        LOG.info(new LogBuilder(LogKey.LOAD_JOB, operation.getId()).add("operation", operation)
+                .add("msg", "replay end load job").build());
     }
 
+    /**
+     * Replay update load job.
+     **/
     public void replayUpdateLoadJobStateInfo(LoadJob.LoadJobStateUpdateInfo info) {
         long jobId = info.getJobId();
         LoadJob job = idToLoadJob.get(jobId);
@@ -384,6 +406,9 @@ public class LoadManager implements Writable {
         job.replayUpdateStateInfo(info);
     }
 
+    /**
+     * Get load job num, used by proc.
+     **/
     public int getLoadJobNum(JobState jobState, long dbId) {
         readLock();
         try {
@@ -391,23 +416,31 @@ public class LoadManager implements Writable {
             if (labelToLoadJobs == null) {
                 return 0;
             }
-            List<LoadJob> loadJobList = labelToLoadJobs.values().stream()
-                    .flatMap(entity -> entity.stream()).collect(Collectors.toList());
+            List<LoadJob> loadJobList =
+                    labelToLoadJobs.values().stream().flatMap(entity -> entity.stream()).collect(Collectors.toList());
             return (int) loadJobList.stream().filter(entity -> entity.getState() == jobState).count();
         } finally {
             readUnlock();
         }
     }
 
+
+    /**
+     * Get load job num, used by metric.
+     **/
     public long getLoadJobNum(JobState jobState, EtlJobType jobType) {
         readLock();
         try {
-            return idToLoadJob.values().stream().filter(j -> j.getState() == jobState && j.getJobType() == jobType).count();
+            return idToLoadJob.values().stream().filter(j -> j.getState() == jobState && j.getJobType() == jobType)
+                    .count();
         } finally {
             readUnlock();
         }
     }
 
+    /**
+     * Remove old load job.
+     **/
     public void removeOldLoadJob() {
         long currentTimeMs = System.currentTimeMillis();
 
@@ -437,7 +470,9 @@ public class LoadManager implements Writable {
         }
     }
 
-    // only for those jobs which have etl state, like SparkLoadJob
+    /**
+     * Only for those jobs which have etl state, like SparkLoadJob.
+     **/
     public void processEtlStateJobs() {
         idToLoadJob.values().stream().filter(job -> (job.jobType == EtlJobType.SPARK && job.state == JobState.ETL))
                 .forEach(job -> {
@@ -445,8 +480,8 @@ public class LoadManager implements Writable {
                         ((SparkLoadJob) job).updateEtlStatus();
                     } catch (DataQualityException e) {
                         LOG.info("update load job etl status failed. job id: {}", job.getId(), e);
-                        job.cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, DataQualityException.QUALITY_FAIL_MSG),
-                                true, true);
+                        job.cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED,
+                                DataQualityException.QUALITY_FAIL_MSG), true, true);
                     } catch (UserException e) {
                         LOG.warn("update load job etl status failed. job id: {}", job.getId(), e);
                         job.cancelJobWithoutCheck(new FailMsg(CancelType.ETL_RUN_FAIL, e.getMessage()), true, true);
@@ -456,7 +491,9 @@ public class LoadManager implements Writable {
                 });
     }
 
-    // only for those jobs which load by PushTask
+    /**
+     * Only for those jobs which load by PushTask.
+     **/
     public void processLoadingStateJobs() {
         idToLoadJob.values().stream().filter(job -> (job.jobType == EtlJobType.SPARK && job.state == JobState.LOADING))
                 .forEach(job -> {
@@ -473,16 +510,17 @@ public class LoadManager implements Writable {
 
     /**
      * This method will return the jobs info which can meet the condition of input param.
-     * @param dbId used to filter jobs which belong to this db
-     * @param labelValue used to filter jobs which's label is or like labelValue.
+     *
+     * @param dbId          used to filter jobs which belong to this db
+     * @param labelValue    used to filter jobs which's label is or like labelValue.
      * @param accurateMatch true: filter jobs which's label is labelValue. false: filter jobs which's label like itself.
-     * @param statesValue used to filter jobs which's state within the statesValue set.
+     * @param statesValue   used to filter jobs which's state within the statesValue set.
      * @return The result is the list of jobInfo.
-     *     JobInfo is a List<Comparable> which includes the comparable object: jobId, label, state etc.
-     *     The result is unordered.
+     *         JobInfo is a list which includes the comparable object: jobId, label, state etc.
+     *         The result is unordered.
      */
-    public List<List<Comparable>> getLoadJobInfosByDb(long dbId, String labelValue,
-                                                      boolean accurateMatch, Set<String> statesValue) throws AnalysisException {
+    public List<List<Comparable>> getLoadJobInfosByDb(long dbId, String labelValue, boolean accurateMatch,
+            Set<String> statesValue) throws AnalysisException {
         LinkedList<List<Comparable>> loadJobInfos = new LinkedList<List<Comparable>>();
         if (!dbIdToLabelToLoadJobs.containsKey(dbId)) {
             return loadJobInfos;
@@ -506,8 +544,8 @@ public class LoadManager implements Writable {
             Map<String, List<LoadJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(dbId);
             List<LoadJob> loadJobList = Lists.newArrayList();
             if (Strings.isNullOrEmpty(labelValue)) {
-                loadJobList.addAll(labelToLoadJobs.values()
-                        .stream().flatMap(Collection::stream).collect(Collectors.toList()));
+                loadJobList.addAll(
+                        labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()));
             } else {
                 // check label value
                 if (accurateMatch) {
@@ -517,7 +555,8 @@ public class LoadManager implements Writable {
                     loadJobList.addAll(labelToLoadJobs.get(labelValue));
                 } else {
                     // non-accurate match
-                    PatternMatcher matcher = PatternMatcher.createMysqlPattern(labelValue, CaseSensibility.LABEL.getCaseSensibility());
+                    PatternMatcher matcher =
+                            PatternMatcher.createMysqlPattern(labelValue, CaseSensibility.LABEL.getCaseSensibility());
                     for (Map.Entry<String, List<LoadJob>> entry : labelToLoadJobs.entrySet()) {
                         if (matcher.match(entry.getKey())) {
                             loadJobList.addAll(entry.getValue());
@@ -544,6 +583,9 @@ public class LoadManager implements Writable {
         }
     }
 
+    /**
+     * Get load job info.
+     **/
     public void getLoadJobInfo(Load.JobInfo info) throws DdlException {
         String fullDbName = ClusterNamespace.getFullName(info.clusterName, info.dbName);
         info.dbName = fullDbName;
@@ -577,8 +619,8 @@ public class LoadManager implements Writable {
     }
 
     private void submitJobs() {
-        loadJobScheduler.submitJob(idToLoadJob.values().stream().filter(
-                loadJob -> loadJob.state == JobState.PENDING).collect(Collectors.toList()));
+        loadJobScheduler.submitJob(idToLoadJob.values().stream().filter(loadJob -> loadJob.state == JobState.PENDING)
+                .collect(Collectors.toList()));
     }
 
     private void analyzeLoadJobs() {
@@ -594,16 +636,13 @@ public class LoadManager implements Writable {
     }
 
     /**
-     * step1: if label has been used in old load jobs which belong to load class
-     * step2: if label has been used in v2 load jobs
-     *     step2.1: if label has been user in v2 load jobs, the create timestamp will be checked
+     * step1: if label has been used in old load jobs which belong to load class.
+     * step2: if label has been used in v2 load jobs.
+     * step2.1: if label has been user in v2 load jobs, the create timestamp will be checked.
      *
-     * @param dbId
-     * @param label
      * @throws LabelAlreadyUsedException throw exception when label has been used by an unfinished job.
      */
-    private void checkLabelUsed(long dbId, String label)
-            throws DdlException {
+    private void checkLabelUsed(long dbId, String label) throws DdlException {
         // if label has been used in old load jobs
         Catalog.getCurrentCatalog().getLoadInstance().isLabelUsed(dbId, label);
         // if label has been used in v2 of load jobs
@@ -637,16 +676,22 @@ public class LoadManager implements Writable {
         lock.writeLock().unlock();
     }
 
+    /**
+     * Init.
+     **/
     public void initJobProgress(Long jobId, TUniqueId loadId, Set<TUniqueId> fragmentIds,
-                                List<Long> relatedBackendIds) {
+            List<Long> relatedBackendIds) {
         LoadJob job = idToLoadJob.get(jobId);
         if (job != null) {
             job.initLoadProgress(loadId, fragmentIds, relatedBackendIds);
         }
     }
 
-    public void updateJobProgress(Long jobId, Long beId, TUniqueId loadId, TUniqueId fragmentId,
-                                  long scannedRows, long scannedBytes, boolean isDone) {
+    /**
+     * Update.
+     **/
+    public void updateJobProgress(Long jobId, Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows,
+            long scannedBytes, boolean isDone) {
         LoadJob job = idToLoadJob.get(jobId);
         if (job != null) {
             job.updateProgress(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone);
@@ -656,7 +701,8 @@ public class LoadManager implements Writable {
     @Override
     public void write(DataOutput out) throws IOException {
         long currentTimeMs = System.currentTimeMillis();
-        List<LoadJob> loadJobs = idToLoadJob.values().stream().filter(t -> !t.isExpired(currentTimeMs)).collect(Collectors.toList());
+        List<LoadJob> loadJobs =
+                idToLoadJob.values().stream().filter(t -> !t.isExpired(currentTimeMs)).collect(Collectors.toList());
 
         out.writeInt(loadJobs.size());
         for (LoadJob loadJob : loadJobs) {
@@ -664,6 +710,9 @@ public class LoadManager implements Writable {
         }
     }
 
+    /**
+     * Read from file.
+     **/
     public void readFields(DataInput in) throws IOException {
         long currentTimeMs = System.currentTimeMillis();
         int size = in.readInt();
@@ -683,12 +732,13 @@ public class LoadManager implements Writable {
                 if (loadJob.getState() == JobState.PENDING) {
                     // bad case. When a mini load job is created and then FE restart.
                     // the job will be in PENDING state forever.
-                    // This is a temp solution to remove these jobs. And the mini load job should be deprecated in Doris v1.1
-                    TransactionState state = Catalog.getCurrentCatalog().getGlobalTransactionMgr().getTransactionState(
-                            loadJob.getDbId(), loadJob.getTransactionId());
+                    // This is a temp solution to remove these jobs.
+                    // And the mini load job should be deprecated in Doris v1.1
+                    TransactionState state = Catalog.getCurrentCatalog().getGlobalTransactionMgr()
+                            .getTransactionState(loadJob.getDbId(), loadJob.getTransactionId());
                     if (state == null) {
-                        LOG.warn("skip mini load job {} in db {} with PENDING state and with txn: {}",
-                                loadJob.getId(), loadJob.getDbId(), loadJob.getTransactionId());
+                        LOG.warn("skip mini load job {} in db {} with PENDING state and with txn: {}", loadJob.getId(),
+                                loadJob.getDbId(), loadJob.getTransactionId());
                         continue;
                     }
                 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index 5904848b55..174f505dac 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -170,18 +170,7 @@ public class DdlExecutor {
                 catalog.getLoadManager().createLoadJobFromStmt(loadStmt);
             }
         } else if (ddlStmt instanceof CancelLoadStmt) {
-            boolean isAccurateMatch = ((CancelLoadStmt) ddlStmt).isAccurateMatch();
-            boolean isLabelExist = catalog.getLoadInstance().isLabelExist(
-                    ((CancelLoadStmt) ddlStmt).getDbName(),
-                    ((CancelLoadStmt) ddlStmt).getLabel(), isAccurateMatch);
-            if (isLabelExist) {
-                catalog.getLoadInstance().cancelLoadJob((CancelLoadStmt) ddlStmt,
-                        isAccurateMatch);
-            }
-            if (!isLabelExist || isAccurateMatch) {
-                catalog.getLoadManager().cancelLoadJob((CancelLoadStmt) ddlStmt,
-                        isAccurateMatch);
-            }
+            catalog.getLoadManager().cancelLoadJob((CancelLoadStmt) ddlStmt);
         } else if (ddlStmt instanceof CreateRoutineLoadStmt) {
             catalog.getRoutineLoadManager().createRoutineLoadJob((CreateRoutineLoadStmt) ddlStmt);
         } else if (ddlStmt instanceof PauseRoutineLoadStmt) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java
index 4ed21d7ae2..3621cbcbad 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java
@@ -17,88 +17,123 @@
 
 package org.apache.doris.analysis;
 
-import org.apache.doris.catalog.Catalog;
-import org.apache.doris.catalog.FakeCatalog;
+import org.apache.doris.analysis.CompoundPredicate.Operator;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ExceptionChecker;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.UserException;
+import org.apache.doris.load.loadv2.InsertLoadJob;
+import org.apache.doris.load.loadv2.LoadJob;
+import org.apache.doris.load.loadv2.LoadManager;
+import org.apache.doris.utframe.TestWithFeService;
 
-import mockit.Expectations;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
 
-public class CancelLoadStmtTest {
-    private Analyzer analyzer;
-    private Catalog catalog;
+import java.util.ArrayList;
+import java.util.List;
 
-    FakeCatalog fakeCatalog;
+public class CancelLoadStmtTest extends TestWithFeService {
 
-    @Before
-    public void setUp() {
-        fakeCatalog = new FakeCatalog();
+    private Analyzer analyzer;
 
-        catalog = AccessTestUtil.fetchAdminCatalog();
-        FakeCatalog.setCatalog(catalog);
+    @Override
+    protected void runBeforeAll() throws Exception {
+        FeConstants.runningUnitTest = true;
+        createDatabase("testDb");
+        useDatabase("testDb");
+        createTable("create table table1\n" + "(k1 int, k2 int) distributed by hash(k1) buckets 1\n"
+                + "properties(\"replication_num\" = \"1\");");
+        analyzer = new Analyzer(connectContext.getCatalog(), connectContext);
+    }
 
-        analyzer = AccessTestUtil.fetchAdminAnalyzer(true);
-        new Expectations(analyzer) {
-            {
-                analyzer.getDefaultDb();
-                minTimes = 0;
-                result = "testCluster:testDb";
+    @Test
+    public void testNormal() throws UserException {
+        SlotRef labelSlotRef = new SlotRef(null, "label");
+        StringLiteral labelStringLiteral = new StringLiteral("doris_test_label");
 
-                analyzer.getQualifiedUser();
-                minTimes = 0;
-                result = "testCluster:testUser";
+        SlotRef stateSlotRef = new SlotRef(null, "state");
+        StringLiteral stateStringLiteral = new StringLiteral("FINISHED");
 
-                analyzer.getClusterName();
-                minTimes = 0;
-                result = "testCluster";
+        BinaryPredicate labelBinaryPredicate =
+                new BinaryPredicate(BinaryPredicate.Operator.EQ, labelSlotRef, labelStringLiteral);
+        CancelLoadStmt stmt = new CancelLoadStmt(null, labelBinaryPredicate);
+        stmt.analyze(analyzer);
+        Assertions.assertEquals("CANCEL LOAD FROM default_cluster:testDb WHERE `label` = 'doris_test_label'",
+                stmt.toString());
 
-                analyzer.getCatalog();
-                minTimes = 0;
-                result = catalog;
-            }
-        };
-    }
+        BinaryPredicate stateBinaryPredicate =
+                new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, stateStringLiteral);
+        stmt = new CancelLoadStmt(null, stateBinaryPredicate);
+        stmt.analyze(analyzer);
+        Assertions.assertEquals("CANCEL LOAD FROM default_cluster:testDb WHERE `state` = 'FINISHED'", stmt.toString());
 
-    @Test
-    public void testNormal() throws UserException, AnalysisException {
-        SlotRef slotRef = new SlotRef(null, "label");
-        StringLiteral stringLiteral = new StringLiteral("doris_test_label");
+        LikePredicate labelLikePredicate =
+                new LikePredicate(LikePredicate.Operator.LIKE, labelSlotRef, labelStringLiteral);
+        stmt = new CancelLoadStmt(null, labelLikePredicate);
+        stmt.analyze(analyzer);
+        Assertions.assertEquals("CANCEL LOAD FROM default_cluster:testDb WHERE `label` LIKE 'doris_test_label'",
+                stmt.toString());
 
-        BinaryPredicate binaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, slotRef, stringLiteral);
-        CancelLoadStmt stmt = new CancelLoadStmt(null, binaryPredicate);
+        CompoundPredicate compoundAndPredicate =
+                new CompoundPredicate(Operator.AND, labelBinaryPredicate, stateBinaryPredicate);
+        stmt = new CancelLoadStmt(null, compoundAndPredicate);
         stmt.analyze(analyzer);
-        Assert.assertTrue(stmt.isAccurateMatch());
-        Assert.assertEquals("CANCEL LOAD FROM testCluster:testDb WHERE `label` = 'doris_test_label'", stmt.toString());
+        Assertions.assertEquals(
+                "CANCEL LOAD FROM default_cluster:testDb WHERE `label` = 'doris_test_label' AND `state` = 'FINISHED'",
+                stmt.toString());
 
-        LikePredicate likePredicate = new LikePredicate(LikePredicate.Operator.LIKE, slotRef, stringLiteral);
-        stmt = new CancelLoadStmt(null, likePredicate);
+        CompoundPredicate compoundOrPredicate =
+                new CompoundPredicate(Operator.OR, labelBinaryPredicate, stateBinaryPredicate);
+        stmt = new CancelLoadStmt(null, compoundOrPredicate);
+        stmt.analyze(analyzer);
+        Assertions.assertEquals(
+                "CANCEL LOAD FROM default_cluster:testDb WHERE `label` = 'doris_test_label' OR `state` = 'FINISHED'",
+                stmt.toString());
+
+        // test match
+        List<LoadJob> loadJobs = new ArrayList<>();
+        InsertLoadJob insertLoadJob1 = new InsertLoadJob("doris_test_label", 1L, 10003L, 10005L, 0, "", "");
+        loadJobs.add(insertLoadJob1);
+        InsertLoadJob insertLoadJob2 = new InsertLoadJob("doris_test_label_1", 2L, 10003L, 10005L, 0, "", "");
+        loadJobs.add(insertLoadJob2);
+        InsertLoadJob insertLoadJob3 = new InsertLoadJob("doris_test_label_2", 3L, 10003L, 10005L, 0, "", "");
+        loadJobs.add(insertLoadJob3);
+        // label
+        stmt = new CancelLoadStmt(null, labelBinaryPredicate);
+        stmt.analyze(analyzer);
+        List<LoadJob> matchLoadJobs = new ArrayList<>();
+        LoadManager.addNeedCancelLoadJob(stmt, loadJobs, matchLoadJobs);
+        Assertions.assertEquals(1, matchLoadJobs.size());
+        // state
+        matchLoadJobs.clear();
+        stmt = new CancelLoadStmt(null, stateBinaryPredicate);
+        stmt.analyze(analyzer);
+        LoadManager.addNeedCancelLoadJob(stmt, loadJobs, matchLoadJobs);
+        Assertions.assertEquals(3, matchLoadJobs.size());
+        // or
+        matchLoadJobs.clear();
+        stmt = new CancelLoadStmt(null, compoundOrPredicate);
         stmt.analyze(analyzer);
-        Assert.assertFalse(stmt.isAccurateMatch());
-        Assert.assertEquals("CANCEL LOAD FROM testCluster:testDb WHERE `label` LIKE 'doris_test_label'", stmt.toString());
+        LoadManager.addNeedCancelLoadJob(stmt, loadJobs, matchLoadJobs);
+        Assertions.assertEquals(3, matchLoadJobs.size());
+        // and
+        matchLoadJobs.clear();
+        stmt = new CancelLoadStmt(null, compoundAndPredicate);
+        stmt.analyze(analyzer);
+        LoadManager.addNeedCancelLoadJob(stmt, loadJobs, matchLoadJobs);
+        Assertions.assertEquals(1, matchLoadJobs.size());
     }
 
-    @Test(expected = AnalysisException.class)
-    public void testNoDb() throws UserException, AnalysisException {
-        SlotRef slotRef = new SlotRef(null, "label");
-        StringLiteral stringLiteral = new StringLiteral("doris_test_label");
-        new Expectations(analyzer) {
-            {
-                analyzer.getDefaultDb();
-                minTimes = 0;
-                result = "";
-
-                analyzer.getClusterName();
-                minTimes = 0;
-                result = "testCluster";
-            }
-        };
-
-        BinaryPredicate binaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, slotRef, stringLiteral);
-        CancelLoadStmt stmt = new CancelLoadStmt(null, binaryPredicate);
-        stmt.analyze(analyzer);
-        Assert.fail("No exception throws.");
+    @Test
+    public void testError() {
+        SlotRef stateSlotRef = new SlotRef(null, "state");
+        StringLiteral stateStringLiteral = new StringLiteral("FINISHED");
+
+        LikePredicate stateLikePredicate =
+                new LikePredicate(LikePredicate.Operator.LIKE, stateSlotRef, stateStringLiteral);
+        CancelLoadStmt stmt = new CancelLoadStmt(null, stateLikePredicate);
+        ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, "Only label can use like",
+                () -> stmt.analyze(analyzer));
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org