You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2022/05/01 03:34:13 UTC

[incubator-doris] branch master updated: [feature-wip](statistics) step3: schedule the statistics tasks and update relevant info (#8860)

This is an automated email from the ASF dual-hosted git repository.

lingmiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bd5d4f163 [feature-wip](statistics) step3: schedule the statistics tasks and update relevant info (#8860)
4bd5d4f163 is described below

commit 4bd5d4f1631db8e11ff4ead44206aa2e790b4482
Author: ElvinWei <zh...@outlook.com>
AuthorDate: Sun May 1 11:34:08 2022 +0800

    [feature-wip](statistics) step3: schedule the statistics tasks and update relevant info (#8860)
    
    This pull request includes some implementations of the statistics(https://github.com/apache/incubator-doris/issues/6370), it will not affect any existing code and users will not be able to create statistics job.
    
    After receiving the statistics statement and dividing the collection task, here we will start implementing the scheduling statistics task and updating the job information. Mainly include the following:
    - Create a thread pool to schedule a certain number of tasks, and the number of concurrency is related to the configuration `cbo_concurrency_statistics_task_num`.
    - After the task is completed, update the information of of the statistics Job.
---
 .../doris/analysis/AlterColumnStatsStmt.java       |  18 ++-
 .../apache/doris/analysis/AlterTableStatsStmt.java |  18 ++-
 .../apache/doris/analysis/ShowColumnStatsStmt.java |  12 +-
 .../apache/doris/analysis/ShowTableStatsStmt.java  |   4 +-
 .../org/apache/doris/statistics/ColumnStats.java   |  53 +++++---
 .../org/apache/doris/statistics/Statistics.java    |   8 +-
 .../apache/doris/statistics/StatisticsManager.java |  54 +++++++-
 .../apache/doris/statistics/StatisticsTask.java    |  99 ++++++--------
 .../doris/statistics/StatisticsTaskResult.java     |  18 ++-
 .../doris/statistics/StatisticsTaskScheduler.java  | 147 ++++++++++++++++++---
 .../org/apache/doris/statistics/StatsType.java     |  34 +++--
 .../org/apache/doris/statistics/TableStats.java    |  20 +--
 12 files changed, 332 insertions(+), 153 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColumnStatsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColumnStatsStmt.java
index 7787b3d876..17d631a1a4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColumnStatsStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColumnStatsStmt.java
@@ -25,15 +25,17 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.statistics.ColumnStats;
+import org.apache.doris.statistics.StatsType;
 
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
 
 import java.util.Map;
 import java.util.Optional;
 
 public class AlterColumnStatsStmt extends DdlStmt {
 
-    private static final ImmutableSet<String> CONFIGURABLE_PROPERTIES_SET = new ImmutableSet.Builder<String>()
+    private static final ImmutableSet<StatsType> CONFIGURABLE_PROPERTIES_SET = new ImmutableSet.Builder<StatsType>()
             .add(ColumnStats.NDV)
             .add(ColumnStats.AVG_SIZE)
             .add(ColumnStats.MAX_SIZE)
@@ -45,6 +47,7 @@ public class AlterColumnStatsStmt extends DdlStmt {
     private TableName tableName;
     private String columnName;
     private Map<String, String> properties;
+    public final Map<StatsType, String> statsTypeToValue = Maps.newHashMap();
 
     public AlterColumnStatsStmt(TableName tableName, String columnName, Map<String, String> properties) {
         this.tableName = tableName;
@@ -58,8 +61,8 @@ public class AlterColumnStatsStmt extends DdlStmt {
         // check table name
         tableName.analyze(analyzer);
         // check properties
-        Optional<String> optional = properties.keySet().stream().filter(
-                entity -> !CONFIGURABLE_PROPERTIES_SET.contains(entity.toLowerCase())).findFirst();
+        Optional<StatsType> optional = properties.keySet().stream().map(StatsType::fromString)
+            .filter(statsType -> !CONFIGURABLE_PROPERTIES_SET.contains(statsType)).findFirst();
         if (optional.isPresent()) {
             throw new AnalysisException(optional.get() + " is invalid statistic");
         }
@@ -71,6 +74,11 @@ public class AlterColumnStatsStmt extends DdlStmt {
                     ConnectContext.get().getRemoteIP(),
                     tableName.getDb() + ": " + tableName.getTbl());
         }
+        // get statsTypeToValue
+        properties.forEach((key, value) -> {
+            StatsType statsType = StatsType.fromString(key);
+            statsTypeToValue.put(statsType, value);
+        });
     }
 
     public TableName getTableName() {
@@ -81,7 +89,7 @@ public class AlterColumnStatsStmt extends DdlStmt {
         return columnName;
     }
 
-    public Map<String, String> getProperties() {
-        return properties;
+    public Map<StatsType, String> getStatsTypeToValue() {
+        return statsTypeToValue;
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterTableStatsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterTableStatsStmt.java
index 97512a4160..6d8ea6027e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterTableStatsStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterTableStatsStmt.java
@@ -24,22 +24,25 @@ import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.UserException;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.statistics.StatsType;
 import org.apache.doris.statistics.TableStats;
 
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
 
 import java.util.Map;
 import java.util.Optional;
 
 public class AlterTableStatsStmt extends DdlStmt {
 
-    private static final ImmutableSet<String> CONFIGURABLE_PROPERTIES_SET = new ImmutableSet.Builder<String>()
+    private static final ImmutableSet<StatsType> CONFIGURABLE_PROPERTIES_SET = new ImmutableSet.Builder<StatsType>()
             .add(TableStats.DATA_SIZE)
             .add(TableStats.ROW_COUNT)
             .build();
 
     private TableName tableName;
     private Map<String, String> properties;
+    public final Map<StatsType, String> statsTypeToValue = Maps.newHashMap();
 
     public AlterTableStatsStmt(TableName tableName, Map<String, String> properties) {
         this.tableName = tableName;
@@ -52,8 +55,8 @@ public class AlterTableStatsStmt extends DdlStmt {
         // check table name
         tableName.analyze(analyzer);
         // check properties
-        Optional<String> optional = properties.keySet().stream().filter(
-                entity -> !CONFIGURABLE_PROPERTIES_SET.contains(entity.toLowerCase())).findFirst();
+        Optional<StatsType> optional = properties.keySet().stream().map(StatsType::fromString)
+            .filter(statsType -> !CONFIGURABLE_PROPERTIES_SET.contains(statsType)).findFirst();
         if (optional.isPresent()) {
             throw new AnalysisException(optional.get() + " is invalid statistic");
         }
@@ -65,13 +68,18 @@ public class AlterTableStatsStmt extends DdlStmt {
                     ConnectContext.get().getRemoteIP(),
                     tableName.getDb() + ": " + tableName.getTbl());
         }
+        // get statsTypeToValue
+        properties.forEach((key, value) -> {
+            StatsType statsType = StatsType.fromString(key);
+            statsTypeToValue.put(statsType, value);
+        });
     }
 
     public TableName getTableName() {
         return tableName;
     }
 
-    public Map<String, String> getProperties() {
-        return properties;
+    public Map<StatsType, String> getStatsTypeToValue() {
+        return statsTypeToValue;
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java
index af41121301..784e719188 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java
@@ -31,12 +31,12 @@ public class ShowColumnStatsStmt extends ShowStmt {
     private static final ImmutableList<String> TITLE_NAMES =
             new ImmutableList.Builder<String>()
                     .add("column_name")
-                    .add(ColumnStats.NDV)
-                    .add(ColumnStats.AVG_SIZE)
-                    .add(ColumnStats.MAX_SIZE)
-                    .add(ColumnStats.NUM_NULLS)
-                    .add(ColumnStats.MIN_VALUE)
-                    .add(ColumnStats.MAX_VALUE)
+                    .add(ColumnStats.NDV.getValue())
+                    .add(ColumnStats.AVG_SIZE.getValue())
+                    .add(ColumnStats.MAX_SIZE.getValue())
+                    .add(ColumnStats.NUM_NULLS.getValue())
+                    .add(ColumnStats.MIN_VALUE.getValue())
+                    .add(ColumnStats.MAX_VALUE.getValue())
                     .build();
 
     private TableName tableName;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java
index a35eb1f0c1..90541cb7dc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java
@@ -35,8 +35,8 @@ public class ShowTableStatsStmt extends ShowStmt {
     private static final ImmutableList<String> TITLE_NAMES =
             new ImmutableList.Builder<String>()
                     .add("table_name")
-                    .add(TableStats.ROW_COUNT)
-                    .add(TableStats.DATA_SIZE)
+                    .add(TableStats.ROW_COUNT.getValue())
+                    .add(TableStats.DATA_SIZE.getValue())
                     .build();
 
     private TableName tableName;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStats.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStats.java
index 5982982f21..df3b17899b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStats.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStats.java
@@ -32,12 +32,12 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.util.Util;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 
 import java.util.List;
 import java.util.Map;
 import java.util.function.Predicate;
 
-import com.clearspring.analytics.util.Lists;
 
 /**
  * There are the statistics of column.
@@ -57,12 +57,12 @@ import com.clearspring.analytics.util.Lists;
  */
 public class ColumnStats {
 
-    public static final String NDV = "ndv";
-    public static final String AVG_SIZE = "avg_size";
-    public static final String MAX_SIZE = "max_size";
-    public static final String NUM_NULLS = "num_nulls";
-    public static final String MIN_VALUE = "min_value";
-    public static final String MAX_VALUE = "max_value";
+    public static final StatsType NDV = StatsType.NDV;
+    public static final StatsType AVG_SIZE = StatsType.AVG_SIZE;
+    public static final StatsType MAX_SIZE = StatsType.MAX_SIZE;
+    public static final StatsType NUM_NULLS = StatsType.NUM_NULLS;
+    public static final StatsType MIN_VALUE = StatsType.MIN_VALUE;
+    public static final StatsType MAX_VALUE = StatsType.MAX_VALUE;
 
     private static final Predicate<Long> DESIRED_NDV_PRED = (v) -> v >= -1L;
     private static final Predicate<Float> DESIRED_AVG_SIZE_PRED = (v) -> (v == -1) || (v >= 0);
@@ -76,25 +76,34 @@ public class ColumnStats {
     private LiteralExpr minValue;
     private LiteralExpr maxValue;
 
-    public void updateStats(Type columnType, Map<String, String> statsNameToValue) throws AnalysisException {
-        for (Map.Entry<String, String> entry : statsNameToValue.entrySet()) {
-            String statsName = entry.getKey();
-            if (statsName.equalsIgnoreCase(NDV)) {
-                ndv = Util.getLongPropertyOrDefault(entry.getValue(), ndv,
+    public void updateStats(Type columnType, Map<StatsType, String> statsNameToValue) throws AnalysisException {
+        for (Map.Entry<StatsType, String> entry : statsNameToValue.entrySet()) {
+            StatsType statsType = entry.getKey();
+            switch (statsType) {
+                case NDV:
+                    ndv = Util.getLongPropertyOrDefault(entry.getValue(), ndv,
                         DESIRED_NDV_PRED, NDV + " should >= -1");
-            } else if (statsName.equalsIgnoreCase(AVG_SIZE)) {
-                avgSize = Util.getFloatPropertyOrDefault(entry.getValue(), avgSize,
+                    break;
+                case AVG_SIZE:
+                    avgSize = Util.getFloatPropertyOrDefault(entry.getValue(), avgSize,
                         DESIRED_AVG_SIZE_PRED, AVG_SIZE + " should (>=0) or (=-1)");
-            } else if (statsName.equalsIgnoreCase(MAX_SIZE)) {
-                maxSize = Util.getLongPropertyOrDefault(entry.getValue(), maxSize,
+                    break;
+                case MAX_SIZE:
+                    maxSize = Util.getLongPropertyOrDefault(entry.getValue(), maxSize,
                         DESIRED_MAX_SIZE_PRED, MAX_SIZE + " should >=-1");
-            } else if (statsName.equalsIgnoreCase(NUM_NULLS)) {
-                numNulls = Util.getLongPropertyOrDefault(entry.getValue(), numNulls,
+                    break;
+                case NUM_NULLS:
+                    numNulls = Util.getLongPropertyOrDefault(entry.getValue(), numNulls,
                         DESIRED_NUM_NULLS_PRED, NUM_NULLS + " should >=-1");
-            } else if (statsName.equalsIgnoreCase(MIN_VALUE)) {
-                minValue = validateColumnValue(columnType, entry.getValue());
-            } else if (statsName.equalsIgnoreCase(MAX_VALUE)) {
-                maxValue = validateColumnValue(columnType, entry.getValue());
+                    break;
+                case MIN_VALUE:
+                    minValue = validateColumnValue(columnType, entry.getValue());
+                    break;
+                case MAX_VALUE:
+                    maxValue = validateColumnValue(columnType, entry.getValue());
+                    break;
+                default:
+                    throw new AnalysisException("Unknown stats type: " + statsType);
             }
         }
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/Statistics.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/Statistics.java
index 58003de90c..77aed0c5f6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/Statistics.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/Statistics.java
@@ -38,25 +38,25 @@ public class Statistics {
 
     private Map<Long, TableStats> idToTableStats = Maps.newConcurrentMap();
 
-    public void updateTableStats(long tableId, Map<String, String> statsNameToValue)
+    public void updateTableStats(long tableId, Map<StatsType, String> statsTypeToValue)
             throws AnalysisException {
         TableStats tableStats = idToTableStats.get(tableId);
         if (tableStats == null) {
             tableStats = new TableStats();
             idToTableStats.put(tableId, tableStats);
         }
-        tableStats.updateTableStats(statsNameToValue);
+        tableStats.updateTableStats(statsTypeToValue);
     }
 
     public void updateColumnStats(long tableId, String columnName, Type columnType,
-                                  Map<String, String> statsNameToValue)
+                                  Map<StatsType, String> statsTypeToValue)
             throws AnalysisException {
         TableStats tableStats = idToTableStats.get(tableId);
         if (tableStats == null) {
             tableStats = new TableStats();
             idToTableStats.put(tableId, tableStats);
         }
-        tableStats.updateColumnStats(columnName, columnType, statsNameToValue);
+        tableStats.updateColumnStats(columnName, columnType, statsTypeToValue);
     }
 
     public TableStats getTableStats(long tableId) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsManager.java
index b1c88e36a5..0fea564740 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsManager.java
@@ -24,18 +24,25 @@ import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
 
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.util.List;
 import java.util.Map;
 
-import com.clearspring.analytics.util.Lists;
-
 public class StatisticsManager {
+    private final static Logger LOG = LogManager.getLogger(StatisticsManager.class);
+
     private Statistics statistics;
 
     public StatisticsManager() {
@@ -45,7 +52,7 @@ public class StatisticsManager {
     public void alterTableStatistics(AlterTableStatsStmt stmt)
             throws AnalysisException {
         Table table = validateTableName(stmt.getTableName());
-        statistics.updateTableStats(table.getId(), stmt.getProperties());
+        statistics.updateTableStats(table.getId(), stmt.getStatsTypeToValue());
     }
 
     public void alterColumnStatistics(AlterColumnStatsStmt stmt) throws AnalysisException {
@@ -56,7 +63,7 @@ public class StatisticsManager {
             ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_FIELD_ERROR, columnName, table.getName());
         }
         // match type and column value
-        statistics.updateColumnStats(table.getId(), columnName, column.getType(), stmt.getProperties());
+        statistics.updateColumnStats(table.getId(), columnName, column.getType(), stmt.getStatsTypeToValue());
     }
 
     public List<List<String>> showTableStatsList(String dbName, String tableName)
@@ -128,13 +135,48 @@ public class StatisticsManager {
         return row;
     }
 
+    public void alterTableStatistics(StatisticsTaskResult taskResult) throws AnalysisException {
+        StatsCategoryDesc categoryDesc = taskResult.getCategoryDesc();
+        validateTableAndColumn(categoryDesc);
+        long tblId = categoryDesc.getTableId();
+        Map<StatsType, String> statsTypeToValue = taskResult.getStatsTypeToValue();
+        statistics.updateTableStats(tblId, statsTypeToValue);
+    }
+
+    public void alterColumnStatistics(StatisticsTaskResult taskResult) throws AnalysisException {
+        StatsCategoryDesc categoryDesc = taskResult.getCategoryDesc();
+        validateTableAndColumn(categoryDesc);
+        long dbId = categoryDesc.getDbId();
+        long tblId = categoryDesc.getTableId();
+        Database db = Catalog.getCurrentCatalog().getDbOrAnalysisException(dbId);
+        Table table = db.getTableOrAnalysisException(tblId);
+        String columnName = categoryDesc.getColumnName();
+        Type columnType = table.getColumn(columnName).getType();
+        Map<StatsType, String> statsTypeToValue = taskResult.getStatsTypeToValue();
+        statistics.updateColumnStats(tblId, columnName, columnType, statsTypeToValue);
+    }
+
     private Table validateTableName(TableName dbTableName) throws AnalysisException {
         String dbName = dbTableName.getDb();
         String tableName = dbTableName.getTbl();
 
         Database db = Catalog.getCurrentCatalog().getDbOrAnalysisException(dbName);
-        Table table = db.getTableOrAnalysisException(tableName);
-        return table;
+        return db.getTableOrAnalysisException(tableName);
+    }
+
+    private void validateTableAndColumn(StatsCategoryDesc categoryDesc) throws AnalysisException {
+        long dbId = categoryDesc.getDbId();
+        long tblId = categoryDesc.getTableId();
+        String columnName = categoryDesc.getColumnName();
+
+        Database db = Catalog.getCurrentCatalog().getDbOrAnalysisException(dbId);
+        Table table = db.getTableOrAnalysisException(tblId);
+        if (!Strings.isNullOrEmpty(columnName)) {
+            Column column = table.getColumn(columnName);
+            if (column == null) {
+                throw new AnalysisException("Column " + columnName + " does not exist in table " + table.getName());
+            }
+        }
     }
 
     public Statistics getStatistics() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java
index fa236584c3..76af28c48f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java
@@ -25,7 +25,6 @@ import org.apache.logging.log4j.Logger;
 
 import java.util.List;
 import java.util.concurrent.Callable;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * The StatisticsTask belongs to one StatisticsJob.
@@ -48,8 +47,6 @@ public abstract class StatisticsTask implements Callable<StatisticsTaskResult> {
         FAILED
     }
 
-    protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
-
     protected long id = Catalog.getCurrentCatalog().getNextId();
     protected long jobId;
     protected StatsGranularityDesc granularityDesc;
@@ -71,24 +68,8 @@ public abstract class StatisticsTask implements Callable<StatisticsTaskResult> {
         this.statsTypeList = statsTypeList;
     }
 
-    public void readLock() {
-        lock.readLock().lock();
-    }
-
-    public void readUnlock() {
-        lock.readLock().unlock();
-    }
-
-    protected void writeLock() {
-        lock.writeLock().lock();
-    }
-
-    protected void writeUnlock() {
-        lock.writeLock().unlock();
-    }
-
     public long getId() {
-        return this.id;
+        return id;
     }
 
     public void setId(long id) {
@@ -96,35 +77,35 @@ public abstract class StatisticsTask implements Callable<StatisticsTaskResult> {
     }
 
     public long getJobId() {
-        return this.jobId;
+        return jobId;
     }
 
     public StatsGranularityDesc getGranularityDesc() {
-        return this.granularityDesc;
+        return granularityDesc;
     }
 
     public StatsCategoryDesc getCategoryDesc() {
-        return this.categoryDesc;
+        return categoryDesc;
     }
 
     public List<StatsType> getStatsTypeList() {
-        return this.statsTypeList;
+        return statsTypeList;
     }
 
     public TaskState getTaskState() {
-        return this.taskState;
+        return taskState;
     }
 
     public long getCreateTime() {
-        return this.createTime;
+        return createTime;
     }
 
     public long getStartTime() {
-        return this.startTime;
+        return startTime;
     }
 
     public long getFinishTime() {
-        return this.finishTime;
+        return finishTime;
     }
 
     /**
@@ -139,44 +120,38 @@ public abstract class StatisticsTask implements Callable<StatisticsTaskResult> {
     // please retain job lock firstly
     public void updateTaskState(TaskState newState) throws DdlException {
         LOG.info("To change statistics task(id={}) state from {} to {}", id, taskState, newState);
-        try {
-            // PENDING -> RUNNING/FAILED
-            if (taskState == TaskState.PENDING) {
-                if (newState == TaskState.RUNNING) {
-                    taskState = newState;
-                    // task start running, set start time
+        String errorMsg = "Invalid statistics task state transition from ";
+
+        // PENDING -> RUNNING/FAILED
+        if (taskState == TaskState.PENDING) {
+            switch (newState) {
+                case RUNNING:
                     startTime = System.currentTimeMillis();
-                    LOG.info("Statistics task(id={}) state changed from {} to {}", id, taskState, newState);
-                } else if (newState == TaskState.FAILED) {
-                    taskState = newState;
-                    LOG.info("Statistics task(id={}) state changed from {} to {}", id, taskState, newState);
-                } else {
-                    LOG.info("Invalid task(id={}) state transition from {} to {}", id, taskState, newState);
-                    throw new DdlException("Invalid task state transition from PENDING to " + newState);
-                }
-                return;
+                    break;
+                case FAILED:
+                    finishTime = System.currentTimeMillis();
+                    break;
+                default:
+                    throw new DdlException(errorMsg + taskState + " to " + newState);
             }
-
-            // RUNNING -> FINISHED/FAILED
-            if (taskState == TaskState.RUNNING) {
-                if (newState == TaskState.FINISHED) {
-                    // set finish time
+        }
+        // RUNNING -> FINISHED/FAILED
+        else if (taskState == TaskState.RUNNING) {
+            switch (newState) {
+                case FINISHED:
+                case FAILED:
                     finishTime = System.currentTimeMillis();
-                    taskState = newState;
-                    LOG.info("Statistics task(id={}) state changed from {} to {}", id, taskState, newState);
-                } else if (newState == TaskState.FAILED) {
-                    taskState = newState;
-                    LOG.info("Statistics task(id={}) state changed from {} to {}", id, taskState, newState);
-                } else {
-                    LOG.info("Invalid task(id={}) state transition from {} to {}", id, taskState, newState);
-                    throw new DdlException("Invalid task state transition from RUNNING to " + newState);
-                }
+                    break;
+                default:
+                    throw new DdlException(errorMsg + taskState + " to " + newState);
             }
-
-            LOG.info("Invalid task(id={}) state transition from {} to {}", id, taskState, newState);
-            throw new DdlException("Invalid task state transition from " + taskState + " to " + newState);
-        } finally {
-            LOG.info("Statistics task(id={}) current state is {}", id, taskState);
         }
+        // unsupported state transition
+        else {
+            throw new DdlException(errorMsg + taskState + " to " + newState);
+        }
+
+        LOG.info("Statistics job(id={}) state changed from {} to {}", id, taskState, newState);
+        taskState = newState;
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskResult.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskResult.java
index 700e2fa93a..94f4c50934 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskResult.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskResult.java
@@ -20,9 +20,9 @@ package org.apache.doris.statistics;
 import java.util.Map;
 
 public class StatisticsTaskResult {
-    private StatsGranularityDesc granularityDesc;
-    private StatsCategoryDesc categoryDesc;
-    private Map<StatsType, String> statsTypeToValue;
+    private final StatsGranularityDesc granularityDesc;
+    private final StatsCategoryDesc categoryDesc;
+    private final Map<StatsType, String> statsTypeToValue;
 
     public StatisticsTaskResult(StatsGranularityDesc granularityDesc, StatsCategoryDesc categoryDesc,
                                 Map<StatsType, String> statsTypeToValue) {
@@ -30,4 +30,16 @@ public class StatisticsTaskResult {
         this.categoryDesc = categoryDesc;
         this.statsTypeToValue = statsTypeToValue;
     }
+
+    public StatsGranularityDesc getGranularityDesc() {
+        return granularityDesc;
+    }
+
+    public StatsCategoryDesc getCategoryDesc() {
+        return categoryDesc;
+    }
+
+    public Map<StatsType, String> getStatsTypeToValue() {
+        return statsTypeToValue;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskScheduler.java
index 46450417b6..0edfa82047 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskScheduler.java
@@ -17,20 +17,33 @@
 
 package org.apache.doris.statistics;
 
+import org.apache.doris.analysis.AnalyzeStmt;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.statistics.StatisticsJob.JobState;
+import org.apache.doris.statistics.StatisticsTask.TaskState;
+import org.apache.doris.statistics.StatsCategoryDesc.StatsCategory;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /*
 Schedule statistics task
@@ -46,25 +59,41 @@ public class StatisticsTaskScheduler extends MasterDaemon {
 
     @Override
     protected void runAfterCatalogReady() {
-        // TODO
         // step1: task n concurrent tasks from the queue
         List<StatisticsTask> tasks = peek();
-        // step2: execute tasks
-        ExecutorService executor = Executors.newFixedThreadPool(tasks.size());
-        List<Future<StatisticsTaskResult>> taskResultList = null;
-        try {
-            taskResultList = executor.invokeAll(tasks);
-        } catch (InterruptedException e) {
-            LOG.warn("Failed to execute this turn of statistics tasks", e);
-        }
-        // step3: update job and statistics
-        handleTaskResult(taskResultList);
-        // step4: remove task from queue
-        remove(tasks.size());
 
+        if (!tasks.isEmpty()) {
+            ThreadPoolExecutor executor = ThreadPoolManager.newDaemonCacheThreadPool(tasks.size(),
+                "statistic-pool", false);
+            StatisticsJobManager jobManager = Catalog.getCurrentCatalog().getStatisticsJobManager();
+            Map<Long, StatisticsJob> statisticsJobs = jobManager.getIdToStatisticsJob();
+            Map<Long, List<Map<Long, Future<StatisticsTaskResult>>>> resultMap = Maps.newLinkedHashMap();
+
+            for (StatisticsTask task : tasks) {
+                queue.remove();
+                long jobId = task.getJobId();
+                StatisticsJob statisticsJob = statisticsJobs.get(jobId);
+
+                if (checkJobIsValid(jobId)) {
+                    // step2: execute task and save task result
+                    Future<StatisticsTaskResult> future = executor.submit(task);
+                    if (updateTaskAndJobState(task, statisticsJob)) {
+                        Map<Long, Future<StatisticsTaskResult>> taskInfo = Maps.newHashMap();
+                        taskInfo.put(task.getId(), future);
+                        List<Map<Long, Future<StatisticsTaskResult>>> jobInfo = resultMap
+                                .getOrDefault(jobId, Lists.newArrayList());
+                        jobInfo.add(taskInfo);
+                        resultMap.put(jobId, jobInfo);
+                    }
+                }
+            }
+
+            // step3: handle task results
+            handleTaskResult(resultMap);
+        }
     }
 
-    public void addTasks(List<StatisticsTask> statisticsTaskList) {
+    public void addTasks(List<StatisticsTask> statisticsTaskList) throws IllegalStateException {
         queue.addAll(statisticsTaskList);
     }
 
@@ -82,11 +111,89 @@ public class StatisticsTaskScheduler extends MasterDaemon {
         return tasks;
     }
 
-    private void remove(int size) {
-        // TODO
+    /**
+     * Update task and job state
+     *
+     * @param task statistics task
+     * @param job  statistics job
+     * @return true if update task and job state successfully.
+     */
+    private boolean updateTaskAndJobState(StatisticsTask task, StatisticsJob job) {
+        try {
+            // update task state
+            task.updateTaskState(TaskState.RUNNING);
+        } catch (DdlException e) {
+            LOG.info("Update statistics task state failed, taskId: " + task.getId(), e);
+        }
+
+        try {
+            // update job state
+            if (task.getTaskState() != TaskState.RUNNING) {
+                job.updateJobState(JobState.FAILED);
+            } else {
+                if (job.getJobState() == JobState.SCHEDULING) {
+                    job.updateJobState(JobState.RUNNING);
+                }
+            }
+        } catch (DdlException e) {
+            LOG.info("Update statistics job state failed, jobId: " + job.getId(), e);
+            return false;
+        }
+        return true;
+    }
+
+    private void handleTaskResult(Map<Long, List<Map<Long, Future<StatisticsTaskResult>>>> resultMap) {
+        StatisticsManager statsManager = Catalog.getCurrentCatalog().getStatisticsManager();
+        StatisticsJobManager jobManager = Catalog.getCurrentCatalog().getStatisticsJobManager();
+
+        resultMap.forEach((jobId, taskMapList) -> {
+            if (checkJobIsValid(jobId)) {
+                String errorMsg = "";
+                StatisticsJob statisticsJob = jobManager.getIdToStatisticsJob().get(jobId);
+                Map<String, String> properties = statisticsJob.getProperties();
+                long timeout = Long.parseLong(properties.get(AnalyzeStmt.CBO_STATISTICS_TASK_TIMEOUT_SEC));
+
+                for (Map<Long, Future<StatisticsTaskResult>> taskInfos : taskMapList) {
+                    for (Map.Entry<Long, Future<StatisticsTaskResult>> taskInfo : taskInfos.entrySet()) {
+                        Long taskId = taskInfo.getKey();
+                        Future<StatisticsTaskResult> future = taskInfo.getValue();
+
+                        try {
+                            StatisticsTaskResult taskResult = future.get(timeout, TimeUnit.SECONDS);
+                            StatsCategoryDesc categoryDesc = taskResult.getCategoryDesc();
+                            StatsCategory category = categoryDesc.getCategory();
+                            if (category == StatsCategory.TABLE) {
+                                // update table statistics
+                                statsManager.alterTableStatistics(taskResult);
+                            } else if (category == StatsCategory.COLUMN) {
+                                // update column statistics
+                                statsManager.alterColumnStatistics(taskResult);
+                            }
+                        } catch (AnalysisException | TimeoutException | ExecutionException
+                                | InterruptedException | CancellationException e) {
+                            errorMsg = e.getMessage();
+                            LOG.info("Failed to update statistics. jobId: {}, taskId: {}, e: {}", jobId, taskId, e);
+                        }
+
+                        try {
+                            // update the task and job info
+                            statisticsJob.updateJobInfoByTaskId(taskId, errorMsg);
+                        } catch (DdlException e) {
+                            LOG.info("Failed to update statistics job info. jobId: {}, taskId: {}, e: {}", jobId, taskId, e);
+                        }
+                    }
+                }
+            }
+        });
     }
 
-    private void handleTaskResult(List<Future<StatisticsTaskResult>> taskResultLists) {
-        // TODO
+    public boolean checkJobIsValid(Long jobId) {
+        StatisticsJobManager jobManager = Catalog.getCurrentCatalog().getStatisticsJobManager();
+        StatisticsJob statisticsJob = jobManager.getIdToStatisticsJob().get(jobId);
+        if (statisticsJob == null) {
+            return false;
+        }
+        JobState jobState = statisticsJob.getJobState();
+        return jobState != JobState.CANCELLED && jobState != JobState.FAILED;
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsType.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsType.java
index 50f916dadf..e805bff1a2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsType.java
@@ -18,12 +18,30 @@
 package org.apache.doris.statistics;
 
 public enum StatsType {
-    ROW_COUNT,
-    DATA_SIZE,
-    NDV,
-    AVG_SIZE,
-    MAX_SIZE,
-    NUM_NULLS,
-    MIN_VALUE,
-    MAX_VALUE
+    ROW_COUNT("row_count"),
+    DATA_SIZE("data_size"),
+    NDV("ndv"),
+    AVG_SIZE("avg_size"),
+    MAX_SIZE("max_size"),
+    NUM_NULLS("num_nulls"),
+    MIN_VALUE("min_value"),
+    MAX_VALUE("max_value"),
+    MAX_COL_LENS("max_col_lens"),
+    AVG_COL_LENS("avg_col_lens");
+    private final String value;
+    StatsType(String value) {
+        this.value = value;
+    }
+    public String getValue() {
+        return value;
+    }
+
+    public static StatsType fromString(String value) {
+        for (StatsType type : StatsType.values()) {
+            if (type.value.equalsIgnoreCase(value)) {
+                return type;
+            }
+        }
+        throw new IllegalArgumentException("Invalid StatsType: " + value);
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStats.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStats.java
index ef494bd9f6..08ef2cc6be 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStats.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStats.java
@@ -21,13 +21,13 @@ import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.util.Util;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 import java.util.List;
 import java.util.Map;
 import java.util.function.Predicate;
 
-import com.clearspring.analytics.util.Lists;
 
 /**
  * There are the statistics of table.
@@ -51,8 +51,8 @@ import com.clearspring.analytics.util.Lists;
  */
 public class TableStats {
 
-    public static final String ROW_COUNT = "row_count";
-    public static final String DATA_SIZE = "data_size";
+    public static final StatsType DATA_SIZE = StatsType.DATA_SIZE;
+    public static final StatsType ROW_COUNT = StatsType.ROW_COUNT;
 
     private static final Predicate<Long> DESIRED_ROW_COUNT_PRED = (v) -> v >= -1L;
     private static final Predicate<Long> DESIRED_DATA_SIZE_PRED = (v) -> v >= -1L;
@@ -61,27 +61,27 @@ public class TableStats {
     private long dataSize = -1;
     private Map<String, ColumnStats> nameToColumnStats = Maps.newConcurrentMap();
 
-    public void updateTableStats(Map<String, String> statsNameToValue) throws AnalysisException {
-        for (Map.Entry<String, String> entry : statsNameToValue.entrySet()) {
-            String statsName = entry.getKey();
-            if (statsName.equalsIgnoreCase(ROW_COUNT)) {
+    public void updateTableStats(Map<StatsType, String> statsTypeToValue) throws AnalysisException {
+        for (Map.Entry<StatsType, String> entry : statsTypeToValue.entrySet()) {
+            StatsType statsType = entry.getKey();
+            if (statsType == ROW_COUNT) {
                 rowCount = Util.getLongPropertyOrDefault(entry.getValue(), rowCount,
                         DESIRED_ROW_COUNT_PRED, ROW_COUNT + " should >= -1");
-            } else if (statsName.equalsIgnoreCase(DATA_SIZE)) {
+            } else if (statsType == DATA_SIZE) {
                 dataSize = Util.getLongPropertyOrDefault(entry.getValue(), dataSize,
                         DESIRED_DATA_SIZE_PRED, DATA_SIZE + " should >= -1");
             }
         }
     }
 
-    public void updateColumnStats(String columnName, Type columnType, Map<String, String> statsNameToValue)
+    public void updateColumnStats(String columnName, Type columnType, Map<StatsType, String> statsTypeToValue)
             throws AnalysisException {
         ColumnStats columnStats = nameToColumnStats.get(columnName);
         if (columnStats == null) {
             columnStats = new ColumnStats();
             nameToColumnStats.put(columnName, columnStats);
         }
-        columnStats.updateStats(columnType, statsNameToValue);
+        columnStats.updateStats(columnType, statsTypeToValue);
     }
 
     public List<String> getShowInfo() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org