You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2022/07/19 08:22:31 UTC

[doris] branch master updated: [feature-wip](statistics) step4: collect statistics by implementing statistics tasks (#8861)

This is an automated email from the ASF dual-hosted git repository.

lingmiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d90f4b87c [feature-wip](statistics) step4: collect statistics by implementing statistics tasks (#8861)
2d90f4b87c is described below

commit 2d90f4b87cea046aa6b42f4b2afb259f96bbafd0
Author: ElvinWei <zh...@outlook.com>
AuthorDate: Tue Jul 19 16:22:25 2022 +0800

    [feature-wip](statistics) step4: collect statistics by implementing statistics tasks (#8861)
    
    This pull request includes some implementations of the statistics(https://github.com/apache/incubator-doris/issues/6370), it will not affect any existing code and users will not be able to create statistics job.
    
    Now only MetaStatisticsTask that directly collects statistics by reading FE meta is implemented. SQLStatisticsTask is still being implemented, it needs to query BE through FE.
    
    The following is the function implemented by this pr:
    1. Support statistics collection for partitioned and non-partitioned tables. For partitioned tables, the collection of statistics for the specified partition is implemented.
    2. When the task is divided, it is divided according to the partition table and the non-partition table. The most fine-grained is to the tablet level. A matetask collects as many statistics as possible.
    3. Add partition statistics (Table -> Partition -> Column). For example, the size of the table, the number of rows, the size of the partition, the number of rows, the maximum and minimum values of the columns, etc.
    4. Display and modify partition-level statistics.
     …
---
 .../doris/analysis/AlterColumnStatsStmt.java       |   7 +
 .../apache/doris/analysis/AlterTableStatsStmt.java |   7 +
 .../org/apache/doris/analysis/AnalyzeStmt.java     | 201 +++++++--
 .../apache/doris/analysis/ShowColumnStatsStmt.java |   8 +
 .../apache/doris/analysis/ShowTableStatsStmt.java  |   8 +
 .../java/org/apache/doris/qe/ShowExecutor.java     |   4 +-
 .../org/apache/doris/statistics/ColumnStats.java   |  52 ++-
 .../doris/statistics/MetaStatisticsTask.java       | 126 +++++-
 .../doris/statistics/OlapScanStatsDerive.java      |   3 +-
 .../{TableStats.java => PartitionStats.java}       | 106 +++--
 .../apache/doris/statistics/SQLStatisticsTask.java |  24 +-
 .../doris/statistics/SampleSQLStatisticsTask.java  |   5 +-
 .../org/apache/doris/statistics/Statistics.java    | 121 +++++-
 .../apache/doris/statistics/StatisticsDesc.java    |  61 +++
 .../org/apache/doris/statistics/StatisticsJob.java |  24 +-
 .../doris/statistics/StatisticsJobScheduler.java   | 471 +++++++++++++++-----
 .../apache/doris/statistics/StatisticsManager.java | 484 ++++++++++++++++++---
 .../apache/doris/statistics/StatisticsTask.java    |  59 ++-
 .../doris/statistics/StatisticsTaskResult.java     | 117 ++++-
 .../doris/statistics/StatisticsTaskScheduler.java  |  49 +--
 .../{StatsCategoryDesc.java => StatsCategory.java} |  35 +-
 ...sGranularityDesc.java => StatsGranularity.java} |  21 +-
 .../org/apache/doris/statistics/StatsType.java     |   4 +-
 .../org/apache/doris/statistics/TableStats.java    | 203 +++++++--
 24 files changed, 1771 insertions(+), 429 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColumnStatsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColumnStatsStmt.java
index c90d816efb..a10f7738b4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColumnStatsStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColumnStatsStmt.java
@@ -29,8 +29,10 @@ import org.apache.doris.statistics.ColumnStats;
 import org.apache.doris.statistics.StatsType;
 
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
@@ -95,4 +97,9 @@ public class AlterColumnStatsStmt extends DdlStmt {
     public Map<StatsType, String> getStatsTypeToValue() {
         return statsTypeToValue;
     }
+
+    public List<String> getPartitionNames() {
+        // TODO(WZT): partition statistics
+        return Lists.newArrayList();
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterTableStatsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterTableStatsStmt.java
index be27a9b1e4..a2e5826776 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterTableStatsStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterTableStatsStmt.java
@@ -29,8 +29,10 @@ import org.apache.doris.statistics.StatsType;
 import org.apache.doris.statistics.TableStats;
 
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
@@ -85,4 +87,9 @@ public class AlterTableStatsStmt extends DdlStmt {
     public Map<StatsType, String> getStatsTypeToValue() {
         return statsTypeToValue;
     }
+
+    public List<String> getPartitionNames() {
+        // TODO(WZT): partition statistics
+        return Lists.newArrayList();
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java
index 6e7f56ccf3..52e2085b6a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java
@@ -20,12 +20,15 @@ package org.apache.doris.analysis;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.PrintableMap;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.mysql.privilege.PaloAuth;
 import org.apache.doris.mysql.privilege.PrivPredicate;
@@ -37,8 +40,7 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import org.apache.commons.lang.StringUtils;
 
 import java.util.List;
 import java.util.Map;
@@ -60,8 +62,6 @@ import java.util.stream.Collectors;
  *      properties: properties of statistics jobs
  */
 public class AnalyzeStmt extends DdlStmt {
-    private static final Logger LOG = LogManager.getLogger(AnalyzeStmt.class);
-
     // time to wait for collect  statistics
     public static final String CBO_STATISTICS_TASK_TIMEOUT_SEC = "cbo_statistics_task_timeout_sec";
 
@@ -69,38 +69,45 @@ public class AnalyzeStmt extends DdlStmt {
             .add(CBO_STATISTICS_TASK_TIMEOUT_SEC)
             .build();
 
-    public static final Predicate<Long> DESIRED_TASK_TIMEOUT_SEC = (v) -> v > 0L;
+    private static final Predicate<Long> DESIRED_TASK_TIMEOUT_SEC = (v) -> v > 0L;
 
-    private final TableName dbTableName;
-    private final List<String> columnNames;
-    private final Map<String, String> properties;
+    private final TableName optTableName;
+    private final PartitionNames optPartitionNames;
+    private final List<String> optColumnNames;
+    private Map<String, String> optProperties;
 
     // after analyzed
     private long dbId;
     private final Set<Long> tblIds = Sets.newHashSet();
-
-    public AnalyzeStmt(TableName dbTableName, List<String> columns, Map<String, String> properties) {
-        this.dbTableName = dbTableName;
-        this.columnNames = columns;
-        this.properties = properties == null ? Maps.newHashMap() : properties;
+    private final List<String> partitionNames = Lists.newArrayList();
+
+    // TODO(wzt): support multiple tables
+    public AnalyzeStmt(TableName optTableName,
+            List<String> optColumnNames,
+            PartitionNames optPartitionNames,
+            Map<String, String> optProperties) {
+        this.optTableName = optTableName;
+        this.optColumnNames = optColumnNames;
+        this.optPartitionNames = optPartitionNames;
+        this.optProperties = optProperties;
     }
 
     public long getDbId() {
         Preconditions.checkArgument(isAnalyzed(),
                 "The dbId must be obtained after the parsing is complete");
-        return this.dbId;
+        return dbId;
     }
 
     public Set<Long> getTblIds() {
         Preconditions.checkArgument(isAnalyzed(),
                 "The tblIds must be obtained after the parsing is complete");
-        return this.tblIds;
+        return tblIds;
     }
 
     public Database getDb() throws AnalysisException {
         Preconditions.checkArgument(isAnalyzed(),
                 "The db must be obtained after the parsing is complete");
-        return this.analyzer.getCatalog().getInternalDataSource().getDbOrAnalysisException(this.dbId);
+        return analyzer.getCatalog().getInternalDataSource().getDbOrAnalysisException(dbId);
     }
 
     public List<Table> getTables() throws AnalysisException {
@@ -111,7 +118,7 @@ public class AnalyzeStmt extends DdlStmt {
 
         db.readLock();
         try {
-            for (Long tblId : this.tblIds) {
+            for (Long tblId : tblIds) {
                 Table table = db.getTableOrAnalysisException(tblId);
                 tables.add(table);
             }
@@ -122,12 +129,39 @@ public class AnalyzeStmt extends DdlStmt {
         return tables;
     }
 
+    public List<String> getPartitionNames() {
+        Preconditions.checkArgument(isAnalyzed(),
+                "The partitionNames must be obtained after the parsing is complete");
+        return partitionNames;
+    }
+
+    public Map<Long, List<String>> getTableIdToPartitionName() throws AnalysisException {
+        Preconditions.checkArgument(isAnalyzed(),
+                "The partitionIds must be obtained after the parsing is complete");
+        Map<Long, List<String>> tableIdToPartitionName = Maps.newHashMap();
+
+        for (Table table : getTables()) {
+            table.readLock();
+            try {
+                OlapTable olapTable = (OlapTable) table;
+                List<String> partitionNames = getPartitionNames();
+                if (partitionNames.isEmpty() && olapTable.isPartitioned()) {
+                    partitionNames.addAll(olapTable.getPartitionNames());
+                }
+                tableIdToPartitionName.put(table.getId(), partitionNames);
+            } finally {
+                table.readUnlock();
+            }
+        }
+        return tableIdToPartitionName;
+    }
+
     public Map<Long, List<String>> getTableIdToColumnName() throws AnalysisException {
         Preconditions.checkArgument(isAnalyzed(),
                 "The db name must be obtained after the parsing is complete");
         Map<Long, List<String>> tableIdToColumnName = Maps.newHashMap();
         List<Table> tables = getTables();
-        if (this.columnNames == null || this.columnNames.isEmpty()) {
+        if (optColumnNames == null || optColumnNames.isEmpty()) {
             for (Table table : tables) {
                 table.readLock();
                 try {
@@ -141,8 +175,8 @@ public class AnalyzeStmt extends DdlStmt {
                 }
             }
         } else {
-            for (Long tblId : this.tblIds) {
-                tableIdToColumnName.put(tblId, this.columnNames);
+            for (Long tblId : tblIds) {
+                tableIdToColumnName.put(tblId, optColumnNames);
             }
         }
 
@@ -150,7 +184,7 @@ public class AnalyzeStmt extends DdlStmt {
     }
 
     public Map<String, String> getProperties() {
-        return this.properties;
+        return optProperties;
     }
 
     @Override
@@ -158,23 +192,29 @@ public class AnalyzeStmt extends DdlStmt {
         super.analyze(analyzer);
 
         // step1: analyze db, table and column
-        if (this.dbTableName != null) {
-            this.dbTableName.analyze(analyzer);
+        if (optTableName != null) {
+            optTableName.analyze(analyzer);
+
             // disallow external catalog
-            Util.prohibitExternalCatalog(dbTableName.getCtl(), this.getClass().getSimpleName());
-            String dbName = this.dbTableName.getDb();
-            String tblName = this.dbTableName.getTbl();
-            checkAnalyzePriv(dbName, tblName);
+            Util.prohibitExternalCatalog(optTableName.getCtl(),
+                    this.getClass().getSimpleName());
 
-            Database db = analyzer.getCatalog().getInternalDataSource().getDbOrAnalysisException(dbName);
+            String dbName = optTableName.getDb();
+            String tblName = optTableName.getTbl();
+            Database db = analyzer.getCatalog().getInternalDataSource()
+                    .getDbOrAnalysisException(dbName);
             Table table = db.getTableOrAnalysisException(tblName);
 
-            if (this.columnNames != null && !this.columnNames.isEmpty()) {
+            // external table is not supported
+            checkAnalyzeType(table);
+            checkAnalyzePriv(dbName, tblName);
+
+            if (optColumnNames != null && !optColumnNames.isEmpty()) {
                 table.readLock();
                 try {
                     List<String> baseSchema = table.getBaseSchema(false)
                             .stream().map(Column::getName).collect(Collectors.toList());
-                    Optional<String> optional = this.columnNames.stream()
+                    Optional<String> optional = optColumnNames.stream()
                             .filter(entity -> !baseSchema.contains(entity)).findFirst();
                     if (optional.isPresent()) {
                         String columnName = optional.get();
@@ -185,34 +225,39 @@ public class AnalyzeStmt extends DdlStmt {
                 }
             }
 
-            this.dbId = db.getId();
-            this.tblIds.add(table.getId());
+            dbId = db.getId();
+            tblIds.add(table.getId());
         } else {
             // analyze the current default db
             String dbName = analyzer.getDefaultDb();
             if (Strings.isNullOrEmpty(dbName)) {
                 ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
             }
-            Database db = analyzer.getCatalog().getInternalDataSource().getDbOrAnalysisException(dbName);
+            Database db = analyzer.getCatalog().getInternalDataSource()
+                    .getDbOrAnalysisException(dbName);
 
             db.readLock();
             try {
                 List<Table> tables = db.getTables();
                 for (Table table : tables) {
+                    checkAnalyzeType(table);
                     checkAnalyzePriv(dbName, table.getName());
                 }
 
-                this.dbId = db.getId();
+                dbId = db.getId();
                 for (Table table : tables) {
                     long tblId = table.getId();
-                    this.tblIds.add(tblId);
+                    tblIds.add(tblId);
                 }
             } finally {
                 db.readUnlock();
             }
         }
 
-        // step2: analyze properties
+        // step2: analyze partition
+        checkPartitionNames();
+
+        // step3: analyze properties
         checkProperties();
     }
 
@@ -233,16 +278,86 @@ public class AnalyzeStmt extends DdlStmt {
         }
     }
 
-    private void checkProperties() throws UserException {
-        Optional<String> optional = this.properties.keySet().stream().filter(
-                entity -> !PROPERTIES_SET.contains(entity)).findFirst();
-        if (optional.isPresent()) {
-            throw new AnalysisException(optional.get() + " is invalid property");
+    private void checkAnalyzeType(Table table) throws AnalysisException {
+        if (table.getType() != Table.TableType.OLAP) {
+            throw new AnalysisException("Only OLAP table statistics are supported");
         }
+    }
 
-        long taskTimeout = ((Long) Util.getLongPropertyOrDefault(this.properties.get(CBO_STATISTICS_TASK_TIMEOUT_SEC),
+    private void checkPartitionNames() throws AnalysisException {
+        if (optPartitionNames != null) {
+            optPartitionNames.analyze(analyzer);
+            if (optTableName != null) {
+                Database db = analyzer.getCatalog().getInternalDataSource()
+                        .getDbOrAnalysisException(optTableName.getDb());
+                OlapTable olapTable = (OlapTable) db.getTableOrAnalysisException(optTableName.getTbl());
+                if (!olapTable.isPartitioned()) {
+                    throw new AnalysisException("Not a partitioned table: " + olapTable.getName());
+                }
+                List<String> names = optPartitionNames.getPartitionNames();
+                Set<String> olapPartitionNames = olapTable.getPartitionNames();
+                List<String> tempPartitionNames = olapTable.getTempPartitions().stream()
+                        .map(Partition::getName).collect(Collectors.toList());
+                Optional<String> optional = names.stream()
+                        .filter(name -> (tempPartitionNames.contains(name)
+                                || !olapPartitionNames.contains(name)))
+                        .findFirst();
+                if (optional.isPresent()) {
+                    throw new AnalysisException("Temporary partition or partition does not exist");
+                }
+            } else {
+                throw new AnalysisException("Specify partition should specify table name as well");
+            }
+            partitionNames.addAll(optPartitionNames.getPartitionNames());
+        }
+    }
+
+    private void checkProperties() throws UserException {
+        if (optProperties == null) {
+            optProperties = Maps.newHashMap();
+        } else {
+            Optional<String> optional = optProperties.keySet().stream().filter(
+                    entity -> !PROPERTIES_SET.contains(entity)).findFirst();
+            if (optional.isPresent()) {
+                throw new AnalysisException(optional.get() + " is invalid property");
+            }
+        }
+        long taskTimeout = ((Long) Util.getLongPropertyOrDefault(optProperties.get(CBO_STATISTICS_TASK_TIMEOUT_SEC),
                 Config.max_cbo_statistics_task_timeout_sec, DESIRED_TASK_TIMEOUT_SEC,
                 CBO_STATISTICS_TASK_TIMEOUT_SEC + " should > 0")).intValue();
-        this.properties.put(CBO_STATISTICS_TASK_TIMEOUT_SEC, String.valueOf(taskTimeout));
+        optProperties.put(CBO_STATISTICS_TASK_TIMEOUT_SEC, String.valueOf(taskTimeout));
+    }
+
+    @Override
+    public String toSql() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("ANALYZE");
+
+        if (optTableName != null) {
+            sb.append(" ");
+            sb.append(optTableName.toSql());
+        }
+
+        if  (optColumnNames != null) {
+            sb.append("(");
+            sb.append(StringUtils.join(optColumnNames, ","));
+            sb.append(")");
+        }
+
+        if (optPartitionNames != null) {
+            sb.append(" ");
+            sb.append(optPartitionNames.toSql());
+        }
+
+        if (optProperties != null) {
+            sb.append(" ");
+            sb.append("PROPERTIES(");
+            sb.append(new PrintableMap<>(optProperties, " = ",
+                    true,
+                    false));
+            sb.append(")");
+        }
+
+        return sb.toString();
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java
index 49661f0030..4a86379ef3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java
@@ -26,6 +26,9 @@ import org.apache.doris.qe.ShowResultSetMetaData;
 import org.apache.doris.statistics.ColumnStats;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import java.util.List;
 
 public class ShowColumnStatsStmt extends ShowStmt {
 
@@ -67,4 +70,9 @@ public class ShowColumnStatsStmt extends ShowStmt {
         }
         return builder.build();
     }
+
+    public List<String> getPartitionNames() {
+        // TODO(WZT): partition statistics
+        return Lists.newArrayList();
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java
index ccc9547208..ad8e7835c9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java
@@ -27,9 +27,12 @@ import org.apache.doris.qe.ShowResultSetMetaData;
 import org.apache.doris.statistics.TableStats;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import org.apache.parquet.Preconditions;
 import org.apache.parquet.Strings;
 
+import java.util.List;
+
 public class ShowTableStatsStmt extends ShowStmt {
 
     private static final ImmutableList<String> TITLE_NAMES =
@@ -89,4 +92,9 @@ public class ShowTableStatsStmt extends ShowStmt {
         }
         return builder.build();
     }
+
+    public List<String> getPartitionNames() {
+        // TODO(WZT): partition statistics
+        return Lists.newArrayList();
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index fe1e50a2fa..58a7d7a257 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -2118,14 +2118,14 @@ public class ShowExecutor {
     private void handleShowTableStats() throws AnalysisException {
         ShowTableStatsStmt showTableStatsStmt = (ShowTableStatsStmt) stmt;
         List<List<String>> results = Catalog.getCurrentCatalog().getStatisticsManager()
-                .showTableStatsList(showTableStatsStmt.getDbName(), showTableStatsStmt.getTableName());
+                .showTableStatsList(showTableStatsStmt);
         resultSet = new ShowResultSet(showTableStatsStmt.getMetaData(), results);
     }
 
     private void handleShowColumnStats() throws AnalysisException {
         ShowColumnStatsStmt showColumnStatsStmt = (ShowColumnStatsStmt) stmt;
         List<List<String>> results = Catalog.getCurrentCatalog().getStatisticsManager()
-                .showColumnStatsList(showColumnStatsStmt.getTableName());
+                .showColumnStatsList(showColumnStatsStmt);
         resultSet = new ShowResultSet(showColumnStatsStmt.getMetaData(), results);
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStats.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStats.java
index eab8bed080..e684156279 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStats.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStats.java
@@ -76,8 +76,56 @@ public class ColumnStats {
     private LiteralExpr minValue;
     private LiteralExpr maxValue;
 
-    public void updateStats(Type columnType, Map<StatsType, String> statsNameToValue) throws AnalysisException {
-        for (Map.Entry<StatsType, String> entry : statsNameToValue.entrySet()) {
+    public long getNdv() {
+        return ndv;
+    }
+
+    public float getAvgSize() {
+        return avgSize;
+    }
+
+    public long getMaxSize() {
+        return maxSize;
+    }
+
+    public long getNumNulls() {
+        return numNulls;
+    }
+
+    public LiteralExpr getMinValue() {
+        return minValue;
+    }
+
+    public LiteralExpr getMaxValue() {
+        return maxValue;
+    }
+
+    public void setNdv(long ndv) {
+        this.ndv = ndv;
+    }
+
+    public void setAvgSize(float avgSize) {
+        this.avgSize = avgSize;
+    }
+
+    public void setMaxSize(long maxSize) {
+        this.maxSize = maxSize;
+    }
+
+    public void setNumNulls(long numNulls) {
+        this.numNulls = numNulls;
+    }
+
+    public void setMinValue(LiteralExpr minValue) {
+        this.minValue = minValue;
+    }
+
+    public void setMaxValue(LiteralExpr maxValue) {
+        this.maxValue = maxValue;
+    }
+
+    public void updateStats(Type columnType, Map<StatsType, String> statsTypeToValue) throws AnalysisException {
+        for (Map.Entry<StatsType, String> entry : statsTypeToValue.entrySet()) {
             StatsType statsType = entry.getKey();
             switch (statsType) {
                 case NDV:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/MetaStatisticsTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/MetaStatisticsTask.java
index 295429bc7e..06880de2ca 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/MetaStatisticsTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/MetaStatisticsTask.java
@@ -17,21 +17,131 @@
 
 package org.apache.doris.statistics;
 
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.statistics.StatisticsTaskResult.TaskResult;
+
+import com.google.common.collect.Lists;
+
 import java.util.List;
 
-/*
-A statistics task that directly collects statistics by reading FE meta.
+/**
+ * A statistics task that directly collects statistics by reading FE meta.
+ * e.g. for fixed-length types such as Int type and Long type we get their size from metadata.
+ * 1.The granularity of row count can be table or partition, and the type should be table or partition
+ * 2.The granularity of data size can be table or partition, and the type should be table or partition
+ * 3.The granularity of max and min size can be table or partition, and the type should be column
  */
 public class MetaStatisticsTask extends StatisticsTask {
-
-    public MetaStatisticsTask(long jobId, StatsGranularityDesc granularityDesc,
-                              StatsCategoryDesc categoryDesc, List<StatsType> statsTypeList) {
-        super(jobId, granularityDesc, categoryDesc, statsTypeList);
+    public MetaStatisticsTask(long jobId, List<StatisticsDesc> statsDescs) {
+        super(jobId, statsDescs);
     }
 
     @Override
     public StatisticsTaskResult call() throws Exception {
-        // TODO
-        return null;
+        checkStatisticsDesc();
+        List<TaskResult> taskResults = Lists.newArrayList();
+
+        for (StatisticsDesc statsDesc : statsDescs) {
+            StatsCategory category = statsDesc.getStatsCategory();
+            StatsGranularity granularity = statsDesc.getStatsGranularity();
+            TaskResult result = createNewTaskResult(category, granularity);
+            List<StatsType> statsTypes = statsDesc.getStatsTypes();
+
+            for (StatsType statsType : statsTypes) {
+                switch (statsType) {
+                    case MAX_SIZE:
+                    case AVG_SIZE:
+                        getColSize(category, statsType, result);
+                        break;
+                    case ROW_COUNT:
+                        getRowCount(category.getDbId(), category.getTableId(), granularity, result);
+                        break;
+                    case DATA_SIZE:
+                        getDataSize(category.getDbId(), category.getTableId(), granularity, result);
+                        break;
+                    default:
+                        throw new DdlException("Unsupported statistics type(" + statsType + ").");
+                }
+            }
+
+            taskResults.add(result);
+        }
+
+        return new StatisticsTaskResult(taskResults);
+    }
+
+    private void getColSize(StatsCategory category, StatsType statsType,
+                            TaskResult result) throws DdlException {
+        OlapTable table = getNotNullOlapTable(category.getDbId(), category.getTableId());
+        Column column = getNotNullColumn(table, category.getColumnName());
+        int colSize = column.getDataType().getSlotSize();
+        result.getStatsTypeToValue().put(statsType, String.valueOf(colSize));
+    }
+
+    private void getRowCount(long dbId, long tableId, StatsGranularity granularity,
+                             TaskResult result) throws DdlException {
+        OlapTable table = getNotNullOlapTable(dbId, tableId);
+
+        switch (granularity.getGranularity()) {
+            case TABLE:
+                long tblRowCount = table.getRowCount();
+                result.getStatsTypeToValue().put(StatsType.ROW_COUNT, String.valueOf(tblRowCount));
+                break;
+            case PARTITION:
+                Partition partition = getNotNullPartition(granularity, table);
+                long ptRowCount = partition.getBaseIndex().getRowCount();
+                result.getStatsTypeToValue().put(StatsType.ROW_COUNT, String.valueOf(ptRowCount));
+                break;
+            case TABLET:
+            default:
+                throw new DdlException("Unsupported granularity(" + granularity + ").");
+        }
+    }
+
+    private void getDataSize(long dbId, long tableId, StatsGranularity granularity,
+                             TaskResult result) throws DdlException {
+        OlapTable table = getNotNullOlapTable(dbId, tableId);
+
+        switch (granularity.getGranularity()) {
+            case TABLE:
+                long tblDataSize = table.getDataSize();
+                result.getStatsTypeToValue().put(StatsType.DATA_SIZE, String.valueOf(tblDataSize));
+                break;
+            case PARTITION:
+                Partition partition = getNotNullPartition(granularity, table);
+                long partitionSize = partition.getBaseIndex().getDataSize();
+                result.getStatsTypeToValue().put(StatsType.DATA_SIZE, String.valueOf(partitionSize));
+                break;
+            case TABLET:
+            default:
+                throw new DdlException("Unsupported granularity(" + granularity + ").");
+        }
+    }
+
+    private OlapTable getNotNullOlapTable(long dbId, long tableId) throws DdlException {
+        Database db = Catalog.getCurrentInternalCatalog().getDbOrDdlException(dbId);
+        return (OlapTable) db.getTableOrDdlException(tableId);
+    }
+
+    private Partition getNotNullPartition(StatsGranularity granularity, OlapTable olapTable) throws DdlException {
+        Partition partition = olapTable.getPartition(granularity.getPartitionId());
+        if (partition == null) {
+            throw new DdlException("Partition(" + granularity.getPartitionId() + ") not found.");
+        }
+        return partition;
+    }
+
+    private Column getNotNullColumn(Table table, String colName) throws DdlException {
+        Column column = table.getColumn(colName);
+        if (column == null) {
+            throw new DdlException("Column(" + colName + ") not found.");
+        }
+        return column;
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapScanStatsDerive.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapScanStatsDerive.java
index 3be8fffcba..584f354ddf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapScanStatsDerive.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapScanStatsDerive.java
@@ -19,6 +19,7 @@ package org.apache.doris.statistics;
 
 import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Id;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
@@ -76,7 +77,7 @@ public class OlapScanStatsDerive extends BaseStatsDerive {
      * @param: node
      * @return: void
      */
-    public void buildStructure(OlapScanNode node) {
+    public void buildStructure(OlapScanNode node) throws AnalysisException {
         slotIdToDataSize = new HashMap<>();
         slotIdToNdv = new HashMap<>();
         slotIdToTableIdAndColumnName = new HashMap<>();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStats.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionStats.java
similarity index 56%
copy from fe/fe-core/src/main/java/org/apache/doris/statistics/TableStats.java
copy to fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionStats.java
index 08ef2cc6be..4720525e1b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStats.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/PartitionStats.java
@@ -30,26 +30,24 @@ import java.util.function.Predicate;
 
 
 /**
- * There are the statistics of table.
- * The table stats are mainly used to provide input for the Optimizer's cost model.
- * <p>
- * The description of table stats are following:
- * 1. @rowCount: The row count of table.
- * 2. @dataSize: The data size of table.
- * 3. @nameToColumnStats: <@String columnName, @ColumnStats columnStats>
- *      Each column in the Table will have corresponding @ColumnStats.
- *      Those @ColumnStats are recorded in @nameToColumnStats form of MAP.
- *      This facilitates the optimizer to quickly find the corresponding
- *      @ColumnStats based on the column name.
+ * There are the statistics of partition.
+ * The partition stats are mainly used to provide input for the Optimizer's cost model.
+ * The description of partition stats are following:
+ *     - @rowCount: The row count of partition.
+ *     - @dataSize: The data size of partition.
+ *     - @nameToColumnStats: <@String columnName, @ColumnStats columnStats>
  *
- * @rowCount: The row count of table.
- * @dataSize: The data size of table.
- * <p>
- * The granularity of the statistics is whole table.
- * For example:
- * "@rowCount = 1000" means that the row count is 1000 in the whole table.
+ * <p>Each column in the Table will have corresponding @ColumnStats.
+ * Those @ColumnStats are recorded in @nameToColumnStats form of MAP.
+ * This facilitates the optimizer to quickly find the corresponding:
+ *     - @ColumnStats: based on the column name.
+ *     - @rowCount: The row count of partition.
+ *     - @dataSize: The data size of partition.
+ *
+ * <p>The granularity of the statistics is whole partition.
+ * For example: "@rowCount = 1000" means that the row count is 1000 in the whole partition.
  */
-public class TableStats {
+public class PartitionStats {
 
     public static final StatsType DATA_SIZE = StatsType.DATA_SIZE;
     public static final StatsType ROW_COUNT = StatsType.ROW_COUNT;
@@ -59,51 +57,77 @@ public class TableStats {
 
     private long rowCount = -1;
     private long dataSize = -1;
-    private Map<String, ColumnStats> nameToColumnStats = Maps.newConcurrentMap();
+    private final Map<String, ColumnStats> nameToColumnStats = Maps.newConcurrentMap();
+
+    public Map<String, ColumnStats> getNameToColumnStats() {
+        return nameToColumnStats;
+    }
+
+    public long getRowCount() {
+        return rowCount;
+    }
+
+    public void setRowCount(long rowCount) {
+        this.rowCount = rowCount;
+    }
+
+    public long getDataSize() {
+        return dataSize;
+    }
 
-    public void updateTableStats(Map<StatsType, String> statsTypeToValue) throws AnalysisException {
+    public void setDataSize(long dataSize) {
+        this.dataSize = dataSize;
+    }
+
+    /**
+     * Update the partition stats.
+     *
+     * @param statsTypeToValue the map of stats type to value
+     * @throws AnalysisException if the stats value is not valid
+     */
+    public void updatePartitionStats(Map<StatsType, String> statsTypeToValue) throws AnalysisException {
         for (Map.Entry<StatsType, String> entry : statsTypeToValue.entrySet()) {
             StatsType statsType = entry.getKey();
+            String value = entry.getValue();
             if (statsType == ROW_COUNT) {
-                rowCount = Util.getLongPropertyOrDefault(entry.getValue(), rowCount,
+                rowCount = Util.getLongPropertyOrDefault(value, rowCount,
                         DESIRED_ROW_COUNT_PRED, ROW_COUNT + " should >= -1");
             } else if (statsType == DATA_SIZE) {
-                dataSize = Util.getLongPropertyOrDefault(entry.getValue(), dataSize,
+                dataSize = Util.getLongPropertyOrDefault(value, dataSize,
                         DESIRED_DATA_SIZE_PRED, DATA_SIZE + " should >= -1");
             }
         }
     }
 
-    public void updateColumnStats(String columnName, Type columnType, Map<StatsType, String> statsTypeToValue)
-            throws AnalysisException {
+    public void updateColumnStats(String columnName,
+                                  Type columnType,
+                                  Map<StatsType, String> statsTypeToValue) throws AnalysisException {
+        ColumnStats columnStats = getNotNullColumnStats(columnName);
+        columnStats.updateStats(columnType, statsTypeToValue);
+    }
+
+    /**
+     * If column stats is not exist, create a new one.
+     *
+     * @param columnName column name
+     * @return @ColumnStats
+     */
+    public ColumnStats getNotNullColumnStats(String columnName) {
         ColumnStats columnStats = nameToColumnStats.get(columnName);
         if (columnStats == null) {
             columnStats = new ColumnStats();
             nameToColumnStats.put(columnName, columnStats);
         }
-        columnStats.updateStats(columnType, statsTypeToValue);
+        return columnStats;
     }
 
+    /**
+     * show the partition row count and data size.
+     */
     public List<String> getShowInfo() {
         List<String> result = Lists.newArrayList();
         result.add(Long.toString(rowCount));
         result.add(Long.toString(dataSize));
         return result;
     }
-
-    public Map<String, ColumnStats> getNameToColumnStats() {
-        return nameToColumnStats;
-    }
-
-    public long getRowCount() {
-        return rowCount;
-    }
-
-    public long getDataSize() {
-        return dataSize;
-    }
-
-    public void setRowCount(long rowCount) {
-        this.rowCount = rowCount;
-    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/SQLStatisticsTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/SQLStatisticsTask.java
index ceb72c1cff..ef25c89262 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/SQLStatisticsTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/SQLStatisticsTask.java
@@ -19,22 +19,17 @@ package org.apache.doris.statistics;
 
 import org.apache.doris.analysis.SelectStmt;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-
 import java.util.List;
-import java.util.Map;
 
-/*
-A statistics task that collects statistics by executing query.
-The results of the query will be returned as @StatisticsTaskResult.
+/**
+ * A statistics task that collects statistics by executing query.
+ * The results of the query will be returned as @StatisticsTaskResult.
  */
 public class SQLStatisticsTask extends StatisticsTask {
     private SelectStmt query;
 
-    public SQLStatisticsTask(long jobId, StatsGranularityDesc granularityDesc,
-                             StatsCategoryDesc categoryDesc, List<StatsType> statsTypeList) {
-        super(jobId, granularityDesc, categoryDesc, statsTypeList);
+    public SQLStatisticsTask(long jobId, List<StatisticsDesc> statsDescs) {
+        super(jobId, statsDescs);
     }
 
     @Override
@@ -62,12 +57,7 @@ public class SQLStatisticsTask extends StatisticsTask {
     }
 
     protected StatisticsTaskResult constructTaskResult(List<String> queryResultList) {
-        Preconditions.checkState(statsTypeList.size() == queryResultList.size());
-        Map<StatsType, String> statsTypeToValue = Maps.newHashMap();
-        for (int i = 0; i < statsTypeList.size(); i++) {
-            statsTypeToValue.put(statsTypeList.get(i), queryResultList.get(i));
-        }
-        StatisticsTaskResult result = new StatisticsTaskResult(granularityDesc, categoryDesc, statsTypeToValue);
-        return result;
+        // TODO
+        return null;
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/SampleSQLStatisticsTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/SampleSQLStatisticsTask.java
index f6dadeb6ed..f9930fc8cb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/SampleSQLStatisticsTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/SampleSQLStatisticsTask.java
@@ -29,9 +29,8 @@ The only difference from the SQLStatisticsTask is that the query is a sampling t
 public class SampleSQLStatisticsTask extends SQLStatisticsTask {
     private float samplePercentage = Config.cbo_default_sample_percentage;
 
-    public SampleSQLStatisticsTask(long jobId, StatsGranularityDesc granularityDesc,
-                                   StatsCategoryDesc categoryDesc, List<StatsType> statsTypeList) {
-        super(jobId, granularityDesc, categoryDesc, statsTypeList);
+    public SampleSQLStatisticsTask(long jobId, List<StatisticsDesc> statsDescs) {
+        super(jobId, statsDescs);
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/Statistics.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/Statistics.java
index 77aed0c5f6..eb02e6e036 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/Statistics.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/Statistics.java
@@ -36,39 +36,130 @@ import java.util.Map;
  */
 public class Statistics {
 
-    private Map<Long, TableStats> idToTableStats = Maps.newConcurrentMap();
+    private final Map<Long, TableStats> idToTableStats = Maps.newConcurrentMap();
 
-    public void updateTableStats(long tableId, Map<StatsType, String> statsTypeToValue)
+    public void updateTableStats(long tableId, Map<StatsType, String> statsTypeToValue) throws AnalysisException {
+        TableStats tableStats = getNotNullTableStats(tableId);
+        tableStats.updateTableStats(statsTypeToValue);
+    }
+
+    public void updatePartitionStats(long tableId, String partitionName, Map<StatsType, String> statsTypeToValue)
             throws AnalysisException {
+        TableStats tableStats = getNotNullTableStats(tableId);
+        tableStats.updatePartitionStats(partitionName, statsTypeToValue);
+    }
+
+    public void updateColumnStats(long tableId, String columnName, Type columnType,
+                                  Map<StatsType, String> statsTypeToValue) throws AnalysisException {
+        TableStats tableStats = getNotNullTableStats(tableId);
+        tableStats.updateColumnStats(columnName, columnType, statsTypeToValue);
+    }
+
+    public void updateColumnStats(long tableId, String partitionName, String columnName, Type columnType,
+                                  Map<StatsType, String> statsTypeToValue) throws AnalysisException {
+        TableStats tableStats = getNotNullTableStats(tableId);
+        Map<String, PartitionStats> nameToPartitionStats = tableStats.getNameToPartitionStats();
+        PartitionStats partitionStats = nameToPartitionStats.get(partitionName);
+        partitionStats.updateColumnStats(columnName, columnType, statsTypeToValue);
+    }
+
+    /**
+     * if the table stats is not exist, create a new one.
+     *
+     * @param tableId table id
+     * @return @TableStats
+     */
+    public TableStats getNotNullTableStats(long tableId) {
         TableStats tableStats = idToTableStats.get(tableId);
         if (tableStats == null) {
             tableStats = new TableStats();
             idToTableStats.put(tableId, tableStats);
         }
-        tableStats.updateTableStats(statsTypeToValue);
+        return tableStats;
     }
 
-    public void updateColumnStats(long tableId, String columnName, Type columnType,
-                                  Map<StatsType, String> statsTypeToValue)
-            throws AnalysisException {
+    /**
+     * Get the table stats for the given table id.
+     *
+     * @param tableId table id
+     * @return @TableStats
+     * @throws AnalysisException if table stats not exists
+     */
+    public TableStats getTableStats(long tableId) throws AnalysisException {
         TableStats tableStats = idToTableStats.get(tableId);
         if (tableStats == null) {
-            tableStats = new TableStats();
-            idToTableStats.put(tableId, tableStats);
+            throw new AnalysisException("Table " + tableId + " has no statistics");
         }
-        tableStats.updateColumnStats(columnName, columnType, statsTypeToValue);
+        return tableStats;
     }
 
-    public TableStats getTableStats(long tableId) {
-        return idToTableStats.get(tableId);
+    /**
+     * Get the partitions stats for the given table id.
+     *
+     * @param tableId table id
+     * @return partition name and @PartitionStats
+     * @throws AnalysisException if partitions stats not exists
+     */
+    public Map<String, PartitionStats> getPartitionStats(long tableId) throws AnalysisException {
+        TableStats tableStats = getTableStats(tableId);
+        Map<String, PartitionStats> nameToPartitionStats = tableStats.getNameToPartitionStats();
+        if (nameToPartitionStats == null) {
+            throw new AnalysisException("Table " + tableId + " has no partition statistics");
+        }
+        return nameToPartitionStats;
     }
 
-    public Map<String, ColumnStats> getColumnStats(long tableId) {
+    /**
+     * Get the partition stats for the given table id and partition name.
+     *
+     * @param tableId table id
+     * @param partitionName partition name
+     * @return partition name and @PartitionStats
+     * @throws AnalysisException if partition stats not exists
+     */
+    public Map<String, PartitionStats> getPartitionStats(long tableId, String partitionName)
+            throws AnalysisException {
+        Map<String, PartitionStats> partitionStats = getPartitionStats(tableId);
+        PartitionStats partitionStat = partitionStats.get(partitionName);
+        if (partitionStat == null) {
+            throw new AnalysisException("Partition " + partitionName + " of table " + tableId + " has no statistics");
+        }
+        Map<String, PartitionStats> statsMap = Maps.newHashMap();
+        statsMap.put(partitionName, partitionStat);
+        return statsMap;
+    }
+
+    /**
+     * Get the columns stats for the given table id.
+     *
+     * @param tableId table id
+     * @return column name and @ColumnStats
+     * @throws AnalysisException if columns stats not exists
+     */
+    public Map<String, ColumnStats> getColumnStats(long tableId) throws AnalysisException {
         TableStats tableStats = getTableStats(tableId);
-        if (tableStats == null) {
-            return null;
+        Map<String, ColumnStats> nameToColumnStats = tableStats.getNameToColumnStats();
+        if (nameToColumnStats == null) {
+            throw new AnalysisException("Table " + tableId + " has no column statistics");
+        }
+        return nameToColumnStats;
+    }
+
+    /**
+     * Get the columns stats for the given table id and partition name.
+     *
+     * @param tableId table id
+     * @param partitionName partition name
+     * @return column name and @ColumnStats
+     * @throws AnalysisException if column stats not exists
+     */
+    public Map<String, ColumnStats> getColumnStats(long tableId, String partitionName) throws AnalysisException {
+        Map<String, PartitionStats> partitionStats = getPartitionStats(tableId, partitionName);
+        PartitionStats partitionStat = partitionStats.get(partitionName);
+        if (partitionStat == null) {
+            throw new AnalysisException("Partition " + partitionName + " of table " + tableId + " has no statistics");
         }
-        return tableStats.getNameToColumnStats();
+        return partitionStat.getNameToColumnStats();
     }
 
     // TODO: mock statistics need to be removed in the future
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsDesc.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsDesc.java
new file mode 100644
index 0000000000..a327fed395
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsDesc.java
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import java.util.List;
+
+public class StatisticsDesc {
+    private StatsCategory statsCategory;
+
+    private StatsGranularity statsGranularity;
+
+    private List<StatsType> statsTypes;
+
+    public StatisticsDesc(StatsCategory statsCategory,
+                          StatsGranularity statsGranularity,
+                          List<StatsType> statsTypes) {
+        this.statsCategory = statsCategory;
+        this.statsGranularity = statsGranularity;
+        this.statsTypes = statsTypes;
+    }
+
+    public StatsCategory getStatsCategory() {
+        return statsCategory;
+    }
+
+    public void setStatsCategory(StatsCategory statsCategory) {
+        this.statsCategory = statsCategory;
+    }
+
+    public StatsGranularity getStatsGranularity() {
+        return statsGranularity;
+    }
+
+    public void setStatsGranularity(StatsGranularity statsGranularity) {
+        this.statsGranularity = statsGranularity;
+    }
+
+    public List<StatsType> getStatsTypes() {
+        return statsTypes;
+    }
+
+    public void setStatsTypes(List<StatsType> statsTypes) {
+        this.statsTypes = statsTypes;
+    }
+}
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java
index 8888a4b486..719ef0ab1e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java
@@ -63,6 +63,11 @@ public class StatisticsJob {
      */
     private final Set<Long> tblIds;
 
+    /**
+     * to be collected partition stats.
+     */
+    private final Map<Long, List<String>> tableIdToPartitionName;
+
     /**
      * to be collected column stats.
      */
@@ -85,10 +90,12 @@ public class StatisticsJob {
 
     public StatisticsJob(Long dbId,
                          Set<Long> tblIds,
+                         Map<Long, List<String>> tblIdToPartitionName,
                          Map<Long, List<String>> tableIdToColumnName,
                          Map<String, String> properties) {
         this.dbId = dbId;
         this.tblIds = tblIds;
+        this.tableIdToPartitionName = tblIdToPartitionName;
         this.tableIdToColumnName = tableIdToColumnName;
         this.properties = properties == null ? Maps.newHashMap() : properties;
     }
@@ -121,6 +128,10 @@ public class StatisticsJob {
         return tblIds;
     }
 
+    public Map<Long, List<String>> getTableIdToPartitionName() {
+        return tableIdToPartitionName;
+    }
+
     public Map<Long, List<String>> getTableIdToColumnName() {
         return tableIdToColumnName;
     }
@@ -246,11 +257,12 @@ public class StatisticsJob {
      * tableId: [t1]
      * tableIdToColumnName <t1, [c1,c2,c3]>
      */
-    public static StatisticsJob fromAnalyzeStmt(AnalyzeStmt analyzeStmt) throws AnalysisException {
-        long dbId = analyzeStmt.getDbId();
-        Map<Long, List<String>> tableIdToColumnName = analyzeStmt.getTableIdToColumnName();
-        Set<Long> tblIds = analyzeStmt.getTblIds();
-        Map<String, String> properties = analyzeStmt.getProperties();
-        return new StatisticsJob(dbId, tblIds, tableIdToColumnName, properties);
+    public static StatisticsJob fromAnalyzeStmt(AnalyzeStmt stmt) throws AnalysisException {
+        long dbId = stmt.getDbId();
+        Set<Long> tblIds = stmt.getTblIds();
+        Map<Long, List<String>> tableIdToPartitionName = stmt.getTableIdToPartitionName();
+        Map<Long, List<String>> tableIdToColumnName = stmt.getTableIdToColumnName();
+        Map<String, String> properties = stmt.getProperties();
+        return new StatisticsJob(dbId, tblIds, tableIdToPartitionName, tableIdToColumnName, properties);
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java
index 23b90df198..f617003475 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java
@@ -22,20 +22,27 @@ import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.statistics.StatsCategory.Category;
+import org.apache.doris.statistics.StatsGranularity.Granularity;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
 
@@ -56,6 +63,11 @@ public class StatisticsJobScheduler extends MasterDaemon {
     private static final long COUNT_MAX_SCAN_PER_TASK = 3700000000L;
     private static final long NDV_MAX_SCAN_PER_TASK = 600000000L;
 
+    /**
+     * if the table row count is greater than the value, use sampleSqlTask instead of SqlTask.
+     */
+    private static final int MIN_SAMPLE_ROWS = 200000;
+
     /**
      * Different statistics need to be collected for the jobs submitted by users.
      * if all statistics be collected at the same time, the cluster may be overburdened
@@ -128,145 +140,380 @@ public class StatisticsJobScheduler extends MasterDaemon {
      * @see SampleSQLStatisticsTask
      * @see SQLStatisticsTask
      */
-    private void divide(StatisticsJob statisticsJob) throws DdlException {
-        long jobId = statisticsJob.getId();
-        long dbId = statisticsJob.getDbId();
-        Database db = Catalog.getCurrentInternalCatalog().getDbOrDdlException(dbId);
-        Set<Long> tblIds = statisticsJob.getTblIds();
-        Map<Long, List<String>> tableIdToColumnName = statisticsJob.getTableIdToColumnName();
-        List<StatisticsTask> tasks = statisticsJob.getTasks();
-        List<Long> backendIds = Catalog.getCurrentSystemInfo().getBackendIds(true);
+    private void divide(StatisticsJob job) throws DdlException {
+        Database db = Catalog.getCurrentInternalCatalog().getDbOrDdlException(job.getDbId());
+        Set<Long> tblIds = job.getTblIds();
 
         for (Long tblId : tblIds) {
-            Table tbl = db.getTableOrDdlException(tblId);
-            long rowCount = tbl.getRowCount();
-            List<Long> partitionIds = ((OlapTable) tbl).getPartitionIds();
-            List<String> columnNameList = tableIdToColumnName.get(tblId);
-
-            // step 1: generate data_size task
-            StatsCategoryDesc dataSizeCategory = getTblStatsCategoryDesc(dbId, tblId);
-            StatsGranularityDesc dataSizeGranularity = getTblStatsGranularityDesc(tblId);
-            MetaStatisticsTask dataSizeTask = new MetaStatisticsTask(jobId,
-                    dataSizeGranularity, dataSizeCategory, Collections.singletonList(StatsType.DATA_SIZE));
-            tasks.add(dataSizeTask);
-
-            // step 2: generate row_count task
-            KeysType keysType = ((OlapTable) tbl).getKeysType();
-            if (keysType == KeysType.DUP_KEYS) {
-                StatsCategoryDesc rowCountCategory = getTblStatsCategoryDesc(dbId, tblId);
-                StatsGranularityDesc rowCountGranularity = getTblStatsGranularityDesc(tblId);
-                MetaStatisticsTask metaTask = new MetaStatisticsTask(jobId,
-                        rowCountGranularity, rowCountCategory, Collections.singletonList(StatsType.ROW_COUNT));
-                tasks.add(metaTask);
+            Optional<Table> optionalTbl = db.getTable(tblId);
+            if (!optionalTbl.isPresent()) {
+                LOG.warn("Table(id={}) not found in the database {}", tblId, db.getFullName());
+                continue;
+            }
+            Table table = optionalTbl.get();
+            if (!table.isPartitioned()) {
+                getStatsTaskByTable(job, tblId);
             } else {
-                if (rowCount > backendIds.size() * COUNT_MAX_SCAN_PER_TASK) {
-                    // divide subtasks by partition
-                    for (Long partitionId : partitionIds) {
-                        StatsCategoryDesc rowCountCategory = getTblStatsCategoryDesc(dbId, tblId);
-                        StatsGranularityDesc rowCountGranularity = getPartitionStatsGranularityDesc(tblId, partitionId);
-                        SQLStatisticsTask sqlTask = new SQLStatisticsTask(jobId,
-                                rowCountGranularity, rowCountCategory, Collections.singletonList(StatsType.ROW_COUNT));
-                        tasks.add(sqlTask);
-                    }
-                } else {
-                    StatsCategoryDesc rowCountCategory = getTblStatsCategoryDesc(dbId, tblId);
-                    StatsGranularityDesc rowCountGranularity = getTblStatsGranularityDesc(tblId);
-                    SQLStatisticsTask sqlTask = new SQLStatisticsTask(jobId,
-                            rowCountGranularity, rowCountCategory, Collections.singletonList(StatsType.ROW_COUNT));
-                    tasks.add(sqlTask);
+                getStatsTaskByPartition(job, tblId);
+            }
+        }
+    }
+
+    /**
+     * For non-partitioned table, dividing the job into several subtasks.
+     *
+     * @param job statistics job
+     * @param tableId table id
+     * @throws DdlException exception
+     */
+    private void getStatsTaskByTable(StatisticsJob job, long tableId) throws DdlException {
+        Database db = Catalog.getCurrentInternalCatalog().getDbOrDdlException(job.getDbId());
+        OlapTable table = (OlapTable) db.getTableOrDdlException(tableId);
+
+        Map<Long, List<String>> tblIdToColName = job.getTableIdToColumnName();
+        List<String> colNames = tblIdToColName.get(tableId);
+
+        List<Long> backendIds = Catalog.getCurrentSystemInfo().getBackendIds(true);
+
+        // step1: collect statistics by metadata
+        List<StatisticsDesc> descs = Lists.newArrayList();
+
+        // table data size
+        StatsCategory dsCategory = getTableStatsCategory(job.getDbId(), tableId);
+        StatsGranularity dsGranularity = getTableGranularity(tableId);
+        StatisticsDesc dsStatsDesc = new StatisticsDesc(dsCategory,
+                dsGranularity, Collections.singletonList(StatsType.DATA_SIZE));
+        descs.add(dsStatsDesc);
+
+        // table row count
+        if (table.getKeysType() == KeysType.DUP_KEYS) {
+            StatsCategory rcCategory = getTableStatsCategory(job.getDbId(), tableId);
+            StatsGranularity rcGranularity = getTableGranularity(tableId);
+            StatisticsDesc rcStatsDesc = new StatisticsDesc(rcCategory,
+                    rcGranularity, Collections.singletonList(StatsType.ROW_COUNT));
+            descs.add(rcStatsDesc);
+        }
+
+        // variable-length columns
+        List<String> strColNames = Lists.newArrayList();
+
+        // column max size and avg size
+        for (String colName : colNames) {
+            Column column = table.getColumn(colName);
+            if (column == null) {
+                LOG.info("column {} not found in table {}", colName, table.getName());
+                continue;
+            }
+            Type colType = column.getType();
+            if (colType.isStringType()) {
+                strColNames.add(colName);
+                continue;
+            }
+            StatsCategory colCategory = getColumnStatsCategory(job.getDbId(), tableId, colName);
+            StatsGranularity colGranularity = getTableGranularity(tableId);
+            StatisticsDesc colStatsDesc = new StatisticsDesc(colCategory,
+                    colGranularity, Arrays.asList(StatsType.MAX_SIZE, StatsType.AVG_SIZE));
+            descs.add(colStatsDesc);
+        }
+
+        // all meta statistics are collected in one task
+        MetaStatisticsTask metaStatsTask = new MetaStatisticsTask(job.getId(), descs);
+        job.getTasks().add(metaStatsTask);
+
+        long rowCount = table.getRowCount();
+
+        // step2: collect statistics by sql
+        // table row count (table model is AGGREGATE or UNIQUE)
+        if (table.getKeysType() != KeysType.DUP_KEYS) {
+            if (rowCount < backendIds.size() * COUNT_MAX_SCAN_PER_TASK) {
+                StatsCategory rcCategory = getTableStatsCategory(job.getDbId(), tableId);
+                StatsGranularity rcGranularity = getTableGranularity(tableId);
+                StatisticsDesc rcStatsDesc = new StatisticsDesc(rcCategory,
+                        rcGranularity, Collections.singletonList(StatsType.ROW_COUNT));
+                SQLStatisticsTask sqlTask = new SQLStatisticsTask(job.getId(),
+                        Collections.singletonList(rcStatsDesc));
+                job.getTasks().add(sqlTask);
+            } else {
+                // divide subtasks by tablet
+                Collection<Partition> partitions = table.getPartitions();
+                for (Partition partition : partitions) {
+                    Collection<Tablet> tablets = partition.getBaseIndex().getTablets();
+                    tablets.forEach(tablet -> {
+                        StatsCategory rcCategory = getTableStatsCategory(job.getDbId(), tableId);
+                        StatsGranularity rcGranularity = getTabletGranularity(tablet.getId());
+                        StatisticsDesc rcStatsDesc = new StatisticsDesc(rcCategory,
+                                rcGranularity, Collections.singletonList(StatsType.ROW_COUNT));
+                        SQLStatisticsTask sqlTask = new SQLStatisticsTask(job.getId(),
+                                Collections.singletonList(rcStatsDesc));
+                        job.getTasks().add(sqlTask);
+                    });
                 }
             }
+        }
+
+        // column max size, avg size
+        for (String colName : strColNames) {
+            StatsCategory colCategory = getColumnStatsCategory(job.getDbId(), tableId, colName);
+            StatsGranularity colGranularity = getTableGranularity(tableId);
+            getColumnSizeSqlTask(job, rowCount, colCategory, colGranularity);
+        }
+
+        // column num nulls
+        for (String colName : colNames) {
+            StatsCategory colCategory = getColumnStatsCategory(job.getDbId(), tableId, colName);
+            StatsGranularity colGranularity = getTableGranularity(tableId);
+            StatisticsDesc colStatsDesc = new StatisticsDesc(colCategory,
+                    colGranularity, Collections.singletonList(StatsType.NUM_NULLS));
+            SQLStatisticsTask sqlTask = new SQLStatisticsTask(job.getId(),
+                    Collections.singletonList(colStatsDesc));
+            job.getTasks().add(sqlTask);
+        }
 
-            // step 3: generate [min,max,ndv] task
-            if (rowCount > backendIds.size() * NDV_MAX_SCAN_PER_TASK) {
-                // divide subtasks by partition
-                columnNameList.forEach(columnName -> {
-                    for (Long partitionId : partitionIds) {
-                        StatsCategoryDesc columnCategory = getColStatsCategoryDesc(dbId, tblId, columnName);
-                        StatsGranularityDesc columnGranularity = getPartitionStatsGranularityDesc(tblId, partitionId);
-                        List<StatsType> statsTypes = Arrays.asList(
-                                StatsType.MIN_VALUE, StatsType.MAX_VALUE, StatsType.NDV);
-                        SQLStatisticsTask sqlTask = new SQLStatisticsTask(
-                                jobId, columnGranularity, columnCategory, statsTypes);
-                        tasks.add(sqlTask);
-                    }
-                });
+        // column max value, min value and ndv
+        for (String colName : colNames) {
+            if (rowCount < backendIds.size() * NDV_MAX_SCAN_PER_TASK) {
+                StatsCategory colCategory = getColumnStatsCategory(job.getDbId(), tableId, colName);
+                StatsGranularity colGranularity = getTableGranularity(tableId);
+                StatisticsDesc colStatsDesc = new StatisticsDesc(colCategory,
+                        colGranularity, Arrays.asList(StatsType.MAX_VALUE, StatsType.MIN_VALUE, StatsType.NDV));
+                SQLStatisticsTask sqlTask = new SQLStatisticsTask(job.getId(),
+                        Collections.singletonList(colStatsDesc));
+                job.getTasks().add(sqlTask);
             } else {
-                for (String columnName : columnNameList) {
-                    StatsCategoryDesc columnCategory = getColStatsCategoryDesc(dbId, tblId, columnName);
-                    StatsGranularityDesc columnGranularity = getTblStatsGranularityDesc(tblId);
-                    List<StatsType> statsTypes = Arrays.asList(StatsType.MIN_VALUE, StatsType.MAX_VALUE, StatsType.NDV);
-                    SQLStatisticsTask sqlTask = new SQLStatisticsTask(
-                            jobId, columnGranularity, columnCategory, statsTypes);
-                    tasks.add(sqlTask);
+                // for non-partitioned table system automatically
+                // generates a partition with the same name as the table name
+                Collection<Partition> partitions = table.getPartitions();
+                for (Partition partition : partitions) {
+                    List<Tablet> tablets = partition.getBaseIndex().getTablets();
+                    tablets.forEach(tablet -> {
+                        StatsCategory colCategory = getColumnStatsCategory(job.getDbId(), tableId, colName);
+                        StatsGranularity colGranularity = getTabletGranularity(tablet.getId());
+                        StatisticsDesc colStatsDesc = new StatisticsDesc(colCategory,
+                                colGranularity, Arrays.asList(StatsType.MAX_VALUE, StatsType.MIN_VALUE, StatsType.NDV));
+                        SQLStatisticsTask sqlTask = new SQLStatisticsTask(job.getId(),
+                                Collections.singletonList(colStatsDesc));
+                        job.getTasks().add(sqlTask);
+                    });
                 }
             }
+        }
+    }
+
+    /**
+     * If table is partitioned, dividing the job into several subtasks by partition.
+     *
+     * @param job statistics job
+     * @param tableId table id
+     * @throws DdlException exception
+     */
+    private void getStatsTaskByPartition(StatisticsJob job, long tableId) throws DdlException {
+        Database db = Catalog.getCurrentInternalCatalog().getDbOrDdlException(job.getDbId());
+        OlapTable table = (OlapTable) db.getTableOrDdlException(tableId);
+
+        Map<Long, List<String>> tblIdToColName = job.getTableIdToColumnName();
+        List<String> colNames = tblIdToColName.get(tableId);
+
+        Map<Long, List<String>> tblIdToPartitionName = job.getTableIdToPartitionName();
+        List<String> partitionNames = tblIdToPartitionName.get(tableId);
+
+        List<Long> backendIds = Catalog.getCurrentSystemInfo().getBackendIds(true);
+
+        for (String partitionName : partitionNames) {
+            Partition partition = table.getPartition(partitionName);
+            if (partition == null) {
+                LOG.info("Partition {} not found in the table {}", partitionName, table.getName());
+                continue;
+            }
+
+            long partitionId = partition.getId();
+            long rowCount = partition.getBaseIndex().getRowCount();
+
+            // step1: collect statistics by metadata
+            List<StatisticsDesc> descs = Lists.newArrayList();
+
+            // partition data size
+            StatsCategory dsCategory = getPartitionStatsCategory(job.getDbId(), tableId, partitionName);
+            StatsGranularity dsGranularity = getPartitionGranularity(partitionId);
+            StatisticsDesc dsStatsDesc = new StatisticsDesc(dsCategory,
+                    dsGranularity, Collections.singletonList(StatsType.DATA_SIZE));
+            descs.add(dsStatsDesc);
 
-            // step 4: generate num_nulls task
-            for (String columnName : columnNameList) {
-                StatsCategoryDesc columnCategory = getColStatsCategoryDesc(dbId, tblId, columnName);
-                StatsGranularityDesc columnGranularity = getTblStatsGranularityDesc(tblId);
-                SQLStatisticsTask sqlTask = new SQLStatisticsTask(jobId,
-                        columnGranularity, columnCategory, Collections.singletonList(StatsType.NUM_NULLS));
-                tasks.add(sqlTask);
+            // partition row count
+            if (table.getKeysType() == KeysType.DUP_KEYS) {
+                StatsCategory rcCategory = getPartitionStatsCategory(job.getDbId(), tableId, partitionName);
+                StatsGranularity rcGranularity = getPartitionGranularity(partitionId);
+                StatisticsDesc rcStatsDesc = new StatisticsDesc(rcCategory,
+                        rcGranularity, Collections.singletonList(StatsType.ROW_COUNT));
+                descs.add(rcStatsDesc);
             }
 
-            // step 5: generate [max_col_lens, avg_col_lens] task
-            for (String columnName : columnNameList) {
-                StatsCategoryDesc columnCategory = getColStatsCategoryDesc(dbId, tblId, columnName);
-                StatsGranularityDesc columnGranularity = getTblStatsGranularityDesc(tblId);
-                List<StatsType> statsTypes = Arrays.asList(StatsType.MAX_SIZE, StatsType.AVG_SIZE);
-                Column column = tbl.getColumn(columnName);
+            // variable-length columns
+            List<String> strColNames = Lists.newArrayList();
+
+            // column max size and avg size
+            for (String colName : colNames) {
+                Column column = table.getColumn(colName);
+                if (column == null) {
+                    LOG.info("Column {} not found in the table {}", colName, table.getName());
+                    continue;
+                }
                 Type colType = column.getType();
                 if (colType.isStringType()) {
-                    SQLStatisticsTask sampleSqlTask = new SampleSQLStatisticsTask(
-                            jobId, columnGranularity, columnCategory, statsTypes);
-                    tasks.add(sampleSqlTask);
+                    strColNames.add(colName);
+                    continue;
+                }
+                StatsCategory colCategory = getColumnStatsCategory(job.getDbId(), tableId, partitionName, colName);
+                StatsGranularity colGranularity = getPartitionGranularity(partitionId);
+                StatisticsDesc colStatsDesc = new StatisticsDesc(colCategory,
+                        colGranularity, Arrays.asList(StatsType.MAX_SIZE, StatsType.AVG_SIZE));
+                descs.add(colStatsDesc);
+            }
+
+            // all meta statistics are collected in one task
+            MetaStatisticsTask metaStatsTask = new MetaStatisticsTask(job.getId(), descs);
+            job.getTasks().add(metaStatsTask);
+
+            // step2: collect statistics by sql
+            // partition row count (table model is AGGREGATE or UNIQUE)
+            if (table.getKeysType() != KeysType.DUP_KEYS) {
+                if (rowCount < backendIds.size() * COUNT_MAX_SCAN_PER_TASK) {
+                    StatsCategory rcCategory = getPartitionStatsCategory(job.getDbId(), tableId, partitionName);
+                    StatsGranularity rcGranularity = getPartitionGranularity(partitionId);
+                    StatisticsDesc rcStatsDesc = new StatisticsDesc(rcCategory,
+                            rcGranularity, Collections.singletonList(StatsType.ROW_COUNT));
+                    SQLStatisticsTask sqlTask = new SQLStatisticsTask(job.getId(),
+                            Collections.singletonList(rcStatsDesc));
+                    job.getTasks().add(sqlTask);
                 } else {
-                    MetaStatisticsTask metaTask = new MetaStatisticsTask(
-                            jobId, columnGranularity, columnCategory, statsTypes);
-                    tasks.add(metaTask);
+                    // divide subtasks by tablet
+                    List<Tablet> tablets = partition.getBaseIndex().getTablets();
+                    tablets.forEach(tablet -> {
+                        StatsCategory rcCategory = getPartitionStatsCategory(job.getDbId(), tableId, partitionName);
+                        StatsGranularity rcGranularity = getTabletGranularity(tablet.getId());
+                        StatisticsDesc rcStatsDesc = new StatisticsDesc(rcCategory,
+                                rcGranularity, Collections.singletonList(StatsType.ROW_COUNT));
+                        SQLStatisticsTask sqlTask = new SQLStatisticsTask(job.getId(),
+                                Collections.singletonList(rcStatsDesc));
+                        job.getTasks().add(sqlTask);
+                    });
                 }
             }
+
+            // column max size, avg size
+            for (String colName : strColNames) {
+                StatsCategory colCategory = getColumnStatsCategory(job.getDbId(), tableId, partitionName, colName);
+                StatsGranularity colGranularity = getPartitionGranularity(partitionId);
+                getColumnSizeSqlTask(job, rowCount, colCategory, colGranularity);
+            }
+
+            // column null nums
+            for (String colName : colNames) {
+                StatsCategory colCategory = getColumnStatsCategory(job.getDbId(), tableId, partitionName, colName);
+                StatsGranularity colGranularity = getPartitionGranularity(partitionId);
+                StatisticsDesc colStatsDesc = new StatisticsDesc(colCategory,
+                        colGranularity, Collections.singletonList(StatsType.NUM_NULLS));
+                SQLStatisticsTask sqlTask = new SQLStatisticsTask(job.getId(),
+                        Collections.singletonList(colStatsDesc));
+                job.getTasks().add(sqlTask);
+            }
+
+            // column max value, min value and ndv
+            for (String colName : colNames) {
+                if (rowCount < backendIds.size() * NDV_MAX_SCAN_PER_TASK) {
+                    StatsCategory colCategory = getColumnStatsCategory(job.getDbId(), tableId, partitionName, colName);
+                    StatsGranularity colGranularity = getPartitionGranularity(partitionId);
+                    StatisticsDesc colStatsDesc = new StatisticsDesc(colCategory,
+                            colGranularity, Arrays.asList(StatsType.MAX_VALUE, StatsType.MIN_VALUE, StatsType.NDV));
+                    SQLStatisticsTask sqlTask = new SQLStatisticsTask(job.getId(),
+                            Collections.singletonList(colStatsDesc));
+                    job.getTasks().add(sqlTask);
+                } else {
+                    // divide subtasks by tablet
+                    List<Tablet> tablets = partition.getBaseIndex().getTablets();
+                    tablets.forEach(tablet -> {
+                        StatsCategory colCategory = getColumnStatsCategory(job.getDbId(),
+                                tableId, partitionName, colName);
+                        StatsGranularity colGranularity = getTabletGranularity(tablet.getId());
+                        StatisticsDesc colStatsDesc = new StatisticsDesc(colCategory,
+                                colGranularity, Arrays.asList(StatsType.MAX_VALUE, StatsType.MIN_VALUE, StatsType.NDV));
+                        SQLStatisticsTask sqlTask = new SQLStatisticsTask(job.getId(),
+                                Collections.singletonList(colStatsDesc));
+                        job.getTasks().add(sqlTask);
+                    });
+                }
+            }
+        }
+    }
+
+    private void getColumnSizeSqlTask(StatisticsJob job, long rowCount,
+                                      StatsCategory colCategory, StatsGranularity colGranularity) {
+        StatisticsDesc colStatsDesc = new StatisticsDesc(colCategory,
+                colGranularity, Arrays.asList(StatsType.MAX_SIZE, StatsType.AVG_SIZE));
+        SQLStatisticsTask sqlTask;
+        if (rowCount < MIN_SAMPLE_ROWS) {
+            sqlTask = new SQLStatisticsTask(job.getId(), Collections.singletonList(colStatsDesc));
+        } else {
+            sqlTask = new SampleSQLStatisticsTask(job.getId(), Collections.singletonList(colStatsDesc));
         }
+        job.getTasks().add(sqlTask);
+    }
+
+    private StatsCategory getTableStatsCategory(long dbId, long tableId) {
+        StatsCategory category = new StatsCategory();
+        category.setCategory(StatsCategory.Category.TABLE);
+        category.setDbId(dbId);
+        category.setTableId(tableId);
+        return category;
+    }
+
+    private StatsCategory getPartitionStatsCategory(long dbId, long tableId, String partitionName) {
+        StatsCategory category = new StatsCategory();
+        category.setCategory(Category.PARTITION);
+        category.setDbId(dbId);
+        category.setTableId(tableId);
+        category.setPartitionName(partitionName);
+        return category;
     }
 
-    private StatsCategoryDesc getTblStatsCategoryDesc(long dbId, long tableId) {
-        StatsCategoryDesc statsCategoryDesc = new StatsCategoryDesc();
-        statsCategoryDesc.setCategory(StatsCategoryDesc.StatsCategory.TABLE);
-        statsCategoryDesc.setDbId(dbId);
-        statsCategoryDesc.setTableId(tableId);
-        return statsCategoryDesc;
+    private StatsCategory getColumnStatsCategory(long dbId, long tableId, String columnName) {
+        StatsCategory category = new StatsCategory();
+        category.setDbId(dbId);
+        category.setTableId(tableId);
+        category.setColumnName(columnName);
+        category.setCategory(Category.COLUMN);
+        category.setColumnName(columnName);
+        return category;
     }
 
-    private StatsCategoryDesc getColStatsCategoryDesc(long dbId, long tableId, String columnName) {
-        StatsCategoryDesc statsCategoryDesc = new StatsCategoryDesc();
-        statsCategoryDesc.setDbId(dbId);
-        statsCategoryDesc.setTableId(tableId);
-        statsCategoryDesc.setCategory(StatsCategoryDesc.StatsCategory.COLUMN);
-        statsCategoryDesc.setColumnName(columnName);
-        return statsCategoryDesc;
+    private StatsCategory getColumnStatsCategory(long dbId, long tableId, String partitionName, String columnName) {
+        StatsCategory category = new StatsCategory();
+        category.setDbId(dbId);
+        category.setTableId(tableId);
+        category.setPartitionName(partitionName);
+        category.setColumnName(columnName);
+        category.setCategory(Category.COLUMN);
+        category.setColumnName(columnName);
+        return category;
     }
 
-    private StatsGranularityDesc getTblStatsGranularityDesc(long tableId) {
-        StatsGranularityDesc statsGranularityDesc = new StatsGranularityDesc();
-        statsGranularityDesc.setTableId(tableId);
-        statsGranularityDesc.setGranularity(StatsGranularityDesc.StatsGranularity.TABLE);
-        return statsGranularityDesc;
+    private StatsGranularity getTableGranularity(long tableId) {
+        StatsGranularity granularity = new StatsGranularity();
+        granularity.setTableId(tableId);
+        granularity.setGranularity(Granularity.TABLE);
+        return granularity;
     }
 
-    private StatsGranularityDesc getPartitionStatsGranularityDesc(long tableId, long partitionId) {
-        StatsGranularityDesc statsGranularityDesc = new StatsGranularityDesc();
-        statsGranularityDesc.setTableId(tableId);
-        statsGranularityDesc.setPartitionId(partitionId);
-        statsGranularityDesc.setGranularity(StatsGranularityDesc.StatsGranularity.PARTITION);
-        return statsGranularityDesc;
+    private StatsGranularity getPartitionGranularity(long partitionId) {
+        StatsGranularity granularity = new StatsGranularity();
+        granularity.setPartitionId(partitionId);
+        granularity.setGranularity(Granularity.PARTITION);
+        return granularity;
     }
 
-    private StatsGranularityDesc getTabletStatsGranularityDesc(long tableId) {
-        StatsGranularityDesc statsGranularityDesc = new StatsGranularityDesc();
-        statsGranularityDesc.setTableId(tableId);
-        statsGranularityDesc.setGranularity(StatsGranularityDesc.StatsGranularity.PARTITION);
-        return statsGranularityDesc;
+    private StatsGranularity getTabletGranularity(long tabletId) {
+        StatsGranularity granularity = new StatsGranularity();
+        granularity.setTabletId(tabletId);
+        granularity.setGranularity(Granularity.TABLET);
+        return granularity;
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsManager.java
index ec5514af81..36a2c395ae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsManager.java
@@ -19,60 +19,225 @@ package org.apache.doris.statistics;
 
 import org.apache.doris.analysis.AlterColumnStatsStmt;
 import org.apache.doris.analysis.AlterTableStatsStmt;
+import org.apache.doris.analysis.ShowColumnStatsStmt;
+import org.apache.doris.analysis.ShowTableStatsStmt;
 import org.apache.doris.analysis.TableName;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Table;
-import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.statistics.StatisticsTaskResult.TaskResult;
+import org.apache.doris.statistics.StatsGranularity.Granularity;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.math.NumberUtils;
 
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 
 public class StatisticsManager {
-    private static final Logger LOG = LogManager.getLogger(StatisticsManager.class);
 
-    private Statistics statistics;
+    private final Statistics statistics;
 
     public StatisticsManager() {
         statistics = new Statistics();
     }
 
-    public void alterTableStatistics(AlterTableStatsStmt stmt)
-            throws AnalysisException {
-        TableIf table = validateTableName(stmt.getTableName());
-        statistics.updateTableStats(table.getId(), stmt.getStatsTypeToValue());
+    public Statistics getStatistics() {
+        return statistics;
+    }
+
+    /**
+     * Alter table or partition stats. if partition name is not null, update partition stats.
+     *
+     * @param stmt alter table stats stmt
+     * @throws AnalysisException if table or partition not exist
+     */
+    public void alterTableStatistics(AlterTableStatsStmt stmt) throws AnalysisException {
+        Table table = validateTableName(stmt.getTableName());
+        List<String> partitionNames = stmt.getPartitionNames();
+        Map<StatsType, String> statsTypeToValue = stmt.getStatsTypeToValue();
+
+        if (partitionNames.isEmpty()) {
+            statistics.updateTableStats(table.getId(), statsTypeToValue);
+            return;
+        }
+
+        for (String partitionName : partitionNames) {
+            partitionName = validatePartitionName(table, partitionName);
+            statistics.updatePartitionStats(table.getId(), partitionName, statsTypeToValue);
+        }
     }
 
+    /**
+     * Alter column stats. if partition name is not null, update column of partition stats.
+     *
+     * @param stmt alter column stats stmt
+     * @throws AnalysisException if table, column or partition not exist
+     */
     public void alterColumnStatistics(AlterColumnStatsStmt stmt) throws AnalysisException {
-        TableIf table = validateTableName(stmt.getTableName());
-        String columnName = stmt.getColumnName();
-        Column column = table.getColumn(columnName);
-        if (column == null) {
-            ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_FIELD_ERROR, columnName, table.getName());
+        Table table = validateTableName(stmt.getTableName());
+        String colName = stmt.getColumnName();
+        List<String> partitionNames = stmt.getPartitionNames();
+        Map<StatsType, String> statsTypeToValue = stmt.getStatsTypeToValue();
+
+        if ((partitionNames.isEmpty()) && table.isPartitioned()) {
+            throw new AnalysisException("Partitioned table must specify partition name.");
+        }
+
+        if (partitionNames.isEmpty()) {
+            Column column = validateColumn(table, colName);
+            Type colType = column.getType();
+            statistics.updateColumnStats(table.getId(), colName, colType, statsTypeToValue);
+            return;
+        }
+
+        for (String partitionName : partitionNames) {
+            validatePartitionName(table, partitionName);
+            Column column = validateColumn(table, colName);
+            Type colType = column.getType();
+            statistics.updateColumnStats(table.getId(), partitionName, colName, colType, statsTypeToValue);
+        }
+    }
+
+    /**
+     * Update statistics. there are three types of statistics: column, table and column.
+     *
+     * @param statsTaskResults statistics task results
+     * @throws AnalysisException if column, table or partition not exist
+     */
+    public void updateStatistics(List<StatisticsTaskResult> statsTaskResults) throws AnalysisException {
+        // tablet granularity stats(row count, max value, min value, ndv)
+        Map<StatsType, Map<TaskResult, List<String>>> tabletStats = Maps.newHashMap();
+
+        for (StatisticsTaskResult statsTaskResult : statsTaskResults) {
+            if (statsTaskResult != null) {
+                List<TaskResult> taskResults = statsTaskResult.getTaskResults();
+
+                for (TaskResult result : taskResults) {
+                    validateResult(result);
+                    long tblId = result.getTableId();
+                    Map<StatsType, String> statsTypeToValue = result.getStatsTypeToValue();
+
+                    if (result.getGranularity() == Granularity.TABLET) {
+                        statsTypeToValue.forEach((statsType, value) -> {
+                            if (tabletStats.containsKey(statsType)) {
+                                Map<TaskResult, List<String>> resultToValue = tabletStats.get(statsType);
+                                List<String> values = resultToValue.get(result);
+                                values.add(value);
+                            } else {
+                                Map<TaskResult, List<String>> resultToValue = Maps.newHashMap();
+                                List<String> values = Lists.newArrayList();
+                                values.add(value);
+                                resultToValue.put(result, values);
+                                tabletStats.put(statsType, resultToValue);
+                            }
+                        });
+                        continue;
+                    }
+
+                    switch (result.getCategory()) {
+                        case TABLE:
+                            statistics.updateTableStats(tblId, statsTypeToValue);
+                            break;
+                        case PARTITION:
+                            String partitionName = result.getPartitionName();
+                            statistics.updatePartitionStats(tblId, partitionName, statsTypeToValue);
+                            break;
+                        case COLUMN:
+                            updateColumnStats(result, statsTypeToValue);
+                            break;
+                        default:
+                            throw new AnalysisException("Unknown stats category: " + result.getCategory());
+                    }
+                }
+            }
+        }
+
+        // update tablet granularity stats
+        updateTabletStats(tabletStats);
+    }
+
+    private void updateColumnStats(TaskResult result, Map<StatsType, String> statsTypeToValue)
+            throws AnalysisException {
+        long dbId = result.getDbId();
+        long tblId = result.getTableId();
+        String partitionName = result.getPartitionName();
+        String colName = result.getColumnName();
+
+        Database db = Catalog.getCurrentInternalCatalog().getDbOrAnalysisException(dbId);
+        OlapTable table = (OlapTable) db.getTableOrAnalysisException(tblId);
+        Column column = table.getColumn(colName);
+        Type colType = column.getType();
+
+        switch (result.getGranularity()) {
+            case TABLE:
+                statistics.updateColumnStats(tblId, colName, colType, statsTypeToValue);
+                break;
+            case PARTITION:
+                statistics.updateColumnStats(tblId, partitionName, colName, colType, statsTypeToValue);
+                break;
+            default:
+                // The tablet granularity is handle separately
+                throw new AnalysisException("Unknown granularity: " + result.getGranularity());
         }
-        // match type and column value
-        statistics.updateColumnStats(table.getId(), columnName, column.getType(), stmt.getStatsTypeToValue());
     }
 
-    public List<List<String>> showTableStatsList(String dbName, String tableName)
+    private void updateTabletStats(Map<StatsType, Map<TaskResult, List<String>>> tabletStats)
             throws AnalysisException {
-        DatabaseIf<TableIf> db = Catalog.getCurrentCatalog().getCurrentDataSource().getDbOrAnalysisException(dbName);
+        for (Map.Entry<StatsType, Map<TaskResult, List<String>>> statsEntry : tabletStats.entrySet()) {
+            StatsType statsType = statsEntry.getKey();
+            Map<TaskResult, List<String>> resultToValue = statsEntry.getValue();
+
+            for (Map.Entry<TaskResult, List<String>> resultEntry : resultToValue.entrySet()) {
+                TaskResult result = resultEntry.getKey();
+                List<String> values = resultEntry.getValue();
+
+                switch (statsType) {
+                    case ROW_COUNT:
+                        updateTabletRowCount(result, values);
+                        break;
+                    case MAX_VALUE:
+                        updateTabletMaxValue(result, values);
+                        break;
+                    case MIN_VALUE:
+                        updateTabletMinValue(result, values);
+                        break;
+                    case NDV:
+                        updateTabletNDV(result, values);
+                        break;
+                    default:
+                        throw new AnalysisException("Unknown stats type: " + statsType);
+                }
+            }
+        }
+    }
+
+    /**
+     * Get the statistics of a table. if specified partition name, get the statistics of the partition.
+     *
+     * @param stmt statement
+     * @return partition or table statistics
+     * @throws AnalysisException statistics not exist
+     */
+    public List<List<String>> showTableStatsList(ShowTableStatsStmt stmt) throws AnalysisException {
+        String dbName = stmt.getDbName();
+        Database db = Catalog.getCurrentInternalCatalog().getDbOrAnalysisException(dbName);
+        String tableName = stmt.getTableName();
         List<List<String>> result = Lists.newArrayList();
+
         if (tableName != null) {
-            TableIf table = db.getTableOrAnalysisException(tableName);
+            Table table = db.getTableOrAnalysisException(tableName);
             // check priv
             if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), dbName, tableName,
                     PrivPredicate.SHOW)) {
@@ -81,12 +246,21 @@ public class StatisticsManager {
                         ConnectContext.get().getRemoteIP(),
                         dbName + ": " + tableName);
             }
-            // get stats
-            result.add(showTableStats(table));
+
+            List<String> partitionNames = stmt.getPartitionNames();
+
+            if (partitionNames.isEmpty()) {
+                result.add(showTableStats(table));
+            } else {
+                for (String partitionName : partitionNames) {
+                    validatePartitionName(table, partitionName);
+                    result.add(showTableStats(table, partitionName));
+                }
+            }
         } else {
-            for (TableIf table : db.getTables()) {
-                if (!Catalog.getCurrentCatalog().getAuth()
-                        .checkTblPriv(ConnectContext.get(), dbName, table.getName(), PrivPredicate.SHOW)) {
+            for (Table table : db.getTables()) {
+                if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), dbName, table.getName(),
+                        PrivPredicate.SHOW)) {
                     continue;
                 }
                 try {
@@ -99,9 +273,21 @@ public class StatisticsManager {
         return result;
     }
 
-    public List<List<String>> showColumnStatsList(TableName tableName) throws AnalysisException {
+    /**
+     * Get the column statistics of a table. if specified partition name,
+     * get the column statistics of the partition.
+     *
+     * @param stmt statement
+     * @return column statistics for  a partition or table
+     * @throws AnalysisException statistics not exist
+     */
+    public List<List<String>> showColumnStatsList(ShowColumnStatsStmt stmt) throws AnalysisException {
+        TableName tableName = stmt.getTableName();
+        List<String> partitionNames = stmt.getPartitionNames();
+
         // check meta
-        TableIf table = validateTableName(tableName);
+        Table table = validateTableName(tableName);
+
         // check priv
         if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), tableName.getDb(),
                 tableName.getTbl(), PrivPredicate.SHOW)) {
@@ -110,22 +296,20 @@ public class StatisticsManager {
                     ConnectContext.get().getRemoteIP(),
                     tableName.getDb() + ": " + tableName.getTbl());
         }
-        // get stats
-        List<List<String>> result = Lists.newArrayList();
-        Map<String, ColumnStats> nameToColumnStats = statistics.getColumnStats(table.getId());
-        if (nameToColumnStats == null) {
-            throw new AnalysisException("There is no column statistics in this table:" + table.getName());
+
+        if (partitionNames.isEmpty()) {
+            return showColumnStats(table.getId());
         }
-        for (Map.Entry<String, ColumnStats> entry : nameToColumnStats.entrySet()) {
-            List<String> row = Lists.newArrayList();
-            row.add(entry.getKey());
-            row.addAll(entry.getValue().getShowInfo());
-            result.add(row);
+
+        List<List<String>> result = Lists.newArrayList();
+        for (String partitionName : partitionNames) {
+            validatePartitionName(table, partitionName);
+            result.addAll(showColumnStats(table.getId(), partitionName));
         }
         return result;
     }
 
-    private List<String> showTableStats(TableIf table) throws AnalysisException {
+    private List<String> showTableStats(Table table) throws AnalysisException {
         TableStats tableStats = statistics.getTableStats(table.getId());
         if (tableStats == null) {
             throw new AnalysisException("There is no statistics in this table:" + table.getName());
@@ -136,52 +320,210 @@ public class StatisticsManager {
         return row;
     }
 
-    public void alterTableStatistics(StatisticsTaskResult taskResult) throws AnalysisException {
-        StatsCategoryDesc categoryDesc = taskResult.getCategoryDesc();
-        validateTableAndColumn(categoryDesc);
-        long tblId = categoryDesc.getTableId();
-        Map<StatsType, String> statsTypeToValue = taskResult.getStatsTypeToValue();
-        statistics.updateTableStats(tblId, statsTypeToValue);
+    private List<String> showTableStats(Table table, String partitionName) throws AnalysisException {
+        Map<String, PartitionStats> partitionStats = statistics.getPartitionStats(table.getId(), partitionName);
+        PartitionStats partitionStat = partitionStats.get(partitionName);
+        if (partitionStat == null) {
+            throw new AnalysisException("There is no statistics in this partition:" + partitionName);
+        }
+        List<String> row = Lists.newArrayList();
+        row.add(partitionName);
+        row.addAll(partitionStat.getShowInfo());
+        return row;
     }
 
-    public void alterColumnStatistics(StatisticsTaskResult taskResult) throws AnalysisException {
-        StatsCategoryDesc categoryDesc = taskResult.getCategoryDesc();
-        validateTableAndColumn(categoryDesc);
-        long dbId = categoryDesc.getDbId();
-        long tblId = categoryDesc.getTableId();
-        Database db = Catalog.getCurrentInternalCatalog().getDbOrAnalysisException(dbId);
-        Table table = db.getTableOrAnalysisException(tblId);
-        String columnName = categoryDesc.getColumnName();
-        Type columnType = table.getColumn(columnName).getType();
-        Map<StatsType, String> statsTypeToValue = taskResult.getStatsTypeToValue();
-        statistics.updateColumnStats(tblId, columnName, columnType, statsTypeToValue);
+    private List<List<String>> showColumnStats(long tableId) throws AnalysisException {
+        List<List<String>> result = Lists.newArrayList();
+        Map<String, ColumnStats> columnStats = statistics.getColumnStats(tableId);
+        columnStats.forEach((key, stats) -> {
+            List<String> row = Lists.newArrayList();
+            row.add(key);
+            row.addAll(stats.getShowInfo());
+            result.add(row);
+        });
+        return result;
     }
 
-    private TableIf validateTableName(TableName dbTableName) throws AnalysisException {
+    private List<List<String>> showColumnStats(long tableId, String partitionName) throws AnalysisException {
+        List<List<String>> result = Lists.newArrayList();
+        Map<String, ColumnStats> columnStats = statistics.getColumnStats(tableId, partitionName);
+        columnStats.forEach((key, stats) -> {
+            List<String> row = Lists.newArrayList();
+            row.add(key);
+            row.addAll(stats.getShowInfo());
+            result.add(row);
+        });
+        return result;
+    }
+
+    private void updateTabletRowCount(TaskResult result, List<String> values) throws AnalysisException {
+        long statsValue = values.stream().filter(NumberUtils::isCreatable)
+                .mapToLong(Long::parseLong).sum();
+
+        Map<StatsType, String> statsTypeToValue = Maps.newHashMap();
+        statsTypeToValue.put(StatsType.ROW_COUNT, String.valueOf(statsValue));
+
+        if (result.getCategory() == StatsCategory.Category.TABLE) {
+            statistics.updateTableStats(result.getTableId(), statsTypeToValue);
+        } else if (result.getCategory() == StatsCategory.Category.PARTITION) {
+            statistics.updatePartitionStats(result.getTableId(), result.getPartitionName(), statsTypeToValue);
+        }
+    }
+
+    private void updateTabletMaxValue(TaskResult result, List<String> values) throws AnalysisException {
+        Column column = getNotNullColumn(result);
+        Type type = column.getType();
+        String maxValue = getNumericMaxOrMinValue(values, type, true);
+
+        Map<StatsType, String> statsTypeToValue = Maps.newHashMap();
+        statsTypeToValue.put(StatsType.MAX_VALUE, maxValue);
+
+        updateTabletGranularityStats(result, type, statsTypeToValue);
+    }
+
+    private void updateTabletMinValue(TaskResult result, List<String> values) throws AnalysisException {
+        Column column = getNotNullColumn(result);
+        Type type = column.getType();
+        String minValue = getNumericMaxOrMinValue(values, type, false);
+
+        Map<StatsType, String> statsTypeToValue = Maps.newHashMap();
+        statsTypeToValue.put(StatsType.MIN_VALUE, minValue);
+
+        updateTabletGranularityStats(result, type, statsTypeToValue);
+    }
+
+    private void updateTabletNDV(TaskResult result, List<String> values) throws AnalysisException {
+        double statsValue = values.stream().filter(NumberUtils::isCreatable)
+                .mapToLong(Long::parseLong).sum();
+
+        Map<StatsType, String> statsTypeToValue = Maps.newHashMap();
+        statsTypeToValue.put(StatsType.NDV, String.valueOf(statsValue));
+
+        Column column = getNotNullColumn(result);
+        Type type = column.getType();
+        updateTabletGranularityStats(result, type, statsTypeToValue);
+    }
+
+    private void updateTabletGranularityStats(TaskResult result, Type columnType,
+            Map<StatsType, String> statsTypeToValue) throws AnalysisException {
+        if (result.getCategory() == StatsCategory.Category.TABLE) {
+            statistics.updateColumnStats(result.getTableId(),
+                    result.getColumnName(), columnType, statsTypeToValue);
+        } else if (result.getCategory() == StatsCategory.Category.PARTITION) {
+            statistics.updateColumnStats(result.getTableId(), result.getPartitionName(),
+                    result.getColumnName(), columnType, statsTypeToValue);
+        }
+    }
+
+    private Table validateTableName(TableName dbTableName) throws AnalysisException {
         String dbName = dbTableName.getDb();
         String tableName = dbTableName.getTbl();
-
-        DatabaseIf db = Catalog.getCurrentInternalCatalog().getDbOrAnalysisException(dbName);
+        Database db = Catalog.getCurrentInternalCatalog().getDbOrAnalysisException(dbName);
         return db.getTableOrAnalysisException(tableName);
     }
 
-    private void validateTableAndColumn(StatsCategoryDesc categoryDesc) throws AnalysisException {
-        long dbId = categoryDesc.getDbId();
-        long tblId = categoryDesc.getTableId();
-        String columnName = categoryDesc.getColumnName();
+    /**
+     * Partition name is optional, if partition name is not null, it will be validated.
+     */
+    private String validatePartitionName(Table table, String partitionName) throws AnalysisException {
+        if (!table.isPartitioned() && !Strings.isNullOrEmpty(partitionName)) {
+            ErrorReport.reportAnalysisException(ErrorCode.ERR_PARTITION_CLAUSE_ON_NONPARTITIONED,
+                    partitionName, table.getName());
+        }
 
-        DatabaseIf db = Catalog.getCurrentInternalCatalog().getDbOrAnalysisException(dbId);
-        TableIf table = db.getTableOrAnalysisException(tblId);
-        if (!Strings.isNullOrEmpty(columnName)) {
-            Column column = table.getColumn(columnName);
-            if (column == null) {
-                throw new AnalysisException("Column " + columnName + " does not exist in table " + table.getName());
-            }
+        if (!Strings.isNullOrEmpty(partitionName) && table.getPartition(partitionName) == null) {
+            ErrorReport.reportAnalysisException(ErrorCode.ERR_UNKNOWN_PARTITION,
+                    partitionName, table.getName());
         }
+
+        return partitionName;
     }
 
-    public Statistics getStatistics() {
-        return statistics;
+    private Column validateColumn(Table table, String columnName) throws AnalysisException {
+        Column column = table.getColumn(columnName);
+        if (column == null) {
+            ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_FIELD_ERROR, columnName, table.getName());
+        }
+        return column;
+    }
+
+    private void validateResult(TaskResult result) throws AnalysisException {
+        Database db = Catalog.getCurrentInternalCatalog().getDbOrAnalysisException(result.getDbId());
+        Table table = db.getTableOrAnalysisException(result.getTableId());
+
+        if (!Strings.isNullOrEmpty(result.getPartitionName())) {
+            validatePartitionName(table, result.getPartitionName());
+        }
+
+        if (!Strings.isNullOrEmpty(result.getColumnName())) {
+            validateColumn(table, result.getColumnName());
+        }
+
+        if (result.getCategory() == null) {
+            throw new AnalysisException("Category is null.");
+        }
+
+        if (result.getGranularity() == null) {
+            throw new AnalysisException("Granularity is null.");
+        }
+
+        Map<StatsType, String> statsTypeToValue = result.getStatsTypeToValue();
+        if (statsTypeToValue == null || statsTypeToValue.isEmpty()) {
+            throw new AnalysisException("StatsTypeToValue is empty.");
+        }
+    }
+
+    private Column getNotNullColumn(TaskResult result) throws AnalysisException {
+        Database db = Catalog.getCurrentInternalCatalog().getDbOrAnalysisException(result.getDbId());
+        Table table = db.getTableOrAnalysisException(result.getTableId());
+        Column column = table.getColumn(result.getColumnName());
+        if (column == null) {
+            throw new AnalysisException("Column " + result.getColumnName() + " does not exist");
+        }
+        return column;
     }
 
+    /**
+     * Get the max/min value of the column.
+     *
+     * @param values String List of values
+     * @param type column type
+     * @param maxOrMin true for max, false for min
+     * @return the max/min value of the column.
+     */
+    private String getNumericMaxOrMinValue(List<String> values, Type type, boolean maxOrMin) {
+        if (type.isFixedPointType()) {
+            long result = 0L;
+            for (String value : values) {
+                if (NumberUtils.isCreatable(value)) {
+                    long temp = Long.parseLong(value);
+                    if (maxOrMin) {
+                        result = Math.max(result, temp);
+                    } else {
+                        result = Math.min(result, temp);
+                    }
+                }
+            }
+            return String.valueOf(result);
+        }
+
+        if (type.isFloatingPointType()) {
+            double result = 0.0;
+            for (String value : values) {
+                if (NumberUtils.isCreatable(value)) {
+                    double temp = Double.parseDouble(value);
+                    if (maxOrMin) {
+                        result = Math.max(result, temp);
+                    } else {
+                        result = Math.min(result, temp);
+                    }
+                }
+            }
+            return String.valueOf(result);
+        }
+
+        // is not numeric type
+        values.sort(Comparator.naturalOrder());
+        return values.size() > 0 ? values.get(values.size() - 1) : null;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java
index fda2c28c77..7d3b9d0f5d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java
@@ -19,7 +19,10 @@ package org.apache.doris.statistics;
 
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.statistics.StatisticsTaskResult.TaskResult;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -49,23 +52,16 @@ public abstract class StatisticsTask implements Callable<StatisticsTaskResult> {
 
     protected long id = Catalog.getCurrentCatalog().getNextId();
     protected long jobId;
-    protected StatsGranularityDesc granularityDesc;
-    protected StatsCategoryDesc categoryDesc;
-    protected List<StatsType> statsTypeList;
+    protected List<StatisticsDesc> statsDescs;
     protected TaskState taskState = TaskState.PENDING;
 
     protected final long createTime = System.currentTimeMillis();
     protected long startTime = -1L;
     protected long finishTime = -1L;
 
-    public StatisticsTask(long jobId,
-                          StatsGranularityDesc granularityDesc,
-                          StatsCategoryDesc categoryDesc,
-                          List<StatsType> statsTypeList) {
+    public StatisticsTask(long jobId, List<StatisticsDesc> statsDescs) {
         this.jobId = jobId;
-        this.granularityDesc = granularityDesc;
-        this.categoryDesc = categoryDesc;
-        this.statsTypeList = statsTypeList;
+        this.statsDescs = statsDescs;
     }
 
     public long getId() {
@@ -80,16 +76,8 @@ public abstract class StatisticsTask implements Callable<StatisticsTaskResult> {
         return jobId;
     }
 
-    public StatsGranularityDesc getGranularityDesc() {
-        return granularityDesc;
-    }
-
-    public StatsCategoryDesc getCategoryDesc() {
-        return categoryDesc;
-    }
-
-    public List<StatsType> getStatsTypeList() {
-        return statsTypeList;
+    public List<StatisticsDesc> getStatsDescs() {
+        return statsDescs;
     }
 
     public TaskState getTaskState() {
@@ -150,4 +138,35 @@ public abstract class StatisticsTask implements Callable<StatisticsTaskResult> {
         LOG.info("Statistics job(id={}) state changed from {} to {}", id, taskState, newState);
         taskState = newState;
     }
+
+    protected void checkStatisticsDesc() throws DdlException {
+        for (StatisticsDesc statsDesc : statsDescs) {
+            if (statsDesc == null) {
+                throw new DdlException("StatisticsDesc is null.");
+            }
+
+            if (statsDesc.getStatsCategory() == null) {
+                throw new DdlException("Category is null.");
+            }
+
+            if (statsDesc.getStatsGranularity() == null) {
+                throw new DdlException("Granularity is null.");
+            }
+
+            Preconditions.checkState(statsDesc.getStatsCategory().getDbId() > 0L);
+            Preconditions.checkState(statsDesc.getStatsCategory().getTableId() > 0L);
+        }
+    }
+
+    protected TaskResult createNewTaskResult(StatsCategory category, StatsGranularity granularity) {
+        TaskResult result = new TaskResult();
+        result.setDbId(category.getDbId());
+        result.setTableId(category.getTableId());
+        result.setPartitionName(category.getPartitionName());
+        result.setColumnName(category.getColumnName());
+        result.setCategory(category.getCategory());
+        result.setGranularity(granularity.getGranularity());
+        result.setStatsTypeToValue(Maps.newHashMap());
+        return result;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskResult.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskResult.java
index 94f4c50934..ea5fb6ed34 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskResult.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskResult.java
@@ -17,29 +17,116 @@
 
 package org.apache.doris.statistics;
 
+
+import org.apache.doris.statistics.StatsCategory.Category;
+import org.apache.doris.statistics.StatsGranularity.Granularity;
+
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 public class StatisticsTaskResult {
-    private final StatsGranularityDesc granularityDesc;
-    private final StatsCategoryDesc categoryDesc;
-    private final Map<StatsType, String> statsTypeToValue;
-
-    public StatisticsTaskResult(StatsGranularityDesc granularityDesc, StatsCategoryDesc categoryDesc,
-                                Map<StatsType, String> statsTypeToValue) {
-        this.granularityDesc = granularityDesc;
-        this.categoryDesc = categoryDesc;
-        this.statsTypeToValue = statsTypeToValue;
+    private List<TaskResult> taskResults;
+
+    public StatisticsTaskResult(List<TaskResult> taskResults) {
+        this.taskResults = taskResults;
     }
 
-    public StatsGranularityDesc getGranularityDesc() {
-        return granularityDesc;
+    public List<TaskResult> getTaskResults() {
+        return taskResults;
     }
 
-    public StatsCategoryDesc getCategoryDesc() {
-        return categoryDesc;
+    public void setTaskResults(List<TaskResult> taskResults) {
+        this.taskResults = taskResults;
     }
 
-    public Map<StatsType, String> getStatsTypeToValue() {
-        return statsTypeToValue;
+    public static class TaskResult {
+        private long dbId = -1L;
+        private long tableId = -1L;
+        private String partitionName = "";
+        private String columnName = "";
+
+        private Category category;
+        private Granularity granularity;
+        private Map<StatsType, String> statsTypeToValue;
+
+        public long getDbId() {
+            return dbId;
+        }
+
+        public void setDbId(long dbId) {
+            this.dbId = dbId;
+        }
+
+        public long getTableId() {
+            return tableId;
+        }
+
+        public void setTableId(long tableId) {
+            this.tableId = tableId;
+        }
+
+        public String getPartitionName() {
+            return partitionName;
+        }
+
+        public void setPartitionName(String partitionName) {
+            this.partitionName = partitionName;
+        }
+
+        public String getColumnName() {
+            return columnName;
+        }
+
+        public void setColumnName(String columnName) {
+            this.columnName = columnName;
+        }
+
+        public Category getCategory() {
+            return category;
+        }
+
+        public void setCategory(Category category) {
+            this.category = category;
+        }
+
+        public Granularity getGranularity() {
+            return granularity;
+        }
+
+        public void setGranularity(Granularity granularity) {
+            this.granularity = granularity;
+        }
+
+        public Map<StatsType, String> getStatsTypeToValue() {
+            return statsTypeToValue;
+        }
+
+        public void setStatsTypeToValue(Map<StatsType, String> statsTypeToValue) {
+            this.statsTypeToValue = statsTypeToValue;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            TaskResult that = (TaskResult) o;
+            return dbId == that.dbId
+                    && tableId == that.tableId
+                    && partitionName.equals(that.partitionName)
+                    && columnName.equals(that.columnName)
+                    && category == that.category
+                    && granularity == that.granularity;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(dbId, tableId, partitionName,
+                    columnName, category, granularity);
+        }
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskScheduler.java
index 4b644e77be..d69226390e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskScheduler.java
@@ -26,7 +26,6 @@ import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.statistics.StatisticsJob.JobState;
 import org.apache.doris.statistics.StatisticsTask.TaskState;
-import org.apache.doris.statistics.StatsCategoryDesc.StatsCategory;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -44,8 +43,8 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-/*
-Schedule statistics task
+/**
+ * Schedule statistics task
  */
 public class StatisticsTaskScheduler extends MasterDaemon {
     private static final Logger LOG = LogManager.getLogger(StatisticsTaskScheduler.class);
@@ -71,11 +70,12 @@ public class StatisticsTaskScheduler extends MasterDaemon {
             for (StatisticsTask task : tasks) {
                 queue.remove();
                 long jobId = task.getJobId();
-                StatisticsJob statisticsJob = statisticsJobs.get(jobId);
 
                 if (checkJobIsValid(jobId)) {
                     // step2: execute task and save task result
                     Future<StatisticsTaskResult> future = executor.submit(task);
+                    StatisticsJob statisticsJob = statisticsJobs.get(jobId);
+
                     if (updateTaskAndJobState(task, statisticsJob)) {
                         Map<Long, Future<StatisticsTaskResult>> taskInfo = Maps.newHashMap();
                         taskInfo.put(task.getId(), future);
@@ -114,7 +114,7 @@ public class StatisticsTaskScheduler extends MasterDaemon {
      * Update task and job state
      *
      * @param task statistics task
-     * @param job  statistics job
+     * @param job statistics job
      * @return true if update task and job state successfully.
      */
     private boolean updateTaskAndJobState(StatisticsTask task, StatisticsJob job) {
@@ -147,41 +147,40 @@ public class StatisticsTaskScheduler extends MasterDaemon {
 
         resultMap.forEach((jobId, taskMapList) -> {
             if (checkJobIsValid(jobId)) {
-                String errorMsg = "";
                 StatisticsJob statisticsJob = jobManager.getIdToStatisticsJob().get(jobId);
                 Map<String, String> properties = statisticsJob.getProperties();
                 long timeout = Long.parseLong(properties.get(AnalyzeStmt.CBO_STATISTICS_TASK_TIMEOUT_SEC));
 
+                // For tasks with tablet granularity,
+                // we need aggregate calculations to get the results of the statistics,
+                // so we need to put all the tasks together and handle the results together.
+                List<StatisticsTaskResult> taskResults = Lists.newArrayList();
+
                 for (Map<Long, Future<StatisticsTaskResult>> taskInfos : taskMapList) {
-                    for (Map.Entry<Long, Future<StatisticsTaskResult>> taskInfo : taskInfos.entrySet()) {
-                        Long taskId = taskInfo.getKey();
-                        Future<StatisticsTaskResult> future = taskInfo.getValue();
+                    taskInfos.forEach((taskId, future) -> {
+                        String errorMsg = "";
 
                         try {
                             StatisticsTaskResult taskResult = future.get(timeout, TimeUnit.SECONDS);
-                            StatsCategoryDesc categoryDesc = taskResult.getCategoryDesc();
-                            StatsCategory category = categoryDesc.getCategory();
-                            if (category == StatsCategory.TABLE) {
-                                // update table statistics
-                                statsManager.alterTableStatistics(taskResult);
-                            } else if (category == StatsCategory.COLUMN) {
-                                // update column statistics
-                                statsManager.alterColumnStatistics(taskResult);
-                            }
-                        } catch (AnalysisException | TimeoutException | ExecutionException
-                                | InterruptedException | CancellationException e) {
+                            taskResults.add(taskResult);
+                        } catch (TimeoutException | ExecutionException | InterruptedException
+                                | CancellationException e) {
                             errorMsg = e.getMessage();
-                            LOG.info("Failed to update statistics. jobId: {}, taskId: {}, e: {}", jobId, taskId, e);
+                            LOG.info("Failed to get statistics. jobId: {}, taskId: {}, e: {}", jobId, taskId, e);
                         }
 
                         try {
-                            // update the task and job info
                             statisticsJob.updateJobInfoByTaskId(taskId, errorMsg);
                         } catch (DdlException e) {
-                            LOG.info("Failed to update statistics job info. jobId: {}, taskId: {}, e: {}",
-                                    jobId, taskId, e);
+                            LOG.info("Failed to update statistics job info. jobId: {}, e: {}", jobId, e);
                         }
-                    }
+                    });
+                }
+
+                try {
+                    statsManager.updateStatistics(taskResults);
+                } catch (AnalysisException e) {
+                    LOG.info("Failed to update statistics. jobId: {}, e: {}", jobId, e);
                 }
             }
         });
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsCategoryDesc.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsCategory.java
similarity index 67%
rename from fe/fe-core/src/main/java/org/apache/doris/statistics/StatsCategoryDesc.java
rename to fe/fe-core/src/main/java/org/apache/doris/statistics/StatsCategory.java
index 1de8d80913..28db7070d8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsCategoryDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsCategory.java
@@ -17,22 +17,31 @@
 
 package org.apache.doris.statistics;
 
-public class StatsCategoryDesc {
-    public enum StatsCategory {
+/**
+ * Describes the basic statistics information.
+ */
+public class StatsCategory {
+    /**
+     * The category of the statistics.
+     */
+    public enum Category {
         TABLE,
+        PARTITION,
         COLUMN
     }
 
-    private StatsCategory category;
+    private Category category;
     private long dbId;
     private long tableId;
+    private String partitionName;
     private String columnName;
+    private String statsValue;
 
-    public StatsCategory getCategory() {
+    public Category getCategory() {
         return this.category;
     }
 
-    public void setCategory(StatsCategory category) {
+    public void setCategory(Category category) {
         this.category = category;
     }
 
@@ -52,6 +61,14 @@ public class StatsCategoryDesc {
         this.tableId = tableId;
     }
 
+    public String getPartitionName() {
+        return partitionName;
+    }
+
+    public void setPartitionName(String partitionName) {
+        this.partitionName = partitionName;
+    }
+
     public String getColumnName() {
         return this.columnName;
     }
@@ -59,4 +76,12 @@ public class StatsCategoryDesc {
     public void setColumnName(String columnName) {
         this.columnName = columnName;
     }
+
+    public String getStatsValue() {
+        return statsValue;
+    }
+
+    public void setStatsValue(String statsValue) {
+        this.statsValue = statsValue;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsGranularityDesc.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsGranularity.java
similarity index 80%
rename from fe/fe-core/src/main/java/org/apache/doris/statistics/StatsGranularityDesc.java
rename to fe/fe-core/src/main/java/org/apache/doris/statistics/StatsGranularity.java
index 30aff6a66c..6fa40341e5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsGranularityDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsGranularity.java
@@ -17,28 +17,31 @@
 
 package org.apache.doris.statistics;
 
-public class StatsGranularityDesc {
-    public enum StatsGranularity {
+/**
+ * The granularity of the statistics.
+ */
+public class StatsGranularity {
+    public enum Granularity {
         TABLE,
         PARTITION,
         TABLET
     }
 
-    private StatsGranularity granularity;
+    private Granularity granularity;
     private long tableId;
     private long partitionId;
     private long tabletId;
 
-    public StatsGranularity getGranularity() {
-        return this.granularity;
+    public Granularity getGranularity() {
+        return granularity;
     }
 
-    public void setGranularity(StatsGranularity granularity) {
+    public void setGranularity(Granularity granularity) {
         this.granularity = granularity;
     }
 
     public long getTableId() {
-        return this.tableId;
+        return tableId;
     }
 
     public void setTableId(long tableId) {
@@ -46,7 +49,7 @@ public class StatsGranularityDesc {
     }
 
     public long getPartitionId() {
-        return this.partitionId;
+        return partitionId;
     }
 
     public void setPartitionId(long partitionId) {
@@ -54,7 +57,7 @@ public class StatsGranularityDesc {
     }
 
     public long getTabletId() {
-        return this.tabletId;
+        return tabletId;
     }
 
     public void setTabletId(long tabletId) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsType.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsType.java
index 93b1104e5c..cb501ad819 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsType.java
@@ -25,9 +25,7 @@ public enum StatsType {
     MAX_SIZE("max_size"),
     NUM_NULLS("num_nulls"),
     MIN_VALUE("min_value"),
-    MAX_VALUE("max_value"),
-    MAX_COL_LENS("max_col_lens"),
-    AVG_COL_LENS("avg_col_lens");
+    MAX_VALUE("max_value");
     private final String value;
 
     StatsType(String value) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStats.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStats.java
index 08ef2cc6be..96d83cb61d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStats.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStats.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.statistics;
 
+import org.apache.doris.analysis.LiteralExpr;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.util.Util;
@@ -32,22 +33,18 @@ import java.util.function.Predicate;
 /**
  * There are the statistics of table.
  * The table stats are mainly used to provide input for the Optimizer's cost model.
- * <p>
  * The description of table stats are following:
- * 1. @rowCount: The row count of table.
- * 2. @dataSize: The data size of table.
- * 3. @nameToColumnStats: <@String columnName, @ColumnStats columnStats>
- *      Each column in the Table will have corresponding @ColumnStats.
- *      Those @ColumnStats are recorded in @nameToColumnStats form of MAP.
- *      This facilitates the optimizer to quickly find the corresponding
- *      @ColumnStats based on the column name.
- *
- * @rowCount: The row count of table.
- * @dataSize: The data size of table.
- * <p>
- * The granularity of the statistics is whole table.
- * For example:
- * "@rowCount = 1000" means that the row count is 1000 in the whole table.
+ *     - @rowCount: The row count of table.
+ *     - @dataSize: The data size of table.
+ *     - @nameToColumnStats: <@String columnName, @ColumnStats columnStats>
+ * <p>Each column in the Table will have corresponding @ColumnStats.
+ * Those @ColumnStats are recorded in @nameToColumnStats form of MAP.
+ * This facilitates the optimizer to quickly find the corresponding:
+ *     - @ColumnStats based on the column name.
+ *     - @rowCount: The row count of table.
+ *     - @dataSize: The data size of table.
+ * <p>The granularity of the statistics is whole table.
+ * For example: "@rowCount = 1000" means that the row count is 1000 in the whole table.
  */
 public class TableStats {
 
@@ -59,51 +56,195 @@ public class TableStats {
 
     private long rowCount = -1;
     private long dataSize = -1;
-    private Map<String, ColumnStats> nameToColumnStats = Maps.newConcurrentMap();
+    private final Map<String, PartitionStats> nameToPartitionStats = Maps.newConcurrentMap();
+    private final Map<String, ColumnStats> nameToColumnStats = Maps.newConcurrentMap();
+
+    public long getRowCount() {
+        if  (rowCount == -1) {
+            return nameToPartitionStats.values().stream()
+                    .filter(partitionStats -> partitionStats.getRowCount() != -1)
+                    .mapToLong(PartitionStats::getRowCount).sum();
+        }
+        return rowCount;
+    }
+
+    public void setRowCount(long rowCount) {
+        this.rowCount = rowCount;
+    }
+
+    public long getDataSize() {
+        if (dataSize == -1) {
+            return nameToPartitionStats.values().stream()
+                    .filter(partitionStats -> partitionStats.getDataSize() != -1)
+                    .mapToLong(PartitionStats::getDataSize).sum();
+        }
+        return dataSize;
+    }
+
+    public Map<String, PartitionStats> getNameToPartitionStats() {
+        return nameToPartitionStats;
+    }
+
+    public Map<String, ColumnStats> getNameToColumnStats() {
+        if (nameToColumnStats.isEmpty()) {
+            return getAggPartitionColStats();
+        }
+        return nameToColumnStats;
+    }
+
+    public void setDataSize(long dataSize) {
+        this.dataSize = dataSize;
+    }
 
     public void updateTableStats(Map<StatsType, String> statsTypeToValue) throws AnalysisException {
         for (Map.Entry<StatsType, String> entry : statsTypeToValue.entrySet()) {
-            StatsType statsType = entry.getKey();
-            if (statsType == ROW_COUNT) {
+            if (entry.getKey() == ROW_COUNT) {
                 rowCount = Util.getLongPropertyOrDefault(entry.getValue(), rowCount,
                         DESIRED_ROW_COUNT_PRED, ROW_COUNT + " should >= -1");
-            } else if (statsType == DATA_SIZE) {
+            } else if (entry.getKey() == DATA_SIZE) {
                 dataSize = Util.getLongPropertyOrDefault(entry.getValue(), dataSize,
                         DESIRED_DATA_SIZE_PRED, DATA_SIZE + " should >= -1");
             }
         }
     }
 
+    public void updatePartitionStats(String partitionName, Map<StatsType, String> statsTypeToValue)
+            throws AnalysisException {
+        PartitionStats partitionStats = getNotNullPartitionStats(partitionName);
+        partitionStats.updatePartitionStats(statsTypeToValue);
+    }
+
     public void updateColumnStats(String columnName, Type columnType, Map<StatsType, String> statsTypeToValue)
             throws AnalysisException {
+        ColumnStats columnStats = getNotNullColumnStats(columnName);
+        columnStats.updateStats(columnType, statsTypeToValue);
+    }
+
+    /**
+     * If partition stats is not exist, create a new one.
+     *
+     * @param partitionName partition name
+     * @return @PartitionStats
+     */
+    private PartitionStats getNotNullPartitionStats(String partitionName) {
+        PartitionStats partitionStat = nameToPartitionStats.get(partitionName);
+        if (partitionStat == null) {
+            partitionStat = new PartitionStats();
+            nameToPartitionStats.put(partitionName, partitionStat);
+        }
+        return partitionStat;
+    }
+
+    /**
+     * If column stats is not exist, create a new one.
+     *
+     * @param columnName column name
+     * @return @ColumnStats
+     */
+    public ColumnStats getNotNullColumnStats(String columnName) {
         ColumnStats columnStats = nameToColumnStats.get(columnName);
         if (columnStats == null) {
             columnStats = new ColumnStats();
             nameToColumnStats.put(columnName, columnStats);
         }
-        columnStats.updateStats(columnType, statsTypeToValue);
+        return columnStats;
     }
 
     public List<String> getShowInfo() {
         List<String> result = Lists.newArrayList();
-        result.add(Long.toString(rowCount));
-        result.add(Long.toString(dataSize));
+        result.add(Long.toString(getRowCount()));
+        result.add(Long.toString(getDataSize()));
         return result;
     }
 
-    public Map<String, ColumnStats> getNameToColumnStats() {
-        return nameToColumnStats;
+    public List<String> getShowInfo(String partitionName) {
+        PartitionStats partitionStats = nameToPartitionStats.get(partitionName);
+        return partitionStats.getShowInfo();
     }
 
-    public long getRowCount() {
-        return rowCount;
-    }
+    private Map<String, ColumnStats> getAggPartitionColStats() {
+        Map<String, ColumnStats> aggColumnStats = Maps.newConcurrentMap();
+        for (PartitionStats partitionStats : nameToPartitionStats.values()) {
+            partitionStats.getNameToColumnStats().forEach((colName, columnStats) -> {
+                if (!aggColumnStats.containsKey(colName)) {
+                    aggColumnStats.put(colName, columnStats);
+                } else {
+                    ColumnStats tblColStats = aggColumnStats.get(colName);
+                    aggPartitionColumnStats(tblColStats, columnStats);
+                }
+            });
+        }
 
-    public long getDataSize() {
-        return dataSize;
+        return aggColumnStats;
     }
 
-    public void setRowCount(long rowCount) {
-        this.rowCount = rowCount;
+    private void aggPartitionColumnStats(ColumnStats leftStats, ColumnStats rightStats) {
+        if (leftStats.getNdv() == -1) {
+            if (rightStats.getNdv() != -1) {
+                leftStats.setNdv(rightStats.getNdv());
+            }
+        } else {
+            if (rightStats.getNdv() != -1) {
+                long ndv = leftStats.getNdv() + rightStats.getNdv();
+                leftStats.setNdv(ndv);
+            }
+        }
+
+        if (leftStats.getAvgSize() == -1) {
+            if (rightStats.getAvgSize() != -1) {
+                leftStats.setAvgSize(rightStats.getAvgSize());
+            }
+        } else {
+            if (rightStats.getAvgSize() != -1) {
+                float avgSize = (leftStats.getAvgSize() + rightStats.getAvgSize()) / 2;
+                leftStats.setAvgSize(avgSize);
+            }
+        }
+
+        if (leftStats.getMaxSize() == -1) {
+            if (rightStats.getMaxSize() != -1) {
+                leftStats.setMaxSize(rightStats.getMaxSize());
+            }
+        } else {
+            if (rightStats.getMaxSize() != -1) {
+                long maxSize = Math.max(leftStats.getMaxSize(), rightStats.getMaxSize());
+                leftStats.setMaxSize(maxSize);
+            }
+        }
+
+        if (leftStats.getNumNulls() == -1) {
+            if (rightStats.getNumNulls() != -1) {
+                leftStats.setNumNulls(rightStats.getNumNulls());
+            }
+        } else {
+            if (rightStats.getNumNulls() != -1) {
+                long numNulls = leftStats.getNumNulls() + rightStats.getNumNulls();
+                leftStats.setNumNulls(numNulls);
+            }
+        }
+
+        if (leftStats.getMinValue() == null) {
+            if (rightStats.getMinValue() != null) {
+                leftStats.setMinValue(rightStats.getMinValue());
+            }
+        } else {
+            if (rightStats.getMinValue() != null) {
+                LiteralExpr minValue = leftStats.getMinValue().compareTo(rightStats.getMinValue()) > 0
+                        ? leftStats.getMinValue() : rightStats.getMinValue();
+                leftStats.setMinValue(minValue);
+            }
+        }
+
+        if (leftStats.getMaxValue() == null) {
+            if (rightStats.getMaxValue() != null) {
+                leftStats.setMaxValue(rightStats.getMaxValue());
+            }
+        } else {
+            if (rightStats.getMaxValue() != null) {
+                LiteralExpr maxValue = leftStats.getMaxValue().compareTo(rightStats.getMaxValue()) > 0
+                        ? leftStats.getMaxValue() : rightStats.getMaxValue();
+                leftStats.setMaxValue(maxValue);
+            }
+        }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org