You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/06/09 07:43:24 UTC

[doris] branch master updated: [fix](stats) set analysis job status to finished when be crashed by mistake (#20485)

This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b6386889d5 [fix](stats) set analysis job status to finished when be crashed by mistake (#20485)
b6386889d5 is described below

commit b6386889d5a7356664e249684948aef9bc337cd6
Author: AKIRA <33...@users.noreply.github.com>
AuthorDate: Fri Jun 9 16:43:11 2023 +0900

    [fix](stats) set analysis job status to finished when be crashed by mistake (#20485)
    
    If BE crashed the error would be logged, and the analysis task would be mark as finished, which is incorrect.
    In this PR, update analysis task according to the query state
---
 .../doris/catalog/InternalSchemaInitializer.java       |  2 +-
 .../main/java/org/apache/doris/qe/StmtExecutor.java    | 10 ++++------
 .../org/apache/doris/statistics/AnalysisManager.java   | 16 +++++++++-------
 .../apache/doris/statistics/AnalysisTaskExecutor.java  |  2 +-
 .../apache/doris/statistics/AnalysisTaskWrapper.java   |  4 ++--
 .../org/apache/doris/statistics/ColumnStatistic.java   |  3 ++-
 .../org/apache/doris/statistics/OlapAnalysisTask.java  | 12 ++++++------
 .../doris/statistics/StatisticsAutoAnalyzer.java       |  1 -
 .../apache/doris/statistics/util/StatisticsUtil.java   |  4 +++-
 .../org/apache/doris/statistics/AnalysisJobTest.java   |  9 +++++++++
 .../doris/statistics/AnalysisTaskExecutorTest.java     | 18 ++++++++++++++++++
 11 files changed, 55 insertions(+), 26 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
index c5a3197dee..04bbbe263a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
@@ -147,7 +147,7 @@ public class InternalSchemaInitializer extends Thread {
         columnDefs.add(partId);
         columnDefs.add(new ColumnDef("count", TypeDef.create(PrimitiveType.BIGINT)));
         columnDefs.add(new ColumnDef("ndv", TypeDef.create(PrimitiveType.BIGINT)));
-        columnDefs.add(new ColumnDef("null_count", TypeDef.create(PrimitiveType.BIGINT)));
+        columnDefs.add(new ColumnDef("null_count", TypeDef.create(PrimitiveType.BIGINT), true));
         columnDefs.add(new ColumnDef("min", TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH), true));
         columnDefs.add(new ColumnDef("max", TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH), true));
         columnDefs.add(new ColumnDef("data_size_in_bytes", TypeDef.create(PrimitiveType.BIGINT)));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 021a179c46..9b0b8708aa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -2421,8 +2421,7 @@ public class StmtExecutor {
                     analyze(context.getSessionVariable().toThrift());
                 }
             } catch (Exception e) {
-                LOG.warn("Internal SQL execution failed, SQL: {}", originStmt, e);
-                return resultRows;
+                throw new RuntimeException("Failed to execute internal SQL", e);
             }
             planner.getFragments();
             RowBatch batch;
@@ -2432,7 +2431,7 @@ public class StmtExecutor {
                 QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
                         new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord));
             } catch (UserException e) {
-                LOG.warn(e.getMessage(), e);
+                throw new RuntimeException("Failed to execute internal SQL", e);
             }
 
             Span queryScheduleSpan = context.getTracer()
@@ -2441,7 +2440,7 @@ public class StmtExecutor {
                 coord.exec();
             } catch (Exception e) {
                 queryScheduleSpan.recordException(e);
-                LOG.warn("Unexpected exception when SQL running", e);
+                throw new RuntimeException("Failed to execute internal SQL", e);
             } finally {
                 queryScheduleSpan.end();
             }
