You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/01/20 23:58:40 UTC
[doris] branch master updated: [fix](statistics) Fix statistics related threads continuously spawn as doing checkpoint #16088
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2daa5f3fef [fix](statistics) Fix statistics related threads continuously spawn as doing checkpoint #16088
2daa5f3fef is described below
commit 2daa5f3fef24d8324e723577de5e111e162ec094
Author: AKIRA <33...@users.noreply.github.com>
AuthorDate: Sat Jan 21 07:58:33 2023 +0800
[fix](statistics) Fix statistics related threads continuously spawn as doing checkpoint #16088
---
.../main/java/org/apache/doris/common/Config.java | 2 +-
.../src/main/java/org/apache/doris/catalog/Env.java | 5 +++--
.../doris/statistics/AnalysisTaskExecutor.java | 20 ++++++++++----------
.../doris/statistics/AnalysisTaskExecutorTest.java | 2 +-
4 files changed, 15 insertions(+), 14 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index d9472377a3..706e3543f6 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1850,7 +1850,7 @@ public class Config extends ConfigBase {
* Used to determined how many statistics collection SQL could run simultaneously.
*/
@ConfField
- public static int statistics_simultaneously_running_job_num = 10;
+ public static int statistics_simultaneously_running_task_num = 10;
/**
* Internal table replica num, once set, user should promise the avaible BE is greater than this value,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index e3ac2ff6f2..0a238e9dbd 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -259,7 +259,6 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
@@ -639,7 +638,9 @@ public class Env {
this.mtmvJobManager = new MTMVJobManager();
this.extMetaCacheMgr = new ExternalMetaCacheMgr();
this.fqdnManager = new FQDNManager(systemInfo);
- this.analysisManager = new AnalysisManager();
+ if (!isCheckpointCatalog) {
+ this.analysisManager = new AnalysisManager();
+ }
}
public static void destroyCheckpoint() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
index 783c73b6de..919185d287 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
@@ -38,8 +38,8 @@ public class AnalysisTaskExecutor extends Thread {
private static final Logger LOG = LogManager.getLogger(AnalysisTaskExecutor.class);
private final ThreadPoolExecutor executors = ThreadPoolManager.newDaemonThreadPool(
- Config.statistics_simultaneously_running_job_num,
- Config.statistics_simultaneously_running_job_num, 0,
+ Config.statistics_simultaneously_running_task_num,
+ Config.statistics_simultaneously_running_task_num, 0,
TimeUnit.DAYS, new LinkedBlockingQueue<>(),
new BlockedPolicy("Analysis Job Executor", Integer.MAX_VALUE),
"Analysis Job Executor", true);
@@ -47,9 +47,9 @@ public class AnalysisTaskExecutor extends Thread {
private final AnalysisTaskScheduler taskScheduler;
private final BlockingCounter blockingCounter =
- new BlockingCounter(Config.statistics_simultaneously_running_job_num);
+ new BlockingCounter(Config.statistics_simultaneously_running_task_num);
- private final BlockingQueue<AnalysisTaskWrapper> jobQueue =
+ private final BlockingQueue<AnalysisTaskWrapper> taskQueue =
new PriorityBlockingQueue<AnalysisTaskWrapper>(20,
Comparator.comparingLong(AnalysisTaskWrapper::getStartTime));
@@ -60,11 +60,11 @@ public class AnalysisTaskExecutor extends Thread {
@Override
public void run() {
fetchAndExecute();
- cancelExpiredJob();
+ cancelExpiredTask();
}
- private void cancelExpiredJob() {
- String name = "Expired Analysis Job Killer";
+ private void cancelExpiredTask() {
+ String name = "Expired Analysis Task Killer";
Thread t = new Thread(this::doCancelExpiredJob, name);
t.setDaemon(true);
t.start();
@@ -73,7 +73,7 @@ public class AnalysisTaskExecutor extends Thread {
private void doCancelExpiredJob() {
for (;;) {
try {
- AnalysisTaskWrapper taskWrapper = jobQueue.take();
+ AnalysisTaskWrapper taskWrapper = taskQueue.take();
try {
long timeout = StatisticConstants.STATISTICS_TASKS_TIMEOUT_IN_MS;
taskWrapper.get(timeout < 0 ? 0 : timeout, TimeUnit.MILLISECONDS);
@@ -95,7 +95,7 @@ public class AnalysisTaskExecutor extends Thread {
LOG.warn(throwable);
}
}
- }, "Analysis Job Submitter");
+ }, "Analysis Task Submitter");
t.setDaemon(true);
t.start();
}
@@ -119,6 +119,6 @@ public class AnalysisTaskExecutor extends Thread {
}
public void putJob(AnalysisTaskWrapper wrapper) throws Exception {
- jobQueue.put(wrapper);
+ taskQueue.put(wrapper);
}
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
index ba75d26cec..8dfcff429b 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
@@ -74,7 +74,7 @@ public class AnalysisTaskExecutorTest extends TestWithFeService {
};
AnalysisTaskExecutor analysisTaskExecutor = new AnalysisTaskExecutor(analysisTaskScheduler);
- BlockingQueue<AnalysisTaskWrapper> b = Deencapsulation.getField(analysisTaskExecutor, "jobQueue");
+ BlockingQueue<AnalysisTaskWrapper> b = Deencapsulation.getField(analysisTaskExecutor, "taskQueue");
AnalysisTaskWrapper analysisTaskWrapper = new AnalysisTaskWrapper(analysisTaskExecutor, analysisJob);
Deencapsulation.setField(analysisTaskWrapper, "startTime", 5);
b.put(analysisTaskWrapper);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org