You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/08/04 02:32:24 UTC
[flink] branch master updated: [FLINK-28492][table-planner] Support "ANALYZE TABLE" execution
This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8236644816a [FLINK-28492][table-planner] Support "ANALYZE TABLE" execution
8236644816a is described below
commit 8236644816a8070499ff4327b7ee71cbb4660afa
Author: godfreyhe <go...@163.com>
AuthorDate: Mon Jul 11 19:53:39 2022 +0800
[FLINK-28492][table-planner] Support "ANALYZE TABLE" execution
This closes #20363
---
.../flink/table/api/internal/AnalyzeTableUtil.java | 383 ++++++++++++++
.../table/api/internal/TableEnvironmentImpl.java | 10 +
.../operations/ddl/AnalyzeTableOperation.java | 84 +++
.../catalog/stats/CatalogColumnStatistics.java | 29 ++
.../stats/CatalogColumnStatisticsDataBinary.java | 29 ++
.../stats/CatalogColumnStatisticsDataBoolean.java | 29 ++
.../stats/CatalogColumnStatisticsDataDate.java | 32 ++
.../stats/CatalogColumnStatisticsDataDouble.java | 42 ++
.../stats/CatalogColumnStatisticsDataLong.java | 32 ++
.../stats/CatalogColumnStatisticsDataString.java | 32 ++
.../catalog/stats/CatalogTableStatistics.java | 38 ++
.../org/apache/flink/table/catalog/stats/Date.java | 24 +
.../operations/SqlToOperationConverter.java | 214 ++++++++
.../table/planner/factories/TestValuesCatalog.java | 3 +
.../runtime/batch/sql/AnalyzeTableITCase.java | 577 +++++++++++++++++++++
.../runtime/stream/sql/AnalyzeTableITCase.java | 63 +++
.../runtime/batch/sql/TableSourceITCase.scala | 2 +-
17 files changed, 1622 insertions(+), 1 deletion(-)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/AnalyzeTableUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/AnalyzeTableUtil.java
new file mode 100644
index 00000000000..37471ed52ae
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/AnalyzeTableUtil.java
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBinary;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBoolean;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDate;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDouble;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataString;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.catalog.stats.Date;
+import org.apache.flink.table.operations.ddl.AnalyzeTableOperation;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** ANALYZE TABLE statement Util. */
+@Internal
+public class AnalyzeTableUtil {
+
+ private AnalyzeTableUtil() {}
+
+ public static TableResultInternal analyzeTable(
+ TableEnvironmentImpl tableEnv, AnalyzeTableOperation operation)
+ throws TableNotExistException, PartitionNotExistException, TablePartitionedException {
+ List<Column> columns = operation.getColumns();
+ // the TableIdentifier has be validated before
+ Catalog catalog =
+ tableEnv.getCatalogManager()
+ .getCatalog(operation.getTableIdentifier().getCatalogName())
+ .orElseThrow(() -> new TableException("This should not happen."));
+ ObjectPath objectPath = operation.getTableIdentifier().toObjectPath();
+
+ if (operation.getPartitionSpecs().isPresent()) {
+ List<CatalogPartitionSpec> targetPartitions = operation.getPartitionSpecs().get();
+ for (CatalogPartitionSpec partitionSpec : targetPartitions) {
+ String statSql =
+ generateAnalyzeSql(operation.getTableIdentifier(), partitionSpec, columns);
+ Tuple2<CatalogTableStatistics, CatalogColumnStatistics> result =
+ executeSqlAndGenerateStatistics(tableEnv, columns, statSql);
+ CatalogTableStatistics tableStat = result.f0;
+ catalog.alterPartitionStatistics(objectPath, partitionSpec, tableStat, false);
+ CatalogColumnStatistics columnStat = result.f1;
+ if (columnStat != null) {
+ catalog.alterPartitionColumnStatistics(
+ objectPath, partitionSpec, columnStat, false);
+ }
+ }
+ } else {
+ String statSql = generateAnalyzeSql(operation.getTableIdentifier(), null, columns);
+ Tuple2<CatalogTableStatistics, CatalogColumnStatistics> result =
+ executeSqlAndGenerateStatistics(tableEnv, columns, statSql);
+ CatalogTableStatistics tableStat = result.f0;
+ catalog.alterTableStatistics(objectPath, tableStat, false);
+ CatalogColumnStatistics columnStat = result.f1;
+ if (columnStat != null) {
+ catalog.alterTableColumnStatistics(objectPath, columnStat, false);
+ }
+ }
+ return TableResultImpl.TABLE_RESULT_OK;
+ }
+
+ private static Tuple2<CatalogTableStatistics, CatalogColumnStatistics>
+ executeSqlAndGenerateStatistics(
+ TableEnvironmentImpl tableEnv, List<Column> columns, String statSql) {
+ TableResult tableResult = tableEnv.executeSql(statSql);
+ List<Row> result = CollectionUtil.iteratorToList(tableResult.collect());
+ Preconditions.checkArgument(result.size() == 1);
+ Row row = result.get(0);
+ CatalogTableStatistics tableStat = convertToTableStatistics(row);
+ CatalogColumnStatistics columnStat = null;
+ if (!columns.isEmpty()) {
+ columnStat = convertToColumnStatistics(row, columns);
+ }
+ return new Tuple2<>(tableStat, columnStat);
+ }
+
+ private static String generateAnalyzeSql(
+ ObjectIdentifier tableIdentifier,
+ @Nullable CatalogPartitionSpec partitionSpec,
+ List<Column> columns) {
+ String partitionFilter;
+ if (partitionSpec != null) {
+ partitionFilter =
+ " WHERE "
+ + partitionSpec.getPartitionSpec().entrySet().stream()
+ .map(e -> e.getKey() + "=" + e.getValue())
+ .collect(Collectors.joining(" AND "));
+ } else {
+ partitionFilter = "";
+ }
+
+ final String columnStatsSelects;
+ if (columns.isEmpty()) {
+ columnStatsSelects = "";
+ } else {
+ columnStatsSelects = ", " + getColumnStatsSelects(columns);
+ }
+
+ return String.format(
+ "SELECT COUNT(1) AS %s %s FROM %s %s",
+ getRowCountColumn(), columnStatsSelects, tableIdentifier, partitionFilter);
+ }
+
+ private static String getColumnStatsSelects(List<Column> columns) {
+ return columns.stream()
+ .flatMap(
+ f -> {
+ String c = f.getName();
+ List<String> columnStatSelect = new ArrayList<>();
+ String computeNullCount =
+ String.format(
+ "(COUNT(1) - COUNT(`%s`)) AS %s",
+ c, getNullCountColumn(c));
+ columnStatSelect.add(computeNullCount);
+
+ String computeNdv =
+ String.format(
+ "APPROX_COUNT_DISTINCT(`%s`) AS %s",
+ c, getNdvColumn(c));
+
+ switch (f.getDataType().getLogicalType().getTypeRoot()) {
+ case BOOLEAN:
+ columnStatSelect.add(
+ String.format(
+ "COUNT(`%s`) FILTER (WHERE `%s` IS TRUE) AS %s",
+ c, c, getTrueCountColumn(c)));
+ columnStatSelect.add(
+ String.format(
+ "COUNT(`%s`) FILTER (WHERE `%s` IS FALSE) AS %s",
+ c, c, getFalseCountColumn(c)));
+ break;
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ case FLOAT:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ case BIGINT:
+ case DOUBLE:
+ case DECIMAL:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ columnStatSelect.add(computeNdv);
+ columnStatSelect.add(
+ String.format("MAX(`%s`) AS %s", c, getMaxColumn(c)));
+ columnStatSelect.add(
+ String.format("MIN(`%s`) AS %s", c, getMinColumn(c)));
+ break;
+ case CHAR:
+ case VARCHAR:
+ columnStatSelect.add(computeNdv);
+ columnStatSelect.add(
+ String.format(
+ "AVG(CAST(CHAR_LENGTH(`%s`) AS DOUBLE)) AS %s",
+ c, getAvgLenColumn(c)));
+ columnStatSelect.add(
+ String.format(
+ "MAX(CAST(CHAR_LENGTH(`%s`) AS BIGINT)) AS %s",
+ c, getMaxLenColumn(c)));
+ break;
+ default:
+ break;
+ }
+ return columnStatSelect.stream();
+ })
+ .collect(Collectors.joining(", "));
+ }
+
+ private static CatalogTableStatistics convertToTableStatistics(Row row) {
+ Long rowCount = row.getFieldAs(getRowCountColumn());
+ return new CatalogTableStatistics(rowCount, -1, -1, -1);
+ }
+
+ private static CatalogColumnStatistics convertToColumnStatistics(
+ Row row, List<Column> columns) {
+ Preconditions.checkArgument(!columns.isEmpty());
+ Map<String, CatalogColumnStatisticsDataBase> columnStatMap = new HashMap<>();
+ for (Column column : columns) {
+ CatalogColumnStatisticsDataBase columnStat = convertToColumnStatisticsData(row, column);
+ if (columnStat != null) {
+ columnStatMap.put(column.getName(), columnStat);
+ }
+ }
+ return new CatalogColumnStatistics(columnStatMap);
+ }
+
+ private static CatalogColumnStatisticsDataBase convertToColumnStatisticsData(
+ Row row, Column column) {
+ String c = column.getName();
+ Long nullCount = row.getFieldAs(getNullCountColumn(c));
+ switch (column.getDataType().getLogicalType().getTypeRoot()) {
+ case BOOLEAN:
+ Long trueCount = row.getFieldAs(getTrueCountColumn(c));
+ Long falseCount = row.getFieldAs(getFalseCountColumn(c));
+ return new CatalogColumnStatisticsDataBoolean(trueCount, falseCount, nullCount);
+ case TINYINT:
+ Byte maxByte = row.getFieldAs(getMaxColumn(c));
+ Byte minByte = row.getFieldAs(getMinColumn(c));
+ Long ndvByte = row.getFieldAs(getNdvColumn(c));
+ return new CatalogColumnStatisticsDataLong(
+ minByte != null ? minByte.longValue() : null,
+ maxByte != null ? maxByte.longValue() : null,
+ ndvByte,
+ nullCount);
+ case SMALLINT:
+ Short maxShort = row.getFieldAs(getMaxColumn(c));
+ Short minShort = row.getFieldAs(getMinColumn(c));
+ Long ndvShort = row.getFieldAs(getNdvColumn(c));
+ return new CatalogColumnStatisticsDataLong(
+ minShort != null ? minShort.longValue() : null,
+ maxShort != null ? maxShort.longValue() : null,
+ ndvShort,
+ nullCount);
+ case INTEGER:
+ Integer maxInt = row.getFieldAs(getMaxColumn(c));
+ Integer minInt = row.getFieldAs(getMinColumn(c));
+ Long ndvInt = row.getFieldAs(getNdvColumn(c));
+ return new CatalogColumnStatisticsDataLong(
+ minInt != null ? minInt.longValue() : null,
+ maxInt != null ? maxInt.longValue() : null,
+ ndvInt,
+ nullCount);
+ case BIGINT:
+ Long ndvLong = row.getFieldAs(getNdvColumn(c));
+ Long maxLong = row.getFieldAs(getMaxColumn(c));
+ Long minLong = row.getFieldAs(getMinColumn(c));
+ return new CatalogColumnStatisticsDataLong(minLong, maxLong, ndvLong, nullCount);
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ Long ndvTs = row.getFieldAs(getNdvColumn(c));
+ LocalDateTime maxTs = row.getFieldAs(getMaxColumn(c));
+ LocalDateTime minTs = row.getFieldAs(getMinColumn(c));
+ return new CatalogColumnStatisticsDataLong(
+ minTs != null ? minTs.toEpochSecond(ZoneOffset.UTC) : null,
+ maxTs != null ? maxTs.toEpochSecond(ZoneOffset.UTC) : null,
+ ndvTs,
+ nullCount);
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ Long ndvTsLtz = row.getFieldAs(getNdvColumn(c));
+ Instant maxTsLtz = row.getFieldAs(getMaxColumn(c));
+ Instant minTsLtz = row.getFieldAs(getMinColumn(c));
+ return new CatalogColumnStatisticsDataLong(
+ minTsLtz != null ? minTsLtz.toEpochMilli() : null,
+ maxTsLtz != null ? maxTsLtz.toEpochMilli() : null,
+ ndvTsLtz,
+ nullCount);
+ case FLOAT:
+ Long ndvFloat = row.getFieldAs(getNdvColumn(c));
+ Float maxFloat = row.getFieldAs(getMaxColumn(c));
+ Float minFloat = row.getFieldAs(getMinColumn(c));
+ return new CatalogColumnStatisticsDataDouble(
+ minFloat != null ? minFloat.doubleValue() : null,
+ maxFloat != null ? maxFloat.doubleValue() : null,
+ ndvFloat,
+ nullCount);
+ case DOUBLE:
+ Long ndvDouble = row.getFieldAs(getNdvColumn(c));
+ Double maxDouble = row.getFieldAs(getMaxColumn(c));
+ Double minDouble = row.getFieldAs(getMinColumn(c));
+ return new CatalogColumnStatisticsDataDouble(
+ minDouble, maxDouble, ndvDouble, nullCount);
+ case DECIMAL:
+ Long ndvDecimal = row.getFieldAs(getNdvColumn(c));
+ BigDecimal maxDecimal = row.getFieldAs(getMaxColumn(c));
+ BigDecimal minDecimal = row.getFieldAs(getMinColumn(c));
+ return new CatalogColumnStatisticsDataDouble(
+ minDecimal != null ? minDecimal.doubleValue() : null,
+ maxDecimal != null ? maxDecimal.doubleValue() : null,
+ ndvDecimal,
+ nullCount);
+ case DATE:
+ Long ndvDate = row.getFieldAs(getNdvColumn(c));
+ LocalDate maxDate = row.getFieldAs(getMaxColumn(c));
+ LocalDate minDate = row.getFieldAs(getMinColumn(c));
+ return new CatalogColumnStatisticsDataDate(
+ minDate != null ? new Date(minDate.toEpochDay()) : null,
+ maxDate != null ? new Date(maxDate.toEpochDay()) : null,
+ ndvDate,
+ nullCount);
+ case TIME_WITHOUT_TIME_ZONE:
+ Long ndvTime = row.getFieldAs(getNdvColumn(c));
+ LocalTime maxTime = row.getFieldAs(getMaxColumn(c));
+ LocalTime minTime = row.getFieldAs(getMinColumn(c));
+ return new CatalogColumnStatisticsDataLong(
+ minTime != null ? minTime.toNanoOfDay() : null,
+ maxTime != null ? maxTime.toNanoOfDay() : null,
+ ndvTime,
+ nullCount);
+ case CHAR:
+ case VARCHAR:
+ Long ndvString = row.getFieldAs(getNdvColumn(c));
+ Double avgLen = row.getFieldAs(getAvgLenColumn(c));
+ Long maxLen = row.getFieldAs(getMaxLenColumn(c));
+ return new CatalogColumnStatisticsDataString(maxLen, avgLen, ndvString, nullCount);
+ case BINARY:
+ case VARBINARY:
+ return new CatalogColumnStatisticsDataBinary(null, null, nullCount);
+ default:
+ return null;
+ }
+ }
+
+ private static String getRowCountColumn() {
+ return "rowCount";
+ }
+
+ private static String getNullCountColumn(String column) {
+ return String.format("%s_nullCount", column);
+ }
+
+ private static String getNdvColumn(String column) {
+ return String.format("%s_ndv", column);
+ }
+
+ private static String getTrueCountColumn(String column) {
+ return String.format("%s_trueCount", column);
+ }
+
+ private static String getFalseCountColumn(String column) {
+ return String.format("%s_falseCount", column);
+ }
+
+ private static String getMaxColumn(String column) {
+ return String.format("%s_max", column);
+ }
+
+ private static String getMinColumn(String column) {
+ return String.format("%s_min", column);
+ }
+
+ private static String getAvgLenColumn(String column) {
+ return String.format("%s_avgLen", column);
+ }
+
+ private static String getMaxLenColumn(String column) {
+ return String.format("%s_maxLen", column);
+ }
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index d4e10108d38..df95c584654 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -132,6 +132,7 @@ import org.apache.flink.table.operations.ddl.AlterViewAsOperation;
import org.apache.flink.table.operations.ddl.AlterViewOperation;
import org.apache.flink.table.operations.ddl.AlterViewPropertiesOperation;
import org.apache.flink.table.operations.ddl.AlterViewRenameOperation;
+import org.apache.flink.table.operations.ddl.AnalyzeTableOperation;
import org.apache.flink.table.operations.ddl.CompilePlanOperation;
import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
@@ -1364,6 +1365,15 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
true,
compileAndExecutePlanOperation.getOperation());
return (TableResultInternal) compiledPlan.execute();
+ } else if (operation instanceof AnalyzeTableOperation) {
+ if (isStreamingMode) {
+ throw new TableException("ANALYZE TABLE is not supported for streaming mode now");
+ }
+ try {
+ return AnalyzeTableUtil.analyzeTable(this, (AnalyzeTableOperation) operation);
+ } catch (Exception e) {
+ throw new TableException("Failed to execute ANALYZE TABLE command", e);
+ }
} else if (operation instanceof NopOperation) {
return TableResultImpl.TABLE_RESULT_OK;
} else {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AnalyzeTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AnalyzeTableOperation.java
new file mode 100644
index 00000000000..614820ce49a
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AnalyzeTableOperation.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.operations.Operation;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/** Operation to describe an {@code ANALYZE TABLE} statement. */
+public class AnalyzeTableOperation implements Operation {
+ private final ObjectIdentifier tableIdentifier;
+ private final @Nullable List<CatalogPartitionSpec> partitionSpecs;
+ private final List<Column> columns;
+
+ public AnalyzeTableOperation(
+ ObjectIdentifier tableIdentifier,
+ @Nullable List<CatalogPartitionSpec> partitionSpecs,
+ List<Column> columns) {
+ this.tableIdentifier = tableIdentifier;
+ this.partitionSpecs = partitionSpecs;
+ this.columns = Objects.requireNonNull(columns, "columns is null");
+ }
+
+ public ObjectIdentifier getTableIdentifier() {
+ return tableIdentifier;
+ }
+
+ /**
+ * Returns Optional.empty() if the table is not a partition table, else returns the given
+ * partition specs.
+ */
+ public Optional<List<CatalogPartitionSpec>> getPartitionSpecs() {
+ return Optional.ofNullable(partitionSpecs);
+ }
+
+ public List<Column> getColumns() {
+ return columns;
+ }
+
+ @Override
+ public String asSummaryString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("ANALYZE TABLE ").append(tableIdentifier.toString());
+ if (partitionSpecs != null) {
+ sb.append(" PARTITION(")
+ .append(
+ partitionSpecs.stream()
+ .map(p -> p.getPartitionSpec().toString())
+ .collect(Collectors.joining(",")))
+ .append(")");
+ }
+ sb.append(" COMPUTE STATISTICS");
+ if (!columns.isEmpty()) {
+ sb.append(" FOR COLUMNS ")
+ .append(columns.stream().map(Column::getName).collect(Collectors.joining(",")));
+ }
+
+ return sb.toString();
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatistics.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatistics.java
index f8b2b215e6b..e5ff3f3df8a 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatistics.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatistics.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.PublicEvolving;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -73,4 +74,32 @@ public class CatalogColumnStatistics {
}
return new CatalogColumnStatistics(copy, new HashMap<>(this.properties));
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CatalogColumnStatistics that = (CatalogColumnStatistics) o;
+ return columnStatisticsData.equals(that.columnStatisticsData)
+ && properties.equals(that.properties);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(columnStatisticsData, properties);
+ }
+
+ @Override
+ public String toString() {
+ return "CatalogColumnStatistics{"
+ + "columnStatisticsData="
+ + columnStatisticsData
+ + ", properties="
+ + properties
+ + '}';
+ }
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBinary.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBinary.java
index 4c52e800d43..eb6042541a1 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBinary.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBinary.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog.stats;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
/** Column statistics value of binary type. */
public class CatalogColumnStatisticsDataBinary extends CatalogColumnStatisticsDataBase {
@@ -54,4 +55,32 @@ public class CatalogColumnStatisticsDataBinary extends CatalogColumnStatisticsDa
return new CatalogColumnStatisticsDataBinary(
maxLength, avgLength, getNullCount(), new HashMap<>(getProperties()));
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CatalogColumnStatisticsDataBinary that = (CatalogColumnStatisticsDataBinary) o;
+ return Objects.equals(maxLength, that.maxLength)
+ && Objects.equals(avgLength, that.avgLength);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(maxLength, avgLength);
+ }
+
+ @Override
+ public String toString() {
+ return "CatalogColumnStatisticsDataBinary{"
+ + "maxLength="
+ + maxLength
+ + ", avgLength="
+ + avgLength
+ + '}';
+ }
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBoolean.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBoolean.java
index 98b3f5388a3..8e5d7ebf445 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBoolean.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataBoolean.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog.stats;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
/** Column statistics value of boolean type. */
public class CatalogColumnStatisticsDataBoolean extends CatalogColumnStatisticsDataBase {
@@ -54,4 +55,32 @@ public class CatalogColumnStatisticsDataBoolean extends CatalogColumnStatisticsD
return new CatalogColumnStatisticsDataBoolean(
trueCount, falseCount, getNullCount(), new HashMap<>(getProperties()));
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CatalogColumnStatisticsDataBoolean that = (CatalogColumnStatisticsDataBoolean) o;
+ return Objects.equals(trueCount, that.trueCount)
+ && Objects.equals(falseCount, that.falseCount);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(trueCount, falseCount);
+ }
+
+ @Override
+ public String toString() {
+ return "CatalogColumnStatisticsDataBoolean{"
+ + "trueCount="
+ + trueCount
+ + ", falseCount="
+ + falseCount
+ + '}';
+ }
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataDate.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataDate.java
index 0b6e80727e1..2bc72ead7d8 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataDate.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataDate.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog.stats;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
/** Column statistics value of date type. */
public class CatalogColumnStatisticsDataDate extends CatalogColumnStatisticsDataBase {
@@ -63,4 +64,35 @@ public class CatalogColumnStatisticsDataDate extends CatalogColumnStatisticsData
return new CatalogColumnStatisticsDataDate(
min, max, ndv, getNullCount(), new HashMap<>(getProperties()));
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CatalogColumnStatisticsDataDate that = (CatalogColumnStatisticsDataDate) o;
+ return Objects.equals(min, that.min)
+ && Objects.equals(max, that.max)
+ && Objects.equals(ndv, that.ndv);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(min, max, ndv);
+ }
+
+ @Override
+ public String toString() {
+ return "CatalogColumnStatisticsDataDate{"
+ + "min="
+ + min
+ + ", max="
+ + max
+ + ", ndv="
+ + ndv
+ + '}';
+ }
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataDouble.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataDouble.java
index f026dfed331..025954e9ceb 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataDouble.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataDouble.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog.stats;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
/** Column statistics value of double type. */
public class CatalogColumnStatisticsDataDouble extends CatalogColumnStatisticsDataBase {
@@ -63,4 +64,45 @@ public class CatalogColumnStatisticsDataDouble extends CatalogColumnStatisticsDa
return new CatalogColumnStatisticsDataDouble(
min, max, ndv, getNullCount(), new HashMap<>(getProperties()));
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CatalogColumnStatisticsDataDouble that = (CatalogColumnStatisticsDataDouble) o;
+ return doubleCompare(min, that.min)
+ && doubleCompare(max, that.max)
+ && Objects.equals(ndv, that.ndv);
+ }
+
+ private boolean doubleCompare(Double d1, Double d2) {
+ if (d1 == null && d2 == null) {
+ return true;
+ } else if (d1 == null || d2 == null) {
+ return false;
+ } else {
+ return Math.abs(d1 - d2) < 1e-6;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(min, max, ndv);
+ }
+
+ @Override
+ public String toString() {
+ return "CatalogColumnStatisticsDataDouble{"
+ + "min="
+ + min
+ + ", max="
+ + max
+ + ", ndv="
+ + ndv
+ + '}';
+ }
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataLong.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataLong.java
index 490893ecc5c..b1422f84ed4 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataLong.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataLong.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog.stats;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
/** Column statistics value of long type. */
public class CatalogColumnStatisticsDataLong extends CatalogColumnStatisticsDataBase {
@@ -63,4 +64,35 @@ public class CatalogColumnStatisticsDataLong extends CatalogColumnStatisticsData
return new CatalogColumnStatisticsDataLong(
min, max, ndv, getNullCount(), new HashMap<>(getProperties()));
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CatalogColumnStatisticsDataLong that = (CatalogColumnStatisticsDataLong) o;
+ return Objects.equals(min, that.min)
+ && Objects.equals(max, that.max)
+ && Objects.equals(ndv, that.ndv);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(min, max, ndv);
+ }
+
+ @Override
+ public String toString() {
+ return "CatalogColumnStatisticsDataLong{"
+ + "min="
+ + min
+ + ", max="
+ + max
+ + ", ndv="
+ + ndv
+ + '}';
+ }
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataString.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataString.java
index 539c0338135..44af143b728 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataString.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogColumnStatisticsDataString.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog.stats;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
/** Column statistics value of string type. */
public class CatalogColumnStatisticsDataString extends CatalogColumnStatisticsDataBase {
@@ -68,4 +69,35 @@ public class CatalogColumnStatisticsDataString extends CatalogColumnStatisticsDa
return new CatalogColumnStatisticsDataString(
maxLength, avgLength, ndv, getNullCount(), new HashMap<>(getProperties()));
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CatalogColumnStatisticsDataString that = (CatalogColumnStatisticsDataString) o;
+ return Objects.equals(maxLength, that.maxLength)
+ && Objects.equals(avgLength, that.avgLength)
+ && Objects.equals(ndv, that.ndv);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(maxLength, avgLength, ndv);
+ }
+
+ @Override
+ public String toString() {
+ return "CatalogColumnStatisticsDataString{"
+ + "maxLength="
+ + maxLength
+ + ", avgLength="
+ + avgLength
+ + ", ndv="
+ + ndv
+ + '}';
+ }
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogTableStatistics.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogTableStatistics.java
index 7c100b8b01a..ab65c1f0f6e 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogTableStatistics.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/CatalogTableStatistics.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.PublicEvolving;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
/** Statistics for a non-partitioned table or a partition of a partitioned table. */
@PublicEvolving
@@ -93,4 +94,41 @@ public class CatalogTableStatistics {
this.rawDataSize,
new HashMap<>(this.properties));
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CatalogTableStatistics that = (CatalogTableStatistics) o;
+ return rowCount == that.rowCount
+ && fileCount == that.fileCount
+ && totalSize == that.totalSize
+ && rawDataSize == that.rawDataSize
+ && properties.equals(that.properties);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(rowCount, fileCount, totalSize, rawDataSize, properties);
+ }
+
+ @Override
+ public String toString() {
+ return "CatalogTableStatistics{"
+ + "rowCount="
+ + rowCount
+ + ", fileCount="
+ + fileCount
+ + ", totalSize="
+ + totalSize
+ + ", rawDataSize="
+ + rawDataSize
+ + ", properties="
+ + properties
+ + '}';
+ }
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/Date.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/Date.java
index fbd250ae86d..1030ac497e7 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/Date.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/stats/Date.java
@@ -18,6 +18,8 @@
package org.apache.flink.table.catalog.stats;
+import java.util.Objects;
+
/** Class representing a date value in statistics. */
public class Date {
private final long daysSinceEpoch;
@@ -33,4 +35,26 @@ public class Date {
public Date copy() {
return new Date(daysSinceEpoch);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Date date = (Date) o;
+ return daysSinceEpoch == date.daysSinceEpoch;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(daysSinceEpoch);
+ }
+
+ @Override
+ public String toString() {
+ return "Date{" + "daysSinceEpoch=" + daysSinceEpoch + '}';
+ }
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 25e2e53d03e..895534124ed 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -34,6 +34,7 @@ import org.apache.flink.sql.parser.ddl.SqlAlterView;
import org.apache.flink.sql.parser.ddl.SqlAlterViewAs;
import org.apache.flink.sql.parser.ddl.SqlAlterViewProperties;
import org.apache.flink.sql.parser.ddl.SqlAlterViewRename;
+import org.apache.flink.sql.parser.ddl.SqlAnalyzeTable;
import org.apache.flink.sql.parser.ddl.SqlChangeColumn;
import org.apache.flink.sql.parser.ddl.SqlCompilePlan;
import org.apache.flink.sql.parser.ddl.SqlCreateCatalog;
@@ -81,6 +82,7 @@ import org.apache.flink.sql.parser.dql.SqlShowPartitions;
import org.apache.flink.sql.parser.dql.SqlShowTables;
import org.apache.flink.sql.parser.dql.SqlShowViews;
import org.apache.flink.sql.parser.dql.SqlUnloadModule;
+import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
@@ -98,6 +100,7 @@ import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.CatalogViewImpl;
+import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ContextResolvedTable;
import org.apache.flink.table.catalog.FunctionLanguage;
import org.apache.flink.table.catalog.ManagedTableListener;
@@ -106,7 +109,15 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionIdentifier;
import org.apache.flink.table.operations.BeginStatementSetOperation;
import org.apache.flink.table.operations.CompileAndExecutePlanOperation;
import org.apache.flink.table.operations.DescribeTableOperation;
@@ -153,6 +164,7 @@ import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
import org.apache.flink.table.operations.ddl.AlterViewAsOperation;
import org.apache.flink.table.operations.ddl.AlterViewPropertiesOperation;
import org.apache.flink.table.operations.ddl.AlterViewRenameOperation;
+import org.apache.flink.table.operations.ddl.AnalyzeTableOperation;
import org.apache.flink.table.operations.ddl.CompilePlanOperation;
import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
@@ -172,7 +184,9 @@ import org.apache.flink.table.planner.utils.Expander;
import org.apache.flink.table.planner.utils.OperationConverterUtils;
import org.apache.flink.table.resource.ResourceType;
import org.apache.flink.table.resource.ResourceUri;
+import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.calcite.rel.RelRoot;
@@ -187,7 +201,12 @@ import org.apache.calcite.sql.SqlUtil;
import org.apache.calcite.sql.dialect.CalciteSqlDialect;
import org.apache.calcite.sql.parser.SqlParser;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -347,6 +366,8 @@ public class SqlToOperationConverter {
converter.convertCompileAndExecutePlan((SqlCompileAndExecutePlan) validated));
} else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
return Optional.of(converter.convertSqlQuery(validated));
+ } else if (validated instanceof SqlAnalyzeTable) {
+ return Optional.of(converter.convertAnalyzeTable((SqlAnalyzeTable) validated));
} else {
return Optional.empty();
}
@@ -1247,6 +1268,199 @@ public class SqlToOperationConverter {
compileAndExecutePlan.getOperandList().get(0)));
}
+ private Operation convertAnalyzeTable(SqlAnalyzeTable analyzeTable) {
+ UnresolvedIdentifier unresolvedIdentifier =
+ UnresolvedIdentifier.of(analyzeTable.fullTableName());
+ ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
+ Optional<ContextResolvedTable> optionalCatalogTable =
+ catalogManager.getTable(tableIdentifier);
+ if (!optionalCatalogTable.isPresent() || optionalCatalogTable.get().isTemporary()) {
+ throw new ValidationException(
+ String.format(
+ "Table %s doesn't exist or is a temporary table.", tableIdentifier));
+ }
+ CatalogBaseTable baseTable = optionalCatalogTable.get().getTable();
+ if (baseTable instanceof CatalogView) {
+ throw new ValidationException("ANALYZE TABLE for a view is not allowed.");
+ }
+ CatalogTable table = (CatalogTable) baseTable;
+ ResolvedSchema schema =
+ baseTable.getUnresolvedSchema().resolve(catalogManager.getSchemaResolver());
+
+ LinkedHashMap<String, String> partitions = analyzeTable.getPartitions();
+ List<CatalogPartitionSpec> targetPartitionSpecs = null;
+ if (table.isPartitioned()) {
+ if (!partitions.keySet().equals(new HashSet<>(table.getPartitionKeys()))) {
+ throw new ValidationException(
+ String.format(
+ "Invalid ANALYZE TABLE statement. For partition table, all partition keys should be specified explicitly. "
+ + "The given partition keys: [%s] are not match the target partition keys: [%s].",
+ String.join(",", partitions.keySet()),
+ String.join(",", table.getPartitionKeys())));
+ }
+
+ try {
+ targetPartitionSpecs = getPartitionSpecs(tableIdentifier, schema, partitions);
+ } catch (Exception e) {
+ throw new ValidationException(e.getMessage(), e);
+ }
+ } else if (!partitions.isEmpty()) {
+ throw new ValidationException(
+ String.format(
+ "Invalid ANALYZE TABLE statement. Table: %s is not a partition table, while partition values are given.",
+ tableIdentifier));
+ }
+
+ String[] columns = analyzeTable.getColumnNames();
+ List<Column> targetColumns;
+ if (analyzeTable.isAllColumns()) {
+ Preconditions.checkArgument(columns.length == 0);
+ // computed column and metadata column will be ignored
+ targetColumns =
+ schema.getColumns().stream()
+ .filter(Column::isPhysical)
+ .collect(Collectors.toList());
+ } else if (columns.length > 0) {
+ targetColumns =
+ Arrays.stream(columns)
+ .map(
+ c -> {
+ Optional<Column> colOpt = schema.getColumn(c);
+ if (!colOpt.isPresent()) {
+ throw new ValidationException(
+ String.format(
+ "Column: %s does not exist in the table: %s.",
+ c, tableIdentifier));
+ }
+ Column col = colOpt.get();
+ if (col instanceof Column.ComputedColumn) {
+ throw new ValidationException(
+ String.format(
+ "Column: %s is a computed column, ANALYZE TABLE does not support computed column.",
+ c));
+ } else if (col instanceof Column.MetadataColumn) {
+ throw new ValidationException(
+ String.format(
+ "Column: %s is a metadata column, ANALYZE TABLE does not support metadata column.",
+ c));
+ } else if (col instanceof Column.PhysicalColumn) {
+ return col;
+ } else {
+ throw new ValidationException(
+ "Unknown column class: "
+ + col.getClass().getSimpleName());
+ }
+ })
+ .collect(Collectors.toList());
+ } else {
+ targetColumns = Collections.emptyList();
+ }
+
+ return new AnalyzeTableOperation(tableIdentifier, targetPartitionSpecs, targetColumns);
+ }
+
+ private List<CatalogPartitionSpec> getPartitionSpecs(
+ ObjectIdentifier tableIdentifier,
+ ResolvedSchema schema,
+ LinkedHashMap<String, String> partitions)
+ throws TableNotPartitionedException, TableNotExistException {
+ List<Expression> filters = new ArrayList<>();
+ for (Map.Entry<String, String> entry : partitions.entrySet()) {
+ if (entry.getValue() != null) {
+ CallExpression call =
+ CallExpression.temporary(
+ FunctionIdentifier.of("="),
+ BuiltInFunctionDefinitions.EQUALS,
+ Arrays.asList(
+ getPartitionKeyExpr(schema, entry.getKey()),
+ getPartitionValueExpr(
+ schema, entry.getKey(), entry.getValue())),
+ DataTypes.BOOLEAN());
+ filters.add(call);
+ }
+ }
+ if (filters.isEmpty()) {
+ return catalogManager
+ .getCatalog(tableIdentifier.getCatalogName())
+ .get()
+ .listPartitions(tableIdentifier.toObjectPath());
+ } else {
+ return catalogManager
+ .getCatalog(tableIdentifier.getCatalogName())
+ .get()
+ .listPartitionsByFilter(tableIdentifier.toObjectPath(), filters);
+ }
+ }
+
+ private FieldReferenceExpression getPartitionKeyExpr(
+ ResolvedSchema schema, String partitionKey) {
+ int fieldIndex = schema.getColumnNames().indexOf(partitionKey);
+ if (fieldIndex < 0) {
+ throw new ValidationException(
+ String.format(
+ "Partition: %s does not exist in the schema: %s",
+ partitionKey, schema.getColumnNames()));
+ }
+ return new FieldReferenceExpression(
+ partitionKey, schema.getColumnDataTypes().get(fieldIndex), 0, fieldIndex);
+ }
+
+ private ValueLiteralExpression getPartitionValueExpr(
+ ResolvedSchema schema, String partitionKey, String partitionValue) {
+ int fieldIndex = schema.getColumnNames().indexOf(partitionKey);
+ if (fieldIndex < 0) {
+ throw new ValidationException(
+ String.format(
+ "Partition: %s does not exist in the schema: %s",
+ partitionKey, schema.getColumnNames()));
+ }
+ DataType dataType = schema.getColumnDataTypes().get(fieldIndex);
+ if (partitionValue == null) {
+ return new ValueLiteralExpression(null, dataType.nullable());
+ }
+ Object value;
+ switch (dataType.getLogicalType().getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ value = partitionValue;
+ break;
+ case TINYINT:
+ value = Byte.valueOf(partitionValue);
+ break;
+ case SMALLINT:
+ value = Short.valueOf(partitionValue);
+ break;
+ case INTEGER:
+ value = Integer.valueOf(partitionValue);
+ break;
+ case BIGINT:
+ value = Long.valueOf(partitionValue);
+ break;
+ case FLOAT:
+ value = Float.valueOf(partitionValue);
+ break;
+ case DOUBLE:
+ value = Double.valueOf(partitionValue);
+ break;
+ case DECIMAL:
+ value = new BigDecimal(partitionValue);
+ break;
+ case DATE:
+ value = Date.valueOf(partitionValue);
+ break;
+ case TIME_WITHOUT_TIME_ZONE:
+ value = Time.valueOf(partitionValue);
+ break;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ value = Timestamp.valueOf(partitionValue);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported partition value type: " + dataType.getLogicalType());
+ }
+ return new ValueLiteralExpression(value, dataType.notNull());
+ }
+
private void validateTableConstraint(SqlTableConstraint constraint) {
if (constraint.isUnique()) {
throw new UnsupportedOperationException("UNIQUE constraint is not supported yet");
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesCatalog.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesCatalog.java
index 69ee1955488..badd3ecb004 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesCatalog.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesCatalog.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.factories.FunctionDefinitionFactory;
import org.apache.flink.table.planner.utils.FilterUtils;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.CharType;
import org.apache.flink.table.types.logical.DoubleType;
@@ -123,6 +124,8 @@ public class TestValuesCatalog extends GenericInMemoryCatalog {
return Double.valueOf(value);
} else if (type instanceof IntType) {
return Integer.valueOf(value);
+ } else if (type instanceof BigIntType) {
+ return Long.valueOf(value);
} else if (type instanceof VarCharType) {
return value;
} else {
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/AnalyzeTableITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/AnalyzeTableITCase.java
new file mode 100644
index 00000000000..2ba4e0b4bf0
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/AnalyzeTableITCase.java
@@ -0,0 +1,577 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogPartitionImpl;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBoolean;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDate;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDouble;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataString;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.catalog.stats.Date;
+import org.apache.flink.table.planner.factories.TestValuesCatalog;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.table.planner.runtime.utils.TestData;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for `ANALYZE TABLE`. */
+public class AnalyzeTableITCase extends BatchTestBase {
+
+ private TableEnvironment tEnv;
+
+ @BeforeEach
+ @Override
+ public void before() throws Exception {
+ super.before();
+ tEnv = tEnv();
+ Catalog catalog = new TestValuesCatalog("cat", "db", true);
+ tEnv.registerCatalog("cat", catalog);
+ tEnv.useCatalog("cat");
+ tEnv.useDatabase("db");
+ String dataId1 = TestValuesTableFactory.registerData(TestData.fullDataTypesData());
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE NonPartitionTable (\n"
+ + " `a` BOOLEAN,\n"
+ + " `b` TINYINT,\n"
+ + " `c` SMALLINT,\n"
+ + " `d` INT,\n"
+ + " `e` BIGINT,\n"
+ + " `f` FLOAT,\n"
+ + " `g` DOUBLE,\n"
+ + " `h` DECIMAL(5, 2),\n"
+ + " `x` DECIMAL(30, 10),\n"
+ + " `i` VARCHAR(5),\n"
+ + " `j` CHAR(5),\n"
+ + " `k` DATE,\n"
+ + " `l` TIME(0),\n"
+ + " `m` TIMESTAMP(9),\n"
+ + " `n` TIMESTAMP(9) WITH LOCAL TIME ZONE,\n"
+ + " `o` ARRAY<BIGINT>,\n"
+ + " `p` ROW<f1 BIGINT, f2 STRING, f3 DOUBLE>,\n"
+ + " `q` MAP<STRING, INT>\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'data-id' = '%s',\n"
+ + " 'bounded' = 'true'\n"
+ + ")",
+ dataId1));
+
+ String dataId2 = TestValuesTableFactory.registerData(TestData.data5());
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE PartitionTable (\n"
+ + " `a` INT,\n"
+ + " `b` BIGINT,\n"
+ + " `c` INT,\n"
+ + " `d` VARCHAR,\n"
+ + " `e` BIGINT\n"
+ + ") partitioned by (e, a)\n"
+ + " WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'partition-list' = 'e:1,a:1;e:1,a:2;e:1,a:4;e:1,a:5;e:2,a:2;e:2,a:3;e:2,a:4;e:2,a:5;e:3,a:3;e:3,a:5;',\n"
+ + " 'data-id' = '%s',\n"
+ + " 'disable-lookup' = 'true',\n"
+ + " 'bounded' = 'true'\n"
+ + ")",
+ dataId2));
+ createPartition(catalog, "db", "PartitionTable", "e=1,a=1");
+ createPartition(catalog, "db", "PartitionTable", "e=1,a=2");
+ createPartition(catalog, "db", "PartitionTable", "e=1,a=4");
+ createPartition(catalog, "db", "PartitionTable", "e=1,a=5");
+ createPartition(catalog, "db", "PartitionTable", "e=2,a=2");
+ createPartition(catalog, "db", "PartitionTable", "e=2,a=3");
+ createPartition(catalog, "db", "PartitionTable", "e=2,a=4");
+ createPartition(catalog, "db", "PartitionTable", "e=2,a=5");
+ createPartition(catalog, "db", "PartitionTable", "e=3,a=3");
+ createPartition(catalog, "db", "PartitionTable", "e=3,a=5");
+
+ String dataId3 = TestValuesTableFactory.registerData(TestData.smallData5());
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE NonPartitionTable2 (\n"
+ + " `a` INT,\n"
+ + " `b` BIGINT,\n"
+ + " `c` INT,\n"
+ + " `d` VARCHAR METADATA VIRTUAL,\n"
+ + " `e` BIGINT METADATA,"
+ + " `f` as a + 1\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'data-id' = '%s',\n"
+ + " 'disable-lookup' = 'true',\n"
+ + " 'readable-metadata'='d:varchar,e:bigint',\n"
+ + " 'bounded' = 'true'\n"
+ + ")",
+ dataId3));
+
+ String dataId4 = TestValuesTableFactory.registerData(TestData.smallData5());
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE PartitionTable2 (\n"
+ + " `a` INT,\n"
+ + " `b` BIGINT,\n"
+ + " `c` INT,\n"
+ + " `d` VARCHAR METADATA VIRTUAL,\n"
+ + " `e` BIGINT METADATA,"
+ + " `f` as a + 1\n"
+ + ") partitioned by (a)\n"
+ + " WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'partition-list' = 'a:1;a:2;',\n"
+ + " 'data-id' = '%s',\n"
+ + " 'disable-lookup' = 'true',\n"
+ + " 'readable-metadata'='d:varchar,e:bigint',\n"
+ + " 'bounded' = 'true'\n"
+ + ")",
+ dataId4));
+ createPartition(catalog, "db", "PartitionTable2", "a=1");
+ createPartition(catalog, "db", "PartitionTable2", "a=2");
+ }
+
+ private void createPartition(Catalog catalog, String db, String table, String partitionSpecs)
+ throws Exception {
+ catalog.createPartition(
+ new ObjectPath(db, table),
+ createCatalogPartitionSpec(partitionSpecs),
+ new CatalogPartitionImpl(new HashMap<>(), ""),
+ false);
+ }
+
+ private CatalogPartitionSpec createCatalogPartitionSpec(String partitionSpecs) {
+ Map<String, String> partitionSpec = new HashMap<>();
+ for (String partition : partitionSpecs.split(",")) {
+ String[] items = partition.split("=");
+ Preconditions.checkArgument(
+ items.length == 2, "Partition key value should be joined with '='");
+ partitionSpec.put(items[0], items[1]);
+ }
+ return new CatalogPartitionSpec(partitionSpec);
+ }
+
+ @Test
+ public void testNonPartitionTableWithoutTableNotExisted() {
+ assertThatThrownBy(
+ () -> tEnv.executeSql("analyze table not_exist_table compute statistics"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("Table `cat`.`db`.`not_exist_table` doesn't exist");
+ }
+
+ @Test
+ public void testNonPartitionTableWithoutColumns() throws Exception {
+ tEnv.executeSql("analyze table NonPartitionTable compute statistics");
+ ObjectPath path = new ObjectPath(tEnv.getCurrentDatabase(), "NonPartitionTable");
+ assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableStatistics(path))
+ .isEqualTo(new CatalogTableStatistics(5L, -1, -1L, -1L));
+ assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableColumnStatistics(path))
+ .isEqualTo(new CatalogColumnStatistics(new HashMap<>()));
+ }
+
+ @Test
+ public void testNonPartitionTableWithColumnsNotExisted() {
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ "analyze table NonPartitionTable compute statistics for columns not_existed_column"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Column: not_existed_column does not exist in the table: `cat`.`db`.`NonPartitionTable`");
+ }
+
+ @Test
+ public void testNonPartitionTableWithComputeColumn() {
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ "analyze table NonPartitionTable2 compute statistics for columns f"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Column: f is a computed column, ANALYZE TABLE does not support computed column");
+ }
+
+ @Test
+ public void testNonPartitionTableWithVirtualMetadataColumn() {
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ "analyze table NonPartitionTable2 compute statistics for columns d"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Column: d is a metadata column, ANALYZE TABLE does not support metadata column");
+ }
+
+ @Test
+ public void testNonPartitionTableWithMetadataColumn() {
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ "analyze table NonPartitionTable2 compute statistics for columns e"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Column: e is a metadata column, ANALYZE TABLE does not support metadata column");
+ }
+
+ @Test
+ public void testNonPartitionTableWithPartition() {
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ "analyze table NonPartitionTable PARTITION(a) compute statistics"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Invalid ANALYZE TABLE statement. Table: `cat`.`db`.`NonPartitionTable` is not a partition table, while partition values are given");
+ }
+
+ @Test
+ public void testNonPartitionTableWithPartialColumns() throws Exception {
+ tEnv.executeSql("analyze table NonPartitionTable compute statistics for columns f, a, d");
+ ObjectPath path1 = new ObjectPath(tEnv.getCurrentDatabase(), "NonPartitionTable");
+ assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableStatistics(path1))
+ .isEqualTo(new CatalogTableStatistics(5L, -1, -1L, -1L));
+ Map<String, CatalogColumnStatisticsDataBase> columnStatisticsData1 = new HashMap<>();
+ columnStatisticsData1.put("a", new CatalogColumnStatisticsDataBoolean(2L, 2L, 1L));
+ columnStatisticsData1.put(
+ "f", new CatalogColumnStatisticsDataDouble(-1.123d, 3.4d, 4L, 1L));
+ columnStatisticsData1.put(
+ "d",
+ new CatalogColumnStatisticsDataLong(
+ (long) Integer.MIN_VALUE, (long) Integer.MAX_VALUE, 4L, 1L));
+ assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableColumnStatistics(path1))
+ .isEqualTo(new CatalogColumnStatistics(columnStatisticsData1));
+
+ tEnv.executeSql("analyze table NonPartitionTable2 compute statistics for columns a, b, c");
+ ObjectPath path2 = new ObjectPath(tEnv.getCurrentDatabase(), "NonPartitionTable2");
+ Map<String, CatalogColumnStatisticsDataBase> columnStatisticsData2 = new HashMap<>();
+ columnStatisticsData2.put("a", new CatalogColumnStatisticsDataLong(1L, 2L, 2L, 0L));
+ columnStatisticsData2.put("b", new CatalogColumnStatisticsDataLong(1L, 3L, 3L, 0L));
+ columnStatisticsData2.put("c", new CatalogColumnStatisticsDataLong(0L, 2L, 3L, 0L));
+ assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableColumnStatistics(path2))
+ .isEqualTo(new CatalogColumnStatistics(columnStatisticsData2));
+ }
+
+ @Test
+ public void testNonPartitionTableWithAllColumns() throws Exception {
+ tEnv.executeSql("analyze table NonPartitionTable compute statistics for all columns");
+ ObjectPath path1 = new ObjectPath(tEnv.getCurrentDatabase(), "NonPartitionTable");
+ assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableStatistics(path1))
+ .isEqualTo(new CatalogTableStatistics(5L, -1, -1L, -1L));
+ Map<String, CatalogColumnStatisticsDataBase> columnStatisticsData1 = new HashMap<>();
+ // boolean
+ columnStatisticsData1.put("a", new CatalogColumnStatisticsDataBoolean(2L, 2L, 1L));
+ // byte
+ columnStatisticsData1.put(
+ "b",
+ new CatalogColumnStatisticsDataLong(
+ (long) Byte.MIN_VALUE, (long) Byte.MAX_VALUE, 4L, 1L));
+ // short
+ columnStatisticsData1.put(
+ "c",
+ new CatalogColumnStatisticsDataLong(
+ (long) Short.MIN_VALUE, (long) Short.MAX_VALUE, 4L, 1L));
+ // int
+ columnStatisticsData1.put(
+ "d",
+ new CatalogColumnStatisticsDataLong(
+ (long) Integer.MIN_VALUE, (long) Integer.MAX_VALUE, 4L, 1L));
+ // long
+ columnStatisticsData1.put(
+ "e", new CatalogColumnStatisticsDataLong(Long.MIN_VALUE, Long.MAX_VALUE, 4L, 1L));
+ // float
+ columnStatisticsData1.put(
+ "f", new CatalogColumnStatisticsDataDouble(-1.123d, 3.4d, 4L, 1L));
+ // double
+ columnStatisticsData1.put(
+ "g", new CatalogColumnStatisticsDataDouble(-1.123d, 3.4d, 4L, 1L));
+ // DECIMAL(5, 2)
+ columnStatisticsData1.put("h", new CatalogColumnStatisticsDataDouble(5.1d, 8.12d, 4L, 1L));
+ // DECIMAL(30, 10)
+ columnStatisticsData1.put(
+ "x",
+ new CatalogColumnStatisticsDataDouble(
+ 1234567891012345.1d, 812345678910123451.0123456789d, 4L, 1L));
+ // varchar
+ columnStatisticsData1.put("i", new CatalogColumnStatisticsDataString(4L, 2.5d, 4L, 1L));
+ // char
+ columnStatisticsData1.put("j", new CatalogColumnStatisticsDataString(4L, 2.5d, 4L, 1L));
+ // date
+ columnStatisticsData1.put(
+ "k", new CatalogColumnStatisticsDataDate(new Date(-365), new Date(18383), 4L, 1L));
+ // time
+ columnStatisticsData1.put(
+ "l", new CatalogColumnStatisticsDataLong(123000000L, 84203000000000L, 4L, 1L));
+ // timestamp
+ columnStatisticsData1.put(
+ "m", new CatalogColumnStatisticsDataLong(-31536000L, 1588375403L, 4L, 1L));
+ // timestamp with local timezone
+ columnStatisticsData1.put(
+ "n", new CatalogColumnStatisticsDataLong(-31535999877L, 1588375403000L, 4L, 1L));
+
+ assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableColumnStatistics(path1))
+ .isEqualTo(new CatalogColumnStatistics(columnStatisticsData1));
+
+ tEnv.executeSql("analyze table NonPartitionTable2 compute statistics for all columns");
+ ObjectPath path2 = new ObjectPath(tEnv.getCurrentDatabase(), "NonPartitionTable2");
+ Map<String, CatalogColumnStatisticsDataBase> columnStatisticsData2 = new HashMap<>();
+ columnStatisticsData2.put("a", new CatalogColumnStatisticsDataLong(1L, 2L, 2L, 0L));
+ columnStatisticsData2.put("b", new CatalogColumnStatisticsDataLong(1L, 3L, 3L, 0L));
+ columnStatisticsData2.put("c", new CatalogColumnStatisticsDataLong(0L, 2L, 3L, 0L));
+ assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableColumnStatistics(path2))
+ .isEqualTo(new CatalogColumnStatistics(columnStatisticsData2));
+ }
+
+ @Test
+ public void testPartitionTableWithoutPartition() {
+ assertThatThrownBy(() -> tEnv.executeSql("analyze table PartitionTable compute statistics"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Invalid ANALYZE TABLE statement. For partition table, all partition keys should be specified explicitly. "
+ + "The given partition keys: [] are not match the target partition keys: [e,a]");
+ }
+
+ @Test
+ public void testPartitionTableWithPartitionKeyNotExisted() {
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ "analyze table PartitionTable PARTITION(d) compute statistics"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Invalid ANALYZE TABLE statement. For partition table, all partition keys should be specified explicitly. "
+ + "The given partition keys: [d] are not match the target partition keys: [e,a]");
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ "analyze table PartitionTable PARTITION(e=1) compute statistics"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Invalid ANALYZE TABLE statement. For partition table, all partition keys should be specified explicitly. "
+ + "The given partition keys: [e] are not match the target partition keys: [e,a]");
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ "analyze table PartitionTable PARTITION(e=1,d) compute statistics"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Invalid ANALYZE TABLE statement. For partition table, all partition keys should be specified explicitly. "
+ + "The given partition keys: [e,d] are not match the target partition keys: [e,a]");
+ }
+
+ @Test
+ public void testPartitionTableWithPartitionValueNotExisted() throws Exception {
+ tEnv.executeSql("analyze table PartitionTable partition(e=10,a) compute statistics");
+ ObjectPath path = new ObjectPath(tEnv.getCurrentDatabase(), "PartitionTable");
+ assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableStatistics(path))
+ .isEqualTo(new CatalogTableStatistics(-1L, -1, -1L, -1L));
+
+ assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableColumnStatistics(path))
+ .isEqualTo(new CatalogColumnStatistics(new HashMap<>()));
+ }
+
+ @Test
+ public void testPartitionTableWithColumnsNotExisted() {
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ "analyze table PartitionTable partition(e, a) compute statistics for columns not_existed_column"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Column: not_existed_column does not exist in the table: `cat`.`db`.`PartitionTable`");
+ }
+
+ @Test
+ public void testPartitionTableWithVirtualMetadataColumn() {
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ "analyze table PartitionTable2 PARTITION(a) compute statistics for columns d"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Column: d is a metadata column, ANALYZE TABLE does not support metadata column");
+ }
+
+ @Test
+ public void testPartitionTableWithMetadataColumn() {
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ "analyze table PartitionTable2 PARTITION(a) compute statistics for columns e"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Column: e is a metadata column, ANALYZE TABLE does not support metadata column");
+ }
+
+ @Test
+ public void testPartitionTableWithComputeColumn() {
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ "analyze table PartitionTable2 PARTITION(a) compute statistics for columns f"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Column: f is a computed column, ANALYZE TABLE does not support computed column");
+ }
+
+ @Test
+ public void testPartitionTableWithPartition() {
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ "analyze table NonPartitionTable PARTITION(a) compute statistics"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Invalid ANALYZE TABLE statement. Table: `cat`.`db`.`NonPartitionTable` is not a partition table, while partition values are given");
+ }
+
+ @Test
+ public void testPartitionTableWithoutColumns() throws Exception {
+ // Strict order is not required
+ tEnv.executeSql("analyze table PartitionTable partition(a, e) compute statistics");
+ ObjectPath path = new ObjectPath(tEnv.getCurrentDatabase(), "PartitionTable");
+ assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableStatistics(path))
+ .isEqualTo(new CatalogTableStatistics(-1L, -1, -1L, -1L));
+ assertPartitionStatistics(path, "e=1,a=1", 1L);
+ assertPartitionStatistics(path, "e=1,a=2", 1L);
+ assertPartitionStatistics(path, "e=1,a=4", 2L);
+ assertPartitionStatistics(path, "e=1,a=5", 1L);
+ assertPartitionStatistics(path, "e=2,a=2", 1L);
+ assertPartitionStatistics(path, "e=2,a=3", 2L);
+ assertPartitionStatistics(path, "e=2,a=4", 2L);
+ assertPartitionStatistics(path, "e=2,a=5", 2L);
+ assertPartitionStatistics(path, "e=3,a=3", 1L);
+ assertPartitionStatistics(path, "e=3,a=5", 2L);
+
+ tEnv.executeSql(
+ "analyze table PartitionTable2 partition(a) compute statistics for all columns");
+ ObjectPath path2 = new ObjectPath(tEnv.getCurrentDatabase(), "PartitionTable2");
+ Map<String, CatalogColumnStatisticsDataBase> columnStatisticsData2 = new HashMap<>();
+ columnStatisticsData2.put("a", new CatalogColumnStatisticsDataLong(1L, 1L, 1L, 0L));
+ columnStatisticsData2.put("b", new CatalogColumnStatisticsDataLong(1L, 1L, 1L, 0L));
+ columnStatisticsData2.put("c", new CatalogColumnStatisticsDataLong(0L, 0L, 1L, 0L));
+ assertPartitionStatistics(
+ path2, "a=1", 1L, new CatalogColumnStatistics(columnStatisticsData2));
+
+ Map<String, CatalogColumnStatisticsDataBase> columnStatisticsData3 = new HashMap<>();
+ columnStatisticsData3.put("a", new CatalogColumnStatisticsDataLong(2L, 2L, 1L, 0L));
+ columnStatisticsData3.put("b", new CatalogColumnStatisticsDataLong(2L, 3L, 2L, 0L));
+ columnStatisticsData3.put("c", new CatalogColumnStatisticsDataLong(1L, 2L, 2L, 0L));
+ assertPartitionStatistics(
+ path2, "a=2", 2L, new CatalogColumnStatistics(columnStatisticsData3));
+ }
+
+ @Test
+ public void testPartitionTableWithFullPartitionPath() throws Exception {
+ tEnv.executeSql(
+ "analyze table PartitionTable partition(e=2, a=5) compute statistics for all columns");
+ ObjectPath path = new ObjectPath(tEnv.getCurrentDatabase(), "PartitionTable");
+ assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableStatistics(path))
+ .isEqualTo(new CatalogTableStatistics(-1L, -1, -1L, -1L));
+ assertPartitionStatistics(path, "e=1,a=1", -1L);
+ assertPartitionStatistics(path, "e=1,a=2", -1L);
+ assertPartitionStatistics(path, "e=1,a=4", -1L);
+ assertPartitionStatistics(path, "e=1,a=5", -1L);
+ assertPartitionStatistics(path, "e=2,a=2", -1L);
+ assertPartitionStatistics(path, "e=2,a=3", -1L);
+ assertPartitionStatistics(path, "e=2,a=4", -1L);
+ assertPartitionStatistics(path, "e=3,a=3", -1L);
+ assertPartitionStatistics(path, "e=3,a=5", -1L);
+
+ Map<String, CatalogColumnStatisticsDataBase> columnStatisticsData = new HashMap<>();
+ columnStatisticsData.put("a", new CatalogColumnStatisticsDataLong(5L, 5L, 1L, 0L));
+ columnStatisticsData.put("b", new CatalogColumnStatisticsDataLong(14L, 15L, 2L, 0L));
+ columnStatisticsData.put("c", new CatalogColumnStatisticsDataLong(13L, 14L, 2L, 0L));
+ columnStatisticsData.put("d", new CatalogColumnStatisticsDataString(3L, 3.0, 2L, 0L));
+ columnStatisticsData.put("e", new CatalogColumnStatisticsDataLong(2L, 2L, 1L, 0L));
+ assertPartitionStatistics(
+ path, "e=2,a=5", 2L, new CatalogColumnStatistics(columnStatisticsData));
+ }
+
+ @Test
+ public void testPartitionTableWithPartialPartitionPath() throws Exception {
+ tEnv.executeSql("analyze table PartitionTable partition(e=2, a) compute statistics");
+ ObjectPath path = new ObjectPath(tEnv.getCurrentDatabase(), "PartitionTable");
+ assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableStatistics(path))
+ .isEqualTo(new CatalogTableStatistics(-1L, -1, -1L, -1L));
+ assertPartitionStatistics(path, "e=1,a=1", -1L);
+ assertPartitionStatistics(path, "e=1,a=2", -1L);
+ assertPartitionStatistics(path, "e=1,a=4", -1L);
+ assertPartitionStatistics(path, "e=1,a=5", -1L);
+ assertPartitionStatistics(path, "e=2,a=2", 1L);
+ assertPartitionStatistics(path, "e=2,a=3", 2L);
+ assertPartitionStatistics(path, "e=2,a=4", 2L);
+ assertPartitionStatistics(path, "e=2,a=5", 2L);
+ assertPartitionStatistics(path, "e=3,a=3", -1L);
+ assertPartitionStatistics(path, "e=3,a=5", -1L);
+ }
+
+ private void assertPartitionStatistics(ObjectPath path, String partitionSpec, long rowCount)
+ throws Exception {
+ CatalogPartitionSpec spec = createCatalogPartitionSpec(partitionSpec);
+ assertThat(
+ tEnv.getCatalog(tEnv.getCurrentCatalog())
+ .get()
+ .getPartitionStatistics(path, spec))
+ .isEqualTo(new CatalogTableStatistics(rowCount, -1, -1L, -1L));
+ assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableColumnStatistics(path))
+ .isEqualTo(new CatalogColumnStatistics(new HashMap<>()));
+ assertThat(
+ tEnv.getCatalog(tEnv.getCurrentCatalog())
+ .get()
+ .getPartitionColumnStatistics(path, spec))
+ .isEqualTo(new CatalogColumnStatistics(new HashMap<>()));
+ }
+
+ private void assertPartitionStatistics(
+ ObjectPath path,
+ String partitionSpec,
+ long rowCount,
+ CatalogColumnStatistics columnStats)
+ throws Exception {
+ CatalogPartitionSpec spec = createCatalogPartitionSpec(partitionSpec);
+ assertThat(
+ tEnv.getCatalog(tEnv.getCurrentCatalog())
+ .get()
+ .getPartitionStatistics(path, spec))
+ .isEqualTo(new CatalogTableStatistics(rowCount, -1, -1L, -1L));
+ assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableColumnStatistics(path))
+ .isEqualTo(new CatalogColumnStatistics(new HashMap<>()));
+ assertThat(
+ tEnv.getCatalog(tEnv.getCurrentCatalog())
+ .get()
+ .getPartitionColumnStatistics(path, spec))
+ .isEqualTo(columnStats);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/AnalyzeTableITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/AnalyzeTableITCase.java
new file mode 100644
index 00000000000..647acd0ac0c
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/AnalyzeTableITCase.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.sql;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+import org.apache.flink.table.planner.runtime.utils.TestData;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for `ANALYZE TABLE`. */
+public class AnalyzeTableITCase extends StreamingTestBase {
+
+ private TableEnvironment tEnv;
+
+ @BeforeEach
+ @Override
+ public void before() throws Exception {
+ super.before();
+ tEnv = tEnv();
+ String dataId1 = TestValuesTableFactory.registerData(TestData.smallData3());
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE MyTable (\n"
+ + " `a` INT,\n"
+ + " `b` BIGINT,\n"
+ + " `c` VARCHAR\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'data-id' = '%s',\n"
+ + " 'bounded' = 'true'\n"
+ + ")",
+ dataId1));
+ }
+
+ @Test
+ public void testAnalyzeTable() {
+ assertThatThrownBy(() -> tEnv.executeSql("analyze table MyTable compute statistics"))
+ .isInstanceOf(TableException.class)
+ .hasMessageContaining("ANALYZE TABLE is not supported for streaming mode now");
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
index d96d9e2623d..e5877c913a3 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala
@@ -17,7 +17,7 @@
*/
package org.apache.flink.table.planner.runtime.batch.sql
-import org.apache.flink.table.api.config.TableConfigOptions
+import org.apache.flink.table.catalog.ObjectPath
import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder
import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, TestData}