You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/06/09 07:43:24 UTC
[doris] branch master updated: [fix](stats) set analysis job status to finished when be crashed by mistake (#20485)
This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b6386889d5 [fix](stats) set analysis job status to finished when be crashed by mistake (#20485)
b6386889d5 is described below
commit b6386889d5a7356664e249684948aef9bc337cd6
Author: AKIRA <33...@users.noreply.github.com>
AuthorDate: Fri Jun 9 16:43:11 2023 +0900
[fix](stats) set analysis job status to finished when be crashed by mistake (#20485)
If BE crashed the error would be logged, and the analysis task would be mark as finished, which is incorrect.
In this PR, update analysis task according to the query state
---
.../doris/catalog/InternalSchemaInitializer.java | 2 +-
.../main/java/org/apache/doris/qe/StmtExecutor.java | 10 ++++------
.../org/apache/doris/statistics/AnalysisManager.java | 16 +++++++++-------
.../apache/doris/statistics/AnalysisTaskExecutor.java | 2 +-
.../apache/doris/statistics/AnalysisTaskWrapper.java | 4 ++--
.../org/apache/doris/statistics/ColumnStatistic.java | 3 ++-
.../org/apache/doris/statistics/OlapAnalysisTask.java | 12 ++++++------
.../doris/statistics/StatisticsAutoAnalyzer.java | 1 -
.../apache/doris/statistics/util/StatisticsUtil.java | 4 +++-
.../org/apache/doris/statistics/AnalysisJobTest.java | 9 +++++++++
.../doris/statistics/AnalysisTaskExecutorTest.java | 18 ++++++++++++++++++
11 files changed, 55 insertions(+), 26 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
index c5a3197dee..04bbbe263a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
@@ -147,7 +147,7 @@ public class InternalSchemaInitializer extends Thread {
columnDefs.add(partId);
columnDefs.add(new ColumnDef("count", TypeDef.create(PrimitiveType.BIGINT)));
columnDefs.add(new ColumnDef("ndv", TypeDef.create(PrimitiveType.BIGINT)));
- columnDefs.add(new ColumnDef("null_count", TypeDef.create(PrimitiveType.BIGINT)));
+ columnDefs.add(new ColumnDef("null_count", TypeDef.create(PrimitiveType.BIGINT), true));
columnDefs.add(new ColumnDef("min", TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH), true));
columnDefs.add(new ColumnDef("max", TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH), true));
columnDefs.add(new ColumnDef("data_size_in_bytes", TypeDef.create(PrimitiveType.BIGINT)));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 021a179c46..9b0b8708aa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -2421,8 +2421,7 @@ public class StmtExecutor {
analyze(context.getSessionVariable().toThrift());
}
} catch (Exception e) {
- LOG.warn("Internal SQL execution failed, SQL: {}", originStmt, e);
- return resultRows;
+ throw new RuntimeException("Failed to execute internal SQL", e);
}
planner.getFragments();
RowBatch batch;
@@ -2432,7 +2431,7 @@ public class StmtExecutor {
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord));
} catch (UserException e) {
- LOG.warn(e.getMessage(), e);
+ throw new RuntimeException("Failed to execute internal SQL", e);
}
Span queryScheduleSpan = context.getTracer()
@@ -2441,7 +2440,7 @@ public class StmtExecutor {
coord.exec();
} catch (Exception e) {
queryScheduleSpan.recordException(e);
- LOG.warn("Unexpected exception when SQL running", e);
+ throw new RuntimeException("Failed to execute internal SQL", e);
} finally {
queryScheduleSpan.end();
}
@@ -2457,9 +2456,8 @@ public class StmtExecutor {
}
}
} catch (Exception e) {
- LOG.warn("Unexpected exception when SQL running", e);
fetchResultSpan.recordException(e);
- return resultRows;
+ throw new RuntimeException("Failed to execute internal SQL", e);
} finally {
fetchResultSpan.end();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index 8dd81208bb..789478d501 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -242,10 +242,10 @@ public class AnalysisManager extends Daemon implements Writable {
Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
createTaskForEachColumns(jobInfo, analysisTaskInfos, false);
createTaskForMVIdx(jobInfo, analysisTaskInfos, false);
-
- persistAnalysisJob(jobInfo);
- analysisJobIdToTaskMap.put(jobInfo.jobId, analysisTaskInfos);
-
+ if (!jobInfo.jobType.equals(JobType.SYSTEM)) {
+ persistAnalysisJob(jobInfo);
+ analysisJobIdToTaskMap.put(jobInfo.jobId, analysisTaskInfos);
+ }
try {
updateTableStats(jobInfo);
} catch (Throwable e) {
@@ -510,7 +510,9 @@ public class AnalysisManager extends Daemon implements Writable {
continue;
}
try {
- logCreateAnalysisTask(analysisInfo);
+ if (!jobInfo.jobType.equals(JobType.SYSTEM)) {
+ logCreateAnalysisTask(analysisInfo);
+ }
} catch (Exception e) {
throw new DdlException("Failed to create analysis task", e);
}
@@ -518,13 +520,13 @@ public class AnalysisManager extends Daemon implements Writable {
}
private void logCreateAnalysisTask(AnalysisInfo analysisInfo) {
- Env.getCurrentEnv().getEditLog().logCreateAnalysisTasks(analysisInfo);
analysisTaskInfoMap.put(analysisInfo.taskId, analysisInfo);
+ Env.getCurrentEnv().getEditLog().logCreateAnalysisTasks(analysisInfo);
}
private void logCreateAnalysisJob(AnalysisInfo analysisJob) {
- Env.getCurrentEnv().getEditLog().logCreateAnalysisJob(analysisJob);
analysisJobInfoMap.put(analysisJob.jobId, analysisJob);
+ Env.getCurrentEnv().getEditLog().logCreateAnalysisJob(analysisJob);
}
private void createTaskForExternalTable(AnalysisInfo jobInfo,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
index 301d46644d..3f22d1ccf3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
@@ -78,7 +78,7 @@ public class AnalysisTaskExecutor extends Thread {
long timeout = TimeUnit.MINUTES.toMillis(Config.analyze_task_timeout_in_minutes);
taskWrapper.get(timeout < 0 ? 0 : timeout, TimeUnit.MILLISECONDS);
} catch (Exception e) {
- taskWrapper.cancel();
+ taskWrapper.cancel(e.getMessage());
}
} catch (Throwable throwable) {
LOG.warn(throwable);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java
index 4590e138f6..b2615e5d05 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java
@@ -76,12 +76,12 @@ public class AnalysisTaskWrapper extends FutureTask<Void> {
}
}
- public boolean cancel() {
+ public boolean cancel(String msg) {
try {
LOG.warn("{} cancelled, cost time:{}", task.toString(), System.currentTimeMillis() - startTime);
task.cancel();
} catch (Exception e) {
- LOG.warn(String.format("Cancel job failed job info : %s", task.toString()));
+ LOG.warn(String.format("Cancel job failed job info : %s", msg));
}
return super.cancel(false);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java
index 9485ecf662..5735c01430 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java
@@ -134,7 +134,8 @@ public class ColumnStatistic {
ndv = count;
}
columnStatisticBuilder.setNdv(ndv);
- columnStatisticBuilder.setNumNulls(Double.parseDouble(resultRow.getColumnValue("null_count")));
+ String nullCount = resultRow.getColumnValue("null_count");
+ columnStatisticBuilder.setNumNulls(nullCount == null ? 0 : Double.parseDouble(nullCount));
columnStatisticBuilder.setDataSize(Double
.parseDouble(resultRow.getColumnValue("data_size_in_bytes")));
columnStatisticBuilder.setAvgSizeByte(columnStatisticBuilder.getDataSize()
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
index a65553a838..4391d87f52 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
@@ -20,8 +20,8 @@ package org.apache.doris.statistics;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Partition;
import org.apache.doris.common.FeConstants;
-import org.apache.doris.qe.AutoCloseConnectContext;
-import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.qe.QueryState;
+import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.statistics.util.StatisticsUtil;
import com.google.common.annotations.VisibleForTesting;
@@ -108,10 +108,10 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
@VisibleForTesting
public void execSQL(String sql) throws Exception {
- try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
- r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
- this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
- this.stmtExecutor.execute();
+ QueryState queryState = StatisticsUtil.execUpdate(sql);
+ if (queryState.getStateType().equals(MysqlStateType.ERR)) {
+ throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s",
+ info.catalogName, info.dbName, info.colName, queryState.getErrorMessage()));
}
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java
index 8128d3a7d3..5120bd23ee 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java
@@ -39,7 +39,6 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-
public class StatisticsAutoAnalyzer extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(StatisticsAutoAnalyzer.class);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index 8f77df2748..2ee244fbe6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -47,6 +47,7 @@ import org.apache.doris.nereids.trees.expressions.literal.DateTimeLiteral;
import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
import org.apache.doris.qe.AutoCloseConnectContext;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.statistics.AnalysisInfo;
@@ -105,12 +106,13 @@ public class StatisticsUtil {
}
}
- public static void execUpdate(String sql) throws Exception {
+ public static QueryState execUpdate(String sql) throws Exception {
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext, sql);
r.connectContext.setExecutor(stmtExecutor);
stmtExecutor.execute();
+ return r.connectContext.getState();
}
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
index e8984cd209..86c3d63463 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
@@ -26,6 +26,7 @@ import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
import org.apache.doris.statistics.AnalysisInfo.AnalysisMode;
import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
import org.apache.doris.statistics.AnalysisInfo.JobType;
+import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.utframe.TestWithFeService;
@@ -39,6 +40,7 @@ import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Set;
public class AnalysisJobTest extends TestWithFeService {
@@ -80,6 +82,13 @@ public class AnalysisJobTest extends TestWithFeService {
}
};
+ new MockUp<StmtExecutor>() {
+ @Mock
+ public List<ResultRow> executeInternalQuery() {
+ return Collections.emptyList();
+ }
+ };
+
new MockUp<ConnectContext>() {
@Mock
diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
index c2b355067d..522a28d9c2 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
@@ -19,11 +19,13 @@ package org.apache.doris.statistics;
import org.apache.doris.catalog.InternalSchemaInitializer;
import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
import org.apache.doris.statistics.AnalysisInfo.AnalysisMode;
import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
import org.apache.doris.statistics.AnalysisInfo.JobType;
import org.apache.doris.statistics.util.BlockingCounter;
+import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
import org.apache.doris.utframe.TestWithFeService;
import com.google.common.collect.Maps;
@@ -36,6 +38,7 @@ import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
@@ -94,6 +97,21 @@ public class AnalysisTaskExecutorTest extends TestWithFeService {
@Test
public void testTaskExecution() throws Exception {
+
+ new MockUp<StmtExecutor>() {
+ @Mock
+ public List<ResultRow> executeInternalQuery() {
+ return Collections.emptyList();
+ }
+ };
+
+ new MockUp<OlapAnalysisTask>() {
+ @Mock
+ public void execSQL(String sql) throws Exception {
+
+ }
+ };
+
AnalysisTaskExecutor analysisTaskExecutor = new AnalysisTaskExecutor(analysisTaskScheduler);
HashMap<String, Set<String>> colToPartitions = Maps.newHashMap();
colToPartitions.put("col1", Collections.singleton("t1"));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org