You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/05/19 08:38:03 UTC
[incubator-doris] branch master updated: [Feature] cancel load support state (#9537)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new cbc7b167b1 [Feature] cancel load support state (#9537)
cbc7b167b1 is described below
commit cbc7b167b1712bd10d94b550e86aeaa358663ccc
Author: Stalary <st...@163.com>
AuthorDate: Thu May 19 16:37:56 2022 +0800
[Feature] cancel load support state (#9537)
---
.../org/apache/doris/analysis/CancelLoadStmt.java | 165 +++++++-------
.../org/apache/doris/common/CaseSensibility.java | 3 +
.../src/main/java/org/apache/doris/load/Load.java | 178 ---------------
.../org/apache/doris/load/loadv2/LoadManager.java | 246 +++++++++++++--------
.../main/java/org/apache/doris/qe/DdlExecutor.java | 13 +-
.../apache/doris/analysis/CancelLoadStmtTest.java | 165 ++++++++------
6 files changed, 343 insertions(+), 427 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java
index 545ed9d187..7af21948b5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java
@@ -23,35 +23,105 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import lombok.Getter;
-// CANCEL LOAD statement used to cancel load job.
-//
-// syntax:
-// CANCEL LOAD [FROM db] WHERE load_label (= "xxx" | LIKE "xxx")
+import java.util.List;
+
+
+/**
+ * CANCEL LOAD statement used to cancel load job.
+ * syntax:
+ * CANCEL LOAD [FROM db] WHERE load_label (= "xxx" | LIKE "xxx")
+ **/
public class CancelLoadStmt extends DdlStmt {
+ private static final List<String> SUPPORT_COLUMNS = Lists.newArrayList("label", "state");
+
+ @Getter
private String dbName;
+
+ @Getter
+ private CompoundPredicate.Operator operator;
+
+ @Getter
private String label;
+ @Getter
+ private String state;
+
private Expr whereClause;
- private boolean isAccurateMatch;
- public String getDbName() {
- return dbName;
+ public CancelLoadStmt(String dbName, Expr whereClause) {
+ this.dbName = dbName;
+ this.whereClause = whereClause;
}
- public String getLabel() {
- return label;
+ private void checkColumn(Expr expr, boolean like) throws AnalysisException {
+ String inputCol = ((SlotRef) expr.getChild(0)).getColumnName();
+ if (!SUPPORT_COLUMNS.contains(inputCol)) {
+ throw new AnalysisException("Current not support " + inputCol);
+ }
+ if (!(expr.getChild(1) instanceof StringLiteral)) {
+ throw new AnalysisException("Value must is string");
+ }
+
+ String inputValue = expr.getChild(1).getStringValue();
+ if (Strings.isNullOrEmpty(inputValue)) {
+ throw new AnalysisException("Value can't is null");
+ }
+ if (like && !inputValue.contains("%")) {
+ inputValue = "%" + inputValue + "%";
+ }
+ if (inputCol.equalsIgnoreCase("label")) {
+ label = inputValue;
+ }
+ if (inputCol.equalsIgnoreCase("state")) {
+ if (like) {
+ throw new AnalysisException("Only label can use like");
+ }
+ state = inputValue;
+ }
}
- public CancelLoadStmt(String dbName, Expr whereClause) {
- this.dbName = dbName;
- this.whereClause = whereClause;
- this.isAccurateMatch = false;
+ private void likeCheck(Expr expr) throws AnalysisException {
+ if (expr instanceof LikePredicate) {
+ LikePredicate likePredicate = (LikePredicate) expr;
+ boolean like = LikePredicate.Operator.LIKE.equals(likePredicate.getOp());
+ if (!like) {
+ throw new AnalysisException("Not support REGEXP");
+ }
+ checkColumn(expr, true);
+ }
}
- public boolean isAccurateMatch() {
- return isAccurateMatch;
+ private void binaryCheck(Expr expr) throws AnalysisException {
+ if (expr instanceof BinaryPredicate) {
+ BinaryPredicate binaryPredicate = (BinaryPredicate) expr;
+ if (!Operator.EQ.equals(binaryPredicate.getOp())) {
+ throw new AnalysisException("Only support equal or like");
+ }
+ checkColumn(expr, false);
+ }
+ }
+
+ private void compoundCheck(Expr expr) throws AnalysisException {
+ if (expr == null) {
+ throw new AnalysisException("Where clause can't is null");
+ }
+ if (expr instanceof CompoundPredicate) {
+ // current only support label and state
+ CompoundPredicate compoundPredicate = (CompoundPredicate) expr;
+ for (int i = 0; i < 2; i++) {
+ Expr child = compoundPredicate.getChild(i);
+ if (child instanceof CompoundPredicate) {
+ throw new AnalysisException("Current only support label and state");
+ }
+ likeCheck(child);
+ binaryCheck(child);
+ }
+ operator = compoundPredicate.getOp();
+ }
}
@Override
@@ -67,63 +137,10 @@ public class CancelLoadStmt extends DdlStmt {
}
// check auth after we get real load job
-
- // analyze expr if not null
- boolean valid = true;
- do {
- if (whereClause == null) {
- valid = false;
- break;
- }
-
- if (whereClause instanceof BinaryPredicate) {
- BinaryPredicate binaryPredicate = (BinaryPredicate) whereClause;
- isAccurateMatch = true;
- if (binaryPredicate.getOp() != Operator.EQ) {
- valid = false;
- break;
- }
- } else if (whereClause instanceof LikePredicate) {
- LikePredicate likePredicate = (LikePredicate) whereClause;
- if (likePredicate.getOp() != LikePredicate.Operator.LIKE) {
- valid = false;
- break;
- }
- } else {
- valid = false;
- break;
- }
-
- // left child
- if (!(whereClause.getChild(0) instanceof SlotRef)) {
- valid = false;
- break;
- }
- if (!((SlotRef) whereClause.getChild(0)).getColumnName().equalsIgnoreCase("label")) {
- valid = false;
- break;
- }
-
- // right child
- if (!(whereClause.getChild(1) instanceof StringLiteral)) {
- valid = false;
- break;
- }
-
- label = ((StringLiteral) whereClause.getChild(1)).getStringValue();
- if (Strings.isNullOrEmpty(label)) {
- valid = false;
- break;
- }
- if (!isAccurateMatch && !label.contains("%")) {
- label = "%" + label + "%";
- }
- } while (false);
-
- if (!valid) {
- throw new AnalysisException("Where clause should looks like: LABEL = \"your_load_label\"," +
- " or LABEL LIKE \"matcher\"");
- }
+ // analyze expr
+ likeCheck(whereClause);
+ binaryCheck(whereClause);
+ compoundCheck(whereClause);
}
@Override
@@ -131,11 +148,11 @@ public class CancelLoadStmt extends DdlStmt {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("CANCEL LOAD ");
if (!Strings.isNullOrEmpty(dbName)) {
- stringBuilder.append("FROM " + dbName);
+ stringBuilder.append("FROM ").append(dbName);
}
if (whereClause != null) {
- stringBuilder.append(" WHERE " + whereClause.toSql());
+ stringBuilder.append(" WHERE ").append(whereClause.toSql());
}
return stringBuilder.toString();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/CaseSensibility.java b/fe/fe-core/src/main/java/org/apache/doris/common/CaseSensibility.java
index ff4ebdf775..651581a3c6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/CaseSensibility.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/CaseSensibility.java
@@ -17,6 +17,9 @@
package org.apache.doris.common;
+/**
+ * CaseSensibility Enum.
+ **/
public enum CaseSensibility {
CLUSTER(true),
DATABASE(true),
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
index 81e500968e..bec97bc58f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
@@ -1728,184 +1728,6 @@ public class Load {
return false;
}
- public boolean isLabelExist(String dbName, String labelValue, boolean isAccurateMatch) throws DdlException, AnalysisException {
- // get load job and check state
- Database db = Catalog.getCurrentCatalog().getDbOrDdlException(dbName);
- readLock();
- try {
- Map<String, List<LoadJob>> labelToLoadJobs = dbLabelToLoadJobs.get(db.getId());
- if (labelToLoadJobs == null) {
- return false;
- }
- List<LoadJob> loadJobs = Lists.newArrayList();
- if (isAccurateMatch) {
- if (labelToLoadJobs.containsKey(labelValue)) {
- loadJobs.addAll(labelToLoadJobs.get(labelValue));
- }
- } else {
- PatternMatcher matcher = PatternMatcher.createMysqlPattern(labelValue, CaseSensibility.LABEL.getCaseSensibility());
- for (Map.Entry<String, List<LoadJob>> entry : labelToLoadJobs.entrySet()) {
- if (matcher.match(entry.getKey())) {
- loadJobs.addAll(entry.getValue());
- }
- }
- }
- if (loadJobs.isEmpty()) {
- return false;
- }
- if (loadJobs.stream().filter(entity -> entity.getState() != JobState.CANCELLED).count() == 0) {
- return false;
- }
- return true;
- } finally {
- readUnlock();
- }
- }
-
- public boolean cancelLoadJob(CancelLoadStmt stmt, boolean isAccurateMatch) throws DdlException, AnalysisException {
- // get params
- String dbName = stmt.getDbName();
- String label = stmt.getLabel();
-
- // get load job and check state
- Database db = Catalog.getCurrentCatalog().getDbOrDdlException(dbName);
- // List of load jobs waiting to be cancelled
- List<LoadJob> loadJobs = Lists.newArrayList();
- readLock();
- try {
- Map<String, List<LoadJob>> labelToLoadJobs = dbLabelToLoadJobs.get(db.getId());
- if (labelToLoadJobs == null) {
- throw new DdlException("Load job does not exist");
- }
-
- // get jobs by label
- List<LoadJob> matchLoadJobs = Lists.newArrayList();
- if (isAccurateMatch) {
- if (labelToLoadJobs.containsKey(label)) {
- matchLoadJobs.addAll(labelToLoadJobs.get(label));
- }
- } else {
- PatternMatcher matcher = PatternMatcher.createMysqlPattern(label, CaseSensibility.LABEL.getCaseSensibility());
- for (Map.Entry<String, List<LoadJob>> entry : labelToLoadJobs.entrySet()) {
- if (matcher.match(entry.getKey())) {
- loadJobs.addAll(entry.getValue());
- }
- }
- }
-
- if (matchLoadJobs.isEmpty()) {
- throw new DdlException("Load job does not exist");
- }
-
- // check state here
- List<LoadJob> uncompletedLoadJob = matchLoadJobs.stream().filter(job -> {
- JobState state = job.getState();
- return state != JobState.CANCELLED && state != JobState.QUORUM_FINISHED && state != JobState.FINISHED;
- }).collect(Collectors.toList());
- if (uncompletedLoadJob.isEmpty()) {
- throw new DdlException("There is no uncompleted job which label " +
- (isAccurateMatch ? "is " : "like ") + stmt.getLabel());
- }
- loadJobs.addAll(uncompletedLoadJob);
- } finally {
- readUnlock();
- }
-
- // check auth here, cause we need table info
- Set<String> tableNames = Sets.newHashSet();
- for (LoadJob loadJob : loadJobs) {
- tableNames.addAll(loadJob.getTableNames());
- }
-
- if (tableNames.isEmpty()) {
- if (Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), dbName,
- PrivPredicate.LOAD)) {
- ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "CANCEL LOAD");
- }
- } else {
- for (String tblName : tableNames) {
- if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), dbName, tblName,
- PrivPredicate.LOAD)) {
- ErrorReport.reportDdlException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "CANCEL LOAD",
- ConnectContext.get().getQualifiedUser(),
- ConnectContext.get().getRemoteIP(), dbName + ": " + tblName);
- }
- }
- }
-
- // cancel job
- for (LoadJob loadJob : loadJobs) {
- List<String> failedMsg = Lists.newArrayList();
- boolean ok = cancelLoadJob(loadJob, CancelType.USER_CANCEL, "user cancel", failedMsg);
- if (!ok) {
- throw new DdlException("Cancel load job [" + loadJob.getId() + "] fail, " +
- "label=[" + loadJob.getLabel() + "] failed msg=" +
- (failedMsg.isEmpty() ? "Unknown reason" : failedMsg.get(0)));
- }
- }
-
- return true;
- }
-
- public boolean cancelLoadJob(CancelLoadStmt stmt) throws DdlException {
- // get params
- String dbName = stmt.getDbName();
- String label = stmt.getLabel();
-
- // get load job and check state
- Database db = Catalog.getCurrentCatalog().getDbOrDdlException(dbName);
- LoadJob job;
- readLock();
- try {
- Map<String, List<LoadJob>> labelToLoadJobs = dbLabelToLoadJobs.get(db.getId());
- if (labelToLoadJobs == null) {
- throw new DdlException("Load job does not exist");
- }
-
- List<LoadJob> loadJobs = labelToLoadJobs.get(label);
- if (loadJobs == null) {
- throw new DdlException("Load job does not exist");
- }
- // only the last one should be running
- job = loadJobs.get(loadJobs.size() - 1);
- JobState state = job.getState();
- if (state == JobState.CANCELLED) {
- throw new DdlException("Load job has been cancelled");
- } else if (state == JobState.QUORUM_FINISHED || state == JobState.FINISHED) {
- throw new DdlException("Load job has been finished");
- }
- } finally {
- readUnlock();
- }
-
- // check auth here, cause we need table info
- Set<String> tableNames = job.getTableNames();
- if (tableNames.isEmpty()) {
- // forward compatibility
- if (!Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), dbName,
- PrivPredicate.LOAD)) {
- ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "CANCEL LOAD");
- }
- } else {
- for (String tblName : tableNames) {
- if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), dbName, tblName,
- PrivPredicate.LOAD)) {
- ErrorReport.reportDdlException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "CANCEL LOAD",
- ConnectContext.get().getQualifiedUser(),
- ConnectContext.get().getRemoteIP(), dbName + ": " + tblName);
- }
- }
- }
-
- // cancel job
- List<String> failedMsg = Lists.newArrayList();
- if (!cancelLoadJob(job, CancelType.USER_CANCEL, "user cancel", failedMsg)) {
- throw new DdlException("Cancel load job fail: " + (failedMsg.isEmpty() ? "Unknown reason" : failedMsg.get(0)));
- }
-
- return true;
- }
-
public boolean cancelLoadJob(LoadJob job, CancelType cancelType, String msg) {
return cancelLoadJob(job, cancelType, msg, null);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 4ac2a3550f..5d54451ee8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -18,6 +18,7 @@
package org.apache.doris.load.loadv2;
import org.apache.doris.analysis.CancelLoadStmt;
+import org.apache.doris.analysis.CompoundPredicate.Operator;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
@@ -46,10 +47,12 @@ import org.apache.doris.thrift.TMiniLoadRequest;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionState;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -71,11 +74,10 @@ import java.util.stream.Collectors;
/**
* The broker and mini load jobs(v2) are included in this class.
- *
* The lock sequence:
* Database.lock
- * LoadManager.lock
- * LoadJob.lock
+ * LoadManager.lock
+ * LoadJob.lock
*/
public class LoadManager implements Writable {
private static final Logger LOG = LogManager.getLogger(LoadManager.class);
@@ -92,8 +94,6 @@ public class LoadManager implements Writable {
/**
* This method will be invoked by the broker load(v2) now.
- * @param stmt
- * @throws DdlException
*/
public long createLoadJobFromStmt(LoadStmt stmt) throws DdlException {
Database database = checkDb(stmt.getLabel().getDbName());
@@ -112,8 +112,10 @@ public class LoadManager implements Writable {
throw new DdlException("LoadManager only support the broker and spark load.");
}
if (unprotectedGetUnfinishedJobNum() >= Config.desired_max_waiting_jobs) {
- throw new DdlException("There are more than " + Config.desired_max_waiting_jobs + " unfinished load jobs, "
- + "please retry later. You can use `SHOW LOAD` to view submitted jobs");
+ throw new DdlException(
+ "There are more than " + Config.desired_max_waiting_jobs
+ + " unfinished load jobs, please retry later. "
+ + "You can use `SHOW LOAD` to view submitted jobs");
}
}
@@ -139,9 +141,6 @@ public class LoadManager implements Writable {
* This method will be invoked by streaming mini load.
* It will begin the txn of mini load immediately without any scheduler .
*
- * @param request
- * @return
- * @throws UserException
*/
public long createLoadJobFromMiniLoad(TMiniLoadBeginRequest request) throws UserException {
String cluster = SystemInfoService.DEFAULT_CLUSTER;
@@ -155,7 +154,8 @@ public class LoadManager implements Writable {
try {
loadJob = new MiniLoadJob(database.getId(), table.getId(), request);
// call unprotectedExecute before adding load job. so that if job is not started ok, no need to add.
- // NOTICE(cmy): this order is only for Mini Load, because mini load's unprotectedExecute() only do beginTxn().
+ // NOTICE(cmy): this order is only for Mini Load, because mini load's
+ // unprotectedExecute() only do beginTxn().
// for other kind of load job, execute the job after adding job.
// Mini load job must be executed before release write lock.
// Otherwise, the duplicated request maybe get the transaction id before transaction of mini load is begun.
@@ -164,7 +164,8 @@ public class LoadManager implements Writable {
createLoadJob(loadJob);
} catch (DuplicatedRequestException e) {
// this is a duplicate request, just return previous txn id
- LOG.info("duplicate request for mini load. request id: {}, txn: {}", e.getDuplicatedRequestId(), e.getTxnId());
+ LOG.info("duplicate request for mini load. request id: {}, txn: {}", e.getDuplicatedRequestId(),
+ e.getTxnId());
return e.getTxnId();
} catch (UserException e) {
if (loadJob != null) {
@@ -190,14 +191,12 @@ public class LoadManager implements Writable {
* Step1: lock the load manager
* Step2: check the label in load manager
* Step3: call the addLoadJob of load class
- * Step3.1: lock the load
- * Step3.2: check the label in load
- * Step3.3: add the loadJob in load rather than load manager
- * Step3.4: unlock the load
+ * Step3.1: lock the load
+ * Step3.2: check the label in load
+ * Step3.3: add the loadJob in load rather than load manager
+ * Step3.4: unlock the load
* Step4: unlock the load manager
- * @param stmt
- * @param timestamp
- * @throws DdlException
+ *
*/
public void createLoadJobV1FromStmt(LoadStmt stmt, EtlJobType jobType, long timestamp) throws DdlException {
Database database = checkDb(stmt.getLabel().getDbName());
@@ -215,10 +214,9 @@ public class LoadManager implements Writable {
* It is used to check the label of v1 and v2 at the same time.
* Finally, the non-streaming mini load will belongs to load class.
*
- * @param request
- * @return if: mini load is a duplicated load, return false.
- * else: return true.
- * @throws DdlException
+ * @param request request
+ * @return if: mini load is a duplicated load, return false. else: return true.
+ * @deprecated not support mini load
*/
@Deprecated
public boolean createLoadJobV1FromRequest(TMiniLoadRequest request) throws DdlException {
@@ -236,6 +234,9 @@ public class LoadManager implements Writable {
}
}
+ /**
+ * MultiLoadMgr use.
+ **/
public void createLoadJobV1FromMultiStart(String fullDbName, String label) throws DdlException {
Database database = checkDb(fullDbName);
writeLock();
@@ -250,9 +251,7 @@ public class LoadManager implements Writable {
public void replayCreateLoadJob(LoadJob loadJob) {
createLoadJob(loadJob);
- LOG.info(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId())
- .add("msg", "replay create load job")
- .build());
+ LOG.info(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId()).add("msg", "replay create load job").build());
}
// add load job and also add to to callback factory
@@ -262,7 +261,8 @@ public class LoadManager implements Writable {
return;
}
addLoadJob(loadJob);
- // add callback before txn if load job is uncompleted, because callback will be performed on replay without txn begin
+ // add callback before txn if load job is uncompleted,
+ // because callback will be performed on replay without txn begin
// register txn state listener
if (!loadJob.isCompleted()) {
Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(loadJob);
@@ -282,8 +282,11 @@ public class LoadManager implements Writable {
labelToLoadJobs.get(loadJob.getLabel()).add(loadJob);
}
+ /**
+ * Record finished load job by editLog.
+ **/
public void recordFinishedLoadJob(String label, long transactionId, String dbName, long tableId, EtlJobType jobType,
- long createTimestamp, String failMsg, String trackingUrl) throws MetaNotFoundException {
+ long createTimestamp, String failMsg, String trackingUrl) throws MetaNotFoundException {
// get db id
Database db = Catalog.getCurrentCatalog().getDbOrMetaException(dbName);
@@ -291,7 +294,8 @@ public class LoadManager implements Writable {
LoadJob loadJob;
switch (jobType) {
case INSERT:
- loadJob = new InsertLoadJob(label, transactionId, db.getId(), tableId, createTimestamp, failMsg, trackingUrl);
+ loadJob = new InsertLoadJob(label, transactionId, db.getId(), tableId, createTimestamp, failMsg,
+ trackingUrl);
break;
default:
return;
@@ -301,60 +305,77 @@ public class LoadManager implements Writable {
Catalog.getCurrentCatalog().getEditLog().logCreateLoadJob(loadJob);
}
- public void cancelLoadJob(CancelLoadStmt stmt, boolean isAccurateMatch) throws DdlException, AnalysisException {
- Database db = Catalog.getCurrentCatalog().getDbOrDdlException(stmt.getDbName());
+ /**
+ * Match need cancel loadJob by stmt.
+ **/
+ @VisibleForTesting
+ public static void addNeedCancelLoadJob(CancelLoadStmt stmt, List<LoadJob> loadJobs, List<LoadJob> matchLoadJobs)
+ throws AnalysisException {
+ String label = stmt.getLabel();
+ String state = stmt.getState();
+ PatternMatcher matcher = PatternMatcher.createMysqlPattern(label, CaseSensibility.LABEL.getCaseSensibility());
+ matchLoadJobs.addAll(loadJobs.stream().filter(job -> {
+ if (stmt.getOperator() != null) {
+ // compound
+ boolean labelFilter =
+ label.contains("%") ? matcher.match(job.getLabel()) : job.getLabel().equalsIgnoreCase(label);
+ boolean stateFilter = job.getState().name().equalsIgnoreCase(state);
+ return Operator.AND.equals(stmt.getOperator()) ? labelFilter && stateFilter :
+ labelFilter || stateFilter;
+ }
+ if (StringUtils.isNotEmpty(label)) {
+ return label.contains("%") ? matcher.match(job.getLabel()) : job.getLabel().equalsIgnoreCase(label);
+ }
+ if (StringUtils.isNotEmpty(state)) {
+ return job.getState().name().equalsIgnoreCase(state);
+ }
+ return false;
+ }).collect(Collectors.toList()));
+ }
+ /**
+ * Cancel load job by stmt.
+ **/
+ public void cancelLoadJob(CancelLoadStmt stmt) throws DdlException, AnalysisException {
+ Database db = Catalog.getCurrentCatalog().getDbOrDdlException(stmt.getDbName());
// List of load jobs waiting to be cancelled
- List<LoadJob> loadJobs = Lists.newArrayList();
+ List<LoadJob> matchLoadJobs = Lists.newArrayList();
readLock();
try {
Map<String, List<LoadJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(db.getId());
if (labelToLoadJobs == null) {
throw new DdlException("Load job does not exist");
}
-
- // get jobs by label
- List<LoadJob> matchLoadJobs = Lists.newArrayList();
- if (isAccurateMatch) {
- if (labelToLoadJobs.containsKey(stmt.getLabel())) {
- matchLoadJobs.addAll(labelToLoadJobs.get(stmt.getLabel()));
- }
- } else {
- PatternMatcher matcher = PatternMatcher.createMysqlPattern(stmt.getLabel(), CaseSensibility.LABEL.getCaseSensibility());
- for (Map.Entry<String, List<LoadJob>> entry : labelToLoadJobs.entrySet()) {
- if (matcher.match(entry.getKey())) {
- matchLoadJobs.addAll(entry.getValue());
- }
- }
- }
-
+ addNeedCancelLoadJob(stmt,
+ labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
+ matchLoadJobs);
if (matchLoadJobs.isEmpty()) {
throw new DdlException("Load job does not exist");
}
-
// check state here
- List<LoadJob> uncompletedLoadJob = matchLoadJobs.stream().filter(entity -> !entity.isTxnDone())
- .collect(Collectors.toList());
+ List<LoadJob> uncompletedLoadJob =
+ matchLoadJobs.stream().filter(entity -> !entity.isTxnDone()).collect(Collectors.toList());
if (uncompletedLoadJob.isEmpty()) {
- throw new DdlException("There is no uncompleted job which label " +
- (isAccurateMatch ? "is " : "like ") + stmt.getLabel());
+ throw new DdlException("There is no uncompleted job");
}
-
- loadJobs.addAll(uncompletedLoadJob);
} finally {
readUnlock();
}
-
- for (LoadJob loadJob : loadJobs) {
+ for (LoadJob loadJob : matchLoadJobs) {
try {
loadJob.cancelJob(new FailMsg(FailMsg.CancelType.USER_CANCEL, "user cancel"));
} catch (DdlException e) {
- throw new DdlException("Cancel load job [" + loadJob.getId() + "] fail, " +
- "label=[" + loadJob.getLabel() + "] failed msg=" + e.getMessage());
+ throw new DdlException(
+ "Cancel load job [" + loadJob.getId() + "] fail, " + "label=[" + loadJob.getLabel()
+ +
+ "] failed msg=" + e.getMessage());
}
}
}
+ /**
+ * Replay end load job.
+ **/
public void replayEndLoadJob(LoadJobFinalOperation operation) {
LoadJob job = idToLoadJob.get(operation.getId());
if (job == null) {
@@ -367,12 +388,13 @@ public class LoadManager implements Writable {
return;
}
job.unprotectReadEndOperation(operation);
- LOG.info(new LogBuilder(LogKey.LOAD_JOB, operation.getId())
- .add("operation", operation)
- .add("msg", "replay end load job")
- .build());
+ LOG.info(new LogBuilder(LogKey.LOAD_JOB, operation.getId()).add("operation", operation)
+ .add("msg", "replay end load job").build());
}
+ /**
+ * Replay update load job.
+ **/
public void replayUpdateLoadJobStateInfo(LoadJob.LoadJobStateUpdateInfo info) {
long jobId = info.getJobId();
LoadJob job = idToLoadJob.get(jobId);
@@ -384,6 +406,9 @@ public class LoadManager implements Writable {
job.replayUpdateStateInfo(info);
}
+ /**
+ * Get load job num, used by proc.
+ **/
public int getLoadJobNum(JobState jobState, long dbId) {
readLock();
try {
@@ -391,23 +416,31 @@ public class LoadManager implements Writable {
if (labelToLoadJobs == null) {
return 0;
}
- List<LoadJob> loadJobList = labelToLoadJobs.values().stream()
- .flatMap(entity -> entity.stream()).collect(Collectors.toList());
+ List<LoadJob> loadJobList =
+ labelToLoadJobs.values().stream().flatMap(entity -> entity.stream()).collect(Collectors.toList());
return (int) loadJobList.stream().filter(entity -> entity.getState() == jobState).count();
} finally {
readUnlock();
}
}
+
+ /**
+ * Get load job num, used by metric.
+ **/
public long getLoadJobNum(JobState jobState, EtlJobType jobType) {
readLock();
try {
- return idToLoadJob.values().stream().filter(j -> j.getState() == jobState && j.getJobType() == jobType).count();
+ return idToLoadJob.values().stream().filter(j -> j.getState() == jobState && j.getJobType() == jobType)
+ .count();
} finally {
readUnlock();
}
}
+ /**
+ * Remove old load job.
+ **/
public void removeOldLoadJob() {
long currentTimeMs = System.currentTimeMillis();
@@ -437,7 +470,9 @@ public class LoadManager implements Writable {
}
}
- // only for those jobs which have etl state, like SparkLoadJob
+ /**
+ * Only for those jobs which have etl state, like SparkLoadJob.
+ **/
public void processEtlStateJobs() {
idToLoadJob.values().stream().filter(job -> (job.jobType == EtlJobType.SPARK && job.state == JobState.ETL))
.forEach(job -> {
@@ -445,8 +480,8 @@ public class LoadManager implements Writable {
((SparkLoadJob) job).updateEtlStatus();
} catch (DataQualityException e) {
LOG.info("update load job etl status failed. job id: {}", job.getId(), e);
- job.cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, DataQualityException.QUALITY_FAIL_MSG),
- true, true);
+ job.cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED,
+ DataQualityException.QUALITY_FAIL_MSG), true, true);
} catch (UserException e) {
LOG.warn("update load job etl status failed. job id: {}", job.getId(), e);
job.cancelJobWithoutCheck(new FailMsg(CancelType.ETL_RUN_FAIL, e.getMessage()), true, true);
@@ -456,7 +491,9 @@ public class LoadManager implements Writable {
});
}
- // only for those jobs which load by PushTask
+ /**
+ * Only for those jobs which load by PushTask.
+ **/
public void processLoadingStateJobs() {
idToLoadJob.values().stream().filter(job -> (job.jobType == EtlJobType.SPARK && job.state == JobState.LOADING))
.forEach(job -> {
@@ -473,16 +510,17 @@ public class LoadManager implements Writable {
/**
* This method will return the jobs info which can meet the condition of input param.
- * @param dbId used to filter jobs which belong to this db
- * @param labelValue used to filter jobs which's label is or like labelValue.
+ *
+ * @param dbId used to filter jobs which belong to this db
+ * @param labelValue used to filter jobs which's label is or like labelValue.
* @param accurateMatch true: filter jobs which's label is labelValue. false: filter jobs which's label like itself.
- * @param statesValue used to filter jobs which's state within the statesValue set.
+ * @param statesValue used to filter jobs which's state within the statesValue set.
* @return The result is the list of jobInfo.
- * JobInfo is a List<Comparable> which includes the comparable object: jobId, label, state etc.
- * The result is unordered.
+ * JobInfo is a list which includes the comparable object: jobId, label, state etc.
+ * The result is unordered.
*/
- public List<List<Comparable>> getLoadJobInfosByDb(long dbId, String labelValue,
- boolean accurateMatch, Set<String> statesValue) throws AnalysisException {
+ public List<List<Comparable>> getLoadJobInfosByDb(long dbId, String labelValue, boolean accurateMatch,
+ Set<String> statesValue) throws AnalysisException {
LinkedList<List<Comparable>> loadJobInfos = new LinkedList<List<Comparable>>();
if (!dbIdToLabelToLoadJobs.containsKey(dbId)) {
return loadJobInfos;
@@ -506,8 +544,8 @@ public class LoadManager implements Writable {
Map<String, List<LoadJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(dbId);
List<LoadJob> loadJobList = Lists.newArrayList();
if (Strings.isNullOrEmpty(labelValue)) {
- loadJobList.addAll(labelToLoadJobs.values()
- .stream().flatMap(Collection::stream).collect(Collectors.toList()));
+ loadJobList.addAll(
+ labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()));
} else {
// check label value
if (accurateMatch) {
@@ -517,7 +555,8 @@ public class LoadManager implements Writable {
loadJobList.addAll(labelToLoadJobs.get(labelValue));
} else {
// non-accurate match
- PatternMatcher matcher = PatternMatcher.createMysqlPattern(labelValue, CaseSensibility.LABEL.getCaseSensibility());
+ PatternMatcher matcher =
+ PatternMatcher.createMysqlPattern(labelValue, CaseSensibility.LABEL.getCaseSensibility());
for (Map.Entry<String, List<LoadJob>> entry : labelToLoadJobs.entrySet()) {
if (matcher.match(entry.getKey())) {
loadJobList.addAll(entry.getValue());
@@ -544,6 +583,9 @@ public class LoadManager implements Writable {
}
}
+ /**
+ * Get load job info.
+ **/
public void getLoadJobInfo(Load.JobInfo info) throws DdlException {
String fullDbName = ClusterNamespace.getFullName(info.clusterName, info.dbName);
info.dbName = fullDbName;
@@ -577,8 +619,8 @@ public class LoadManager implements Writable {
}
private void submitJobs() {
- loadJobScheduler.submitJob(idToLoadJob.values().stream().filter(
- loadJob -> loadJob.state == JobState.PENDING).collect(Collectors.toList()));
+ loadJobScheduler.submitJob(idToLoadJob.values().stream().filter(loadJob -> loadJob.state == JobState.PENDING)
+ .collect(Collectors.toList()));
}
private void analyzeLoadJobs() {
@@ -594,16 +636,13 @@ public class LoadManager implements Writable {
}
/**
- * step1: if label has been used in old load jobs which belong to load class
- * step2: if label has been used in v2 load jobs
- * step2.1: if label has been user in v2 load jobs, the create timestamp will be checked
+ * step1: if label has been used in old load jobs which belong to load class.
+ * step2: if label has been used in v2 load jobs.
+ * step2.1: if label has been user in v2 load jobs, the create timestamp will be checked.
*
- * @param dbId
- * @param label
* @throws LabelAlreadyUsedException throw exception when label has been used by an unfinished job.
*/
- private void checkLabelUsed(long dbId, String label)
- throws DdlException {
+ private void checkLabelUsed(long dbId, String label) throws DdlException {
// if label has been used in old load jobs
Catalog.getCurrentCatalog().getLoadInstance().isLabelUsed(dbId, label);
// if label has been used in v2 of load jobs
@@ -637,16 +676,22 @@ public class LoadManager implements Writable {
lock.writeLock().unlock();
}
+ /**
+ * Init.
+ **/
public void initJobProgress(Long jobId, TUniqueId loadId, Set<TUniqueId> fragmentIds,
- List<Long> relatedBackendIds) {
+ List<Long> relatedBackendIds) {
LoadJob job = idToLoadJob.get(jobId);
if (job != null) {
job.initLoadProgress(loadId, fragmentIds, relatedBackendIds);
}
}
- public void updateJobProgress(Long jobId, Long beId, TUniqueId loadId, TUniqueId fragmentId,
- long scannedRows, long scannedBytes, boolean isDone) {
+ /**
+ * Update.
+ **/
+ public void updateJobProgress(Long jobId, Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows,
+ long scannedBytes, boolean isDone) {
LoadJob job = idToLoadJob.get(jobId);
if (job != null) {
job.updateProgress(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone);
@@ -656,7 +701,8 @@ public class LoadManager implements Writable {
@Override
public void write(DataOutput out) throws IOException {
long currentTimeMs = System.currentTimeMillis();
- List<LoadJob> loadJobs = idToLoadJob.values().stream().filter(t -> !t.isExpired(currentTimeMs)).collect(Collectors.toList());
+ List<LoadJob> loadJobs =
+ idToLoadJob.values().stream().filter(t -> !t.isExpired(currentTimeMs)).collect(Collectors.toList());
out.writeInt(loadJobs.size());
for (LoadJob loadJob : loadJobs) {
@@ -664,6 +710,9 @@ public class LoadManager implements Writable {
}
}
+ /**
+ * Read from file.
+ **/
public void readFields(DataInput in) throws IOException {
long currentTimeMs = System.currentTimeMillis();
int size = in.readInt();
@@ -683,12 +732,13 @@ public class LoadManager implements Writable {
if (loadJob.getState() == JobState.PENDING) {
// bad case. When a mini load job is created and then FE restart.
// the job will be in PENDING state forever.
- // This is a temp solution to remove these jobs. And the mini load job should be deprecated in Doris v1.1
- TransactionState state = Catalog.getCurrentCatalog().getGlobalTransactionMgr().getTransactionState(
- loadJob.getDbId(), loadJob.getTransactionId());
+ // This is a temp solution to remove these jobs.
+ // And the mini load job should be deprecated in Doris v1.1
+ TransactionState state = Catalog.getCurrentCatalog().getGlobalTransactionMgr()
+ .getTransactionState(loadJob.getDbId(), loadJob.getTransactionId());
if (state == null) {
- LOG.warn("skip mini load job {} in db {} with PENDING state and with txn: {}",
- loadJob.getId(), loadJob.getDbId(), loadJob.getTransactionId());
+ LOG.warn("skip mini load job {} in db {} with PENDING state and with txn: {}", loadJob.getId(),
+ loadJob.getDbId(), loadJob.getTransactionId());
continue;
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index 5904848b55..174f505dac 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -170,18 +170,7 @@ public class DdlExecutor {
catalog.getLoadManager().createLoadJobFromStmt(loadStmt);
}
} else if (ddlStmt instanceof CancelLoadStmt) {
- boolean isAccurateMatch = ((CancelLoadStmt) ddlStmt).isAccurateMatch();
- boolean isLabelExist = catalog.getLoadInstance().isLabelExist(
- ((CancelLoadStmt) ddlStmt).getDbName(),
- ((CancelLoadStmt) ddlStmt).getLabel(), isAccurateMatch);
- if (isLabelExist) {
- catalog.getLoadInstance().cancelLoadJob((CancelLoadStmt) ddlStmt,
- isAccurateMatch);
- }
- if (!isLabelExist || isAccurateMatch) {
- catalog.getLoadManager().cancelLoadJob((CancelLoadStmt) ddlStmt,
- isAccurateMatch);
- }
+ catalog.getLoadManager().cancelLoadJob((CancelLoadStmt) ddlStmt);
} else if (ddlStmt instanceof CreateRoutineLoadStmt) {
catalog.getRoutineLoadManager().createRoutineLoadJob((CreateRoutineLoadStmt) ddlStmt);
} else if (ddlStmt instanceof PauseRoutineLoadStmt) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java
index 4ed21d7ae2..3621cbcbad 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java
@@ -17,88 +17,123 @@
package org.apache.doris.analysis;
-import org.apache.doris.catalog.Catalog;
-import org.apache.doris.catalog.FakeCatalog;
+import org.apache.doris.analysis.CompoundPredicate.Operator;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ExceptionChecker;
+import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
+import org.apache.doris.load.loadv2.InsertLoadJob;
+import org.apache.doris.load.loadv2.LoadJob;
+import org.apache.doris.load.loadv2.LoadManager;
+import org.apache.doris.utframe.TestWithFeService;
-import mockit.Expectations;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
-public class CancelLoadStmtTest {
- private Analyzer analyzer;
- private Catalog catalog;
+import java.util.ArrayList;
+import java.util.List;
- FakeCatalog fakeCatalog;
+public class CancelLoadStmtTest extends TestWithFeService {
- @Before
- public void setUp() {
- fakeCatalog = new FakeCatalog();
+ private Analyzer analyzer;
- catalog = AccessTestUtil.fetchAdminCatalog();
- FakeCatalog.setCatalog(catalog);
+ @Override
+ protected void runBeforeAll() throws Exception {
+ FeConstants.runningUnitTest = true;
+ createDatabase("testDb");
+ useDatabase("testDb");
+ createTable("create table table1\n" + "(k1 int, k2 int) distributed by hash(k1) buckets 1\n"
+ + "properties(\"replication_num\" = \"1\");");
+ analyzer = new Analyzer(connectContext.getCatalog(), connectContext);
+ }
- analyzer = AccessTestUtil.fetchAdminAnalyzer(true);
- new Expectations(analyzer) {
- {
- analyzer.getDefaultDb();
- minTimes = 0;
- result = "testCluster:testDb";
+ @Test
+ public void testNormal() throws UserException {
+ SlotRef labelSlotRef = new SlotRef(null, "label");
+ StringLiteral labelStringLiteral = new StringLiteral("doris_test_label");
- analyzer.getQualifiedUser();
- minTimes = 0;
- result = "testCluster:testUser";
+ SlotRef stateSlotRef = new SlotRef(null, "state");
+ StringLiteral stateStringLiteral = new StringLiteral("FINISHED");
- analyzer.getClusterName();
- minTimes = 0;
- result = "testCluster";
+ BinaryPredicate labelBinaryPredicate =
+ new BinaryPredicate(BinaryPredicate.Operator.EQ, labelSlotRef, labelStringLiteral);
+ CancelLoadStmt stmt = new CancelLoadStmt(null, labelBinaryPredicate);
+ stmt.analyze(analyzer);
+ Assertions.assertEquals("CANCEL LOAD FROM default_cluster:testDb WHERE `label` = 'doris_test_label'",
+ stmt.toString());
- analyzer.getCatalog();
- minTimes = 0;
- result = catalog;
- }
- };
- }
+ BinaryPredicate stateBinaryPredicate =
+ new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, stateStringLiteral);
+ stmt = new CancelLoadStmt(null, stateBinaryPredicate);
+ stmt.analyze(analyzer);
+ Assertions.assertEquals("CANCEL LOAD FROM default_cluster:testDb WHERE `state` = 'FINISHED'", stmt.toString());
- @Test
- public void testNormal() throws UserException, AnalysisException {
- SlotRef slotRef = new SlotRef(null, "label");
- StringLiteral stringLiteral = new StringLiteral("doris_test_label");
+ LikePredicate labelLikePredicate =
+ new LikePredicate(LikePredicate.Operator.LIKE, labelSlotRef, labelStringLiteral);
+ stmt = new CancelLoadStmt(null, labelLikePredicate);
+ stmt.analyze(analyzer);
+ Assertions.assertEquals("CANCEL LOAD FROM default_cluster:testDb WHERE `label` LIKE 'doris_test_label'",
+ stmt.toString());
- BinaryPredicate binaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, slotRef, stringLiteral);
- CancelLoadStmt stmt = new CancelLoadStmt(null, binaryPredicate);
+ CompoundPredicate compoundAndPredicate =
+ new CompoundPredicate(Operator.AND, labelBinaryPredicate, stateBinaryPredicate);
+ stmt = new CancelLoadStmt(null, compoundAndPredicate);
stmt.analyze(analyzer);
- Assert.assertTrue(stmt.isAccurateMatch());
- Assert.assertEquals("CANCEL LOAD FROM testCluster:testDb WHERE `label` = 'doris_test_label'", stmt.toString());
+ Assertions.assertEquals(
+ "CANCEL LOAD FROM default_cluster:testDb WHERE `label` = 'doris_test_label' AND `state` = 'FINISHED'",
+ stmt.toString());
- LikePredicate likePredicate = new LikePredicate(LikePredicate.Operator.LIKE, slotRef, stringLiteral);
- stmt = new CancelLoadStmt(null, likePredicate);
+ CompoundPredicate compoundOrPredicate =
+ new CompoundPredicate(Operator.OR, labelBinaryPredicate, stateBinaryPredicate);
+ stmt = new CancelLoadStmt(null, compoundOrPredicate);
+ stmt.analyze(analyzer);
+ Assertions.assertEquals(
+ "CANCEL LOAD FROM default_cluster:testDb WHERE `label` = 'doris_test_label' OR `state` = 'FINISHED'",
+ stmt.toString());
+
+ // test match
+ List<LoadJob> loadJobs = new ArrayList<>();
+ InsertLoadJob insertLoadJob1 = new InsertLoadJob("doris_test_label", 1L, 10003L, 10005L, 0, "", "");
+ loadJobs.add(insertLoadJob1);
+ InsertLoadJob insertLoadJob2 = new InsertLoadJob("doris_test_label_1", 2L, 10003L, 10005L, 0, "", "");
+ loadJobs.add(insertLoadJob2);
+ InsertLoadJob insertLoadJob3 = new InsertLoadJob("doris_test_label_2", 3L, 10003L, 10005L, 0, "", "");
+ loadJobs.add(insertLoadJob3);
+ // label
+ stmt = new CancelLoadStmt(null, labelBinaryPredicate);
+ stmt.analyze(analyzer);
+ List<LoadJob> matchLoadJobs = new ArrayList<>();
+ LoadManager.addNeedCancelLoadJob(stmt, loadJobs, matchLoadJobs);
+ Assertions.assertEquals(1, matchLoadJobs.size());
+ // state
+ matchLoadJobs.clear();
+ stmt = new CancelLoadStmt(null, stateBinaryPredicate);
+ stmt.analyze(analyzer);
+ LoadManager.addNeedCancelLoadJob(stmt, loadJobs, matchLoadJobs);
+ Assertions.assertEquals(3, matchLoadJobs.size());
+ // or
+ matchLoadJobs.clear();
+ stmt = new CancelLoadStmt(null, compoundOrPredicate);
stmt.analyze(analyzer);
- Assert.assertFalse(stmt.isAccurateMatch());
- Assert.assertEquals("CANCEL LOAD FROM testCluster:testDb WHERE `label` LIKE 'doris_test_label'", stmt.toString());
+ LoadManager.addNeedCancelLoadJob(stmt, loadJobs, matchLoadJobs);
+ Assertions.assertEquals(3, matchLoadJobs.size());
+ // and
+ matchLoadJobs.clear();
+ stmt = new CancelLoadStmt(null, compoundAndPredicate);
+ stmt.analyze(analyzer);
+ LoadManager.addNeedCancelLoadJob(stmt, loadJobs, matchLoadJobs);
+ Assertions.assertEquals(1, matchLoadJobs.size());
}
- @Test(expected = AnalysisException.class)
- public void testNoDb() throws UserException, AnalysisException {
- SlotRef slotRef = new SlotRef(null, "label");
- StringLiteral stringLiteral = new StringLiteral("doris_test_label");
- new Expectations(analyzer) {
- {
- analyzer.getDefaultDb();
- minTimes = 0;
- result = "";
-
- analyzer.getClusterName();
- minTimes = 0;
- result = "testCluster";
- }
- };
-
- BinaryPredicate binaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, slotRef, stringLiteral);
- CancelLoadStmt stmt = new CancelLoadStmt(null, binaryPredicate);
- stmt.analyze(analyzer);
- Assert.fail("No exception throws.");
+ @Test
+ public void testError() {
+ SlotRef stateSlotRef = new SlotRef(null, "state");
+ StringLiteral stateStringLiteral = new StringLiteral("FINISHED");
+
+ LikePredicate stateLikePredicate =
+ new LikePredicate(LikePredicate.Operator.LIKE, stateSlotRef, stateStringLiteral);
+ CancelLoadStmt stmt = new CancelLoadStmt(null, stateLikePredicate);
+ ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, "Only label can use like",
+ () -> stmt.analyze(analyzer));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org