You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/11/21 16:23:06 UTC
(doris) branch branch-2.0 updated: [fix](stats) Fix creating too many tasks on new env (#27362)
This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new ffd7521080a [fix](stats) Fix creating too many tasks on new env (#27362)
ffd7521080a is described below
commit ffd7521080a9d8f368c898a3488e968644ce1c5e
Author: AKIRA <33...@users.noreply.github.com>
AuthorDate: Wed Nov 22 01:22:57 2023 +0900
[fix](stats) Fix creating too many tasks on new env (#27362)
---
.../org/apache/doris/statistics/AnalysisJob.java | 33 +++++++++++-----------
.../apache/doris/statistics/AnalysisManager.java | 4 +++
.../doris/statistics/StatisticConstants.java | 2 ++
.../doris/statistics/StatisticsCollector.java | 9 ++++--
.../apache/doris/statistics/AnalysisJobTest.java | 8 +++---
5 files changed, 33 insertions(+), 23 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java
index e079e6b2ade..5b1ca430409 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java
@@ -30,6 +30,7 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -45,10 +46,6 @@ public class AnalysisJob {
protected List<ColStatsData> buf;
- protected int totalTaskCount;
-
- protected int queryFinishedTaskCount;
-
protected StmtExecutor stmtExecutor;
protected boolean killed;
@@ -63,10 +60,9 @@ public class AnalysisJob {
for (BaseAnalysisTask task : queryingTask) {
task.job = this;
}
- this.queryingTask = new HashSet<>(queryingTask);
- this.queryFinished = new HashSet<>();
+ this.queryingTask = Collections.synchronizedSet(new HashSet<>(queryingTask));
+ this.queryFinished = Collections.synchronizedSet(new HashSet<>());
this.buf = new ArrayList<>();
- totalTaskCount = queryingTask.size();
start = System.currentTimeMillis();
this.jobInfo = jobInfo;
this.analysisManager = Env.getCurrentEnv().getAnalysisManager();
@@ -86,12 +82,14 @@ public class AnalysisJob {
}
protected void markOneTaskDone() {
- queryFinishedTaskCount += 1;
- if (queryFinishedTaskCount == totalTaskCount) {
- writeBuf();
- updateTaskState(AnalysisState.FINISHED, "Cost time in sec: "
- + (System.currentTimeMillis() - start) / 1000);
- deregisterJob();
+ if (queryingTask.isEmpty()) {
+ try {
+ writeBuf();
+ updateTaskState(AnalysisState.FINISHED, "Cost time in sec: "
+ + (System.currentTimeMillis() - start) / 1000);
+ } finally {
+ deregisterJob();
+ }
} else if (buf.size() >= StatisticsUtil.getInsertMergeCount()) {
writeBuf();
}
@@ -175,9 +173,12 @@ public class AnalysisJob {
}
public void taskFailed(BaseAnalysisTask task, String reason) {
- updateTaskState(AnalysisState.FAILED, reason);
- cancel();
- deregisterJob();
+ try {
+ updateTaskState(AnalysisState.FAILED, reason);
+ cancel();
+ } finally {
+ deregisterJob();
+ }
}
public void cancel() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index e5d997d3425..ff0ae2acd96 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -1072,4 +1072,8 @@ public class AnalysisManager implements Writable {
public void removeJob(long id) {
idToAnalysisJob.remove(id);
}
+
+ public boolean hasUnFinished() {
+ return !analysisJobIdToTaskMap.isEmpty();
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
index e6f71cd5911..ee07d52d3b2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
@@ -94,6 +94,8 @@ public class StatisticConstants {
public static final int ANALYZE_TIMEOUT_IN_SEC = 43200;
+ public static final int SUBMIT_JOB_LIMIT = 5;
+
static {
SYSTEM_DBS.add(SystemInfoService.DEFAULT_CLUSTER
+ ClusterNamespace.CLUSTER_DELIMITER + FeConstants.INTERNAL_DB_NAME);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
index 569965ff1eb..f71d589d4ef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
@@ -35,6 +35,7 @@ public abstract class StatisticsCollector extends MasterDaemon {
protected final AnalysisTaskExecutor analysisTaskExecutor;
+ protected int submittedThisRound = StatisticConstants.SUBMIT_JOB_LIMIT;
public StatisticsCollector(String name, long intervalMs, AnalysisTaskExecutor analysisTaskExecutor) {
super(name, intervalMs);
@@ -53,8 +54,8 @@ public abstract class StatisticsCollector extends MasterDaemon {
if (Env.isCheckpointThread()) {
return;
}
-
- if (!analysisTaskExecutor.idle()) {
+ submittedThisRound = StatisticConstants.SUBMIT_JOB_LIMIT;
+ if (Env.getCurrentEnv().getAnalysisManager().hasUnFinished()) {
LOG.info("Analyze tasks those submitted in last time is not finished, skip");
return;
}
@@ -71,7 +72,9 @@ public abstract class StatisticsCollector extends MasterDaemon {
// No statistics need to be collected or updated
return;
}
-
+ if (submittedThisRound-- < 0) {
+ return;
+ }
Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>();
AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, false);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
index d4dedd17123..bca05d8299c 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
@@ -44,7 +44,9 @@ public class AnalysisJobTest {
}
@Test
- public void testAppendBufTest1(@Mocked AnalysisInfo analysisInfo, @Mocked OlapAnalysisTask olapAnalysisTask) {
+ public void testAppendBufTest1(@Mocked AnalysisInfo analysisInfo,
+ @Mocked OlapAnalysisTask olapAnalysisTask,
+ @Mocked OlapAnalysisTask olapAnalysisTask2) {
AtomicInteger writeBufInvokeTimes = new AtomicInteger();
new MockUp<AnalysisJob>() {
@Mock
@@ -63,9 +65,9 @@ public class AnalysisJobTest {
AnalysisJob job = new AnalysisJob(analysisInfo, Arrays.asList(olapAnalysisTask));
job.queryingTask = new HashSet<>();
job.queryingTask.add(olapAnalysisTask);
+ job.queryingTask.add(olapAnalysisTask2);
job.queryFinished = new HashSet<>();
job.buf = new ArrayList<>();
- job.totalTaskCount = 20;
// not all task finished nor cached limit exceed, shouldn't write
job.appendBuf(olapAnalysisTask, Arrays.asList(new ColStatsData()));
@@ -97,7 +99,6 @@ public class AnalysisJobTest {
job.queryingTask.add(olapAnalysisTask);
job.queryFinished = new HashSet<>();
job.buf = new ArrayList<>();
- job.totalTaskCount = 1;
job.appendBuf(olapAnalysisTask, Arrays.asList(new ColStatsData()));
// all task finished, should write and deregister this job
@@ -132,7 +133,6 @@ public class AnalysisJobTest {
for (int i = 0; i < StatisticsUtil.getInsertMergeCount(); i++) {
job.buf.add(colStatsData);
}
- job.totalTaskCount = 100;
job.appendBuf(olapAnalysisTask, Arrays.asList(new ColStatsData()));
// cache limit exceed, should write them
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org