@@ -2457,9 +2456,8 @@ public class StmtExecutor {
                     }
                 }
             } catch (Exception e) {
-                LOG.warn("Unexpected exception when SQL running", e);
                 fetchResultSpan.recordException(e);
-                return resultRows;
+                throw new RuntimeException("Failed to execute internal SQL", e);
             } finally {
                 fetchResultSpan.end();
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index 8dd81208bb..789478d501 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -242,10 +242,10 @@ public class AnalysisManager extends Daemon implements Writable {
         Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
         createTaskForEachColumns(jobInfo, analysisTaskInfos, false);
         createTaskForMVIdx(jobInfo, analysisTaskInfos, false);
-
-        persistAnalysisJob(jobInfo);
-        analysisJobIdToTaskMap.put(jobInfo.jobId, analysisTaskInfos);
-
+        if (!jobInfo.jobType.equals(JobType.SYSTEM)) {
+            persistAnalysisJob(jobInfo);
+            analysisJobIdToTaskMap.put(jobInfo.jobId, analysisTaskInfos);
+        }
         try {
             updateTableStats(jobInfo);
         } catch (Throwable e) {
@@ -510,7 +510,9 @@ public class AnalysisManager extends Daemon implements Writable {
                 continue;
             }
             try {
-                logCreateAnalysisTask(analysisInfo);
+                if (!jobInfo.jobType.equals(JobType.SYSTEM)) {
+                    logCreateAnalysisTask(analysisInfo);
+                }
             } catch (Exception e) {
                 throw new DdlException("Failed to create analysis task", e);
             }
@@ -518,13 +520,13 @@ public class AnalysisManager extends Daemon implements Writable {
     }
 
     private void logCreateAnalysisTask(AnalysisInfo analysisInfo) {
-        Env.getCurrentEnv().getEditLog().logCreateAnalysisTasks(analysisInfo);
         analysisTaskInfoMap.put(analysisInfo.taskId, analysisInfo);
+        Env.getCurrentEnv().getEditLog().logCreateAnalysisTasks(analysisInfo);
     }
 
     private void logCreateAnalysisJob(AnalysisInfo analysisJob) {
-        Env.getCurrentEnv().getEditLog().logCreateAnalysisJob(analysisJob);
         analysisJobInfoMap.put(analysisJob.jobId, analysisJob);
+        Env.getCurrentEnv().getEditLog().logCreateAnalysisJob(analysisJob);
     }
 
     private void createTaskForExternalTable(AnalysisInfo jobInfo,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
index 301d46644d..3f22d1ccf3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
@@ -78,7 +78,7 @@ public class AnalysisTaskExecutor extends Thread {
                     long timeout = TimeUnit.MINUTES.toMillis(Config.analyze_task_timeout_in_minutes);
                     taskWrapper.get(timeout < 0 ? 0 : timeout, TimeUnit.MILLISECONDS);
                 } catch (Exception e) {
-                    taskWrapper.cancel();
+                    taskWrapper.cancel(e.getMessage());
                 }
             } catch (Throwable throwable) {
                 LOG.warn(throwable);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java
index 4590e138f6..b2615e5d05 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java
@@ -76,12 +76,12 @@ public class AnalysisTaskWrapper extends FutureTask<Void> {
         }
     }
 
-    public boolean cancel() {
+    public boolean cancel(String msg) {
         try {
             LOG.warn("{} cancelled, cost time:{}", task.toString(), System.currentTimeMillis() - startTime);
             task.cancel();
         } catch (Exception e) {
-            LOG.warn(String.format("Cancel job failed job info : %s", task.toString()));
+            LOG.warn(String.format("Cancel job failed job info : %s", msg));
         }
         return super.cancel(false);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java
index 9485ecf662..5735c01430 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java
@@ -134,7 +134,8 @@ public class ColumnStatistic {
                 ndv = count;
             }
             columnStatisticBuilder.setNdv(ndv);
-            columnStatisticBuilder.setNumNulls(Double.parseDouble(resultRow.getColumnValue("null_count")));
+            String nullCount = resultRow.getColumnValue("null_count");
+            columnStatisticBuilder.setNumNulls(nullCount == null ? 0 : Double.parseDouble(nullCount));
             columnStatisticBuilder.setDataSize(Double
                     .parseDouble(resultRow.getColumnValue("data_size_in_bytes")));
             columnStatisticBuilder.setAvgSizeByte(columnStatisticBuilder.getDataSize()
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
index a65553a838..4391d87f52 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java
@@ -20,8 +20,8 @@ package org.apache.doris.statistics;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.common.FeConstants;
-import org.apache.doris.qe.AutoCloseConnectContext;
-import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.qe.QueryState;
+import org.apache.doris.qe.QueryState.MysqlStateType;
 import org.apache.doris.statistics.util.StatisticsUtil;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -108,10 +108,10 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
 
     @VisibleForTesting
     public void execSQL(String sql) throws Exception {
-        try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
-            r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
-            this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
-            this.stmtExecutor.execute();
+        QueryState queryState = StatisticsUtil.execUpdate(sql);
+        if (queryState.getStateType().equals(MysqlStateType.ERR)) {
+            throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s",
+                    info.catalogName, info.dbName, info.colName, queryState.getErrorMessage()));
         }
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java
index 8128d3a7d3..5120bd23ee 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java
@@ -39,7 +39,6 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-
 public class StatisticsAutoAnalyzer extends MasterDaemon {
 
     private static final Logger LOG = LogManager.getLogger(StatisticsAutoAnalyzer.class);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index 8f77df2748..2ee244fbe6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -47,6 +47,7 @@ import org.apache.doris.nereids.trees.expressions.literal.DateTimeLiteral;
 import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
 import org.apache.doris.qe.AutoCloseConnectContext;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryState;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.qe.StmtExecutor;
 import org.apache.doris.statistics.AnalysisInfo;
@@ -105,12 +106,13 @@ public class StatisticsUtil {
         }
     }
 
-    public static void execUpdate(String sql) throws Exception {
+    public static QueryState execUpdate(String sql) throws Exception {
         try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
             r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
             StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext, sql);
             r.connectContext.setExecutor(stmtExecutor);
             stmtExecutor.execute();
+            return r.connectContext.getState();
         }
     }
 
diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
index e8984cd209..86c3d63463 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java
@@ -26,6 +26,7 @@ import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
 import org.apache.doris.statistics.AnalysisInfo.AnalysisMode;
 import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
 import org.apache.doris.statistics.AnalysisInfo.JobType;
+import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
 import org.apache.doris.statistics.util.StatisticsUtil;
 import org.apache.doris.utframe.TestWithFeService;
 
@@ -39,6 +40,7 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Set;
 
 public class AnalysisJobTest extends TestWithFeService {
@@ -80,6 +82,13 @@ public class AnalysisJobTest extends TestWithFeService {
             }
         };
 
+        new MockUp<StmtExecutor>() {
+            @Mock
+            public List<ResultRow> executeInternalQuery() {
+                return Collections.emptyList();
+            }
+        };
+
         new MockUp<ConnectContext>() {
 
             @Mock
diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
index c2b355067d..522a28d9c2 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java
@@ -19,11 +19,13 @@ package org.apache.doris.statistics;
 
 import org.apache.doris.catalog.InternalSchemaInitializer;
 import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.qe.StmtExecutor;
 import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
 import org.apache.doris.statistics.AnalysisInfo.AnalysisMode;
 import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
 import org.apache.doris.statistics.AnalysisInfo.JobType;
 import org.apache.doris.statistics.util.BlockingCounter;
+import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
 import org.apache.doris.utframe.TestWithFeService;
 
 import com.google.common.collect.Maps;
@@ -36,6 +38,7 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 
@@ -94,6 +97,21 @@ public class AnalysisTaskExecutorTest extends TestWithFeService {
 
     @Test
     public void testTaskExecution() throws Exception {
+
+        new MockUp<StmtExecutor>() {
+            @Mock
+            public List<ResultRow> executeInternalQuery() {
+                return Collections.emptyList();
+            }
+        };
+
+        new MockUp<OlapAnalysisTask>() {
+            @Mock
+            public void execSQL(String sql) throws Exception {
+
+            }
+        };
+
         AnalysisTaskExecutor analysisTaskExecutor = new AnalysisTaskExecutor(analysisTaskScheduler);
         HashMap<String, Set<String>> colToPartitions = Maps.newHashMap();
         colToPartitions.put("col1", Collections.singleton("t1"));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org