You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/01/20 23:58:40 UTC

[doris] branch master updated: [fix](statistics) Fix statistics related threads continuously spawn as doing checkpoint #16088

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2daa5f3fef [fix](statistics) Fix statistics related threads continuously spawn as doing checkpoint #16088
2daa5f3fef is described below

commit 2daa5f3fef24d8324e723577de5e111e162ec094
Author: AKIRA <33...@users.noreply.github.com>
AuthorDate: Sat Jan 21 07:58:33 2023 +0800

    [fix](statistics) Fix statistics related threads continuously spawn as doing checkpoint #16088
---
 .../main/java/org/apache/doris/common/Config.java    |  2 +-
 .../src/main/java/org/apache/doris/catalog/Env.java  |  5 +++--
 .../doris/statistics/AnalysisTaskExecutor.java       | 20 ++++++++++----------
 .../doris/statistics/AnalysisTaskExecutorTest.java   |  2 +-
 4 files changed, 15 insertions(+), 14 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index d9472377a3..706e3543f6 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1850,7 +1850,7 @@ public class Config extends ConfigBase {
      * Used to determined how many statistics collection SQL could run simultaneously.
      */
     @ConfField
-    public static int statistics_simultaneously_running_job_num = 10;
+    public static int statistics_simultaneously_running_task_num = 10;
 
     /**
      * Internal table replica num, once set, user should promise the avaible BE is greater than this value,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index e3ac2ff6f2..0a238e9dbd 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -259,7 +259,6 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
@@ -639,7 +638,9 @@ public class Env {
         this.mtmvJobManager = new MTMVJobManager();
         this.extMetaCacheMgr = new ExternalMetaCacheMgr();
         this.fqdnManager = new FQDNManager(systemInfo);
-        this.analysisManager = new AnalysisManager();
+        if (!isCheckpointCatalog) {
+            this.analysisManager = new AnalysisManager();
+        }
     }
 
     public static void destroyCheckpoint() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
index 783c73b6de..919185d287 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
@@ -38,8 +38,8 @@ public class AnalysisTaskExecutor extends Thread {
     private static final Logger LOG = LogManager.getLogger(AnalysisTaskExecutor.class);
 
     private final ThreadPoolExecutor executors = ThreadPoolManager.newDaemonThreadPool(
-            Config.statistics_simultaneously_running_job_num,
-            Config.statistics_simultaneously_running_job_num, 0,
+            Config.statistics_simultaneously_running_task_num,
+            Config.statistics_simultaneously_running_task_num, 0,
             TimeUnit.DAYS, new LinkedBlockingQueue<>(),
             new BlockedPolicy("Analysis Job Executor", Integer.MAX_VALUE),
             "Analysis Job Executor", true);
@@ -47,9 +47,9 @@ public class AnalysisTaskExecutor extends Thread {
     private final AnalysisTaskScheduler taskScheduler;
 
     private final BlockingCounter blockingCounter =
-            new BlockingCounter(Config.statistics_simultaneously_running_job_num);
+            new BlockingCounter(Config.statistics_simultaneously_running_task_num);
 
-    private final BlockingQueue<AnalysisTaskWrapper> jobQueue =
+    private final BlockingQueue<AnalysisTaskWrapper> taskQueue =
             new PriorityBlockingQueue<AnalysisTaskWrapper>(20,
                     Comparator.comparingLong(AnalysisTaskWrapper::getStartTime));
 
@@ -60,11 +60,11 @@ public class AnalysisTaskExecutor extends Thread {
     @Override
     public void run() {
         fetchAndExecute();
-        cancelExpiredJob();
+        cancelExpiredTask();
     }
 
-    private void cancelExpiredJob() {
-        String name = "Expired Analysis Job Killer";
+    private void cancelExpiredTask() {
+        String name = "Expired Analysis Task Killer";
         Thread t = new Thread(this::doCancelExpiredJob, name);
         t.setDaemon(true);
         t.start();
@@ -73,7 +73,7 @@ public class AnalysisTaskExecutor extends Thread {
     private void doCancelExpiredJob() {
         for (;;) {
             try {
-                AnalysisTaskWrapper taskWrapper = jobQueue.take();
+                AnalysisTaskWrapper taskWrapper = taskQueue.take();
                 try {
                     long timeout = StatisticConstants.STATISTICS_TASKS_TIMEOUT_IN_MS;
                     taskWrapper.get(timeout < 0 ? 0 : timeout, TimeUnit.MILLISECONDS);
@@ -95,7 +95,7 @@ public class AnalysisTaskExecutor extends Thread {
                     LOG.warn(throwable);
                 }
             }
-        }, "Analysis Job Submitter");
+        }, "Analysis Task Submitter");
         t.setDaemon(true);
         t.start();
     }
@@ -119,6 +119,6 @@ public class AnalysisTaskExecutor extends Thread {
     }
 
     public void putJob(AnalysisTaskWrapper wrapper) throws Exception {
-        jobQueue.put(wrapper);
+        taskQueue.put(wrapper);
     }
 }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
index ba75d26cec..8dfcff429b 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
@@ -74,7 +74,7 @@ public class AnalysisTaskExecutorTest extends TestWithFeService {
         };
 
         AnalysisTaskExecutor analysisTaskExecutor = new AnalysisTaskExecutor(analysisTaskScheduler);
-        BlockingQueue<AnalysisTaskWrapper> b = Deencapsulation.getField(analysisTaskExecutor, "jobQueue");
+        BlockingQueue<AnalysisTaskWrapper> b = Deencapsulation.getField(analysisTaskExecutor, "taskQueue");
         AnalysisTaskWrapper analysisTaskWrapper = new AnalysisTaskWrapper(analysisTaskExecutor, analysisJob);
         Deencapsulation.setField(analysisTaskWrapper, "startTime", 5);
         b.put(analysisTaskWrapper);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org