You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/26 08:05:17 UTC

[GitHub] [flink] godfreyhe opened a new pull request, #20363: [FLINK-28492][table-planner] Support "ANALYZE TABLE" execution

godfreyhe opened a new pull request, #20363:
URL: https://github.com/apache/flink/pull/20363

   
   ## What is the purpose of the change
   
   *Support "ANALYZE TABLE" execution, see https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386481 for more details*
   
   
   ## Brief change log
   
     - *Add equals/hashCode/toString method for catalog statistics classes*
     - *Convert SqlAnalyzeTable to AnalyzeTableOperation, and then convert to SELECT statement*
     - *submit the select query, collect the execution result, and update statistics into catalog*
   
   
   ## Verifying this change
   
   
   
   This change added tests and can be verified as follows:
   
     - *Added AnalyzeTableITCase to verify the execution result for non-partition table and partition table*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on a diff in pull request #20363: [FLINK-28492][table-planner] Support "ANALYZE TABLE" execution

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on code in PR #20363:
URL: https://github.com/apache/flink/pull/20363#discussion_r935155784


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java:
##########
@@ -1247,6 +1268,178 @@ private Operation convertCompileAndExecutePlan(SqlCompileAndExecutePlan compileA
                         compileAndExecutePlan.getOperandList().get(0)));
     }
 
