You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/08/04 02:32:24 UTC

[flink] branch master updated: [FLINK-28492][table-planner] Support "ANALYZE TABLE" execution

This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8236644816a [FLINK-28492][table-planner] Support "ANALYZE TABLE" execution
8236644816a is described below

commit 8236644816a8070499ff4327b7ee71cbb4660afa
Author: godfreyhe <go...@163.com>
AuthorDate: Mon Jul 11 19:53:39 2022 +0800

    [FLINK-28492][table-planner] Support "ANALYZE TABLE" execution
    
    This closes #20363
---
 .../flink/table/api/internal/AnalyzeTableUtil.java | 383 ++++++++++++++
 .../table/api/internal/TableEnvironmentImpl.java   |  10 +
 .../operations/ddl/AnalyzeTableOperation.java      |  84 +++
 .../catalog/stats/CatalogColumnStatistics.java     |  29 ++
 .../stats/CatalogColumnStatisticsDataBinary.java   |  29 ++
 .../stats/CatalogColumnStatisticsDataBoolean.java  |  29 ++
 .../stats/CatalogColumnStatisticsDataDate.java     |  32 ++
 .../stats/CatalogColumnStatisticsDataDouble.java   |  42 ++
 .../stats/CatalogColumnStatisticsDataLong.java     |  32 ++
 .../stats/CatalogColumnStatisticsDataString.java   |  32 ++
 .../catalog/stats/CatalogTableStatistics.java      |  38 ++
 .../org/apache/flink/table/catalog/stats/Date.java |  24 +
 .../operations/SqlToOperationConverter.java        | 214 ++++++++
 .../table/planner/factories/TestValuesCatalog.java |   3 +
 .../runtime/batch/sql/AnalyzeTableITCase.java      | 577 +++++++++++++++++++++
 .../runtime/stream/sql/AnalyzeTableITCase.java     |  63 +++
 .../runtime/batch/sql/TableSourceITCase.scala      |   2 +-
 17 files changed, 1622 insertions(+), 1 deletion(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/AnalyzeTableUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/AnalyzeTableUtil.java
new file mode 100644
index 00000000000..37471ed52ae
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/AnalyzeTableUtil.java
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBinary;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBoolean;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDate;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDouble;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataString;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.catalog.stats.Date;
+import org.apache.flink.table.operations.ddl.AnalyzeTableOperation;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** ANALYZE TABLE statement Util. */
+@Internal
+public class AnalyzeTableUtil {
+
+    private AnalyzeTableUtil() {}
+
+    public static TableResultInternal analyzeTable(
+            TableEnvironmentImpl tableEnv, AnalyzeTableOperation operation)
+            throws TableNotExistException, PartitionNotExistException, TablePartitionedException {
+        List<Column> columns = operation.getColumns();
+        // the TableIdentifier has be validated before
+        Catalog catalog =
+                tableEnv.getCatalogManager()
+                        .getCatalog(operation.getTableIdentifier().getCatalogName())
+                        .orElseThrow(() -> new TableException("This should not happen."));
+        ObjectPath objectPath = operation.getTableIdentifier().toObjectPath();
+
+        if (operation.getPartitionSpecs().isPresent()) {
+            List<CatalogPartitionSpec> targetPartitions = operation.getPartitionSpecs().get();
+            for (CatalogPartitionSpec partitionSpec : targetPartitions) {
+                String statSql =
+                        generateAnalyzeSql(operation.getTableIdentifier(), partitionSpec, columns);
+                Tuple2<CatalogTableStatistics, CatalogColumnStatistics> result =
+                        executeSqlAndGenerateStatistics(tableEnv, columns, statSql);
+                CatalogTableStatistics tableStat = result.f0;
+                catalog.alterPartitionStatistics(objectPath, partitionSpec, tableStat, false);
+                CatalogColumnStatistics columnStat = result.f1;
+                if (columnStat != null) {
+                    catalog.alterPartitionColumnStatistics(
+                            objectPath, partitionSpec, columnStat, false);
+                }
+            }
+        } else {
+            String statSql = generateAnalyzeSql(operation.getTableIdentifier(), null, columns);
+            Tuple2<CatalogTableStatistics, CatalogColumnStatistics> result =
+                    executeSqlAndGenerateStatistics(tableEnv, columns, statSql);
+            CatalogTableStatistics tableStat = result.f0;
+            catalog.alterTableStatistics(objectPath, tableStat, false);
+            CatalogColumnStatistics columnStat = result.f1;
+            if (columnStat != null) {
+                catalog.alterTableColumnStatistics(objectPath, columnStat, false);
+            }
+        }
+        return TableResultImpl.TABLE_RESULT_OK;
+    }
+
+    private static Tuple2<CatalogTableStatistics, CatalogColumnStatistics>
+            executeSqlAndGenerateStatistics(
+                    TableEnvironmentImpl tableEnv, List<Column> columns, String statSql) {
+        TableResult tableResult = tableEnv.executeSql(statSql);
+        List<Row> result = CollectionUtil.iteratorToList(tableResult.collect());
+        Preconditions.checkArgument(result.size() == 1);
+        Row row = result.get(0);
+        CatalogTableStatistics tableStat = convertToTableStatistics(row);
+        CatalogColumnStatistics columnStat = null;
+        if (!columns.isEmpty()) {
+            columnStat = convertToColumnStatistics(row, columns);
+        }
+        return new Tuple2<>(tableStat, columnStat);
+    }
+
+    private static String generateAnalyzeSql(
+            ObjectIdentifier tableIdentifier,
+            @Nullable CatalogPartitionSpec partitionSpec,
+            List<Column> columns) {
+        String partitionFilter;
+        if (partitionSpec != null) {
+            partitionFilter =
+                    " WHERE "
+                            + partitionSpec.getPartitionSpec().entrySet().stream()
+                                    .map(e -> e.getKey() + "=" + e.getValue())
+                                    .collect(Collectors.joining(" AND "));
+        } else {
+            partitionFilter = "";
+        }
+
+        final String columnStatsSelects;
+        if (columns.isEmpty()) {
+            columnStatsSelects = "";
+        } else {
+            columnStatsSelects = ", " + getColumnStatsSelects(columns);
+        }
+
+        return String.format(
+                "SELECT COUNT(1) AS %s %s FROM %s %s",
+                getRowCountColumn(), columnStatsSelects, tableIdentifier, partitionFilter);
+    }
+
+    private static String getColumnStatsSelects(List<Column> columns) {
+        return columns.stream()
+                .flatMap(
+                        f -> {
+                            String c = f.getName();
+                            List<String> columnStatSelect = new ArrayList<>();
+                            String computeNullCount =
+                                    String.format(
+                                            "(COUNT(1) - COUNT(`%s`)) AS %s",
+                                            c, getNullCountColumn(c));
+                            columnStatSelect.add(computeNullCount);
+
+                            String computeNdv =
+                                    String.format(
+                                            "APPROX_COUNT_DISTINCT(`%s`) AS %s",
+                                            c, getNdvColumn(c));
+
+                            switch (f.getDataType().getLogicalType().getTypeRoot()) {
+                                case BOOLEAN:
+                                    columnStatSelect.add(
+                                            String.format(
+                                                    "COUNT(`%s`) FILTER (WHERE `%s` IS TRUE) AS %s",
+                                                    c, c, getTrueCountColumn(c)));
+                                    columnStatSelect.add(
+                                            String.format(
+                                                    "COUNT(`%s`) FILTER (WHERE `%s` IS FALSE) AS %s",
+                                                    c, c, getFalseCountColumn(c)));
+                                    break;
+                                case TINYINT:
+                                case SMALLINT:
+                                case INTEGER:
+                                case FLOAT:
+                                case DATE:
+                                case TIME_WITHOUT_TIME_ZONE:
+                                case BIGINT:
+                                case DOUBLE:
+                                case DECIMAL:
+                                case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                                case TIMESTAMP_WITHOUT_TIME_ZONE:
+                                    columnStatSelect.add(computeNdv);
+                                    columnStatSelect.add(
+                                            String.format("MAX(`%s`) AS %s", c, getMaxColumn(c)));
+                                    columnStatSelect.add(
+                                            String.format("MIN(`%s`) AS %s", c, getMinColumn(c)));
+                                    break;
+                                case CHAR:
+                                case VARCHAR:
+                                    columnStatSelect.add(computeNdv);
+                                    columnStatSelect.add(
+                                            String.format(
+                                                    "AVG(CAST(CHAR_LENGTH(`%s`) AS DOUBLE)) AS %s",
+                                                    c, getAvgLenColumn(c)));
+                                    columnStatSelect.add(
+                                            String.format(
+                                                    "MAX(CAST(CHAR_LENGTH(`%s`) AS BIGINT)) AS %s",
+                                                    c, getMaxLenColumn(c)));
+                                    break;
+                                default:
+                                    break;
+                            }
+                            return columnStatSelect.stream();
+                        })
+                .collect(Collectors.joining(", "));
+    }
+
+    private static CatalogTableStatistics convertToTableStatistics(Row row) {
+        Long rowCount = row.getFieldAs(getRowCountColumn());
+        return new CatalogTableStatistics(rowCount, -1, -1, -1);
+    }
+
+    private static CatalogColumnStatistics convertToColumnStatistics(
+            Row row, List<Column> columns) {
+        Preconditions.checkArgument(!columns.isEmpty());
+        Map<String, CatalogColumnStatisticsDataBase> columnStatMap = new HashMap<>();
+        for (Column column : columns) {
+            CatalogColumnStatisticsDataBase columnStat = convertToColumnStatisticsData(row, column);
+            if (columnStat != null) {
+                columnStatMap.put(column.getName(), columnStat);
+            }
+        }
+        return new CatalogColumnStatistics(columnStatMap);
+    }
+
+    private static CatalogColumnStatisticsDataBase convertToColumnStatisticsData(
+            Row row, Column column) {
+        String c = column.getName();
+        Long nullCount = row.getFieldAs(getNullCountColumn(c));
+        switch (column.getDataType().getLogicalType().getTypeRoot()) {
+            case BOOLEAN:
+                Long trueCount = row.getFieldAs(getTrueCountColumn(c));
+                Long falseCount = row.getFieldAs(getFalseCountColumn(c));
+                return new CatalogColumnStatisticsDataBoolean(trueCount, falseCount, nullCount);
+            case TINYINT:
+                Byte maxByte = row.getFieldAs(getMaxColumn(c));
+                Byte minByte = row.getFieldAs(getMinColumn(c));
+                Long ndvByte = row.getFieldAs(getNdvColumn(c));
+                return new CatalogColumnStatisticsDataLong(
+                        minByte != null ? minByte.longValue() : null,
+                        maxByte != null ? maxByte.longValue() : null,
+                        ndvByte,
+                        nullCount);
+            case SMALLINT:
+                Short maxShort = row.getFieldAs(getMaxColumn(c));
+                Short minShort = row.getFieldAs(getMinColumn(c));
+                Long ndvShort = row.getFieldAs(getNdvColumn(c));
+                return new CatalogColumnStatisticsDataLong(
+                        minShort != null ? minShort.longValue() : null,
+                        maxShort != null ? maxShort.longValue() : null,
+                        ndvShort,
+                        nullCount);
+            case INTEGER:
+                Integer maxInt = row.getFieldAs(getMaxColumn(c));
+                Integer minInt = row.getFieldAs(getMinColumn(c));
+                Long ndvInt = row.getFieldAs(getNdvColumn(c));
+                return new CatalogColumnStatisticsDataLong(
+                        minInt != null ? minInt.longValue() : null,
+                        maxInt != null ? maxInt.longValue() : null,
+                        ndvInt,
+                        nullCount);
+            case BIGINT:
+                Long ndvLong = row.getFieldAs(getNdvColumn(c));
+                Long maxLong = row.getFieldAs(getMaxColumn(c));
+                Long minLong = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataLong(minLong, maxLong, ndvLong, nullCount);
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                Long ndvTs = row.getFieldAs(getNdvColumn(c));
+                LocalDateTime maxTs = row.getFieldAs(getMaxColumn(c));
+                LocalDateTime minTs = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataLong(
+                        minTs != null ? minTs.toEpochSecond(ZoneOffset.UTC) : null,
+                        maxTs != null ? maxTs.toEpochSecond(ZoneOffset.UTC) : null,
+                        ndvTs,
+                        nullCount);
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                Long ndvTsLtz = row.getFieldAs(getNdvColumn(c));
+                Instant maxTsLtz = row.getFieldAs(getMaxColumn(c));
+                Instant minTsLtz = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataLong(
+                        minTsLtz != null ? minTsLtz.toEpochMilli() : null,
+                        maxTsLtz != null ? maxTsLtz.toEpochMilli() : null,
+                        ndvTsLtz,
+                        nullCount);
+            case FLOAT:
+                Long ndvFloat = row.getFieldAs(getNdvColumn(c));
+                Float maxFloat = row.getFieldAs(getMaxColumn(c));
+                Float minFloat = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataDouble(
+                        minFloat != null ? minFloat.doubleValue() : null,
+                        maxFloat != null ? maxFloat.doubleValue() : null,
+                        ndvFloat,
+                        nullCount);
+            case DOUBLE:
+                Long ndvDouble = row.getFieldAs(getNdvColumn(c));
+                Double maxDouble = row.getFieldAs(getMaxColumn(c));
+                Double minDouble = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataDouble(
+                        minDouble, maxDouble, ndvDouble, nullCount);
+            case DECIMAL:
+                Long ndvDecimal = row.getFieldAs(getNdvColumn(c));
+                BigDecimal maxDecimal = row.getFieldAs(getMaxColumn(c));
+                BigDecimal minDecimal = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataDouble(
+                        minDecimal != null ? minDecimal.doubleValue() : null,
+                        maxDecimal != null ? maxDecimal.doubleValue() : null,
+                        ndvDecimal,
+                        nullCount);
+            case DATE:
+                Long ndvDate = row.getFieldAs(getNdvColumn(c));
+                LocalDate maxDate = row.getFieldAs(getMaxColumn(c));
+                LocalDate minDate = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataDate(
+                        minDate != null ? new Date(minDate.toEpochDay()) : null,
+                        maxDate != null ? new Date(maxDate.toEpochDay()) : null,
+                        ndvDate,
+                        nullCount);
+            case TIME_WITHOUT_TIME_ZONE:
+                Long ndvTime = row.getFieldAs(getNdvColumn(c));
+                LocalTime maxTime = row.getFieldAs(getMaxColumn(c));
+                LocalTime minTime = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataLong(
+                        minTime != null ? minTime.toNanoOfDay() : null,
+                        maxTime != null ? maxTime.toNanoOfDay() : null,
+                        ndvTime,
+                        nullCount);
+            case CHAR:
+            case VARCHAR:
+                Long ndvString = row.getFieldAs(getNdvColumn(c));
+                Double avgLen = row.getFieldAs(getAvgLenColumn(c));
+                Long maxLen = row.getFieldAs(getMaxLenColumn(c));
+                return new CatalogColumnStatisticsDataString(maxLen, avgLen, ndvString, nullCount);
+            case BINARY:
+            case VARBINARY:
+                return new CatalogColumnStatisticsDataBinary(null, null, nullCount);
+            default:
+                return null;
+        }
+    }
+
+    private static String getRowCountColumn() {
+        return "rowCount";
+    }
+
+    private static String getNullCountColumn(String column) {
+        return String.format("%s_nullCount", column);
+    }
+
+    private static String getNdvColumn(String column) {
+        return String.format("%s_ndv", column);
+    }
+
+    private static String getTrueCountColumn(String column) {
+        return String.format("%s_trueCount", column);
+    }
+
+    private static String getFalseCountColumn(String column) {
+        return String.format("%s_falseCount", column);
+    }
+
+    private static String getMaxColumn(String column) {
+        return String.format("%s_max", column);
+    }
+
+    private static String getMinColumn(String column) {
+        return String.format("%s_min", column);
+    }
+
+    private static String getAvgLenColumn(String column) {
+        return String.format("%s_avgLen", column);
+    }
+
+    private static String getMaxLenColumn(String column) {
+        return String.format("%s_maxLen", column);
+    }
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index d4e10108d38..df95c584654 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -132,6 +132,7 @@ import org.apache.flink.table.operations.ddl.AlterViewAsOperation;
 import org.apache.flink.table.operations.ddl.AlterViewOperation;
 import org.apache.flink.table.operations.ddl.AlterViewPropertiesOperation;
 import org.apache.flink.table.operations.ddl.AlterViewRenameOperation;
+import org.apache.flink.table.operations.ddl.AnalyzeTableOperation;
 import org.apache.flink.table.operations.ddl.CompilePlanOperation;
 import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
 import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
@@ -1364,6 +1365,15 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
                             true,
                             compileAndExecutePlanOperation.getOperation());
             return (TableResultInternal) compiledPlan.execute();
+        } else if (operation instanceof AnalyzeTableOperation) {
+            if (isStreamingMode) {
+                throw new TableException("ANALYZE TABLE is not supported for streaming mode now");
+            }
+            try {
+                return AnalyzeTableUtil.analyzeTable(this, (AnalyzeTableOperation) operation);
+            } catch (Exception e) {
+                throw new TableException("Failed to execute ANALYZE TABLE command", e);
+            }
         } else if (operation instanceof NopOperation) {
             return TableResultImpl.TABLE_RESULT_OK;
         } else {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AnalyzeTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AnalyzeTableOperation.java
new file mode 100644
index 00000000000..614820ce49a
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AnalyzeTableOperation.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.operations.Operation;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/** Operation to describe an {@code ANALYZE TABLE} statement. */
+public class AnalyzeTableOperation implements Operation {
+    private final ObjectIdentifier tableIdentifier;
+    private final @Nullable List<CatalogPartitionSpec> partitionSpecs;
+    private final List<Column> columns;
+
+    public AnalyzeTableOperation(
+            ObjectIdentifier tableIdentifier,
+            @Nullable List<CatalogPartitionSpec> partitionSpecs,
+            List<Column> columns) {
+        this.tableIdentifier = tableIdentifier;
+        this.partitionSpecs = partitionSpecs;
+        this.columns = Objects.requireNonNull(columns, "columns is null");
+    }
+
+    public ObjectIdentifier getTableIdentifier() {
+        return tableIdentifier;
+    }
+
+    /**
+     * Returns Optional.empty() if the table is not a partition table, else returns the given
+     * partition specs.
+     */
+    public Optional<List<CatalogPartitionSpec>> getPartitionSpecs() {
+        return Optional.ofNullable(partitionSpecs);
+    }
+
+    public List<Column> getColumns() {
+        return columns;
+    }
+
+    @Override
+    public String asSummaryString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("ANALYZE TABLE ").append(tableIdentifier.toString());
+        if (partitionSpecs != null) {
+            sb.append(" PARTITION(")
+                    .append(
+                            partitionSpecs.stream()
+                                    .map(p -> p.getPartitionSpec().toString())
+                                    .collect(Collectors.joining(",")))
+                    .append(")");
+        }
+        sb.append(" COMPUTE STATISTICS");
+        if (!columns.isEmpty()) {
+            sb.append(" FOR COLUMNS ")
+                    .append(columns.stream().map(Column::getName).collect(Collectors.joining(",")));
+        }
+
+        return sb.toString();
+    }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatistics.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatistics.java
index f8b2b215e6b..e5ff3f3df8a 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatistics.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatistics.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.PublicEvolving;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -73,4 +74,32 @@ public class CatalogColumnStatistics {
         }
         return new CatalogColumnStatistics(copy, new HashMap<>(this.properties));
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        CatalogColumnStatistics that = (CatalogColumnStatistics) o;
+        return columnStatisticsData.equals(that.columnStatisticsData)
+                && properties.equals(that.properties);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(columnStatisticsData, properties);
+    }
+
+    @Override
+    public String toString() {
+        return "CatalogColumnStatistics{"
+                + "columnStatisticsData="
+                + columnStatisticsData
+                + ", properties="
+                + properties
+                + '}';
+    }
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBinary.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBinary.java
index 4c52e800d43..eb6042541a1 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBinary.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBinary.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog.stats;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 
 /** Column statistics value of binary type. */
 public class CatalogColumnStatisticsDataBinary extends CatalogColumnStatisticsDataBase {
@@ -54,4 +55,32 @@ public class CatalogColumnStatisticsDataBinary extends CatalogColumnStatisticsDa
         return new CatalogColumnStatisticsDataBinary(
                 maxLength, avgLength, getNullCount(), new HashMap<>(getProperties()));
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        CatalogColumnStatisticsDataBinary that = (CatalogColumnStatisticsDataBinary) o;
+        return Objects.equals(maxLength, that.maxLength)
+                && Objects.equals(avgLength, that.avgLength);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(maxLength, avgLength);
+    }
+
+    @Override
+    public String toString() {
+        return "CatalogColumnStatisticsDataBinary{"
+                + "maxLength="
+                + maxLength
+                + ", avgLength="
+                + avgLength
+                + '}';
+    }
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBoolean.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBoolean.java
index 98b3f5388a3..8e5d7ebf445 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBoolean.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBoolean.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog.stats;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 
 /** Column statistics value of boolean type. */
 public class CatalogColumnStatisticsDataBoolean extends CatalogColumnStatisticsDataBase {
@@ -54,4 +55,32 @@ public class CatalogColumnStatisticsDataBoolean extends CatalogColumnStatisticsD
         return new CatalogColumnStatisticsDataBoolean(
                 trueCount, falseCount, getNullCount(), new HashMap<>(getProperties()));
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        CatalogColumnStatisticsDataBoolean that = (CatalogColumnStatisticsDataBoolean) o;
+        return Objects.equals(trueCount, that.trueCount)
+                && Objects.equals(falseCount, that.falseCount);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(trueCount, falseCount);
+    }
+
+    @Override
+    public String toString() {
+        return "CatalogColumnStatisticsDataBoolean{"
+                + "trueCount="
+                + trueCount
+                + ", falseCount="
+                + falseCount
+                + '}';
+    }
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataDate.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataDate.java
index 0b6e80727e1..2bc72ead7d8 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataDate.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataDate.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog.stats;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 
 /** Column statistics value of date type. */
 public class CatalogColumnStatisticsDataDate extends CatalogColumnStatisticsDataBase {
@@ -63,4 +64,35 @@ public class CatalogColumnStatisticsDataDate extends CatalogColumnStatisticsData
         return new CatalogColumnStatisticsDataDate(
                 min, max, ndv, getNullCount(), new HashMap<>(getProperties()));
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        CatalogColumnStatisticsDataDate that = (CatalogColumnStatisticsDataDate) o;
+        return Objects.equals(min, that.min)
+                && Objects.equals(max, that.max)
+                && Objects.equals(ndv, that.ndv);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(min, max, ndv);
+    }
+
+    @Override
+    public String toString() {
+        return "CatalogColumnStatisticsDataDate{"
+                + "min="
+                + min
+                + ", max="
+                + max
+                + ", ndv="
+                + ndv
+                + '}';
+    }
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataDouble.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataDouble.java
index f026dfed331..025954e9ceb 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataDouble.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataDouble.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog.stats;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 
 /** Column statistics value of double type. */
 public class CatalogColumnStatisticsDataDouble extends CatalogColumnStatisticsDataBase {
@@ -63,4 +64,45 @@ public class CatalogColumnStatisticsDataDouble extends CatalogColumnStatisticsDa
         return new CatalogColumnStatisticsDataDouble(
                 min, max, ndv, getNullCount(), new HashMap<>(getProperties()));
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        CatalogColumnStatisticsDataDouble that = (CatalogColumnStatisticsDataDouble) o;
+        return doubleCompare(min, that.min)
+                && doubleCompare(max, that.max)
+                && Objects.equals(ndv, that.ndv);
+    }
+
+    private boolean doubleCompare(Double d1, Double d2) {
+        if (d1 == null && d2 == null) {
+            return true;
+        } else if (d1 == null || d2 == null) {
+            return false;
+        } else {
+            return Math.abs(d1 - d2) < 1e-6;
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(min, max, ndv);
+    }
+
+    @Override
+    public String toString() {
+        return "CatalogColumnStatisticsDataDouble{"
+                + "min="
+                + min
+                + ", max="
+                + max
+                + ", ndv="
+                + ndv
+                + '}';
+    }
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataLong.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataLong.java
index 490893ecc5c..b1422f84ed4 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataLong.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataLong.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog.stats;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 
 /** Column statistics value of long type. */
 public class CatalogColumnStatisticsDataLong extends CatalogColumnStatisticsDataBase {
@@ -63,4 +64,35 @@ public class CatalogColumnStatisticsDataLong extends CatalogColumnStatisticsData
         return new CatalogColumnStatisticsDataLong(
                 min, max, ndv, getNullCount(), new HashMap<>(getProperties()));
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        CatalogColumnStatisticsDataLong that = (CatalogColumnStatisticsDataLong) o;
+        return Objects.equals(min, that.min)
+                && Objects.equals(max, that.max)
+                && Objects.equals(ndv, that.ndv);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(min, max, ndv);
+    }
+
+    @Override
+    public String toString() {
+        return "CatalogColumnStatisticsDataLong{"
+                + "min="
+                + min
+                + ", max="
+                + max
+                + ", ndv="
+                + ndv
+                + '}';
+    }
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataString.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataString.java
index 539c0338135..44af143b728 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataString.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataString.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog.stats;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 
 /** Column statistics value of string type. */
 public class CatalogColumnStatisticsDataString extends CatalogColumnStatisticsDataBase {
@@ -68,4 +69,35 @@ public class CatalogColumnStatisticsDataString extends CatalogColumnStatisticsDa
         return new CatalogColumnStatisticsDataString(
                 maxLength, avgLength, ndv, getNullCount(), new HashMap<>(getProperties()));
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        CatalogColumnStatisticsDataString that = (CatalogColumnStatisticsDataString) o;
+        return Objects.equals(maxLength, that.maxLength)
+                && Objects.equals(avgLength, that.avgLength)
+                && Objects.equals(ndv, that.ndv);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(maxLength, avgLength, ndv);
+    }
+
+    @Override
+    public String toString() {
+        return "CatalogColumnStatisticsDataString{"
+                + "maxLength="
+                + maxLength
+                + ", avgLength="
+                + avgLength
+                + ", ndv="
+                + ndv
+                + '}';
+    }
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogTableStatistics.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogTableStatistics.java
index 7c100b8b01a..ab65c1f0f6e 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogTableStatistics.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogTableStatistics.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.PublicEvolving;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 
 /** Statistics for a non-partitioned table or a partition of a partitioned table. */
 @PublicEvolving
@@ -93,4 +94,41 @@ public class CatalogTableStatistics {
                 this.rawDataSize,
                 new HashMap<>(this.properties));
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        CatalogTableStatistics that = (CatalogTableStatistics) o;
+        return rowCount == that.rowCount
+                && fileCount == that.fileCount
+                && totalSize == that.totalSize
+                && rawDataSize == that.rawDataSize
+                && properties.equals(that.properties);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(rowCount, fileCount, totalSize, rawDataSize, properties);
+    }
+
+    @Override
+    public String toString() {
+        return "CatalogTableStatistics{"
+                + "rowCount="
+                + rowCount
+                + ", fileCount="
+                + fileCount
+                + ", totalSize="
+                + totalSize
+                + ", rawDataSize="
+                + rawDataSize
+                + ", properties="
+                + properties
+                + '}';
+    }
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/Date.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/Date.java
index fbd250ae86d..1030ac497e7 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/Date.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/Date.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.catalog.stats;
 
