You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by en...@apache.org on 2023/06/20 02:37:52 UTC
[doris] branch master updated: [enhancement](nereids) Remove useless config option #20905
This is an automated email from the ASF dual-hosted git repository.
englefly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 87258a13c4 [enhancement](nereids) Remove useless config option #20905
87258a13c4 is described below
commit 87258a13c4e134cf5eb5b3ad4c28a2281fb869f8
Author: AKIRA <33...@users.noreply.github.com>
AuthorDate: Tue Jun 20 11:37:46 2023 +0900
[enhancement](nereids) Remove useless config option #20905
1. Remove useless config option
2. Fix timeout cancel, before this PR an OlapAnalysisTask would continue running even if it's already timeout.
---
.../src/main/java/org/apache/doris/common/Config.java | 6 ------
.../org/apache/doris/statistics/AnalysisManager.java | 4 ++--
.../apache/doris/statistics/AnalysisTaskExecutor.java | 7 ++-----
.../apache/doris/statistics/AnalysisTaskWrapper.java | 1 +
.../org/apache/doris/statistics/BaseAnalysisTask.java | 9 +--------
.../org/apache/doris/statistics/OlapAnalysisTask.java | 19 +++++++++++++++----
.../doris/statistics/AnalysisTaskExecutorTest.java | 9 ---------
7 files changed, 21 insertions(+), 34 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index f44a9833d5..c01cf90d1b 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1586,12 +1586,6 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int be_exec_version = max_be_exec_version;
- @ConfField(mutable = false)
- public static int statistic_job_scheduler_execution_interval_ms = 1000;
-
- @ConfField(mutable = false)
- public static int statistic_task_scheduler_execution_interval_ms = 1000;
-
/*
* mtmv is still under dev, remove this config when it is graduate.
*/
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index b5b182dd62..99c4922850 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -711,7 +711,7 @@ public class AnalysisManager extends Daemon implements Writable {
checkPriv(anyTask);
logKilled(analysisJobInfoMap.get(anyTask.getJobId()));
for (BaseAnalysisTask taskInfo : analysisTaskMap.values()) {
- taskInfo.markAsKilled();
+ taskInfo.cancel();
logKilled(taskInfo.info);
}
}
@@ -780,7 +780,7 @@ public class AnalysisManager extends Daemon implements Writable {
public void cancel() {
cancelled = true;
- tasks.forEach(BaseAnalysisTask::markAsKilled);
+ tasks.forEach(BaseAnalysisTask::cancel);
}
public void execute() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
index 3f22d1ccf3..00e2b9fc76 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
@@ -21,7 +21,6 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.ThreadPoolManager.BlockedPolicy;
-import org.apache.doris.statistics.util.BlockingCounter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -46,9 +45,6 @@ public class AnalysisTaskExecutor extends Thread {
private final AnalysisTaskScheduler taskScheduler;
- private final BlockingCounter blockingCounter =
- new BlockingCounter(Config.statistics_simultaneously_running_task_num);
-
private final BlockingQueue<AnalysisTaskWrapper> taskQueue =
new PriorityBlockingQueue<AnalysisTaskWrapper>(20,
Comparator.comparingLong(AnalysisTaskWrapper::getStartTime));
@@ -75,7 +71,8 @@ public class AnalysisTaskExecutor extends Thread {
try {
AnalysisTaskWrapper taskWrapper = taskQueue.take();
try {
- long timeout = TimeUnit.MINUTES.toMillis(Config.analyze_task_timeout_in_minutes);
+ long timeout = TimeUnit.MINUTES.toMillis(Config.analyze_task_timeout_in_minutes)
+ - (System.currentTimeMillis() - taskWrapper.getStartTime());
taskWrapper.get(timeout < 0 ? 0 : timeout, TimeUnit.MILLISECONDS);
} catch (Exception e) {
taskWrapper.cancel(e.getMessage());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java
index b2615e5d05..864c9100d8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java
@@ -83,6 +83,7 @@ public class AnalysisTaskWrapper extends FutureTask<Void> {
} catch (Exception e) {
LOG.warn(String.format("Cancel job failed job info : %s", msg));
}
+ // Interrupt thread when it's writing metadata would cause FE crush.
return super.cancel(false);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
index 1fc97c10dc..a1a52c0a11 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
@@ -163,12 +163,10 @@ public abstract class BaseAnalysisTask {
public abstract void execute() throws Exception;
public void cancel() {
+ killed = true;
if (stmtExecutor != null) {
stmtExecutor.cancel();
}
- if (killed) {
- return;
- }
Env.getCurrentEnv().getAnalysisManager()
.updateTaskStatus(info, AnalysisState.FAILED,
String.format("Job has been cancelled: %s", info.toString()), -1);
@@ -202,9 +200,4 @@ public abstract class BaseAnalysisTask {
return String.format("TABLESAMPLE(%d ROWS)", info.sampleRows);
}
}
-
- public void markAsKilled() {
- this.killed = true;
- cancel();
- }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
index 4391d87f52..d33248e873 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
@@ -20,8 +20,10 @@ package org.apache.doris.statistics;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Partition;
import org.apache.doris.common.FeConstants;
+import org.apache.doris.qe.AutoCloseConnectContext;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.QueryState.MysqlStateType;
+import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.statistics.util.StatisticsUtil;
import com.google.common.annotations.VisibleForTesting;
@@ -108,10 +110,19 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
@VisibleForTesting
public void execSQL(String sql) throws Exception {
- QueryState queryState = StatisticsUtil.execUpdate(sql);
- if (queryState.getStateType().equals(MysqlStateType.ERR)) {
- throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s",
- info.catalogName, info.dbName, info.colName, queryState.getErrorMessage()));
+ if (killed) {
+ return;
+ }
+ try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
+ r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
+ stmtExecutor = new StmtExecutor(r.connectContext, sql);
+ r.connectContext.setExecutor(stmtExecutor);
+ stmtExecutor.execute();
+ QueryState queryState = r.connectContext.getState();
+ if (queryState.getStateType().equals(MysqlStateType.ERR)) {
+ throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s",
+ info.catalogName, info.dbName, info.colName, queryState.getErrorMessage()));
+ }
}
}
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
index 522a28d9c2..264015dd9e 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
@@ -24,7 +24,6 @@ import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
import org.apache.doris.statistics.AnalysisInfo.AnalysisMode;
import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
import org.apache.doris.statistics.AnalysisInfo.JobType;
-import org.apache.doris.statistics.util.BlockingCounter;
import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
import org.apache.doris.utframe.TestWithFeService;
@@ -33,7 +32,6 @@ import mockit.Expectations;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.Collections;
@@ -86,13 +84,6 @@ public class AnalysisTaskExecutorTest extends TestWithFeService {
Deencapsulation.setField(analysisTaskWrapper, "startTime", 5);
b.put(analysisTaskWrapper);
analysisTaskExecutor.start();
- BlockingCounter counter = Deencapsulation.getField(analysisTaskExecutor, "blockingCounter");
- int sleepTime = 500;
- while (counter.getVal() != 0 && sleepTime > 0) {
- sleepTime -= 100;
- Thread.sleep(100);
- }
- Assertions.assertEquals(0, counter.getVal());
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org