+    private Operation convertAnalyzeTable(SqlAnalyzeTable analyzeTable) {
+        UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(analyzeTable.fullTableName());
+        ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
+        Optional<ContextResolvedTable> optionalCatalogTable =
+                catalogManager.getTable(tableIdentifier);
+        if (!optionalCatalogTable.isPresent()) {
+            throw new ValidationException(
+                    String.format("Table %s doesn't exist.", tableIdentifier));
+        }
+
+        CatalogBaseTable baseTable = optionalCatalogTable.get().getTable();
+        if (baseTable instanceof CatalogView) {
+            throw new ValidationException("ANALYZE TABLE for a view is not allowed");
+        }
+        CatalogTable table = (CatalogTable) baseTable;
+        ResolvedSchema schema =
+                baseTable.getUnresolvedSchema().resolve(catalogManager.getSchemaResolver());
+
+        LinkedHashMap<String, String> partitions = analyzeTable.getPartitions();
+        List<CatalogPartitionSpec> targetPartitionSpecs = null;
+        if (table.isPartitioned()) {
+            if (!new ArrayList<>(partitions.keySet()).equals(table.getPartitionKeys())) {
+                throw new ValidationException(
+                        String.format(
+                                "For partition table, all partition keys should be specified explicitly. "
+                                        + "The given partition keys: [%s] are not match the target partition keys: [%s]",
+                                String.join(",", partitions.keySet()),
+                                String.join(",", table.getPartitionKeys())));
+            }
+
+            try {
+                targetPartitionSpecs = getPartitionSpecs(tableIdentifier, schema, partitions);
+            } catch (Exception e) {
+                throw new ValidationException(e.getMessage(), e);
+            }
+        } else if (!partitions.isEmpty()) {
+            throw new ValidationException(
+                    String.format(
+                            "Table: %s is not a partition table, while partition values is given",
+                            tableIdentifier));
+        }
+
+        List<String> origColumns =
+                ((RowType) schema.toPhysicalRowDataType().getLogicalType()).getFieldNames();
+        String[] columns = analyzeTable.getColumnNames();
+        List<String> targetColumns;
+        if (analyzeTable.isAllColumns()) {

Review Comment:
   compute column and metadata column will be excluded, I will throw more clear exception when the given columns contain compute column and metadata column.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on pull request #20363: [FLINK-28492][table-planner] Support "ANALYZE TABLE" execution

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on PR #20363:
URL: https://github.com/apache/flink/pull/20363#issuecomment-1203595125

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on a diff in pull request #20363: [FLINK-28492][table-planner] Support "ANALYZE TABLE" execution

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on code in PR #20363:
URL: https://github.com/apache/flink/pull/20363#discussion_r935503783


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/AnalyzeTableITCase.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.sql;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+import org.apache.flink.table.planner.runtime.utils.TestData;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for `ANALYZE TABLE`. */
+public class AnalyzeTableITCase extends StreamingTestBase {
+
+    TableEnvironment tEnv;
+
+    @BeforeEach
+    @Override
+    public void before() throws Exception {
+        super.before();
+        tEnv = tEnv();
+        String dataId1 = TestValuesTableFactory.registerData(TestData.smallData3());
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE MyTable (\n"
+                                + "  `a` INT,\n"
+                                + "  `b` BIGINT,\n"
+                                + "  `c` VARCHAR\n"
+                                + ") WITH (\n"
+                                + "  'connector' = 'values',\n"
+                                + "  'data-id' = '%s',\n"
+                                + "  'bounded' = 'true'\n"
+                                + ")",
+                        dataId1));
+    }
+
+    @Test
+    public void testAnalyzeTable() throws Exception {
+        assertThatThrownBy(() -> tEnv.executeSql("analyze table MyTable compute statistics"))
+                .isInstanceOf(TableException.class)
+                .hasMessageContaining("AnalyzeTable is not supported for streaming mode now");

Review Comment:
   -> 'ANALYZE TABLE'



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on a diff in pull request #20363: [FLINK-28492][table-planner] Support "ANALYZE TABLE" execution

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on code in PR #20363:
URL: https://github.com/apache/flink/pull/20363#discussion_r935101215


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java:
##########
@@ -1247,6 +1268,178 @@ private Operation convertCompileAndExecutePlan(SqlCompileAndExecutePlan compileA
                         compileAndExecutePlan.getOperandList().get(0)));
     }
 
+    private Operation convertAnalyzeTable(SqlAnalyzeTable analyzeTable) {
+        UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(analyzeTable.fullTableName());
+        ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
+        Optional<ContextResolvedTable> optionalCatalogTable =
+                catalogManager.getTable(tableIdentifier);
+        if (!optionalCatalogTable.isPresent()) {
+            throw new ValidationException(
+                    String.format("Table %s doesn't exist.", tableIdentifier));
+        }
+
+        CatalogBaseTable baseTable = optionalCatalogTable.get().getTable();
+        if (baseTable instanceof CatalogView) {
+            throw new ValidationException("ANALYZE TABLE for a view is not allowed");
+        }
+        CatalogTable table = (CatalogTable) baseTable;
+        ResolvedSchema schema =
+                baseTable.getUnresolvedSchema().resolve(catalogManager.getSchemaResolver());
+
+        LinkedHashMap<String, String> partitions = analyzeTable.getPartitions();
+        List<CatalogPartitionSpec> targetPartitionSpecs = null;
+        if (table.isPartitioned()) {
+            if (!new ArrayList<>(partitions.keySet()).equals(table.getPartitionKeys())) {

Review Comment:
   Strict order is not required, I will use HashSet to compare



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on a diff in pull request #20363: [FLINK-28492][table-planner] Support "ANALYZE TABLE" execution

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on code in PR #20363:
URL: https://github.com/apache/flink/pull/20363#discussion_r935221469


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1894,4 +1926,319 @@ public TableImpl createTable(QueryOperation tableOperation) {
     public String explainPlan(InternalPlan compiledPlan, ExplainDetail... extraDetails) {
         return planner.explainPlan(compiledPlan, extraDetails);
     }
+
+    private TableResultInternal analyzeTable(AnalyzeTableOperation operation)
+            throws TableNotPartitionedException, TableNotExistException, PartitionNotExistException,
+                    TablePartitionedException {
+        CatalogTable table =
+                catalogManager.getTable(operation.getTableIdentifier()).get().getTable();
+        ResolvedSchema schema =
+                table.getUnresolvedSchema().resolve(catalogManager.getSchemaResolver());
+        List<Column> columns =
+                operation.getColumns().stream()
+                        .map(c -> schema.getColumn(c).get())
+                        .collect(Collectors.toList());
+        Catalog catalog =
+                catalogManager.getCatalog(operation.getTableIdentifier().getCatalogName()).get();
+        ObjectPath objectPath = operation.getTableIdentifier().toObjectPath();
+
+        if (table.isPartitioned()) {
+            List<CatalogPartitionSpec> targetPartitions =
+                    operation.getPartitionSpecs().orElse(catalog.listPartitions(objectPath));

Review Comment:
   throw exception in `orElse`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on a diff in pull request #20363: [FLINK-28492][table-planner] Support "ANALYZE TABLE" execution

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on code in PR #20363:
URL: https://github.com/apache/flink/pull/20363#discussion_r935097024


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1894,4 +1926,319 @@ public TableImpl createTable(QueryOperation tableOperation) {
     public String explainPlan(InternalPlan compiledPlan, ExplainDetail... extraDetails) {
         return planner.explainPlan(compiledPlan, extraDetails);
     }
+
+    private TableResultInternal analyzeTable(AnalyzeTableOperation operation)
+            throws TableNotPartitionedException, TableNotExistException, PartitionNotExistException,
+                    TablePartitionedException {
+        CatalogTable table =
+                catalogManager.getTable(operation.getTableIdentifier()).get().getTable();
+        ResolvedSchema schema =
+                table.getUnresolvedSchema().resolve(catalogManager.getSchemaResolver());
+        List<Column> columns =
+                operation.getColumns().stream()
+                        .map(c -> schema.getColumn(c).get())
+                        .collect(Collectors.toList());
+        Catalog catalog =
+                catalogManager.getCatalog(operation.getTableIdentifier().getCatalogName()).get();
+        ObjectPath objectPath = operation.getTableIdentifier().toObjectPath();
+
+        if (table.isPartitioned()) {
+            List<CatalogPartitionSpec> targetPartitions =
+                    operation.getPartitionSpecs().orElse(catalog.listPartitions(objectPath));
+            for (CatalogPartitionSpec partitionSpec : targetPartitions) {
+                String statSql =
+                        generateAnalyzeSql(operation.getTableIdentifier(), partitionSpec, columns);
+                TableResult tableResult = executeSql(statSql);
+                List<Row> result = CollectionUtil.iteratorToList(tableResult.collect());
+                Preconditions.checkArgument(result.size() == 1);
+                Row row = result.get(0);
+                CatalogTableStatistics tableStat = convertToTableStatistics(row);
+                catalog.alterPartitionStatistics(objectPath, partitionSpec, tableStat, false);
+                if (!columns.isEmpty()) {
+                    CatalogColumnStatistics columnStat = convertToColumnStatistics(row, columns);
+                    catalog.alterPartitionColumnStatistics(
+                            objectPath, partitionSpec, columnStat, false);
+                }
+            }
+        } else {
+            String statSql = generateAnalyzeSql(operation.getTableIdentifier(), null, columns);
+            TableResult tableResult = executeSql(statSql);
+            List<Row> result = CollectionUtil.iteratorToList(tableResult.collect());
+            Preconditions.checkArgument(result.size() == 1);
+            Row row = result.get(0);
+            CatalogTableStatistics tableStat = convertToTableStatistics(row);
+            catalog.alterTableStatistics(objectPath, tableStat, false);
+            if (!columns.isEmpty()) {
+                CatalogColumnStatistics columnStat = convertToColumnStatistics(row, columns);
+                catalog.alterTableColumnStatistics(objectPath, columnStat, false);
+            }
+        }
+        return TableResultImpl.TABLE_RESULT_OK;
+    }
+
+    private String generateAnalyzeSql(
+            ObjectIdentifier tableIdentifier,
+            @Nullable CatalogPartitionSpec partitionSpec,
+            List<Column> columns) {
+        Optional<ContextResolvedTable> optionalCatalogTable =
+                catalogManager.getTable(tableIdentifier);
+        Preconditions.checkArgument(
+                optionalCatalogTable.isPresent(), tableIdentifier + " does not exist");
+
+        String partitionFilter;
+        if (partitionSpec != null) {
+            partitionFilter =
+                    " WHERE "
+                            + partitionSpec.getPartitionSpec().entrySet().stream()
+                                    .map(e -> e.getKey() + "=" + e.getValue())
+                                    .collect(Collectors.joining(" AND "));
+        } else {
+            partitionFilter = "";
+        }
+
+        final String columnStatsSelects;
+        if (columns.isEmpty()) {
+            columnStatsSelects = "";
+        } else {
+            columnStatsSelects = ", " + getColumnStatsSelects(columns);
+        }
+
+        return String.format(
+                "SELECT COUNT(1) AS %s %s FROM %s %s",
+                getRowCountColumn(), columnStatsSelects, tableIdentifier, partitionFilter);
+    }
+
+    private String getColumnStatsSelects(List<Column> columns) {
+        return columns.stream()
+                .flatMap(
+                        f -> {
+                            String c = f.getName();
+                            List<String> columnStatSelect = new ArrayList<>();
+                            String computeNullCount =
+                                    String.format(
+                                            "(COUNT(1) - COUNT(`%s`)) AS %s",
+                                            c, getNullCountColumn(c));
+                            columnStatSelect.add(computeNullCount);
+
+                            String computeNdv =
+                                    String.format(
+                                            "APPROX_COUNT_DISTINCT(`%s`) AS %s",
+                                            c, getNdvColumn(c));
+
+                            switch (f.getDataType().getLogicalType().getTypeRoot()) {
+                                case BOOLEAN:
+                                    columnStatSelect.add(
+                                            String.format(
+                                                    "COUNT(`%s`) FILTER (WHERE `%s` IS TRUE) AS %s",
+                                                    c, c, getTrueCountColumn(c)));
+                                    columnStatSelect.add(
+                                            String.format(
+                                                    "COUNT(`%s`) FILTER (WHERE `%s` IS FALSE) AS %s",
+                                                    c, c, getFalseCountColumn(c)));
+                                    break;
+                                case TINYINT:
+                                case SMALLINT:
+                                case INTEGER:
+                                case FLOAT:
+                                case DATE:
+                                case TIME_WITHOUT_TIME_ZONE:
+                                case BIGINT:
+                                case DOUBLE:
+                                case DECIMAL:
+                                case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                                case TIMESTAMP_WITHOUT_TIME_ZONE:
+                                    columnStatSelect.add(computeNdv);
+                                    columnStatSelect.add(
+                                            String.format("MAX(`%s`) AS %s", c, getMaxColumn(c)));
+                                    columnStatSelect.add(
+                                            String.format("MIN(`%s`) AS %s", c, getMinColumn(c)));
+                                    break;
+                                case CHAR:
+                                case VARCHAR:
+                                    columnStatSelect.add(computeNdv);
+                                    columnStatSelect.add(
+                                            String.format(
+                                                    "AVG(CAST(CHAR_LENGTH(`%s`) AS DOUBLE)) AS %s",
+                                                    c, getAvgLenColumn(c)));
+                                    columnStatSelect.add(
+                                            String.format(
+                                                    "MAX(CAST(CHAR_LENGTH(`%s`) AS BIGINT)) AS %s",
+                                                    c, getMaxLenColumn(c)));
+                                    break;
+                                default:
+                                    break;
+                            }
+                            return columnStatSelect.stream();
+                        })
+                .collect(Collectors.joining(", "));
+    }
+
+    private String getRowCountColumn() {
+        return "rowCount";
+    }
+
+    private String getNullCountColumn(String column) {
+        return String.format("%s_nullCount", column);
+    }
+
+    private String getNdvColumn(String column) {
+        return String.format("%s_ndv", column);
+    }
+
+    private String getTrueCountColumn(String column) {
+        return String.format("%s_trueCount", column);
+    }
+
+    private String getFalseCountColumn(String column) {
+        return String.format("%s_falseCount", column);
+    }
+
+    private String getMaxColumn(String column) {
+        return String.format("%s_max", column);
+    }
+
+    private String getMinColumn(String column) {
+        return String.format("%s_min", column);
+    }
+
+    private String getAvgLenColumn(String column) {
+        return String.format("%s_avgLen", column);
+    }
+
+    private String getMaxLenColumn(String column) {
+        return String.format("%s_maxLen", column);
+    }
+
+    private CatalogTableStatistics convertToTableStatistics(Row row) {
+        Long rowCount = row.getFieldAs(getRowCountColumn());
+        return new CatalogTableStatistics(rowCount, -1, -1, -1);
+    }
+
+    private CatalogColumnStatistics convertToColumnStatistics(Row row, List<Column> columns) {
+        Preconditions.checkArgument(!columns.isEmpty());
+        Map<String, CatalogColumnStatisticsDataBase> columnStatMap = new HashMap<>();
+        for (Column column : columns) {
+            CatalogColumnStatisticsDataBase columnStat = convertToColumnStatisticsData(row, column);
+            if (columnStat != null) {
+                columnStatMap.put(column.getName(), columnStat);
+            }
+        }
+        return new CatalogColumnStatistics(columnStatMap);
+    }
+
+    private CatalogColumnStatisticsDataBase convertToColumnStatisticsData(Row row, Column column) {
+        String c = column.getName();
+        Long nullCount = row.getFieldAs(getNullCountColumn(c));
+        switch (column.getDataType().getLogicalType().getTypeRoot()) {
+            case BOOLEAN:
+                Long trueCount = row.getFieldAs(getTrueCountColumn(c));
+                Long falseCount = row.getFieldAs(getFalseCountColumn(c));
+                return new CatalogColumnStatisticsDataBoolean(trueCount, falseCount, nullCount);
+            case TINYINT:
+                Byte maxByte = row.getFieldAs(getMaxColumn(c));
+                Byte minByte = row.getFieldAs(getMinColumn(c));
+                Long ndvByte = row.getFieldAs(getNdvColumn(c));
+                return new CatalogColumnStatisticsDataLong(
+                        minByte != null ? minByte.longValue() : null,
+                        maxByte != null ? maxByte.longValue() : null,
+                        ndvByte,
+                        nullCount);
+            case SMALLINT:
+                Short maxShort = row.getFieldAs(getMaxColumn(c));
+                Short minShort = row.getFieldAs(getMinColumn(c));
+                Long ndvShort = row.getFieldAs(getNdvColumn(c));
+                return new CatalogColumnStatisticsDataLong(
+                        minShort != null ? minShort.longValue() : null,
+                        maxShort != null ? maxShort.longValue() : null,
+                        ndvShort,
+                        nullCount);
+            case INTEGER:
+                Integer maxInt = row.getFieldAs(getMaxColumn(c));
+                Integer minInt = row.getFieldAs(getMinColumn(c));
+                Long ndvInt = row.getFieldAs(getNdvColumn(c));
+                return new CatalogColumnStatisticsDataLong(
+                        minInt != null ? minInt.longValue() : null,
+                        maxInt != null ? maxInt.longValue() : null,
+                        ndvInt,
+                        nullCount);
+            case BIGINT:
+                Long ndvLong = row.getFieldAs(getNdvColumn(c));
+                Long maxLong = row.getFieldAs(getMaxColumn(c));
+                Long minLong = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataLong(minLong, maxLong, ndvLong, nullCount);
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                Long ndvTs = row.getFieldAs(getNdvColumn(c));
+                LocalDateTime maxTs = row.getFieldAs(getMaxColumn(c));
+                LocalDateTime minTs = row.getFieldAs(getMinColumn(c));
+
+                return new CatalogColumnStatisticsDataLong(
+                        minTs != null ? minTs.toEpochSecond(ZoneOffset.UTC) : null,
+                        maxTs != null ? maxTs.toEpochSecond(ZoneOffset.UTC) : null,
+                        ndvTs,
+                        nullCount);
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                Long ndvTsLtz = row.getFieldAs(getNdvColumn(c));
+                Instant maxTsLtz = row.getFieldAs(getMaxColumn(c));
+                Instant minTsLtz = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataLong(
+                        minTsLtz != null ? minTsLtz.toEpochMilli() : null,
+                        maxTsLtz != null ? maxTsLtz.toEpochMilli() : null,
+                        ndvTsLtz,
+                        nullCount);
+            case FLOAT:
+                Long ndvFloat = row.getFieldAs(getNdvColumn(c));
+                Float maxFloat = row.getFieldAs(getMaxColumn(c));
+                Float minFloat = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataDouble(
+                        minFloat != null ? minFloat.doubleValue() : null,
+                        maxFloat != null ? maxFloat.doubleValue() : null,
+                        ndvFloat,
+                        nullCount);
+            case DOUBLE:
+                Long ndvDouble = row.getFieldAs(getNdvColumn(c));
+                Double maxDouble = row.getFieldAs(getMaxColumn(c));
+                Double minDouble = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataDouble(
+                        minDouble, maxDouble, ndvDouble, nullCount);
+            case DECIMAL:
+                Long ndvDecimal = row.getFieldAs(getNdvColumn(c));
+                BigDecimal maxDecimal = row.getFieldAs(getMaxColumn(c));
+                BigDecimal minDecimal = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataDouble(
+                        minDecimal != null ? minDecimal.doubleValue() : null,
+                        maxDecimal != null ? maxDecimal.doubleValue() : null,
+                        ndvDecimal,
+                        nullCount);
+            case DATE:
+                Long ndvDate = row.getFieldAs(getNdvColumn(c));
+                LocalDate maxDate = row.getFieldAs(getMaxColumn(c));
+                LocalDate minDate = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataDate(
+                        minDate != null ? new Date(minDate.toEpochDay()) : null,
+                        maxDate != null ? new Date(maxDate.toEpochDay()) : null,
+                        ndvDate,
+                        nullCount);
+            case TIME_WITHOUT_TIME_ZONE:
+                Long ndvTime = row.getFieldAs(getNdvColumn(c));
+                LocalTime maxTime = row.getFieldAs(getMaxColumn(c));
+                LocalTime minTime = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataLong(
+                        minTime != null ? minTime.toNanoOfDay() : null,
+                        maxTime != null ? maxTime.toNanoOfDay() : null,
+                        ndvTime,
+                        nullCount);
+            case CHAR:
+            case VARCHAR:
+                Long ndvString = row.getFieldAs(getNdvColumn(c));
+                Double avgLen = row.getFieldAs(getAvgLenColumn(c));
+                Long maxLen = row.getFieldAs(getMaxLenColumn(c));
+                return new CatalogColumnStatisticsDataString(maxLen, avgLen, ndvString, nullCount);
+            case BINARY:

Review Comment:
   good catch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on a diff in pull request #20363: [FLINK-28492][table-planner] Support "ANALYZE TABLE" execution

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on code in PR #20363:
URL: https://github.com/apache/flink/pull/20363#discussion_r933363174


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AnalyzeTableOperation.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.operations.Operation;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Operation to describe an {@code ANALYZE TABLE} statement. */
+public class AnalyzeTableOperation implements Operation {
+    private final ObjectIdentifier tableIdentifier;
+    private final @Nullable List<CatalogPartitionSpec> partitionSpecs;
+    private final List<String> columns;
+
+    public AnalyzeTableOperation(
+            ObjectIdentifier tableIdentifier,
+            @Nullable List<CatalogPartitionSpec> partitionSpecs,
+            List<String> columns) {
+        this.tableIdentifier = tableIdentifier;
+        this.partitionSpecs = partitionSpecs;
+        this.columns = Objects.requireNonNull(columns, "columns is null");
+    }
+
+    public ObjectIdentifier getTableIdentifier() {
+        return tableIdentifier;
+    }
+
+    /**
+     * Returns Optional.empty() if the table is not a partition table, else returns the given
+     * partition specs.
+     */
+    public Optional<List<CatalogPartitionSpec>> getPartitionSpecs() {
+        return Optional.ofNullable(partitionSpecs);
+    }
+
+    public List<String> getColumns() {
+        return columns;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "ANALYZE TABLE";

Review Comment:
   It would be better also print the `tableIdentifier`.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java:
##########
@@ -1247,6 +1268,178 @@ private Operation convertCompileAndExecutePlan(SqlCompileAndExecutePlan compileA
                         compileAndExecutePlan.getOperandList().get(0)));
     }
 
+    private Operation convertAnalyzeTable(SqlAnalyzeTable analyzeTable) {
+        UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(analyzeTable.fullTableName());
+        ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
+        Optional<ContextResolvedTable> optionalCatalogTable =
+                catalogManager.getTable(tableIdentifier);
+        if (!optionalCatalogTable.isPresent()) {
+            throw new ValidationException(
+                    String.format("Table %s doesn't exist.", tableIdentifier));
+        }
+
+        CatalogBaseTable baseTable = optionalCatalogTable.get().getTable();
+        if (baseTable instanceof CatalogView) {
+            throw new ValidationException("ANALYZE TABLE for a view is not allowed");
+        }
+        CatalogTable table = (CatalogTable) baseTable;
+        ResolvedSchema schema =
+                baseTable.getUnresolvedSchema().resolve(catalogManager.getSchemaResolver());
+
+        LinkedHashMap<String, String> partitions = analyzeTable.getPartitions();
+        List<CatalogPartitionSpec> targetPartitionSpecs = null;
+        if (table.isPartitioned()) {
+            if (!new ArrayList<>(partitions.keySet()).equals(table.getPartitionKeys())) {
+                throw new ValidationException(
+                        String.format(
+                                "For partition table, all partition keys should be specified explicitly. "
+                                        + "The given partition keys: [%s] are not match the target partition keys: [%s]",
+                                String.join(",", partitions.keySet()),
+                                String.join(",", table.getPartitionKeys())));
+            }
+
+            try {
+                targetPartitionSpecs = getPartitionSpecs(tableIdentifier, schema, partitions);
+            } catch (Exception e) {
+                throw new ValidationException(e.getMessage(), e);
+            }
+        } else if (!partitions.isEmpty()) {
+            throw new ValidationException(
+                    String.format(
+                            "Table: %s is not a partition table, while partition values is given",
+                            tableIdentifier));
+        }
+
+        List<String> origColumns =
+                ((RowType) schema.toPhysicalRowDataType().getLogicalType()).getFieldNames();
+        String[] columns = analyzeTable.getColumnNames();
+        List<String> targetColumns;
+        if (analyzeTable.isAllColumns()) {
+            Preconditions.checkArgument(columns.length == 0);
+            targetColumns = origColumns;
+        } else if (columns.length > 0) {
+            targetColumns =
+                    Arrays.stream(columns)
+                            .peek(
+                                    c -> {
+                                        if (!origColumns.contains(c)) {
+                                            throw new ValidationException(
+                                                    String.format(
+                                                            "Column: %s does not exist in the table: %s",
+                                                            c, tableIdentifier));
+                                        }
+                                    })
+                            .collect(Collectors.toList());
+        } else {
+            targetColumns = new ArrayList<>();

Review Comment:
   Collections.emptyList()?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1894,4 +1926,319 @@ public TableImpl createTable(QueryOperation tableOperation) {
     public String explainPlan(InternalPlan compiledPlan, ExplainDetail... extraDetails) {
         return planner.explainPlan(compiledPlan, extraDetails);
     }
+
+    private TableResultInternal analyzeTable(AnalyzeTableOperation operation)
+            throws TableNotPartitionedException, TableNotExistException, PartitionNotExistException,
+                    TablePartitionedException {
+        CatalogTable table =
+                catalogManager.getTable(operation.getTableIdentifier()).get().getTable();
+        ResolvedSchema schema =
+                table.getUnresolvedSchema().resolve(catalogManager.getSchemaResolver());
+        List<Column> columns =
+                operation.getColumns().stream()
+                        .map(c -> schema.getColumn(c).get())
+                        .collect(Collectors.toList());
+        Catalog catalog =
+                catalogManager.getCatalog(operation.getTableIdentifier().getCatalogName()).get();
+        ObjectPath objectPath = operation.getTableIdentifier().toObjectPath();
+
+        if (table.isPartitioned()) {
+            List<CatalogPartitionSpec> targetPartitions =
+                    operation.getPartitionSpecs().orElse(catalog.listPartitions(objectPath));
+            for (CatalogPartitionSpec partitionSpec : targetPartitions) {
+                String statSql =
+                        generateAnalyzeSql(operation.getTableIdentifier(), partitionSpec, columns);
+                TableResult tableResult = executeSql(statSql);
+                List<Row> result = CollectionUtil.iteratorToList(tableResult.collect());
+                Preconditions.checkArgument(result.size() == 1);
+                Row row = result.get(0);
+                CatalogTableStatistics tableStat = convertToTableStatistics(row);
+                catalog.alterPartitionStatistics(objectPath, partitionSpec, tableStat, false);
+                if (!columns.isEmpty()) {
+                    CatalogColumnStatistics columnStat = convertToColumnStatistics(row, columns);
+                    catalog.alterPartitionColumnStatistics(
+                            objectPath, partitionSpec, columnStat, false);
+                }
+            }
+        } else {
+            String statSql = generateAnalyzeSql(operation.getTableIdentifier(), null, columns);
+            TableResult tableResult = executeSql(statSql);
+            List<Row> result = CollectionUtil.iteratorToList(tableResult.collect());
+            Preconditions.checkArgument(result.size() == 1);
+            Row row = result.get(0);
+            CatalogTableStatistics tableStat = convertToTableStatistics(row);
+            catalog.alterTableStatistics(objectPath, tableStat, false);
+            if (!columns.isEmpty()) {
+                CatalogColumnStatistics columnStat = convertToColumnStatistics(row, columns);
+                catalog.alterTableColumnStatistics(objectPath, columnStat, false);
+            }
+        }
+        return TableResultImpl.TABLE_RESULT_OK;
+    }
+
+    private String generateAnalyzeSql(
+            ObjectIdentifier tableIdentifier,
+            @Nullable CatalogPartitionSpec partitionSpec,
+            List<Column> columns) {
+        Optional<ContextResolvedTable> optionalCatalogTable =
+                catalogManager.getTable(tableIdentifier);
+        Preconditions.checkArgument(
+                optionalCatalogTable.isPresent(), tableIdentifier + " does not exist");
+
+        String partitionFilter;
+        if (partitionSpec != null) {
+            partitionFilter =
+                    " WHERE "
+                            + partitionSpec.getPartitionSpec().entrySet().stream()
+                                    .map(e -> e.getKey() + "=" + e.getValue())
+                                    .collect(Collectors.joining(" AND "));
+        } else {
+            partitionFilter = "";
+        }
+
+        final String columnStatsSelects;
+        if (columns.isEmpty()) {
+            columnStatsSelects = "";
+        } else {
+            columnStatsSelects = ", " + getColumnStatsSelects(columns);
+        }
+
+        return String.format(
+                "SELECT COUNT(1) AS %s %s FROM %s %s",
+                getRowCountColumn(), columnStatsSelects, tableIdentifier, partitionFilter);
+    }
+
+    private String getColumnStatsSelects(List<Column> columns) {
+        return columns.stream()
+                .flatMap(
+                        f -> {
+                            String c = f.getName();
+                            List<String> columnStatSelect = new ArrayList<>();
+                            String computeNullCount =
+                                    String.format(
+                                            "(COUNT(1) - COUNT(`%s`)) AS %s",
+                                            c, getNullCountColumn(c));
+                            columnStatSelect.add(computeNullCount);
+
+                            String computeNdv =
+                                    String.format(
+                                            "APPROX_COUNT_DISTINCT(`%s`) AS %s",
+                                            c, getNdvColumn(c));
+
+                            switch (f.getDataType().getLogicalType().getTypeRoot()) {
+                                case BOOLEAN:
+                                    columnStatSelect.add(
+                                            String.format(
+                                                    "COUNT(`%s`) FILTER (WHERE `%s` IS TRUE) AS %s",
+                                                    c, c, getTrueCountColumn(c)));
+                                    columnStatSelect.add(
+                                            String.format(
+                                                    "COUNT(`%s`) FILTER (WHERE `%s` IS FALSE) AS %s",
+                                                    c, c, getFalseCountColumn(c)));
+                                    break;
+                                case TINYINT:
+                                case SMALLINT:
+                                case INTEGER:
+                                case FLOAT:
+                                case DATE:
+                                case TIME_WITHOUT_TIME_ZONE:
+                                case BIGINT:
+                                case DOUBLE:
+                                case DECIMAL:
+                                case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                                case TIMESTAMP_WITHOUT_TIME_ZONE:
+                                    columnStatSelect.add(computeNdv);
+                                    columnStatSelect.add(
+                                            String.format("MAX(`%s`) AS %s", c, getMaxColumn(c)));
+                                    columnStatSelect.add(
+                                            String.format("MIN(`%s`) AS %s", c, getMinColumn(c)));
+                                    break;
+                                case CHAR:
+                                case VARCHAR:
+                                    columnStatSelect.add(computeNdv);
+                                    columnStatSelect.add(
+                                            String.format(
+                                                    "AVG(CAST(CHAR_LENGTH(`%s`) AS DOUBLE)) AS %s",
+                                                    c, getAvgLenColumn(c)));
+                                    columnStatSelect.add(
+                                            String.format(
+                                                    "MAX(CAST(CHAR_LENGTH(`%s`) AS BIGINT)) AS %s",
+                                                    c, getMaxLenColumn(c)));
+                                    break;
+                                default:
+                                    break;
+                            }
+                            return columnStatSelect.stream();
+                        })
+                .collect(Collectors.joining(", "));
+    }
+
+    private String getRowCountColumn() {
+        return "rowCount";
+    }
+
+    private String getNullCountColumn(String column) {

Review Comment:
   ditto.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java:
##########
@@ -1247,6 +1268,178 @@ private Operation convertCompileAndExecutePlan(SqlCompileAndExecutePlan compileA
                         compileAndExecutePlan.getOperandList().get(0)));
     }
 
+    private Operation convertAnalyzeTable(SqlAnalyzeTable analyzeTable) {
+        UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(analyzeTable.fullTableName());
+        ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
+        Optional<ContextResolvedTable> optionalCatalogTable =
+                catalogManager.getTable(tableIdentifier);
+        if (!optionalCatalogTable.isPresent()) {
+            throw new ValidationException(
+                    String.format("Table %s doesn't exist.", tableIdentifier));
+        }
+
+        CatalogBaseTable baseTable = optionalCatalogTable.get().getTable();
+        if (baseTable instanceof CatalogView) {
+            throw new ValidationException("ANALYZE TABLE for a view is not allowed");
+        }
+        CatalogTable table = (CatalogTable) baseTable;
+        ResolvedSchema schema =
+                baseTable.getUnresolvedSchema().resolve(catalogManager.getSchemaResolver());
+
+        LinkedHashMap<String, String> partitions = analyzeTable.getPartitions();
+        List<CatalogPartitionSpec> targetPartitionSpecs = null;
+        if (table.isPartitioned()) {
+            if (!new ArrayList<>(partitions.keySet()).equals(table.getPartitionKeys())) {
+                throw new ValidationException(
+                        String.format(
+                                "For partition table, all partition keys should be specified explicitly. "
+                                        + "The given partition keys: [%s] are not match the target partition keys: [%s]",
+                                String.join(",", partitions.keySet()),
+                                String.join(",", table.getPartitionKeys())));
+            }
+
+            try {
+                targetPartitionSpecs = getPartitionSpecs(tableIdentifier, schema, partitions);
+            } catch (Exception e) {
+                throw new ValidationException(e.getMessage(), e);
+            }
+        } else if (!partitions.isEmpty()) {
+            throw new ValidationException(
+                    String.format(
+                            "Table: %s is not a partition table, while partition values is given",
+                            tableIdentifier));
+        }
+
+        List<String> origColumns =
+                ((RowType) schema.toPhysicalRowDataType().getLogicalType()).getFieldNames();
+        String[] columns = analyzeTable.getColumnNames();
+        List<String> targetColumns;
+        if (analyzeTable.isAllColumns()) {
+            Preconditions.checkArgument(columns.length == 0);
+            targetColumns = origColumns;
+        } else if (columns.length > 0) {

Review Comment:
   What is the behavior when user specify the compute column or metadata column?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java:
##########
@@ -1247,6 +1268,178 @@ private Operation convertCompileAndExecutePlan(SqlCompileAndExecutePlan compileA
                         compileAndExecutePlan.getOperandList().get(0)));
     }
 
+    private Operation convertAnalyzeTable(SqlAnalyzeTable analyzeTable) {
+        UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(analyzeTable.fullTableName());
+        ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
+        Optional<ContextResolvedTable> optionalCatalogTable =
+                catalogManager.getTable(tableIdentifier);
+        if (!optionalCatalogTable.isPresent()) {
+            throw new ValidationException(
+                    String.format("Table %s doesn't exist.", tableIdentifier));
+        }
+
+        CatalogBaseTable baseTable = optionalCatalogTable.get().getTable();
+        if (baseTable instanceof CatalogView) {
+            throw new ValidationException("ANALYZE TABLE for a view is not allowed");
+        }
+        CatalogTable table = (CatalogTable) baseTable;
+        ResolvedSchema schema =
+                baseTable.getUnresolvedSchema().resolve(catalogManager.getSchemaResolver());
+
+        LinkedHashMap<String, String> partitions = analyzeTable.getPartitions();
+        List<CatalogPartitionSpec> targetPartitionSpecs = null;
+        if (table.isPartitioned()) {
+            if (!new ArrayList<>(partitions.keySet()).equals(table.getPartitionKeys())) {
+                throw new ValidationException(
+                        String.format(
+                                "For partition table, all partition keys should be specified explicitly. "
+                                        + "The given partition keys: [%s] are not match the target partition keys: [%s]",
+                                String.join(",", partitions.keySet()),
+                                String.join(",", table.getPartitionKeys())));
+            }
+
+            try {
+                targetPartitionSpecs = getPartitionSpecs(tableIdentifier, schema, partitions);
+            } catch (Exception e) {
+                throw new ValidationException(e.getMessage(), e);
+            }
+        } else if (!partitions.isEmpty()) {
+            throw new ValidationException(
+                    String.format(
+                            "Table: %s is not a partition table, while partition values is given",
+                            tableIdentifier));
+        }
+
+        List<String> origColumns =
+                ((RowType) schema.toPhysicalRowDataType().getLogicalType()).getFieldNames();
+        String[] columns = analyzeTable.getColumnNames();
+        List<String> targetColumns;
+        if (analyzeTable.isAllColumns()) {

Review Comment:
   If the table contains compute column and metadata column? what is the behavior? They should be excluded?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java:
##########
@@ -1247,6 +1268,178 @@ private Operation convertCompileAndExecutePlan(SqlCompileAndExecutePlan compileA
                         compileAndExecutePlan.getOperandList().get(0)));
     }
 
+    private Operation convertAnalyzeTable(SqlAnalyzeTable analyzeTable) {
+        UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(analyzeTable.fullTableName());
+        ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
+        Optional<ContextResolvedTable> optionalCatalogTable =
+                catalogManager.getTable(tableIdentifier);
+        if (!optionalCatalogTable.isPresent()) {
+            throw new ValidationException(
+                    String.format("Table %s doesn't exist.", tableIdentifier));
+        }
+
+        CatalogBaseTable baseTable = optionalCatalogTable.get().getTable();
+        if (baseTable instanceof CatalogView) {
+            throw new ValidationException("ANALYZE TABLE for a view is not allowed");
+        }
+        CatalogTable table = (CatalogTable) baseTable;
+        ResolvedSchema schema =
+                baseTable.getUnresolvedSchema().resolve(catalogManager.getSchemaResolver());
+
+        LinkedHashMap<String, String> partitions = analyzeTable.getPartitions();
+        List<CatalogPartitionSpec> targetPartitionSpecs = null;

Review Comment:
   ```suggestion
           List<CatalogPartitionSpec> targetPartitionSpecs = Collections.emptyList();
   ```



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AnalyzeTableOperation.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.operations.Operation;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Operation to describe an {@code ANALYZE TABLE} statement. */
+public class AnalyzeTableOperation implements Operation {

Review Comment:
   It is a `ModifyOperation`?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java:
##########
@@ -1247,6 +1268,178 @@ private Operation convertCompileAndExecutePlan(SqlCompileAndExecutePlan compileA
                         compileAndExecutePlan.getOperandList().get(0)));
     }
 
+    private Operation convertAnalyzeTable(SqlAnalyzeTable analyzeTable) {
+        UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(analyzeTable.fullTableName());
+        ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
+        Optional<ContextResolvedTable> optionalCatalogTable =
+                catalogManager.getTable(tableIdentifier);
+        if (!optionalCatalogTable.isPresent()) {
+            throw new ValidationException(
+                    String.format("Table %s doesn't exist.", tableIdentifier));
+        }
+
+        CatalogBaseTable baseTable = optionalCatalogTable.get().getTable();

Review Comment:
   Why not use `getResolvedTable()` method directly?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java:
##########
@@ -1247,6 +1268,178 @@ private Operation convertCompileAndExecutePlan(SqlCompileAndExecutePlan compileA
                         compileAndExecutePlan.getOperandList().get(0)));
     }
 
+    private Operation convertAnalyzeTable(SqlAnalyzeTable analyzeTable) {
+        UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(analyzeTable.fullTableName());
+        ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
+        Optional<ContextResolvedTable> optionalCatalogTable =
+                catalogManager.getTable(tableIdentifier);
+        if (!optionalCatalogTable.isPresent()) {
+            throw new ValidationException(
+                    String.format("Table %s doesn't exist.", tableIdentifier));
+        }
+
+        CatalogBaseTable baseTable = optionalCatalogTable.get().getTable();
+        if (baseTable instanceof CatalogView) {
+            throw new ValidationException("ANALYZE TABLE for a view is not allowed");

Review Comment:
   Please also print the view identifier?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AnalyzeTableOperation.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.operations.Operation;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Operation to describe an {@code ANALYZE TABLE} statement. */
+public class AnalyzeTableOperation implements Operation {
+    private final ObjectIdentifier tableIdentifier;
+    private final @Nullable List<CatalogPartitionSpec> partitionSpecs;
+    private final List<String> columns;
+
+    public AnalyzeTableOperation(
+            ObjectIdentifier tableIdentifier,
+            @Nullable List<CatalogPartitionSpec> partitionSpecs,

Review Comment:
   If use doesn't specify the partition, pass the empty list here? I think we can remove the `@Nullable` annotation here. 



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1894,4 +1926,319 @@ public TableImpl createTable(QueryOperation tableOperation) {
     public String explainPlan(InternalPlan compiledPlan, ExplainDetail... extraDetails) {
         return planner.explainPlan(compiledPlan, extraDetails);
     }
+
+    private TableResultInternal analyzeTable(AnalyzeTableOperation operation)
+            throws TableNotPartitionedException, TableNotExistException, PartitionNotExistException,
+                    TablePartitionedException {
+        CatalogTable table =
+                catalogManager.getTable(operation.getTableIdentifier()).get().getTable();
+        ResolvedSchema schema =
+                table.getUnresolvedSchema().resolve(catalogManager.getSchemaResolver());
+        List<Column> columns =
+                operation.getColumns().stream()
+                        .map(c -> schema.getColumn(c).get())
+                        .collect(Collectors.toList());
+        Catalog catalog =

Review Comment:
   The catalog maybe is null? so check it?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1894,4 +1926,319 @@ public TableImpl createTable(QueryOperation tableOperation) {
     public String explainPlan(InternalPlan compiledPlan, ExplainDetail... extraDetails) {
         return planner.explainPlan(compiledPlan, extraDetails);
     }
+
+    private TableResultInternal analyzeTable(AnalyzeTableOperation operation)
+            throws TableNotPartitionedException, TableNotExistException, PartitionNotExistException,
+                    TablePartitionedException {
+        CatalogTable table =

Review Comment:
   Why not return ResolveCatalogTable directly?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1894,4 +1926,319 @@ public TableImpl createTable(QueryOperation tableOperation) {
     public String explainPlan(InternalPlan compiledPlan, ExplainDetail... extraDetails) {
         return planner.explainPlan(compiledPlan, extraDetails);
     }
+
+    private TableResultInternal analyzeTable(AnalyzeTableOperation operation)
+            throws TableNotPartitionedException, TableNotExistException, PartitionNotExistException,
+                    TablePartitionedException {
+        CatalogTable table =
+                catalogManager.getTable(operation.getTableIdentifier()).get().getTable();
+        ResolvedSchema schema =
+                table.getUnresolvedSchema().resolve(catalogManager.getSchemaResolver());
+        List<Column> columns =
+                operation.getColumns().stream()

Review Comment:
   I think the AnalyzeOperation can wrap the needed List<Column> directly?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1894,4 +1926,319 @@ public TableImpl createTable(QueryOperation tableOperation) {
     public String explainPlan(InternalPlan compiledPlan, ExplainDetail... extraDetails) {
         return planner.explainPlan(compiledPlan, extraDetails);
     }
+
+    private TableResultInternal analyzeTable(AnalyzeTableOperation operation)
+            throws TableNotPartitionedException, TableNotExistException, PartitionNotExistException,
+                    TablePartitionedException {
+        CatalogTable table =
+                catalogManager.getTable(operation.getTableIdentifier()).get().getTable();
+        ResolvedSchema schema =
+                table.getUnresolvedSchema().resolve(catalogManager.getSchemaResolver());
+        List<Column> columns =
+                operation.getColumns().stream()
+                        .map(c -> schema.getColumn(c).get())
+                        .collect(Collectors.toList());
+        Catalog catalog =
+                catalogManager.getCatalog(operation.getTableIdentifier().getCatalogName()).get();
+        ObjectPath objectPath = operation.getTableIdentifier().toObjectPath();
+
+        if (table.isPartitioned()) {
+            List<CatalogPartitionSpec> targetPartitions =
+                    operation.getPartitionSpecs().orElse(catalog.listPartitions(objectPath));

Review Comment:
   `orElse()` is redundant? We have get all partition specs in SqlToOperationConverter?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1894,4 +1926,319 @@ public TableImpl createTable(QueryOperation tableOperation) {
     public String explainPlan(InternalPlan compiledPlan, ExplainDetail... extraDetails) {
         return planner.explainPlan(compiledPlan, extraDetails);
     }
+
+    private TableResultInternal analyzeTable(AnalyzeTableOperation operation)
+            throws TableNotPartitionedException, TableNotExistException, PartitionNotExistException,
+                    TablePartitionedException {
+        CatalogTable table =
+                catalogManager.getTable(operation.getTableIdentifier()).get().getTable();
+        ResolvedSchema schema =
+                table.getUnresolvedSchema().resolve(catalogManager.getSchemaResolver());
+        List<Column> columns =
+                operation.getColumns().stream()
+                        .map(c -> schema.getColumn(c).get())
+                        .collect(Collectors.toList());
+        Catalog catalog =
+                catalogManager.getCatalog(operation.getTableIdentifier().getCatalogName()).get();
+        ObjectPath objectPath = operation.getTableIdentifier().toObjectPath();
+
+        if (table.isPartitioned()) {
+            List<CatalogPartitionSpec> targetPartitions =
+                    operation.getPartitionSpecs().orElse(catalog.listPartitions(objectPath));
+            for (CatalogPartitionSpec partitionSpec : targetPartitions) {
+                String statSql =

Review Comment:
   Regarding to partition and non-partition table, the following code I think we can extract a util method, try to reuse code?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1894,4 +1926,319 @@ public TableImpl createTable(QueryOperation tableOperation) {
     public String explainPlan(InternalPlan compiledPlan, ExplainDetail... extraDetails) {
         return planner.explainPlan(compiledPlan, extraDetails);
     }
+
+    private TableResultInternal analyzeTable(AnalyzeTableOperation operation)
+            throws TableNotPartitionedException, TableNotExistException, PartitionNotExistException,
+                    TablePartitionedException {
+        CatalogTable table =
+                catalogManager.getTable(operation.getTableIdentifier()).get().getTable();
+        ResolvedSchema schema =
+                table.getUnresolvedSchema().resolve(catalogManager.getSchemaResolver());
+        List<Column> columns =
+                operation.getColumns().stream()
+                        .map(c -> schema.getColumn(c).get())
+                        .collect(Collectors.toList());
+        Catalog catalog =
+                catalogManager.getCatalog(operation.getTableIdentifier().getCatalogName()).get();
+        ObjectPath objectPath = operation.getTableIdentifier().toObjectPath();
+
+        if (table.isPartitioned()) {
+            List<CatalogPartitionSpec> targetPartitions =
+                    operation.getPartitionSpecs().orElse(catalog.listPartitions(objectPath));
+            for (CatalogPartitionSpec partitionSpec : targetPartitions) {
+                String statSql =
+                        generateAnalyzeSql(operation.getTableIdentifier(), partitionSpec, columns);
+                TableResult tableResult = executeSql(statSql);
+                List<Row> result = CollectionUtil.iteratorToList(tableResult.collect());
+                Preconditions.checkArgument(result.size() == 1);
+                Row row = result.get(0);
+                CatalogTableStatistics tableStat = convertToTableStatistics(row);
+                catalog.alterPartitionStatistics(objectPath, partitionSpec, tableStat, false);
+                if (!columns.isEmpty()) {
+                    CatalogColumnStatistics columnStat = convertToColumnStatistics(row, columns);
+                    catalog.alterPartitionColumnStatistics(
+                            objectPath, partitionSpec, columnStat, false);
+                }
+            }
+        } else {
+            String statSql = generateAnalyzeSql(operation.getTableIdentifier(), null, columns);
+            TableResult tableResult = executeSql(statSql);
+            List<Row> result = CollectionUtil.iteratorToList(tableResult.collect());
+            Preconditions.checkArgument(result.size() == 1);
+            Row row = result.get(0);
+            CatalogTableStatistics tableStat = convertToTableStatistics(row);
+            catalog.alterTableStatistics(objectPath, tableStat, false);
+            if (!columns.isEmpty()) {
+                CatalogColumnStatistics columnStat = convertToColumnStatistics(row, columns);
+                catalog.alterTableColumnStatistics(objectPath, columnStat, false);
+            }
+        }
+        return TableResultImpl.TABLE_RESULT_OK;
+    }
+
+    private String generateAnalyzeSql(
+            ObjectIdentifier tableIdentifier,
+            @Nullable CatalogPartitionSpec partitionSpec,
+            List<Column> columns) {
+        Optional<ContextResolvedTable> optionalCatalogTable =
+                catalogManager.getTable(tableIdentifier);
+        Preconditions.checkArgument(

Review Comment:
   This check is redundant?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java:
##########
@@ -1247,6 +1268,178 @@ private Operation convertCompileAndExecutePlan(SqlCompileAndExecutePlan compileA
                         compileAndExecutePlan.getOperandList().get(0)));
     }
 
+    private Operation convertAnalyzeTable(SqlAnalyzeTable analyzeTable) {
+        UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(analyzeTable.fullTableName());
+        ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
+        Optional<ContextResolvedTable> optionalCatalogTable =
+                catalogManager.getTable(tableIdentifier);
+        if (!optionalCatalogTable.isPresent()) {
+            throw new ValidationException(
+                    String.format("Table %s doesn't exist.", tableIdentifier));
+        }
+
+        CatalogBaseTable baseTable = optionalCatalogTable.get().getTable();
+        if (baseTable instanceof CatalogView) {
+            throw new ValidationException("ANALYZE TABLE for a view is not allowed");
+        }
+        CatalogTable table = (CatalogTable) baseTable;
+        ResolvedSchema schema =
+                baseTable.getUnresolvedSchema().resolve(catalogManager.getSchemaResolver());
+
+        LinkedHashMap<String, String> partitions = analyzeTable.getPartitions();
+        List<CatalogPartitionSpec> targetPartitionSpecs = null;
+        if (table.isPartitioned()) {
+            if (!new ArrayList<>(partitions.keySet()).equals(table.getPartitionKeys())) {

Review Comment:
   Why here need to copy a new List?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1894,4 +1926,319 @@ public TableImpl createTable(QueryOperation tableOperation) {
     public String explainPlan(InternalPlan compiledPlan, ExplainDetail... extraDetails) {
         return planner.explainPlan(compiledPlan, extraDetails);
     }
+
+    private TableResultInternal analyzeTable(AnalyzeTableOperation operation)
+            throws TableNotPartitionedException, TableNotExistException, PartitionNotExistException,
+                    TablePartitionedException {
+        CatalogTable table =
+                catalogManager.getTable(operation.getTableIdentifier()).get().getTable();
+        ResolvedSchema schema =
+                table.getUnresolvedSchema().resolve(catalogManager.getSchemaResolver());
+        List<Column> columns =
+                operation.getColumns().stream()
+                        .map(c -> schema.getColumn(c).get())
+                        .collect(Collectors.toList());
+        Catalog catalog =
+                catalogManager.getCatalog(operation.getTableIdentifier().getCatalogName()).get();
+        ObjectPath objectPath = operation.getTableIdentifier().toObjectPath();
+
+        if (table.isPartitioned()) {
+            List<CatalogPartitionSpec> targetPartitions =
+                    operation.getPartitionSpecs().orElse(catalog.listPartitions(objectPath));
+            for (CatalogPartitionSpec partitionSpec : targetPartitions) {
+                String statSql =
+                        generateAnalyzeSql(operation.getTableIdentifier(), partitionSpec, columns);
+                TableResult tableResult = executeSql(statSql);
+                List<Row> result = CollectionUtil.iteratorToList(tableResult.collect());
+                Preconditions.checkArgument(result.size() == 1);
+                Row row = result.get(0);
+                CatalogTableStatistics tableStat = convertToTableStatistics(row);
+                catalog.alterPartitionStatistics(objectPath, partitionSpec, tableStat, false);
+                if (!columns.isEmpty()) {
+                    CatalogColumnStatistics columnStat = convertToColumnStatistics(row, columns);
+                    catalog.alterPartitionColumnStatistics(
+                            objectPath, partitionSpec, columnStat, false);
+                }
+            }
+        } else {
+            String statSql = generateAnalyzeSql(operation.getTableIdentifier(), null, columns);
+            TableResult tableResult = executeSql(statSql);
+            List<Row> result = CollectionUtil.iteratorToList(tableResult.collect());
+            Preconditions.checkArgument(result.size() == 1);
+            Row row = result.get(0);
+            CatalogTableStatistics tableStat = convertToTableStatistics(row);
+            catalog.alterTableStatistics(objectPath, tableStat, false);
+            if (!columns.isEmpty()) {
+                CatalogColumnStatistics columnStat = convertToColumnStatistics(row, columns);
+                catalog.alterTableColumnStatistics(objectPath, columnStat, false);
+            }
+        }
+        return TableResultImpl.TABLE_RESULT_OK;
+    }
+
+    private String generateAnalyzeSql(
+            ObjectIdentifier tableIdentifier,
+            @Nullable CatalogPartitionSpec partitionSpec,
+            List<Column> columns) {
+        Optional<ContextResolvedTable> optionalCatalogTable =
+                catalogManager.getTable(tableIdentifier);
+        Preconditions.checkArgument(
+                optionalCatalogTable.isPresent(), tableIdentifier + " does not exist");
+
+        String partitionFilter;
+        if (partitionSpec != null) {
+            partitionFilter =
+                    " WHERE "
+                            + partitionSpec.getPartitionSpec().entrySet().stream()
+                                    .map(e -> e.getKey() + "=" + e.getValue())
+                                    .collect(Collectors.joining(" AND "));
+        } else {
+            partitionFilter = "";
+        }
+
+        final String columnStatsSelects;
+        if (columns.isEmpty()) {
+            columnStatsSelects = "";
+        } else {
+            columnStatsSelects = ", " + getColumnStatsSelects(columns);
+        }
+
+        return String.format(
+                "SELECT COUNT(1) AS %s %s FROM %s %s",
+                getRowCountColumn(), columnStatsSelects, tableIdentifier, partitionFilter);
+    }
+
+    private String getColumnStatsSelects(List<Column> columns) {
+        return columns.stream()
+                .flatMap(
+                        f -> {
+                            String c = f.getName();
+                            List<String> columnStatSelect = new ArrayList<>();
+                            String computeNullCount =
+                                    String.format(
+                                            "(COUNT(1) - COUNT(`%s`)) AS %s",
+                                            c, getNullCountColumn(c));
+                            columnStatSelect.add(computeNullCount);
+
+                            String computeNdv =
+                                    String.format(
+                                            "APPROX_COUNT_DISTINCT(`%s`) AS %s",
+                                            c, getNdvColumn(c));
+
+                            switch (f.getDataType().getLogicalType().getTypeRoot()) {
+                                case BOOLEAN:
+                                    columnStatSelect.add(
+                                            String.format(
+                                                    "COUNT(`%s`) FILTER (WHERE `%s` IS TRUE) AS %s",
+                                                    c, c, getTrueCountColumn(c)));
+                                    columnStatSelect.add(
+                                            String.format(
+                                                    "COUNT(`%s`) FILTER (WHERE `%s` IS FALSE) AS %s",
+                                                    c, c, getFalseCountColumn(c)));
+                                    break;
+                                case TINYINT:
+                                case SMALLINT:
+                                case INTEGER:
+                                case FLOAT:
+                                case DATE:
+                                case TIME_WITHOUT_TIME_ZONE:
+                                case BIGINT:
+                                case DOUBLE:
+                                case DECIMAL:
+                                case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                                case TIMESTAMP_WITHOUT_TIME_ZONE:
+                                    columnStatSelect.add(computeNdv);
+                                    columnStatSelect.add(
+                                            String.format("MAX(`%s`) AS %s", c, getMaxColumn(c)));
+                                    columnStatSelect.add(
+                                            String.format("MIN(`%s`) AS %s", c, getMinColumn(c)));
+                                    break;
+                                case CHAR:
+                                case VARCHAR:
+                                    columnStatSelect.add(computeNdv);
+                                    columnStatSelect.add(
+                                            String.format(
+                                                    "AVG(CAST(CHAR_LENGTH(`%s`) AS DOUBLE)) AS %s",
+                                                    c, getAvgLenColumn(c)));
+                                    columnStatSelect.add(
+                                            String.format(
+                                                    "MAX(CAST(CHAR_LENGTH(`%s`) AS BIGINT)) AS %s",
+                                                    c, getMaxLenColumn(c)));
+                                    break;
+                                default:
+                                    break;
+                            }
+                            return columnStatSelect.stream();
+                        })
+                .collect(Collectors.joining(", "));
+    }
+
+    private String getRowCountColumn() {
+        return "rowCount";
+    }
+
+    private String getNullCountColumn(String column) {
+        return String.format("%s_nullCount", column);
+    }
+
+    private String getNdvColumn(String column) {
+        return String.format("%s_ndv", column);
+    }
+
+    private String getTrueCountColumn(String column) {
+        return String.format("%s_trueCount", column);
+    }
+
+    private String getFalseCountColumn(String column) {
+        return String.format("%s_falseCount", column);
+    }
+
+    private String getMaxColumn(String column) {
+        return String.format("%s_max", column);
+    }
+
+    private String getMinColumn(String column) {
+        return String.format("%s_min", column);
+    }
+
+    private String getAvgLenColumn(String column) {
+        return String.format("%s_avgLen", column);
+    }
+
+    private String getMaxLenColumn(String column) {
+        return String.format("%s_maxLen", column);
+    }
+
+    private CatalogTableStatistics convertToTableStatistics(Row row) {
+        Long rowCount = row.getFieldAs(getRowCountColumn());
+        return new CatalogTableStatistics(rowCount, -1, -1, -1);
+    }
+
+    private CatalogColumnStatistics convertToColumnStatistics(Row row, List<Column> columns) {
+        Preconditions.checkArgument(!columns.isEmpty());
+        Map<String, CatalogColumnStatisticsDataBase> columnStatMap = new HashMap<>();
+        for (Column column : columns) {
+            CatalogColumnStatisticsDataBase columnStat = convertToColumnStatisticsData(row, column);
+            if (columnStat != null) {
+                columnStatMap.put(column.getName(), columnStat);
+            }
+        }
+        return new CatalogColumnStatistics(columnStatMap);
+    }
+
+    private CatalogColumnStatisticsDataBase convertToColumnStatisticsData(Row row, Column column) {
+        String c = column.getName();
+        Long nullCount = row.getFieldAs(getNullCountColumn(c));
+        switch (column.getDataType().getLogicalType().getTypeRoot()) {
+            case BOOLEAN:
+                Long trueCount = row.getFieldAs(getTrueCountColumn(c));
+                Long falseCount = row.getFieldAs(getFalseCountColumn(c));
+                return new CatalogColumnStatisticsDataBoolean(trueCount, falseCount, nullCount);
+            case TINYINT:
+                Byte maxByte = row.getFieldAs(getMaxColumn(c));
+                Byte minByte = row.getFieldAs(getMinColumn(c));
+                Long ndvByte = row.getFieldAs(getNdvColumn(c));
+                return new CatalogColumnStatisticsDataLong(
+                        minByte != null ? minByte.longValue() : null,
+                        maxByte != null ? maxByte.longValue() : null,
+                        ndvByte,
+                        nullCount);
+            case SMALLINT:
+                Short maxShort = row.getFieldAs(getMaxColumn(c));
+                Short minShort = row.getFieldAs(getMinColumn(c));
+                Long ndvShort = row.getFieldAs(getNdvColumn(c));
+                return new CatalogColumnStatisticsDataLong(
+                        minShort != null ? minShort.longValue() : null,
+                        maxShort != null ? maxShort.longValue() : null,
+                        ndvShort,
+                        nullCount);
+            case INTEGER:
+                Integer maxInt = row.getFieldAs(getMaxColumn(c));
+                Integer minInt = row.getFieldAs(getMinColumn(c));
+                Long ndvInt = row.getFieldAs(getNdvColumn(c));
+                return new CatalogColumnStatisticsDataLong(
+                        minInt != null ? minInt.longValue() : null,
+                        maxInt != null ? maxInt.longValue() : null,
+                        ndvInt,
+                        nullCount);
+            case BIGINT:
+                Long ndvLong = row.getFieldAs(getNdvColumn(c));
+                Long maxLong = row.getFieldAs(getMaxColumn(c));
+                Long minLong = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataLong(minLong, maxLong, ndvLong, nullCount);
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                Long ndvTs = row.getFieldAs(getNdvColumn(c));
+                LocalDateTime maxTs = row.getFieldAs(getMaxColumn(c));
+                LocalDateTime minTs = row.getFieldAs(getMinColumn(c));
+
+                return new CatalogColumnStatisticsDataLong(
+                        minTs != null ? minTs.toEpochSecond(ZoneOffset.UTC) : null,
+                        maxTs != null ? maxTs.toEpochSecond(ZoneOffset.UTC) : null,
+                        ndvTs,
+                        nullCount);
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                Long ndvTsLtz = row.getFieldAs(getNdvColumn(c));
+                Instant maxTsLtz = row.getFieldAs(getMaxColumn(c));
+                Instant minTsLtz = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataLong(
+                        minTsLtz != null ? minTsLtz.toEpochMilli() : null,
+                        maxTsLtz != null ? maxTsLtz.toEpochMilli() : null,
+                        ndvTsLtz,
+                        nullCount);
+            case FLOAT:
+                Long ndvFloat = row.getFieldAs(getNdvColumn(c));
+                Float maxFloat = row.getFieldAs(getMaxColumn(c));
+                Float minFloat = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataDouble(
+                        minFloat != null ? minFloat.doubleValue() : null,
+                        maxFloat != null ? maxFloat.doubleValue() : null,
+                        ndvFloat,
+                        nullCount);
+            case DOUBLE:
+                Long ndvDouble = row.getFieldAs(getNdvColumn(c));
+                Double maxDouble = row.getFieldAs(getMaxColumn(c));
+                Double minDouble = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataDouble(
+                        minDouble, maxDouble, ndvDouble, nullCount);
+            case DECIMAL:
+                Long ndvDecimal = row.getFieldAs(getNdvColumn(c));
+                BigDecimal maxDecimal = row.getFieldAs(getMaxColumn(c));
+                BigDecimal minDecimal = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataDouble(
+                        minDecimal != null ? minDecimal.doubleValue() : null,
+                        maxDecimal != null ? maxDecimal.doubleValue() : null,
+                        ndvDecimal,
+                        nullCount);
+            case DATE:
+                Long ndvDate = row.getFieldAs(getNdvColumn(c));
+                LocalDate maxDate = row.getFieldAs(getMaxColumn(c));
+                LocalDate minDate = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataDate(
+                        minDate != null ? new Date(minDate.toEpochDay()) : null,
+                        maxDate != null ? new Date(maxDate.toEpochDay()) : null,
+                        ndvDate,
+                        nullCount);
+            case TIME_WITHOUT_TIME_ZONE:
+                Long ndvTime = row.getFieldAs(getNdvColumn(c));
+                LocalTime maxTime = row.getFieldAs(getMaxColumn(c));
+                LocalTime minTime = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataLong(
+                        minTime != null ? minTime.toNanoOfDay() : null,
+                        maxTime != null ? maxTime.toNanoOfDay() : null,
+                        ndvTime,
+                        nullCount);
+            case CHAR:
+            case VARCHAR:
+                Long ndvString = row.getFieldAs(getNdvColumn(c));
+                Double avgLen = row.getFieldAs(getAvgLenColumn(c));
+                Long maxLen = row.getFieldAs(getMaxLenColumn(c));
+                return new CatalogColumnStatisticsDataString(maxLen, avgLen, ndvString, nullCount);
+            case BINARY:

Review Comment:
   > This is not consistent with types in `getColumnStatsSelects`
   
   +10086



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1894,4 +1926,319 @@ public TableImpl createTable(QueryOperation tableOperation) {
     public String explainPlan(InternalPlan compiledPlan, ExplainDetail... extraDetails) {
         return planner.explainPlan(compiledPlan, extraDetails);
     }
+
+    private TableResultInternal analyzeTable(AnalyzeTableOperation operation)
+            throws TableNotPartitionedException, TableNotExistException, PartitionNotExistException,
+                    TablePartitionedException {
+        CatalogTable table =
+                catalogManager.getTable(operation.getTableIdentifier()).get().getTable();
+        ResolvedSchema schema =
+                table.getUnresolvedSchema().resolve(catalogManager.getSchemaResolver());
+        List<Column> columns =
+                operation.getColumns().stream()
+                        .map(c -> schema.getColumn(c).get())
+                        .collect(Collectors.toList());
+        Catalog catalog =
+                catalogManager.getCatalog(operation.getTableIdentifier().getCatalogName()).get();
+        ObjectPath objectPath = operation.getTableIdentifier().toObjectPath();
+
+        if (table.isPartitioned()) {
+            List<CatalogPartitionSpec> targetPartitions =
+                    operation.getPartitionSpecs().orElse(catalog.listPartitions(objectPath));
+            for (CatalogPartitionSpec partitionSpec : targetPartitions) {
+                String statSql =
+                        generateAnalyzeSql(operation.getTableIdentifier(), partitionSpec, columns);
+                TableResult tableResult = executeSql(statSql);
+                List<Row> result = CollectionUtil.iteratorToList(tableResult.collect());
+                Preconditions.checkArgument(result.size() == 1);
+                Row row = result.get(0);
+                CatalogTableStatistics tableStat = convertToTableStatistics(row);
+                catalog.alterPartitionStatistics(objectPath, partitionSpec, tableStat, false);
+                if (!columns.isEmpty()) {
+                    CatalogColumnStatistics columnStat = convertToColumnStatistics(row, columns);
+                    catalog.alterPartitionColumnStatistics(
+                            objectPath, partitionSpec, columnStat, false);
+                }
+            }
+        } else {
+            String statSql = generateAnalyzeSql(operation.getTableIdentifier(), null, columns);
+            TableResult tableResult = executeSql(statSql);
+            List<Row> result = CollectionUtil.iteratorToList(tableResult.collect());
+            Preconditions.checkArgument(result.size() == 1);
+            Row row = result.get(0);
+            CatalogTableStatistics tableStat = convertToTableStatistics(row);
+            catalog.alterTableStatistics(objectPath, tableStat, false);
+            if (!columns.isEmpty()) {
+                CatalogColumnStatistics columnStat = convertToColumnStatistics(row, columns);
+                catalog.alterTableColumnStatistics(objectPath, columnStat, false);
+            }
+        }
+        return TableResultImpl.TABLE_RESULT_OK;
+    }
+
+    private String generateAnalyzeSql(
+            ObjectIdentifier tableIdentifier,
+            @Nullable CatalogPartitionSpec partitionSpec,
+            List<Column> columns) {
+        Optional<ContextResolvedTable> optionalCatalogTable =
+                catalogManager.getTable(tableIdentifier);
+        Preconditions.checkArgument(
+                optionalCatalogTable.isPresent(), tableIdentifier + " does not exist");
+
+        String partitionFilter;
+        if (partitionSpec != null) {
+            partitionFilter =
+                    " WHERE "
+                            + partitionSpec.getPartitionSpec().entrySet().stream()
+                                    .map(e -> e.getKey() + "=" + e.getValue())
+                                    .collect(Collectors.joining(" AND "));
+        } else {
+            partitionFilter = "";
+        }
+
+        final String columnStatsSelects;
+        if (columns.isEmpty()) {
+            columnStatsSelects = "";
+        } else {
+            columnStatsSelects = ", " + getColumnStatsSelects(columns);
+        }
+
+        return String.format(
+                "SELECT COUNT(1) AS %s %s FROM %s %s",
+                getRowCountColumn(), columnStatsSelects, tableIdentifier, partitionFilter);
+    }
+
+    private String getColumnStatsSelects(List<Column> columns) {
+        return columns.stream()
+                .flatMap(
+                        f -> {
+                            String c = f.getName();
+                            List<String> columnStatSelect = new ArrayList<>();
+                            String computeNullCount =
+                                    String.format(
+                                            "(COUNT(1) - COUNT(`%s`)) AS %s",
+                                            c, getNullCountColumn(c));
+                            columnStatSelect.add(computeNullCount);
+
+                            String computeNdv =
+                                    String.format(
+                                            "APPROX_COUNT_DISTINCT(`%s`) AS %s",
+                                            c, getNdvColumn(c));
+
+                            switch (f.getDataType().getLogicalType().getTypeRoot()) {
+                                case BOOLEAN:
+                                    columnStatSelect.add(
+                                            String.format(
+                                                    "COUNT(`%s`) FILTER (WHERE `%s` IS TRUE) AS %s",
+                                                    c, c, getTrueCountColumn(c)));
+                                    columnStatSelect.add(
+                                            String.format(
+                                                    "COUNT(`%s`) FILTER (WHERE `%s` IS FALSE) AS %s",
+                                                    c, c, getFalseCountColumn(c)));
+                                    break;
+                                case TINYINT:
+                                case SMALLINT:
+                                case INTEGER:
+                                case FLOAT:
+                                case DATE:
+                                case TIME_WITHOUT_TIME_ZONE:
+                                case BIGINT:
+                                case DOUBLE:
+                                case DECIMAL:
+                                case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                                case TIMESTAMP_WITHOUT_TIME_ZONE:
+                                    columnStatSelect.add(computeNdv);
+                                    columnStatSelect.add(
+                                            String.format("MAX(`%s`) AS %s", c, getMaxColumn(c)));
+                                    columnStatSelect.add(
+                                            String.format("MIN(`%s`) AS %s", c, getMinColumn(c)));
+                                    break;
+                                case CHAR:
+                                case VARCHAR:
+                                    columnStatSelect.add(computeNdv);
+                                    columnStatSelect.add(
+                                            String.format(
+                                                    "AVG(CAST(CHAR_LENGTH(`%s`) AS DOUBLE)) AS %s",
+                                                    c, getAvgLenColumn(c)));
+                                    columnStatSelect.add(
+                                            String.format(
+                                                    "MAX(CAST(CHAR_LENGTH(`%s`) AS BIGINT)) AS %s",
+                                                    c, getMaxLenColumn(c)));
+                                    break;
+                                default:
+                                    break;
+                            }
+                            return columnStatSelect.stream();
+                        })
+                .collect(Collectors.joining(", "));
+    }
+
+    private String getRowCountColumn() {

Review Comment:
   getRowCountColumnName is more intuitive?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on a diff in pull request #20363: [FLINK-28492][table-planner] Support "ANALYZE TABLE" execution

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on code in PR #20363:
URL: https://github.com/apache/flink/pull/20363#discussion_r935132247


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AnalyzeTableOperation.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.operations.Operation;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Operation to describe an {@code ANALYZE TABLE} statement. */
+public class AnalyzeTableOperation implements Operation {
+    private final ObjectIdentifier tableIdentifier;
+    private final @Nullable List<CatalogPartitionSpec> partitionSpecs;
+    private final List<String> columns;
+
+    public AnalyzeTableOperation(
+            ObjectIdentifier tableIdentifier,
+            @Nullable List<CatalogPartitionSpec> partitionSpecs,

Review Comment:
   For non partition table, partitionSpecs should be null, instead of empty.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on pull request #20363: [FLINK-28492][table-planner] Support "ANALYZE TABLE" execution

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on PR #20363:
URL: https://github.com/apache/flink/pull/20363#issuecomment-1202144823

   @lincoln-lil @lsyldliu Thanks for the detailed review, I have updated the pr


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20363: [FLINK-28492][table-planner] Support "ANALYZE TABLE" execution

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20363:
URL: https://github.com/apache/flink/pull/20363#issuecomment-1195162996

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "f1729b318978bf07d0fee899f730b5b35e51b586",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f1729b318978bf07d0fee899f730b5b35e51b586",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f1729b318978bf07d0fee899f730b5b35e51b586 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on a diff in pull request #20363: [FLINK-28492][table-planner] Support "ANALYZE TABLE" execution

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on code in PR #20363:
URL: https://github.com/apache/flink/pull/20363#discussion_r934082373


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1356,6 +1379,15 @@ public TableResultInternal executeInternal(Operation operation) {
             return (TableResultInternal) compiledPlan.execute();
         } else if (operation instanceof NopOperation) {
             return TableResultImpl.TABLE_RESULT_OK;
+        } else if (operation instanceof AnalyzeTableOperation) {
+            if (isStreamingMode) {
+                throw new TableException("AnalyzeTable is not supported for streaming mode now");

Review Comment:
   'AnalyzeTable' ->  'ANALYZE TABLE'



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1356,6 +1379,15 @@ public TableResultInternal executeInternal(Operation operation) {
             return (TableResultInternal) compiledPlan.execute();
         } else if (operation instanceof NopOperation) {
             return TableResultImpl.TABLE_RESULT_OK;
+        } else if (operation instanceof AnalyzeTableOperation) {

Review Comment:
   nit: maybe better to move this new meaningful operation before `NopOperation`



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1894,4 +1926,319 @@ public TableImpl createTable(QueryOperation tableOperation) {
     public String explainPlan(InternalPlan compiledPlan, ExplainDetail... extraDetails) {
         return planner.explainPlan(compiledPlan, extraDetails);
     }
+
+    private TableResultInternal analyzeTable(AnalyzeTableOperation operation)
+            throws TableNotPartitionedException, TableNotExistException, PartitionNotExistException,
+                    TablePartitionedException {
+        CatalogTable table =
+                catalogManager.getTable(operation.getTableIdentifier()).get().getTable();
+        ResolvedSchema schema =
+                table.getUnresolvedSchema().resolve(catalogManager.getSchemaResolver());
+        List<Column> columns =
+                operation.getColumns().stream()
+                        .map(c -> schema.getColumn(c).get())
+                        .collect(Collectors.toList());
+        Catalog catalog =
+                catalogManager.getCatalog(operation.getTableIdentifier().getCatalogName()).get();
+        ObjectPath objectPath = operation.getTableIdentifier().toObjectPath();
+
+        if (table.isPartitioned()) {
+            List<CatalogPartitionSpec> targetPartitions =
+                    operation.getPartitionSpecs().orElse(catalog.listPartitions(objectPath));
+            for (CatalogPartitionSpec partitionSpec : targetPartitions) {
+                String statSql =
+                        generateAnalyzeSql(operation.getTableIdentifier(), partitionSpec, columns);
+                TableResult tableResult = executeSql(statSql);
+                List<Row> result = CollectionUtil.iteratorToList(tableResult.collect());
+                Preconditions.checkArgument(result.size() == 1);
+                Row row = result.get(0);
+                CatalogTableStatistics tableStat = convertToTableStatistics(row);
+                catalog.alterPartitionStatistics(objectPath, partitionSpec, tableStat, false);
+                if (!columns.isEmpty()) {
+                    CatalogColumnStatistics columnStat = convertToColumnStatistics(row, columns);
+                    catalog.alterPartitionColumnStatistics(
+                            objectPath, partitionSpec, columnStat, false);
+                }
+            }
+        } else {
+            String statSql = generateAnalyzeSql(operation.getTableIdentifier(), null, columns);
+            TableResult tableResult = executeSql(statSql);
+            List<Row> result = CollectionUtil.iteratorToList(tableResult.collect());
+            Preconditions.checkArgument(result.size() == 1);
+            Row row = result.get(0);
+            CatalogTableStatistics tableStat = convertToTableStatistics(row);
+            catalog.alterTableStatistics(objectPath, tableStat, false);
+            if (!columns.isEmpty()) {
+                CatalogColumnStatistics columnStat = convertToColumnStatistics(row, columns);
+                catalog.alterTableColumnStatistics(objectPath, columnStat, false);
+            }
+        }
+        return TableResultImpl.TABLE_RESULT_OK;
+    }
+
+    private String generateAnalyzeSql(
+            ObjectIdentifier tableIdentifier,
+            @Nullable CatalogPartitionSpec partitionSpec,
+            List<Column> columns) {
+        Optional<ContextResolvedTable> optionalCatalogTable =
+                catalogManager.getTable(tableIdentifier);
+        Preconditions.checkArgument(
+                optionalCatalogTable.isPresent(), tableIdentifier + " does not exist");
+
+        String partitionFilter;
+        if (partitionSpec != null) {
+            partitionFilter =
+                    " WHERE "
+                            + partitionSpec.getPartitionSpec().entrySet().stream()
+                                    .map(e -> e.getKey() + "=" + e.getValue())
+                                    .collect(Collectors.joining(" AND "));
+        } else {
+            partitionFilter = "";
+        }
+
+        final String columnStatsSelects;
+        if (columns.isEmpty()) {
+            columnStatsSelects = "";
+        } else {
+            columnStatsSelects = ", " + getColumnStatsSelects(columns);
+        }
+
+        return String.format(
+                "SELECT COUNT(1) AS %s %s FROM %s %s",
+                getRowCountColumn(), columnStatsSelects, tableIdentifier, partitionFilter);
+    }
+
+    private String getColumnStatsSelects(List<Column> columns) {
+        return columns.stream()
+                .flatMap(
+                        f -> {
+                            String c = f.getName();
+                            List<String> columnStatSelect = new ArrayList<>();
+                            String computeNullCount =
+                                    String.format(
+                                            "(COUNT(1) - COUNT(`%s`)) AS %s",
+                                            c, getNullCountColumn(c));
+                            columnStatSelect.add(computeNullCount);
+
+                            String computeNdv =
+                                    String.format(
+                                            "APPROX_COUNT_DISTINCT(`%s`) AS %s",
+                                            c, getNdvColumn(c));
+
+                            switch (f.getDataType().getLogicalType().getTypeRoot()) {
+                                case BOOLEAN:
+                                    columnStatSelect.add(
+                                            String.format(
+                                                    "COUNT(`%s`) FILTER (WHERE `%s` IS TRUE) AS %s",
+                                                    c, c, getTrueCountColumn(c)));
+                                    columnStatSelect.add(
+                                            String.format(
+                                                    "COUNT(`%s`) FILTER (WHERE `%s` IS FALSE) AS %s",
+                                                    c, c, getFalseCountColumn(c)));
+                                    break;
+                                case TINYINT:
+                                case SMALLINT:
+                                case INTEGER:
+                                case FLOAT:
+                                case DATE:
+                                case TIME_WITHOUT_TIME_ZONE:
+                                case BIGINT:
+                                case DOUBLE:
+                                case DECIMAL:
+                                case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                                case TIMESTAMP_WITHOUT_TIME_ZONE:
+                                    columnStatSelect.add(computeNdv);
+                                    columnStatSelect.add(
+                                            String.format("MAX(`%s`) AS %s", c, getMaxColumn(c)));
+                                    columnStatSelect.add(
+                                            String.format("MIN(`%s`) AS %s", c, getMinColumn(c)));
+                                    break;
+                                case CHAR:
+                                case VARCHAR:
+                                    columnStatSelect.add(computeNdv);
+                                    columnStatSelect.add(
+                                            String.format(
+                                                    "AVG(CAST(CHAR_LENGTH(`%s`) AS DOUBLE)) AS %s",
+                                                    c, getAvgLenColumn(c)));
+                                    columnStatSelect.add(
+                                            String.format(
+                                                    "MAX(CAST(CHAR_LENGTH(`%s`) AS BIGINT)) AS %s",
+                                                    c, getMaxLenColumn(c)));
+                                    break;
+                                default:
+                                    break;
+                            }
+                            return columnStatSelect.stream();
+                        })
+                .collect(Collectors.joining(", "));
+    }
+
+    private String getRowCountColumn() {
+        return "rowCount";
+    }
+
+    private String getNullCountColumn(String column) {
+        return String.format("%s_nullCount", column);
+    }
+
+    private String getNdvColumn(String column) {
+        return String.format("%s_ndv", column);
+    }
+
+    private String getTrueCountColumn(String column) {
+        return String.format("%s_trueCount", column);
+    }
+
+    private String getFalseCountColumn(String column) {
+        return String.format("%s_falseCount", column);
+    }
+
+    private String getMaxColumn(String column) {
+        return String.format("%s_max", column);
+    }
+
+    private String getMinColumn(String column) {
+        return String.format("%s_min", column);
+    }
+
+    private String getAvgLenColumn(String column) {
+        return String.format("%s_avgLen", column);
+    }
+
+    private String getMaxLenColumn(String column) {
+        return String.format("%s_maxLen", column);
+    }
+
+    private CatalogTableStatistics convertToTableStatistics(Row row) {
+        Long rowCount = row.getFieldAs(getRowCountColumn());
+        return new CatalogTableStatistics(rowCount, -1, -1, -1);
+    }
+
+    private CatalogColumnStatistics convertToColumnStatistics(Row row, List<Column> columns) {
+        Preconditions.checkArgument(!columns.isEmpty());
+        Map<String, CatalogColumnStatisticsDataBase> columnStatMap = new HashMap<>();
+        for (Column column : columns) {
+            CatalogColumnStatisticsDataBase columnStat = convertToColumnStatisticsData(row, column);
+            if (columnStat != null) {
+                columnStatMap.put(column.getName(), columnStat);
+            }
+        }
+        return new CatalogColumnStatistics(columnStatMap);
+    }
+
+    private CatalogColumnStatisticsDataBase convertToColumnStatisticsData(Row row, Column column) {
+        String c = column.getName();
+        Long nullCount = row.getFieldAs(getNullCountColumn(c));
+        switch (column.getDataType().getLogicalType().getTypeRoot()) {
+            case BOOLEAN:
+                Long trueCount = row.getFieldAs(getTrueCountColumn(c));
+                Long falseCount = row.getFieldAs(getFalseCountColumn(c));
+                return new CatalogColumnStatisticsDataBoolean(trueCount, falseCount, nullCount);
+            case TINYINT:
+                Byte maxByte = row.getFieldAs(getMaxColumn(c));
+                Byte minByte = row.getFieldAs(getMinColumn(c));
+                Long ndvByte = row.getFieldAs(getNdvColumn(c));
+                return new CatalogColumnStatisticsDataLong(
+                        minByte != null ? minByte.longValue() : null,
+                        maxByte != null ? maxByte.longValue() : null,
+                        ndvByte,
+                        nullCount);
+            case SMALLINT:
+                Short maxShort = row.getFieldAs(getMaxColumn(c));
+                Short minShort = row.getFieldAs(getMinColumn(c));
+                Long ndvShort = row.getFieldAs(getNdvColumn(c));
+                return new CatalogColumnStatisticsDataLong(
+                        minShort != null ? minShort.longValue() : null,
+                        maxShort != null ? maxShort.longValue() : null,
+                        ndvShort,
+                        nullCount);
+            case INTEGER:
+                Integer maxInt = row.getFieldAs(getMaxColumn(c));
+                Integer minInt = row.getFieldAs(getMinColumn(c));
+                Long ndvInt = row.getFieldAs(getNdvColumn(c));
+                return new CatalogColumnStatisticsDataLong(
+                        minInt != null ? minInt.longValue() : null,
+                        maxInt != null ? maxInt.longValue() : null,
+                        ndvInt,
+                        nullCount);
+            case BIGINT:
+                Long ndvLong = row.getFieldAs(getNdvColumn(c));
+                Long maxLong = row.getFieldAs(getMaxColumn(c));
+                Long minLong = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataLong(minLong, maxLong, ndvLong, nullCount);
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                Long ndvTs = row.getFieldAs(getNdvColumn(c));
+                LocalDateTime maxTs = row.getFieldAs(getMaxColumn(c));
+                LocalDateTime minTs = row.getFieldAs(getMinColumn(c));
+
+                return new CatalogColumnStatisticsDataLong(
+                        minTs != null ? minTs.toEpochSecond(ZoneOffset.UTC) : null,
+                        maxTs != null ? maxTs.toEpochSecond(ZoneOffset.UTC) : null,
+                        ndvTs,
+                        nullCount);
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                Long ndvTsLtz = row.getFieldAs(getNdvColumn(c));
+                Instant maxTsLtz = row.getFieldAs(getMaxColumn(c));
+                Instant minTsLtz = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataLong(
+                        minTsLtz != null ? minTsLtz.toEpochMilli() : null,
+                        maxTsLtz != null ? maxTsLtz.toEpochMilli() : null,
+                        ndvTsLtz,
+                        nullCount);
+            case FLOAT:
+                Long ndvFloat = row.getFieldAs(getNdvColumn(c));
+                Float maxFloat = row.getFieldAs(getMaxColumn(c));
+                Float minFloat = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataDouble(
+                        minFloat != null ? minFloat.doubleValue() : null,
+                        maxFloat != null ? maxFloat.doubleValue() : null,
+                        ndvFloat,
+                        nullCount);
+            case DOUBLE:
+                Long ndvDouble = row.getFieldAs(getNdvColumn(c));
+                Double maxDouble = row.getFieldAs(getMaxColumn(c));
+                Double minDouble = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataDouble(
+                        minDouble, maxDouble, ndvDouble, nullCount);
+            case DECIMAL:
+                Long ndvDecimal = row.getFieldAs(getNdvColumn(c));
+                BigDecimal maxDecimal = row.getFieldAs(getMaxColumn(c));
+                BigDecimal minDecimal = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataDouble(
+                        minDecimal != null ? minDecimal.doubleValue() : null,
+                        maxDecimal != null ? maxDecimal.doubleValue() : null,
+                        ndvDecimal,
+                        nullCount);
+            case DATE:
+                Long ndvDate = row.getFieldAs(getNdvColumn(c));
+                LocalDate maxDate = row.getFieldAs(getMaxColumn(c));
+                LocalDate minDate = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataDate(
+                        minDate != null ? new Date(minDate.toEpochDay()) : null,
+                        maxDate != null ? new Date(maxDate.toEpochDay()) : null,
+                        ndvDate,
+                        nullCount);
+            case TIME_WITHOUT_TIME_ZONE:
+                Long ndvTime = row.getFieldAs(getNdvColumn(c));
+                LocalTime maxTime = row.getFieldAs(getMaxColumn(c));
+                LocalTime minTime = row.getFieldAs(getMinColumn(c));
+                return new CatalogColumnStatisticsDataLong(
+                        minTime != null ? minTime.toNanoOfDay() : null,
+                        maxTime != null ? maxTime.toNanoOfDay() : null,
+                        ndvTime,
+                        nullCount);
+            case CHAR:
+            case VARCHAR:
+                Long ndvString = row.getFieldAs(getNdvColumn(c));
+                Double avgLen = row.getFieldAs(getAvgLenColumn(c));
+                Long maxLen = row.getFieldAs(getMaxLenColumn(c));
+                return new CatalogColumnStatisticsDataString(maxLen, avgLen, ndvString, nullCount);
+            case BINARY:

Review Comment:
   This is not consistent with types in `getColumnStatsSelects`



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java:
##########
@@ -1247,6 +1268,178 @@ private Operation convertCompileAndExecutePlan(SqlCompileAndExecutePlan compileA
                         compileAndExecutePlan.getOperandList().get(0)));
     }
 
+    private Operation convertAnalyzeTable(SqlAnalyzeTable analyzeTable) {
+        UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(analyzeTable.fullTableName());
+        ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
+        Optional<ContextResolvedTable> optionalCatalogTable =
+                catalogManager.getTable(tableIdentifier);
+        if (!optionalCatalogTable.isPresent()) {
+            throw new ValidationException(
+                    String.format("Table %s doesn't exist.", tableIdentifier));
+        }
+
+        CatalogBaseTable baseTable = optionalCatalogTable.get().getTable();
+        if (baseTable instanceof CatalogView) {
+            throw new ValidationException("ANALYZE TABLE for a view is not allowed");
+        }
+        CatalogTable table = (CatalogTable) baseTable;
+        ResolvedSchema schema =
+                baseTable.getUnresolvedSchema().resolve(catalogManager.getSchemaResolver());
+
+        LinkedHashMap<String, String> partitions = analyzeTable.getPartitions();
+        List<CatalogPartitionSpec> targetPartitionSpecs = null;
+        if (table.isPartitioned()) {
+            if (!new ArrayList<>(partitions.keySet()).equals(table.getPartitionKeys())) {

Review Comment:
   Do we require a strict order of partition columns? If so, the error message could highlight it.
   Also, it is good to add a case includes only partition columns' order mismatch.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1356,6 +1379,15 @@ public TableResultInternal executeInternal(Operation operation) {
             return (TableResultInternal) compiledPlan.execute();
         } else if (operation instanceof NopOperation) {
             return TableResultImpl.TABLE_RESULT_OK;
+        } else if (operation instanceof AnalyzeTableOperation) {
+            if (isStreamingMode) {
+                throw new TableException("AnalyzeTable is not supported for streaming mode now");
+            }
+            try {
+                return analyzeTable((AnalyzeTableOperation) operation);
+            } catch (Exception e) {
+                throw new TableException("Failed to execute AnalyzeTable command", e);

Review Comment:
   ditto



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java:
##########
@@ -1247,6 +1268,178 @@ private Operation convertCompileAndExecutePlan(SqlCompileAndExecutePlan compileA
                         compileAndExecutePlan.getOperandList().get(0)));
     }
 
+    private Operation convertAnalyzeTable(SqlAnalyzeTable analyzeTable) {
+        UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(analyzeTable.fullTableName());
+        ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
+        Optional<ContextResolvedTable> optionalCatalogTable =
+                catalogManager.getTable(tableIdentifier);
+        if (!optionalCatalogTable.isPresent()) {
+            throw new ValidationException(
+                    String.format("Table %s doesn't exist.", tableIdentifier));
+        }
+
+        CatalogBaseTable baseTable = optionalCatalogTable.get().getTable();
+        if (baseTable instanceof CatalogView) {
+            throw new ValidationException("ANALYZE TABLE for a view is not allowed");
+        }
+        CatalogTable table = (CatalogTable) baseTable;
+        ResolvedSchema schema =
+                baseTable.getUnresolvedSchema().resolve(catalogManager.getSchemaResolver());
+
+        LinkedHashMap<String, String> partitions = analyzeTable.getPartitions();
+        List<CatalogPartitionSpec> targetPartitionSpecs = null;
+        if (table.isPartitioned()) {
+            if (!new ArrayList<>(partitions.keySet()).equals(table.getPartitionKeys())) {
+                throw new ValidationException(
+                        String.format(
+                                "For partition table, all partition keys should be specified explicitly. "
+                                        + "The given partition keys: [%s] are not match the target partition keys: [%s]",
+                                String.join(",", partitions.keySet()),
+                                String.join(",", table.getPartitionKeys())));
+            }
+
+            try {
+                targetPartitionSpecs = getPartitionSpecs(tableIdentifier, schema, partitions);
+            } catch (Exception e) {
+                throw new ValidationException(e.getMessage(), e);
+            }
+        } else if (!partitions.isEmpty()) {
+            throw new ValidationException(
+                    String.format(
+                            "Table: %s is not a partition table, while partition values is given",

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on a diff in pull request #20363: [FLINK-28492][table-planner] Support "ANALYZE TABLE" execution

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on code in PR #20363:
URL: https://github.com/apache/flink/pull/20363#discussion_r935226448


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1894,4 +1926,319 @@ public TableImpl createTable(QueryOperation tableOperation) {
     public String explainPlan(InternalPlan compiledPlan, ExplainDetail... extraDetails) {
         return planner.explainPlan(compiledPlan, extraDetails);
     }
+
+    private TableResultInternal analyzeTable(AnalyzeTableOperation operation)
+            throws TableNotPartitionedException, TableNotExistException, PartitionNotExistException,
+                    TablePartitionedException {
+        CatalogTable table =
+                catalogManager.getTable(operation.getTableIdentifier()).get().getTable();
+        ResolvedSchema schema =
+                table.getUnresolvedSchema().resolve(catalogManager.getSchemaResolver());
+        List<Column> columns =
+                operation.getColumns().stream()
+                        .map(c -> schema.getColumn(c).get())
+                        .collect(Collectors.toList());
+        Catalog catalog =
+                catalogManager.getCatalog(operation.getTableIdentifier().getCatalogName()).get();
+        ObjectPath objectPath = operation.getTableIdentifier().toObjectPath();
+
+        if (table.isPartitioned()) {
+            List<CatalogPartitionSpec> targetPartitions =
+                    operation.getPartitionSpecs().orElse(catalog.listPartitions(objectPath));
+            for (CatalogPartitionSpec partitionSpec : targetPartitions) {
+                String statSql =
+                        generateAnalyzeSql(operation.getTableIdentifier(), partitionSpec, columns);
+                TableResult tableResult = executeSql(statSql);
+                List<Row> result = CollectionUtil.iteratorToList(tableResult.collect());
+                Preconditions.checkArgument(result.size() == 1);
+                Row row = result.get(0);
+                CatalogTableStatistics tableStat = convertToTableStatistics(row);
+                catalog.alterPartitionStatistics(objectPath, partitionSpec, tableStat, false);
+                if (!columns.isEmpty()) {
+                    CatalogColumnStatistics columnStat = convertToColumnStatistics(row, columns);
+                    catalog.alterPartitionColumnStatistics(
+                            objectPath, partitionSpec, columnStat, false);
+                }
+            }
+        } else {
+            String statSql = generateAnalyzeSql(operation.getTableIdentifier(), null, columns);
+            TableResult tableResult = executeSql(statSql);
+            List<Row> result = CollectionUtil.iteratorToList(tableResult.collect());
+            Preconditions.checkArgument(result.size() == 1);
+            Row row = result.get(0);
+            CatalogTableStatistics tableStat = convertToTableStatistics(row);
+            catalog.alterTableStatistics(objectPath, tableStat, false);
+            if (!columns.isEmpty()) {
+                CatalogColumnStatistics columnStat = convertToColumnStatistics(row, columns);
+                catalog.alterTableColumnStatistics(objectPath, columnStat, false);
+            }
+        }
+        return TableResultImpl.TABLE_RESULT_OK;
+    }
+
+    private String generateAnalyzeSql(
+            ObjectIdentifier tableIdentifier,
+            @Nullable CatalogPartitionSpec partitionSpec,
+            List<Column> columns) {
+        Optional<ContextResolvedTable> optionalCatalogTable =
+                catalogManager.getTable(tableIdentifier);
+        Preconditions.checkArgument(

Review Comment:
   I think it's necessary, because we should there is only one row in the result



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on a diff in pull request #20363: [FLINK-28492][table-planner] Support "ANALYZE TABLE" execution

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on code in PR #20363:
URL: https://github.com/apache/flink/pull/20363#discussion_r935213243


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1894,4 +1926,319 @@ public TableImpl createTable(QueryOperation tableOperation) {
     public String explainPlan(InternalPlan compiledPlan, ExplainDetail... extraDetails) {
         return planner.explainPlan(compiledPlan, extraDetails);
     }
+
+    private TableResultInternal analyzeTable(AnalyzeTableOperation operation)
+            throws TableNotPartitionedException, TableNotExistException, PartitionNotExistException,
+                    TablePartitionedException {
+        CatalogTable table =
+                catalogManager.getTable(operation.getTableIdentifier()).get().getTable();
+        ResolvedSchema schema =
+                table.getUnresolvedSchema().resolve(catalogManager.getSchemaResolver());
+        List<Column> columns =
+                operation.getColumns().stream()
+                        .map(c -> schema.getColumn(c).get())
+                        .collect(Collectors.toList());
+        Catalog catalog =

Review Comment:
   the catalog has validated in SqlToOperationConverter, we can throw exception directly here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on a diff in pull request #20363: [FLINK-28492][table-planner] Support "ANALYZE TABLE" execution

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on code in PR #20363:
URL: https://github.com/apache/flink/pull/20363#discussion_r935132719


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AnalyzeTableOperation.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.operations.Operation;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Operation to describe an {@code ANALYZE TABLE} statement. */
+public class AnalyzeTableOperation implements Operation {

Review Comment:
   It's not a ModifyOperation, because it just update the statistics (metadata) instead of records



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on a diff in pull request #20363: [FLINK-28492][table-planner] Support "ANALYZE TABLE" execution

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on code in PR #20363:
URL: https://github.com/apache/flink/pull/20363#discussion_r935133429


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java:
##########
@@ -1247,6 +1268,178 @@ private Operation convertCompileAndExecutePlan(SqlCompileAndExecutePlan compileA
                         compileAndExecutePlan.getOperandList().get(0)));
     }
 
+    private Operation convertAnalyzeTable(SqlAnalyzeTable analyzeTable) {
+        UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(analyzeTable.fullTableName());
+        ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
+        Optional<ContextResolvedTable> optionalCatalogTable =
+                catalogManager.getTable(tableIdentifier);
+        if (!optionalCatalogTable.isPresent()) {
+            throw new ValidationException(
+                    String.format("Table %s doesn't exist.", tableIdentifier));
+        }
+
+        CatalogBaseTable baseTable = optionalCatalogTable.get().getTable();
+        if (baseTable instanceof CatalogView) {
+            throw new ValidationException("ANALYZE TABLE for a view is not allowed");
+        }
+        CatalogTable table = (CatalogTable) baseTable;
+        ResolvedSchema schema =
+                baseTable.getUnresolvedSchema().resolve(catalogManager.getSchemaResolver());
+
+        LinkedHashMap<String, String> partitions = analyzeTable.getPartitions();
+        List<CatalogPartitionSpec> targetPartitionSpecs = null;
+        if (table.isPartitioned()) {
+            if (!new ArrayList<>(partitions.keySet()).equals(table.getPartitionKeys())) {

Review Comment:
   Use Set instead of List to avoid partition name order



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe closed pull request #20363: [FLINK-28492][table-planner] Support "ANALYZE TABLE" execution

Posted by GitBox <gi...@apache.org>.
godfreyhe closed pull request #20363: [FLINK-28492][table-planner] Support "ANALYZE TABLE" execution
URL: https://github.com/apache/flink/pull/20363


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org