+import java.util.Objects;
+
 /** Class representing a date value in statistics. */
 public class Date {
     private final long daysSinceEpoch;
@@ -33,4 +35,26 @@ public class Date {
     public Date copy() {
         return new Date(daysSinceEpoch);
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        Date date = (Date) o;
+        return daysSinceEpoch == date.daysSinceEpoch;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(daysSinceEpoch);
+    }
+
+    @Override
+    public String toString() {
+        return "Date{" + "daysSinceEpoch=" + daysSinceEpoch + '}';
+    }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 25e2e53d03e..895534124ed 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -34,6 +34,7 @@ import org.apache.flink.sql.parser.ddl.SqlAlterView;
 import org.apache.flink.sql.parser.ddl.SqlAlterViewAs;
 import org.apache.flink.sql.parser.ddl.SqlAlterViewProperties;
 import org.apache.flink.sql.parser.ddl.SqlAlterViewRename;
+import org.apache.flink.sql.parser.ddl.SqlAnalyzeTable;
 import org.apache.flink.sql.parser.ddl.SqlChangeColumn;
 import org.apache.flink.sql.parser.ddl.SqlCompilePlan;
 import org.apache.flink.sql.parser.ddl.SqlCreateCatalog;
@@ -81,6 +82,7 @@ import org.apache.flink.sql.parser.dql.SqlShowPartitions;
 import org.apache.flink.sql.parser.dql.SqlShowTables;
 import org.apache.flink.sql.parser.dql.SqlShowViews;
 import org.apache.flink.sql.parser.dql.SqlUnloadModule;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
@@ -98,6 +100,7 @@ import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.CatalogViewImpl;
+import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ContextResolvedTable;
 import org.apache.flink.table.catalog.FunctionLanguage;
 import org.apache.flink.table.catalog.ManagedTableListener;
@@ -106,7 +109,15 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
 import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionIdentifier;
 import org.apache.flink.table.operations.BeginStatementSetOperation;
 import org.apache.flink.table.operations.CompileAndExecutePlanOperation;
 import org.apache.flink.table.operations.DescribeTableOperation;
@@ -153,6 +164,7 @@ import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
 import org.apache.flink.table.operations.ddl.AlterViewAsOperation;
 import org.apache.flink.table.operations.ddl.AlterViewPropertiesOperation;
 import org.apache.flink.table.operations.ddl.AlterViewRenameOperation;
+import org.apache.flink.table.operations.ddl.AnalyzeTableOperation;
 import org.apache.flink.table.operations.ddl.CompilePlanOperation;
 import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
 import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
@@ -172,7 +184,9 @@ import org.apache.flink.table.planner.utils.Expander;
 import org.apache.flink.table.planner.utils.OperationConverterUtils;
 import org.apache.flink.table.resource.ResourceType;
 import org.apache.flink.table.resource.ResourceUri;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
 import org.apache.calcite.rel.RelRoot;
@@ -187,7 +201,12 @@ import org.apache.calcite.sql.SqlUtil;
 import org.apache.calcite.sql.dialect.CalciteSqlDialect;
 import org.apache.calcite.sql.parser.SqlParser;
 
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -347,6 +366,8 @@ public class SqlToOperationConverter {
                     converter.convertCompileAndExecutePlan((SqlCompileAndExecutePlan) validated));
         } else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
             return Optional.of(converter.convertSqlQuery(validated));
+        } else if (validated instanceof SqlAnalyzeTable) {
+            return Optional.of(converter.convertAnalyzeTable((SqlAnalyzeTable) validated));
         } else {
             return Optional.empty();
         }
