You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/11/21 16:23:06 UTC

(doris) branch branch-2.0 updated: [fix](stats) Fix creating too many tasks on new env (#27362)

This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new ffd7521080a [fix](stats) Fix creating too many tasks on new env (#27362)
ffd7521080a is described below

commit ffd7521080a9d8f368c898a3488e968644ce1c5e
Author: AKIRA <33...@users.noreply.github.com>
AuthorDate: Wed Nov 22 01:22:57 2023 +0900

    [fix](stats) Fix creating too many tasks on new env (#27362)
---
 .../org/apache/doris/statistics/AnalysisJob.java   | 33 +++++++++++-----------
 .../apache/doris/statistics/AnalysisManager.java   |  4 +++
 .../doris/statistics/StatisticConstants.java       |  2 ++
 .../doris/statistics/StatisticsCollector.java      |  9 ++++--
 .../apache/doris/statistics/AnalysisJobTest.java   |  8 +++---
 5 files changed, 33 insertions(+), 23 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java
index e079e6b2ade..5b1ca430409 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java
@@ -30,6 +30,7 @@ import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -45,10 +46,6 @@ public class AnalysisJob {
 
     protected List<ColStatsData> buf;
 
-    protected int totalTaskCount;
-
-    protected int queryFinishedTaskCount;
-
     protected StmtExecutor stmtExecutor;
 
     protected boolean killed;
@@ -63,10 +60,9 @@ public class AnalysisJob {
         for (BaseAnalysisTask task : queryingTask) {
             task.job = this;
         }
-        this.queryingTask = new HashSet<>(queryingTask);
-        this.queryFinished = new HashSet<>();
+        this.queryingTask = Collections.synchronizedSet(new HashSet<>(queryingTask));
+        this.queryFinished = Collections.synchronizedSet(new HashSet<>());
         this.buf = new ArrayList<>();
-        totalTaskCount = queryingTask.size();
         start = System.currentTimeMillis();
         this.jobInfo = jobInfo;
         this.analysisManager = Env.getCurrentEnv().getAnalysisManager();
@@ -86,12 +82,14 @@ public class AnalysisJob {
     }
 
     protected void markOneTaskDone() {
-        queryFinishedTaskCount += 1;
-        if (queryFinishedTaskCount == totalTaskCount) {
-            writeBuf();
-            updateTaskState(AnalysisState.FINISHED, "Cost time in sec: "
-                    + (System.currentTimeMillis() - start) / 1000);
-            deregisterJob();
+        if (queryingTask.isEmpty()) {
+            try {
+                writeBuf();
+                updateTaskState(AnalysisState.FINISHED, "Cost time in sec: "
+                        + (System.currentTimeMillis() - start) / 1000);
+            } finally {
+                deregisterJob();
+            }
         } else if (buf.size() >= StatisticsUtil.getInsertMergeCount()) {
             writeBuf();
         }
@@ -175,9 +173,12 @@ public class AnalysisJob {
     }
 
     public void taskFailed(BaseAnalysisTask task, String reason) {
-        updateTaskState(AnalysisState.FAILED, reason);
-        cancel();
-        deregisterJob();
+        try {
+            updateTaskState(AnalysisState.FAILED, reason);
+            cancel();
+        } finally {
+            deregisterJob();
+        }
     }
 
     public void cancel() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index e5d997d3425..ff0ae2acd96 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -1072,4 +1072,8 @@ public class AnalysisManager implements Writable {
     public void removeJob(long id) {
         idToAnalysisJob.remove(id);
     }
+
+    public boolean hasUnFinished() {
+        return !analysisJobIdToTaskMap.isEmpty();
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
index e6f71cd5911..ee07d52d3b2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
@@ -94,6 +94,8 @@ public class StatisticConstants {
 
     public static final int ANALYZE_TIMEOUT_IN_SEC = 43200;
 
+    public static final int SUBMIT_JOB_LIMIT = 5;
+
     static {
         SYSTEM_DBS.add(SystemInfoService.DEFAULT_CLUSTER
                 + ClusterNamespace.CLUSTER_DELIMITER + FeConstants.INTERNAL_DB_NAME);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
index 569965ff1eb..f71d589d4ef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
@@ -35,6 +35,7 @@ public abstract class StatisticsCollector extends MasterDaemon {
 
     protected final AnalysisTaskExecutor analysisTaskExecutor;
 
+    protected int submittedThisRound = StatisticConstants.SUBMIT_JOB_LIMIT;
 
     public StatisticsCollector(String name, long intervalMs, AnalysisTaskExecutor analysisTaskExecutor) {
         super(name, intervalMs);
@@ -53,8 +54,8 @@ public abstract class StatisticsCollector extends MasterDaemon {
         if (Env.isCheckpointThread()) {
             return;
         }
-
-        if (!analysisTaskExecutor.idle()) {
+        submittedThisRound = StatisticConstants.SUBMIT_JOB_LIMIT;
+        if (Env.getCurrentEnv().getAnalysisManager().hasUnFinished()) {
             LOG.info("Analyze tasks those submitted in last time is not finished, skip");
             return;
         }
@@ -71,7 +72,9 @@ public abstract class StatisticsCollector extends MasterDaemon {
             // No statistics need to be collected or updated
             return;
         }
-
+        if (submittedThisRound-- < 0) {
+            return;
+        }
         Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>();
         AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
         analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, false);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
index d4dedd17123..bca05d8299c 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
@@ -44,7 +44,9 @@ public class AnalysisJobTest {
     }
 
     @Test
-    public void testAppendBufTest1(@Mocked AnalysisInfo analysisInfo, @Mocked OlapAnalysisTask olapAnalysisTask) {
+    public void testAppendBufTest1(@Mocked AnalysisInfo analysisInfo,
+            @Mocked OlapAnalysisTask olapAnalysisTask,
+            @Mocked OlapAnalysisTask olapAnalysisTask2) {
         AtomicInteger writeBufInvokeTimes = new AtomicInteger();
         new MockUp<AnalysisJob>() {
             @Mock
@@ -63,9 +65,9 @@ public class AnalysisJobTest {
         AnalysisJob job = new AnalysisJob(analysisInfo, Arrays.asList(olapAnalysisTask));
         job.queryingTask = new HashSet<>();
         job.queryingTask.add(olapAnalysisTask);
+        job.queryingTask.add(olapAnalysisTask2);
         job.queryFinished = new HashSet<>();
         job.buf = new ArrayList<>();
-        job.totalTaskCount = 20;
 
         // not all task finished nor cached limit exceed, shouldn't  write
         job.appendBuf(olapAnalysisTask, Arrays.asList(new ColStatsData()));
@@ -97,7 +99,6 @@ public class AnalysisJobTest {
         job.queryingTask.add(olapAnalysisTask);
         job.queryFinished = new HashSet<>();
         job.buf = new ArrayList<>();
-        job.totalTaskCount = 1;
 
         job.appendBuf(olapAnalysisTask, Arrays.asList(new ColStatsData()));
         // all task finished, should write and deregister this job
@@ -132,7 +133,6 @@ public class AnalysisJobTest {
         for (int i = 0; i < StatisticsUtil.getInsertMergeCount(); i++) {
             job.buf.add(colStatsData);
         }
-        job.totalTaskCount = 100;
 
         job.appendBuf(olapAnalysisTask, Arrays.asList(new ColStatsData()));
         // cache limit exceed, should write them


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org