@@ -1247,6 +1268,199 @@ public class SqlToOperationConverter {
                         compileAndExecutePlan.getOperandList().get(0)));
     }
 
+    private Operation convertAnalyzeTable(SqlAnalyzeTable analyzeTable) {
+        UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(analyzeTable.fullTableName());
+        ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
+        Optional<ContextResolvedTable> optionalCatalogTable =
+                catalogManager.getTable(tableIdentifier);
+        if (!optionalCatalogTable.isPresent() || optionalCatalogTable.get().isTemporary()) {
+            throw new ValidationException(
+                    String.format(
+                            "Table %s doesn't exist or is a temporary table.", tableIdentifier));
+        }
+        CatalogBaseTable baseTable = optionalCatalogTable.get().getTable();
+        if (baseTable instanceof CatalogView) {
+            throw new ValidationException("ANALYZE TABLE for a view is not allowed.");
+        }
+        CatalogTable table = (CatalogTable) baseTable;
+        ResolvedSchema schema =
+                baseTable.getUnresolvedSchema().resolve(catalogManager.getSchemaResolver());
+
+        LinkedHashMap<String, String> partitions = analyzeTable.getPartitions();
+        List<CatalogPartitionSpec> targetPartitionSpecs = null;
+        if (table.isPartitioned()) {
+            if (!partitions.keySet().equals(new HashSet<>(table.getPartitionKeys()))) {
+                throw new ValidationException(
+                        String.format(
+                                "Invalid ANALYZE TABLE statement. For partition table, all partition keys should be specified explicitly. "
+                                        + "The given partition keys: [%s] are not match the target partition keys: [%s].",
+                                String.join(",", partitions.keySet()),
+                                String.join(",", table.getPartitionKeys())));
+            }
+
+            try {
+                targetPartitionSpecs = getPartitionSpecs(tableIdentifier, schema, partitions);
+            } catch (Exception e) {
+                throw new ValidationException(e.getMessage(), e);
+            }
+        } else if (!partitions.isEmpty()) {
+            throw new ValidationException(
+                    String.format(
+                            "Invalid ANALYZE TABLE statement. Table: %s is not a partition table, while partition values are given.",
+                            tableIdentifier));
+        }
+
+        String[] columns = analyzeTable.getColumnNames();
+        List<Column> targetColumns;
+        if (analyzeTable.isAllColumns()) {
+            Preconditions.checkArgument(columns.length == 0);
+            // computed column and metadata column will be ignored
+            targetColumns =
+                    schema.getColumns().stream()
+                            .filter(Column::isPhysical)
+                            .collect(Collectors.toList());
+        } else if (columns.length > 0) {
+            targetColumns =
+                    Arrays.stream(columns)
+                            .map(
+                                    c -> {
+                                        Optional<Column> colOpt = schema.getColumn(c);
+                                        if (!colOpt.isPresent()) {
+                                            throw new ValidationException(
+                                                    String.format(
+                                                            "Column: %s does not exist in the table: %s.",
+                                                            c, tableIdentifier));
+                                        }
+                                        Column col = colOpt.get();
+                                        if (col instanceof Column.ComputedColumn) {
+                                            throw new ValidationException(
+                                                    String.format(
+                                                            "Column: %s is a computed column, ANALYZE TABLE does not support computed column.",
+                                                            c));
+                                        } else if (col instanceof Column.MetadataColumn) {
+                                            throw new ValidationException(
+                                                    String.format(
+                                                            "Column: %s is a metadata column, ANALYZE TABLE does not support metadata column.",
+                                                            c));
+                                        } else if (col instanceof Column.PhysicalColumn) {
+                                            return col;
+                                        } else {
+                                            throw new ValidationException(
+                                                    "Unknown column class: "
+                                                            + col.getClass().getSimpleName());
+                                        }
+                                    })
+                            .collect(Collectors.toList());
+        } else {
+            targetColumns = Collections.emptyList();
+        }
+
+        return new AnalyzeTableOperation(tableIdentifier, targetPartitionSpecs, targetColumns);
+    }
+
+    private List<CatalogPartitionSpec> getPartitionSpecs(
+            ObjectIdentifier tableIdentifier,
+            ResolvedSchema schema,
+            LinkedHashMap<String, String> partitions)
+            throws TableNotPartitionedException, TableNotExistException {
+        List<Expression> filters = new ArrayList<>();
+        for (Map.Entry<String, String> entry : partitions.entrySet()) {
+            if (entry.getValue() != null) {
+                CallExpression call =
+                        CallExpression.temporary(
+                                FunctionIdentifier.of("="),
+                                BuiltInFunctionDefinitions.EQUALS,
+                                Arrays.asList(
+                                        getPartitionKeyExpr(schema, entry.getKey()),
+                                        getPartitionValueExpr(
+                                                schema, entry.getKey(), entry.getValue())),
+                                DataTypes.BOOLEAN());
+                filters.add(call);
+            }
+        }
+        if (filters.isEmpty()) {
+            return catalogManager
+                    .getCatalog(tableIdentifier.getCatalogName())
+                    .get()
+                    .listPartitions(tableIdentifier.toObjectPath());
+        } else {
+            return catalogManager
+                    .getCatalog(tableIdentifier.getCatalogName())
+                    .get()
+                    .listPartitionsByFilter(tableIdentifier.toObjectPath(), filters);
+        }
+    }
+
+    private FieldReferenceExpression getPartitionKeyExpr(
+            ResolvedSchema schema, String partitionKey) {
+        int fieldIndex = schema.getColumnNames().indexOf(partitionKey);
+        if (fieldIndex < 0) {
+            throw new ValidationException(
+                    String.format(
+                            "Partition: %s does not exist in the schema: %s",
+                            partitionKey, schema.getColumnNames()));
+        }
+        return new FieldReferenceExpression(
+                partitionKey, schema.getColumnDataTypes().get(fieldIndex), 0, fieldIndex);
+    }
+
+    private ValueLiteralExpression getPartitionValueExpr(
+            ResolvedSchema schema, String partitionKey, String partitionValue) {
+        int fieldIndex = schema.getColumnNames().indexOf(partitionKey);
+        if (fieldIndex < 0) {
+            throw new ValidationException(
+                    String.format(
+                            "Partition: %s does not exist in the schema: %s",
+                            partitionKey, schema.getColumnNames()));
+        }
+        DataType dataType = schema.getColumnDataTypes().get(fieldIndex);
+        if (partitionValue == null) {
+            return new ValueLiteralExpression(null, dataType.nullable());
+        }
+        Object value;
+        switch (dataType.getLogicalType().getTypeRoot()) {
+            case CHAR:
+            case VARCHAR:
+                value = partitionValue;
+                break;
+            case TINYINT:
+                value = Byte.valueOf(partitionValue);
+                break;
+            case SMALLINT:
+                value = Short.valueOf(partitionValue);
+                break;
+            case INTEGER:
+                value = Integer.valueOf(partitionValue);
+                break;
+            case BIGINT:
+                value = Long.valueOf(partitionValue);
+                break;
+            case FLOAT:
+                value = Float.valueOf(partitionValue);
+                break;
+            case DOUBLE:
+                value = Double.valueOf(partitionValue);
+                break;
+            case DECIMAL:
+                value = new BigDecimal(partitionValue);
+                break;
+            case DATE:
+                value = Date.valueOf(partitionValue);
+                break;
+            case TIME_WITHOUT_TIME_ZONE:
+                value = Time.valueOf(partitionValue);
+                break;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                value = Timestamp.valueOf(partitionValue);
+                break;
+            default:
+                throw new UnsupportedOperationException(
+                        "Unsupported partition value type: " + dataType.getLogicalType());
+        }
+        return new ValueLiteralExpression(value, dataType.notNull());
+    }
+
     private void validateTableConstraint(SqlTableConstraint constraint) {
         if (constraint.isUnique()) {
             throw new UnsupportedOperationException("UNIQUE constraint is not supported yet");
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesCatalog.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesCatalog.java
index 69ee1955488..badd3ecb004 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesCatalog.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesCatalog.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.factories.FunctionDefinitionFactory;
 import org.apache.flink.table.planner.utils.FilterUtils;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.BooleanType;
 import org.apache.flink.table.types.logical.CharType;
 import org.apache.flink.table.types.logical.DoubleType;
@@ -123,6 +124,8 @@ public class TestValuesCatalog extends GenericInMemoryCatalog {
             return Double.valueOf(value);
         } else if (type instanceof IntType) {
             return Integer.valueOf(value);
+        } else if (type instanceof BigIntType) {
+            return Long.valueOf(value);
         } else if (type instanceof VarCharType) {
             return value;
         } else {
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/AnalyzeTableITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/AnalyzeTableITCase.java
new file mode 100644
index 00000000000..2ba4e0b4bf0
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/AnalyzeTableITCase.java
@@ -0,0 +1,577 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogPartitionImpl;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBoolean;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDate;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDouble;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataString;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.catalog.stats.Date;
+import org.apache.flink.table.planner.factories.TestValuesCatalog;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.table.planner.runtime.utils.TestData;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for `ANALYZE TABLE`. */
+public class AnalyzeTableITCase extends BatchTestBase {
+
+    private TableEnvironment tEnv;
+
+    @BeforeEach
+    @Override
+    public void before() throws Exception {
+        super.before();
+        tEnv = tEnv();
+        Catalog catalog = new TestValuesCatalog("cat", "db", true);
+        tEnv.registerCatalog("cat", catalog);
+        tEnv.useCatalog("cat");
+        tEnv.useDatabase("db");
+        String dataId1 = TestValuesTableFactory.registerData(TestData.fullDataTypesData());
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE NonPartitionTable (\n"
+                                + "  `a` BOOLEAN,\n"
+                                + "  `b` TINYINT,\n"
+                                + "  `c` SMALLINT,\n"
+                                + "  `d` INT,\n"
+                                + "  `e` BIGINT,\n"
+                                + "  `f` FLOAT,\n"
+                                + "  `g` DOUBLE,\n"
+                                + "  `h` DECIMAL(5, 2),\n"
+                                + "  `x` DECIMAL(30, 10),\n"
+                                + "  `i` VARCHAR(5),\n"
+                                + "  `j` CHAR(5),\n"
+                                + "  `k` DATE,\n"
+                                + "  `l` TIME(0),\n"
+                                + "  `m` TIMESTAMP(9),\n"
+                                + "  `n` TIMESTAMP(9) WITH LOCAL TIME ZONE,\n"
+                                + "  `o` ARRAY<BIGINT>,\n"
+                                + "  `p` ROW<f1 BIGINT, f2 STRING, f3 DOUBLE>,\n"
+                                + "  `q` MAP<STRING, INT>\n"
+                                + ") WITH (\n"
+                                + "  'connector' = 'values',\n"
+                                + "  'data-id' = '%s',\n"
+                                + "  'bounded' = 'true'\n"
+                                + ")",
+                        dataId1));
+
+        String dataId2 = TestValuesTableFactory.registerData(TestData.data5());
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE PartitionTable (\n"
+                                + "  `a` INT,\n"
+                                + "  `b` BIGINT,\n"
+                                + "  `c` INT,\n"
+                                + "  `d` VARCHAR,\n"
+                                + "  `e` BIGINT\n"
+                                + ") partitioned by (e, a)\n"
+                                + " WITH (\n"
+                                + "  'connector' = 'values',\n"
+                                + "  'partition-list' = 'e:1,a:1;e:1,a:2;e:1,a:4;e:1,a:5;e:2,a:2;e:2,a:3;e:2,a:4;e:2,a:5;e:3,a:3;e:3,a:5;',\n"
+                                + "  'data-id' = '%s',\n"
+                                + "  'disable-lookup' = 'true',\n"
+                                + "  'bounded' = 'true'\n"
+                                + ")",
+                        dataId2));
+        createPartition(catalog, "db", "PartitionTable", "e=1,a=1");
+        createPartition(catalog, "db", "PartitionTable", "e=1,a=2");
+        createPartition(catalog, "db", "PartitionTable", "e=1,a=4");
+        createPartition(catalog, "db", "PartitionTable", "e=1,a=5");
+        createPartition(catalog, "db", "PartitionTable", "e=2,a=2");
+        createPartition(catalog, "db", "PartitionTable", "e=2,a=3");
+        createPartition(catalog, "db", "PartitionTable", "e=2,a=4");
+        createPartition(catalog, "db", "PartitionTable", "e=2,a=5");
+        createPartition(catalog, "db", "PartitionTable", "e=3,a=3");
+        createPartition(catalog, "db", "PartitionTable", "e=3,a=5");
+
+        String dataId3 = TestValuesTableFactory.registerData(TestData.smallData5());
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE NonPartitionTable2 (\n"
+                                + "  `a` INT,\n"
+                                + "  `b` BIGINT,\n"
+                                + "  `c` INT,\n"
+                                + "  `d` VARCHAR METADATA VIRTUAL,\n"
+                                + "  `e` BIGINT METADATA,"
+                                + "  `f` as a + 1\n"
+                                + ") WITH (\n"
+                                + "  'connector' = 'values',\n"
+                                + "  'data-id' = '%s',\n"
+                                + "  'disable-lookup' = 'true',\n"
+                                + "  'readable-metadata'='d:varchar,e:bigint',\n"
+                                + "  'bounded' = 'true'\n"
+                                + ")",
+                        dataId3));
+
+        String dataId4 = TestValuesTableFactory.registerData(TestData.smallData5());
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE PartitionTable2 (\n"
+                                + "  `a` INT,\n"
+                                + "  `b` BIGINT,\n"
+                                + "  `c` INT,\n"
+                                + "  `d` VARCHAR METADATA VIRTUAL,\n"
+                                + "  `e` BIGINT METADATA,"
+                                + "  `f` as a + 1\n"
+                                + ") partitioned by (a)\n"
+                                + " WITH (\n"
+                                + "  'connector' = 'values',\n"
+                                + "  'partition-list' = 'a:1;a:2;',\n"
+                                + "  'data-id' = '%s',\n"
+                                + "  'disable-lookup' = 'true',\n"
+                                + "  'readable-metadata'='d:varchar,e:bigint',\n"
+                                + "  'bounded' = 'true'\n"
+                                + ")",
+                        dataId4));
+        createPartition(catalog, "db", "PartitionTable2", "a=1");
+        createPartition(catalog, "db", "PartitionTable2", "a=2");
+    }
+
+    private void createPartition(Catalog catalog, String db, String table, String partitionSpecs)
+            throws Exception {
+        catalog.createPartition(
+                new ObjectPath(db, table),
+                createCatalogPartitionSpec(partitionSpecs),
+                new CatalogPartitionImpl(new HashMap<>(), ""),
+                false);
+    }
+
+    private CatalogPartitionSpec createCatalogPartitionSpec(String partitionSpecs) {
+        Map<String, String> partitionSpec = new HashMap<>();
+        for (String partition : partitionSpecs.split(",")) {
+            String[] items = partition.split("=");
+            Preconditions.checkArgument(
+                    items.length == 2, "Partition key value should be joined with '='");
+            partitionSpec.put(items[0], items[1]);
+        }
+        return new CatalogPartitionSpec(partitionSpec);
+    }
+
+    @Test
+    public void testNonPartitionTableWithoutTableNotExisted() {
+        assertThatThrownBy(
+                        () -> tEnv.executeSql("analyze table not_exist_table compute statistics"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining("Table `cat`.`db`.`not_exist_table` doesn't exist");
+    }
+
+    @Test
+    public void testNonPartitionTableWithoutColumns() throws Exception {
+        tEnv.executeSql("analyze table NonPartitionTable compute statistics");
+        ObjectPath path = new ObjectPath(tEnv.getCurrentDatabase(), "NonPartitionTable");
+        assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableStatistics(path))
+                .isEqualTo(new CatalogTableStatistics(5L, -1, -1L, -1L));
+        assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableColumnStatistics(path))
+                .isEqualTo(new CatalogColumnStatistics(new HashMap<>()));
+    }
+
+    @Test
+    public void testNonPartitionTableWithColumnsNotExisted() {
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                        "analyze table NonPartitionTable compute statistics for columns not_existed_column"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Column: not_existed_column does not exist in the table: `cat`.`db`.`NonPartitionTable`");
+    }
+
+    @Test
+    public void testNonPartitionTableWithComputeColumn() {
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                        "analyze table NonPartitionTable2 compute statistics for columns f"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Column: f is a computed column, ANALYZE TABLE does not support computed column");
+    }
+
+    @Test
+    public void testNonPartitionTableWithVirtualMetadataColumn() {
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                        "analyze table NonPartitionTable2 compute statistics for columns d"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Column: d is a metadata column, ANALYZE TABLE does not support metadata column");
+    }
+
+    @Test
+    public void testNonPartitionTableWithMetadataColumn() {
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                        "analyze table NonPartitionTable2 compute statistics for columns e"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Column: e is a metadata column, ANALYZE TABLE does not support metadata column");
+    }
+
+    @Test
+    public void testNonPartitionTableWithPartition() {
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                        "analyze table NonPartitionTable PARTITION(a) compute statistics"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Invalid ANALYZE TABLE statement. Table: `cat`.`db`.`NonPartitionTable` is not a partition table, while partition values are given");
+    }
+
+    @Test
+    public void testNonPartitionTableWithPartialColumns() throws Exception {
+        tEnv.executeSql("analyze table NonPartitionTable compute statistics for columns f, a, d");
+        ObjectPath path1 = new ObjectPath(tEnv.getCurrentDatabase(), "NonPartitionTable");
+        assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableStatistics(path1))
+                .isEqualTo(new CatalogTableStatistics(5L, -1, -1L, -1L));
+        Map<String, CatalogColumnStatisticsDataBase> columnStatisticsData1 = new HashMap<>();
+        columnStatisticsData1.put("a", new CatalogColumnStatisticsDataBoolean(2L, 2L, 1L));
+        columnStatisticsData1.put(
+                "f", new CatalogColumnStatisticsDataDouble(-1.123d, 3.4d, 4L, 1L));
+        columnStatisticsData1.put(
+                "d",
+                new CatalogColumnStatisticsDataLong(
+                        (long) Integer.MIN_VALUE, (long) Integer.MAX_VALUE, 4L, 1L));
+        assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableColumnStatistics(path1))
+                .isEqualTo(new CatalogColumnStatistics(columnStatisticsData1));
+
+        tEnv.executeSql("analyze table NonPartitionTable2 compute statistics for columns a, b, c");
+        ObjectPath path2 = new ObjectPath(tEnv.getCurrentDatabase(), "NonPartitionTable2");
+        Map<String, CatalogColumnStatisticsDataBase> columnStatisticsData2 = new HashMap<>();
+        columnStatisticsData2.put("a", new CatalogColumnStatisticsDataLong(1L, 2L, 2L, 0L));
+        columnStatisticsData2.put("b", new CatalogColumnStatisticsDataLong(1L, 3L, 3L, 0L));
+        columnStatisticsData2.put("c", new CatalogColumnStatisticsDataLong(0L, 2L, 3L, 0L));
+        assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableColumnStatistics(path2))
+                .isEqualTo(new CatalogColumnStatistics(columnStatisticsData2));
+    }
+
+    @Test
+    public void testNonPartitionTableWithAllColumns() throws Exception {
+        tEnv.executeSql("analyze table NonPartitionTable compute statistics for all columns");
+        ObjectPath path1 = new ObjectPath(tEnv.getCurrentDatabase(), "NonPartitionTable");
+        assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableStatistics(path1))
+                .isEqualTo(new CatalogTableStatistics(5L, -1, -1L, -1L));
+        Map<String, CatalogColumnStatisticsDataBase> columnStatisticsData1 = new HashMap<>();
+        // boolean
+        columnStatisticsData1.put("a", new CatalogColumnStatisticsDataBoolean(2L, 2L, 1L));
+        // byte
+        columnStatisticsData1.put(
+                "b",
+                new CatalogColumnStatisticsDataLong(
+                        (long) Byte.MIN_VALUE, (long) Byte.MAX_VALUE, 4L, 1L));
+        // short
+        columnStatisticsData1.put(
+                "c",
+                new CatalogColumnStatisticsDataLong(
+                        (long) Short.MIN_VALUE, (long) Short.MAX_VALUE, 4L, 1L));
+        // int
+        columnStatisticsData1.put(
+                "d",
+                new CatalogColumnStatisticsDataLong(
+                        (long) Integer.MIN_VALUE, (long) Integer.MAX_VALUE, 4L, 1L));
+        // long
+        columnStatisticsData1.put(
+                "e", new CatalogColumnStatisticsDataLong(Long.MIN_VALUE, Long.MAX_VALUE, 4L, 1L));
+        // float
+        columnStatisticsData1.put(
+                "f", new CatalogColumnStatisticsDataDouble(-1.123d, 3.4d, 4L, 1L));
+        // double
+        columnStatisticsData1.put(
+                "g", new CatalogColumnStatisticsDataDouble(-1.123d, 3.4d, 4L, 1L));
+        // DECIMAL(5, 2)
+        columnStatisticsData1.put("h", new CatalogColumnStatisticsDataDouble(5.1d, 8.12d, 4L, 1L));
+        // DECIMAL(30, 10)
+        columnStatisticsData1.put(
+                "x",
+                new CatalogColumnStatisticsDataDouble(
+                        1234567891012345.1d, 812345678910123451.0123456789d, 4L, 1L));
+        // varchar
+        columnStatisticsData1.put("i", new CatalogColumnStatisticsDataString(4L, 2.5d, 4L, 1L));
+        // char
+        columnStatisticsData1.put("j", new CatalogColumnStatisticsDataString(4L, 2.5d, 4L, 1L));
+        // date
+        columnStatisticsData1.put(
+                "k", new CatalogColumnStatisticsDataDate(new Date(-365), new Date(18383), 4L, 1L));
+        // time
+        columnStatisticsData1.put(
+                "l", new CatalogColumnStatisticsDataLong(123000000L, 84203000000000L, 4L, 1L));
+        // timestamp
+        columnStatisticsData1.put(
+                "m", new CatalogColumnStatisticsDataLong(-31536000L, 1588375403L, 4L, 1L));
+        // timestamp with local timezone
+        columnStatisticsData1.put(
+                "n", new CatalogColumnStatisticsDataLong(-31535999877L, 1588375403000L, 4L, 1L));
+
+        assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableColumnStatistics(path1))
+                .isEqualTo(new CatalogColumnStatistics(columnStatisticsData1));
+
+        tEnv.executeSql("analyze table NonPartitionTable2 compute statistics for all columns");
+        ObjectPath path2 = new ObjectPath(tEnv.getCurrentDatabase(), "NonPartitionTable2");
+        Map<String, CatalogColumnStatisticsDataBase> columnStatisticsData2 = new HashMap<>();
+        columnStatisticsData2.put("a", new CatalogColumnStatisticsDataLong(1L, 2L, 2L, 0L));
+        columnStatisticsData2.put("b", new CatalogColumnStatisticsDataLong(1L, 3L, 3L, 0L));
+        columnStatisticsData2.put("c", new CatalogColumnStatisticsDataLong(0L, 2L, 3L, 0L));
+        assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableColumnStatistics(path2))
+                .isEqualTo(new CatalogColumnStatistics(columnStatisticsData2));
+    }
+
+    @Test
+    public void testPartitionTableWithoutPartition() {
+        assertThatThrownBy(() -> tEnv.executeSql("analyze table PartitionTable compute statistics"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Invalid ANALYZE TABLE statement. For partition table, all partition keys should be specified explicitly. "
+                                + "The given partition keys: [] are not match the target partition keys: [e,a]");
+    }
+
+    @Test
+    public void testPartitionTableWithPartitionKeyNotExisted() {
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                        "analyze table PartitionTable PARTITION(d) compute statistics"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Invalid ANALYZE TABLE statement. For partition table, all partition keys should be specified explicitly. "
+                                + "The given partition keys: [d] are not match the target partition keys: [e,a]");
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                        "analyze table PartitionTable PARTITION(e=1) compute statistics"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Invalid ANALYZE TABLE statement. For partition table, all partition keys should be specified explicitly. "
+                                + "The given partition keys: [e] are not match the target partition keys: [e,a]");
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                        "analyze table PartitionTable PARTITION(e=1,d) compute statistics"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Invalid ANALYZE TABLE statement. For partition table, all partition keys should be specified explicitly. "
+                                + "The given partition keys: [e,d] are not match the target partition keys: [e,a]");
+    }
+
+    @Test
+    public void testPartitionTableWithPartitionValueNotExisted() throws Exception {
+        tEnv.executeSql("analyze table PartitionTable partition(e=10,a) compute statistics");
+        ObjectPath path = new ObjectPath(tEnv.getCurrentDatabase(), "PartitionTable");
+        assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableStatistics(path))
+                .isEqualTo(new CatalogTableStatistics(-1L, -1, -1L, -1L));
+
+        assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableColumnStatistics(path))
+                .isEqualTo(new CatalogColumnStatistics(new HashMap<>()));
+    }
+
+    @Test
+    public void testPartitionTableWithColumnsNotExisted() {
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                        "analyze table PartitionTable partition(e, a) compute statistics for columns not_existed_column"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Column: not_existed_column does not exist in the table: `cat`.`db`.`PartitionTable`");
+    }
+
+    @Test
+    public void testPartitionTableWithVirtualMetadataColumn() {
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                        "analyze table PartitionTable2 PARTITION(a) compute statistics for columns d"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Column: d is a metadata column, ANALYZE TABLE does not support metadata column");
+    }
+
+    @Test
+    public void testPartitionTableWithMetadataColumn() {
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                        "analyze table PartitionTable2 PARTITION(a) compute statistics for columns e"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Column: e is a metadata column, ANALYZE TABLE does not support metadata column");
+    }
+
+    @Test
+    public void testPartitionTableWithComputeColumn() {
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                        "analyze table PartitionTable2 PARTITION(a) compute statistics for columns f"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Column: f is a computed column, ANALYZE TABLE does not support computed column");
+    }
+
+    @Test
+    public void testPartitionTableWithPartition() {
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                        "analyze table NonPartitionTable PARTITION(a) compute statistics"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Invalid ANALYZE TABLE statement. Table: `cat`.`db`.`NonPartitionTable` is not a partition table, while partition values are given");
+    }
+
+    @Test
+    public void testPartitionTableWithoutColumns() throws Exception {
+        // Strict order is not required
+        tEnv.executeSql("analyze table PartitionTable partition(a, e) compute statistics");
+        ObjectPath path = new ObjectPath(tEnv.getCurrentDatabase(), "PartitionTable");
+        assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableStatistics(path))
+                .isEqualTo(new CatalogTableStatistics(-1L, -1, -1L, -1L));
+        assertPartitionStatistics(path, "e=1,a=1", 1L);
+        assertPartitionStatistics(path, "e=1,a=2", 1L);
+        assertPartitionStatistics(path, "e=1,a=4", 2L);
+        assertPartitionStatistics(path, "e=1,a=5", 1L);
+        assertPartitionStatistics(path, "e=2,a=2", 1L);
+        assertPartitionStatistics(path, "e=2,a=3", 2L);
+        assertPartitionStatistics(path, "e=2,a=4", 2L);
+        assertPartitionStatistics(path, "e=2,a=5", 2L);
+        assertPartitionStatistics(path, "e=3,a=3", 1L);
+        assertPartitionStatistics(path, "e=3,a=5", 2L);
+
+        tEnv.executeSql(
+                "analyze table PartitionTable2 partition(a) compute statistics for all columns");
+        ObjectPath path2 = new ObjectPath(tEnv.getCurrentDatabase(), "PartitionTable2");
+        Map<String, CatalogColumnStatisticsDataBase> columnStatisticsData2 = new HashMap<>();
+        columnStatisticsData2.put("a", new CatalogColumnStatisticsDataLong(1L, 1L, 1L, 0L));
+        columnStatisticsData2.put("b", new CatalogColumnStatisticsDataLong(1L, 1L, 1L, 0L));
+        columnStatisticsData2.put("c", new CatalogColumnStatisticsDataLong(0L, 0L, 1L, 0L));
+        assertPartitionStatistics(
+                path2, "a=1", 1L, new CatalogColumnStatistics(columnStatisticsData2));
+
+        Map<String, CatalogColumnStatisticsDataBase> columnStatisticsData3 = new HashMap<>();
+        columnStatisticsData3.put("a", new CatalogColumnStatisticsDataLong(2L, 2L, 1L, 0L));
+        columnStatisticsData3.put("b", new CatalogColumnStatisticsDataLong(2L, 3L, 2L, 0L));
+        columnStatisticsData3.put("c", new CatalogColumnStatisticsDataLong(1L, 2L, 2L, 0L));
+        assertPartitionStatistics(
+                path2, "a=2", 2L, new CatalogColumnStatistics(columnStatisticsData3));
+    }
+
+    @Test
+    public void testPartitionTableWithFullPartitionPath() throws Exception {
+        tEnv.executeSql(
+                "analyze table PartitionTable partition(e=2, a=5) compute statistics for all columns");
+        ObjectPath path = new ObjectPath(tEnv.getCurrentDatabase(), "PartitionTable");
+        assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableStatistics(path))
+                .isEqualTo(new CatalogTableStatistics(-1L, -1, -1L, -1L));
+        assertPartitionStatistics(path, "e=1,a=1", -1L);
+        assertPartitionStatistics(path, "e=1,a=2", -1L);
+        assertPartitionStatistics(path, "e=1,a=4", -1L);
+        assertPartitionStatistics(path, "e=1,a=5", -1L);
+        assertPartitionStatistics(path, "e=2,a=2", -1L);
+        assertPartitionStatistics(path, "e=2,a=3", -1L);
+        assertPartitionStatistics(path, "e=2,a=4", -1L);
+        assertPartitionStatistics(path, "e=3,a=3", -1L);
+        assertPartitionStatistics(path, "e=3,a=5", -1L);
+
+        Map<String, CatalogColumnStatisticsDataBase> columnStatisticsData = new HashMap<>();
+        columnStatisticsData.put("a", new CatalogColumnStatisticsDataLong(5L, 5L, 1L, 0L));
+        columnStatisticsData.put("b", new CatalogColumnStatisticsDataLong(14L, 15L, 2L, 0L));
+        columnStatisticsData.put("c", new CatalogColumnStatisticsDataLong(13L, 14L, 2L, 0L));
+        columnStatisticsData.put("d", new CatalogColumnStatisticsDataString(3L, 3.0, 2L, 0L));
+        columnStatisticsData.put("e", new CatalogColumnStatisticsDataLong(2L, 2L, 1L, 0L));
+        assertPartitionStatistics(
+                path, "e=2,a=5", 2L, new CatalogColumnStatistics(columnStatisticsData));
+    }
+
+    @Test
+    public void testPartitionTableWithPartialPartitionPath() throws Exception {
+        tEnv.executeSql("analyze table PartitionTable partition(e=2, a) compute statistics");
+        ObjectPath path = new ObjectPath(tEnv.getCurrentDatabase(), "PartitionTable");
+        assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableStatistics(path))
+                .isEqualTo(new CatalogTableStatistics(-1L, -1, -1L, -1L));
+        assertPartitionStatistics(path, "e=1,a=1", -1L);
+        assertPartitionStatistics(path, "e=1,a=2", -1L);
+        assertPartitionStatistics(path, "e=1,a=4", -1L);
+        assertPartitionStatistics(path, "e=1,a=5", -1L);
+        assertPartitionStatistics(path, "e=2,a=2", 1L);
+        assertPartitionStatistics(path, "e=2,a=3", 2L);
+        assertPartitionStatistics(path, "e=2,a=4", 2L);
+        assertPartitionStatistics(path, "e=2,a=5", 2L);
+        assertPartitionStatistics(path, "e=3,a=3", -1L);
+        assertPartitionStatistics(path, "e=3,a=5", -1L);
+    }
+
+    private void assertPartitionStatistics(ObjectPath path, String partitionSpec, long rowCount)
+            throws Exception {
+        CatalogPartitionSpec spec = createCatalogPartitionSpec(partitionSpec);
+        assertThat(
+                        tEnv.getCatalog(tEnv.getCurrentCatalog())
+                                .get()
+                                .getPartitionStatistics(path, spec))
+                .isEqualTo(new CatalogTableStatistics(rowCount, -1, -1L, -1L));
+        assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableColumnStatistics(path))
+                .isEqualTo(new CatalogColumnStatistics(new HashMap<>()));
+        assertThat(
+                        tEnv.getCatalog(tEnv.getCurrentCatalog())
+                                .get()
+                                .getPartitionColumnStatistics(path, spec))
+                .isEqualTo(new CatalogColumnStatistics(new HashMap<>()));
+    }
+
+    private void assertPartitionStatistics(
+            ObjectPath path,
+            String partitionSpec,
+            long rowCount,
+            CatalogColumnStatistics columnStats)
+            throws Exception {
+        CatalogPartitionSpec spec = createCatalogPartitionSpec(partitionSpec);
+        assertThat(
+                        tEnv.getCatalog(tEnv.getCurrentCatalog())
+                                .get()
+                                .getPartitionStatistics(path, spec))
+                .isEqualTo(new CatalogTableStatistics(rowCount, -1, -1L, -1L));
+        assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableColumnStatistics(path))
+                .isEqualTo(new CatalogColumnStatistics(new HashMap<>()));
+        assertThat(
+                        tEnv.getCatalog(tEnv.getCurrentCatalog())
+                                .get()
+                                .getPartitionColumnStatistics(path, spec))
+                .isEqualTo(columnStats);
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/AnalyzeTableITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/AnalyzeTableITCase.java
new file mode 100644
index 00000000000..647acd0ac0c
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/AnalyzeTableITCase.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.sql;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+import org.apache.flink.table.planner.runtime.utils.TestData;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for `ANALYZE TABLE`. */
+public class AnalyzeTableITCase extends StreamingTestBase {
+
+    private TableEnvironment tEnv;
+
+    @BeforeEach
+    @Override
+    public void before() throws Exception {
+        super.before();
+        tEnv = tEnv();
+        String dataId1 = TestValuesTableFactory.registerData(TestData.smallData3());
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE MyTable (\n"
+                                + "  `a` INT,\n"
+                                + "  `b` BIGINT,\n"
+                                + "  `c` VARCHAR\n"
+                                + ") WITH (\n"
+                                + "  'connector' = 'values',\n"
+                                + "  'data-id' = '%s',\n"
+                                + "  'bounded' = 'true'\n"
+                                + ")",
+                        dataId1));
+    }
+
+    @Test
+    public void testAnalyzeTable() {
+        assertThatThrownBy(() -> tEnv.executeSql("analyze table MyTable compute statistics"))
+                .isInstanceOf(TableException.class)
+                .hasMessageContaining("ANALYZE TABLE is not supported for streaming mode now");
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
index d96d9e2623d..e5877c913a3 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.table.planner.runtime.batch.sql
 
-import org.apache.flink.table.api.config.TableConfigOptions
+import org.apache.flink.table.catalog.ObjectPath
 import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder
 import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData}