You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/08/25 14:01:03 UTC
[flink] branch master updated: [FLINK-29059][table-planner] Fix the existing column stats are deleted incorrectly when analyze table for partial columns
This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new fe392645421 [FLINK-29059][table-planner] Fix the existing column stats are deleted incorrectly when analyze table for partial columns
fe392645421 is described below
commit fe392645421d10923c75cd5438b91d9ed55900d3
Author: zhengyunhong.zyh <33...@qq.com>
AuthorDate: Wed Aug 24 11:51:29 2022 +0800
[FLINK-29059][table-planner] Fix the existing column stats are deleted incorrectly when analyze table for partial columns
This closes #20672
---
.../flink/table/api/internal/AnalyzeTableUtil.java | 32 ++++++--
.../runtime/batch/sql/AnalyzeTableITCase.java | 92 ++++++++++++++++++++++
2 files changed, 118 insertions(+), 6 deletions(-)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/AnalyzeTableUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/AnalyzeTableUtil.java
index 37471ed52ae..45d324d4e48 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/AnalyzeTableUtil.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/AnalyzeTableUtil.java
@@ -85,10 +85,15 @@ public class AnalyzeTableUtil {
executeSqlAndGenerateStatistics(tableEnv, columns, statSql);
CatalogTableStatistics tableStat = result.f0;
catalog.alterPartitionStatistics(objectPath, partitionSpec, tableStat, false);
- CatalogColumnStatistics columnStat = result.f1;
- if (columnStat != null) {
+ CatalogColumnStatistics newColumnStat = result.f1;
+ if (newColumnStat != null) {
+ CatalogColumnStatistics oldColumnStat =
+ catalog.getPartitionColumnStatistics(objectPath, partitionSpec);
+ // merge stats
+ CatalogColumnStatistics mergedColumnStatistics =
+ mergeColumnStatistics(oldColumnStat, newColumnStat);
catalog.alterPartitionColumnStatistics(
- objectPath, partitionSpec, columnStat, false);
+ objectPath, partitionSpec, mergedColumnStatistics, false);
}
}
} else {
@@ -97,14 +102,29 @@ public class AnalyzeTableUtil {
executeSqlAndGenerateStatistics(tableEnv, columns, statSql);
CatalogTableStatistics tableStat = result.f0;
catalog.alterTableStatistics(objectPath, tableStat, false);
- CatalogColumnStatistics columnStat = result.f1;
- if (columnStat != null) {
- catalog.alterTableColumnStatistics(objectPath, columnStat, false);
+ CatalogColumnStatistics newColumnStat = result.f1;
+ if (newColumnStat != null) {
+ CatalogColumnStatistics oldColumnStat =
+ catalog.getTableColumnStatistics(objectPath);
+ // merge stats.
+ CatalogColumnStatistics mergedColumnStatistics =
+ mergeColumnStatistics(oldColumnStat, newColumnStat);
+ catalog.alterTableColumnStatistics(objectPath, mergedColumnStatistics, false);
}
}
return TableResultImpl.TABLE_RESULT_OK;
}
+ private static CatalogColumnStatistics mergeColumnStatistics(
+ CatalogColumnStatistics oldColumnStatistics,
+ CatalogColumnStatistics newColumnStatistics) {
+ CatalogColumnStatistics columnStatistics = oldColumnStatistics.copy();
+ columnStatistics
+ .getColumnStatisticsData()
+ .putAll(newColumnStatistics.getColumnStatisticsData());
+ return columnStatistics;
+ }
+
private static Tuple2<CatalogTableStatistics, CatalogColumnStatistics>
executeSqlAndGenerateStatistics(
TableEnvironmentImpl tableEnv, List<Column> columns, String statSql) {
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/AnalyzeTableITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/AnalyzeTableITCase.java
index 2ba4e0b4bf0..d61bb8440bd 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/AnalyzeTableITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/AnalyzeTableITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogPartitionImpl;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBoolean;
@@ -353,6 +354,36 @@ public class AnalyzeTableITCase extends BatchTestBase {
.isEqualTo(new CatalogColumnStatistics(columnStatisticsData2));
}
+ @Test
+ public void testNonPartitionTableAnalyzePartialColumnsWithSomeColumnsHaveColumnStats()
+ throws TableNotExistException {
+ // If some columns have table column stats, analyze table for partial columns will merge
+ // these exist columns stats instead of covering it.
+ // Adding column stats to partial columns.
+ tEnv.executeSql("analyze table NonPartitionTable compute statistics for columns f, a, d");
+ ObjectPath path = new ObjectPath(tEnv.getCurrentDatabase(), "NonPartitionTable");
+ assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableStatistics(path))
+ .isEqualTo(new CatalogTableStatistics(5L, -1, -1L, -1L));
+ Map<String, CatalogColumnStatisticsDataBase> columnStatisticsData = new HashMap<>();
+ columnStatisticsData.put("a", new CatalogColumnStatisticsDataBoolean(2L, 2L, 1L));
+ columnStatisticsData.put("f", new CatalogColumnStatisticsDataDouble(-1.123d, 3.4d, 4L, 1L));
+ columnStatisticsData.put(
+ "d",
+ new CatalogColumnStatisticsDataLong(
+ (long) Integer.MIN_VALUE, (long) Integer.MAX_VALUE, 4L, 1L));
+ assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableColumnStatistics(path))
+ .isEqualTo(new CatalogColumnStatistics(columnStatisticsData));
+
+ // Analyze different column sets.
+ tEnv.executeSql("analyze table NonPartitionTable compute statistics for columns d, e");
+ assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableStatistics(path))
+ .isEqualTo(new CatalogTableStatistics(5L, -1, -1L, -1L));
+ columnStatisticsData.put(
+ "e", new CatalogColumnStatisticsDataLong(Long.MIN_VALUE, Long.MAX_VALUE, 4L, 1L));
+ assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableColumnStatistics(path))
+ .isEqualTo(new CatalogColumnStatistics(columnStatisticsData));
+ }
+
@Test
public void testPartitionTableWithoutPartition() {
assertThatThrownBy(() -> tEnv.executeSql("analyze table PartitionTable compute statistics"))
@@ -537,6 +568,67 @@ public class AnalyzeTableITCase extends BatchTestBase {
assertPartitionStatistics(path, "e=3,a=5", -1L);
}
+ @Test
+ public void testPartitionTableAnalyzePartialColumnsWithSomeColumnsHaveColumnStats()
+ throws Exception {
+ // If some columns have table column stats, analyze table for partial columns will merge
+ // these exist columns stats instead of covering it.
+ // Adding column stats to partial columns.
+ tEnv.executeSql(
+ "analyze table PartitionTable partition(e=2, a=5) compute statistics for columns a, b, c");
+ ObjectPath path = new ObjectPath(tEnv.getCurrentDatabase(), "PartitionTable");
+ assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableStatistics(path))
+ .isEqualTo(new CatalogTableStatistics(-1L, -1, -1L, -1L));
+ Map<String, CatalogColumnStatisticsDataBase> columnStatisticsData = new HashMap<>();
+ columnStatisticsData.put("a", new CatalogColumnStatisticsDataLong(5L, 5L, 1L, 0L));
+ columnStatisticsData.put("b", new CatalogColumnStatisticsDataLong(14L, 15L, 2L, 0L));
+ columnStatisticsData.put("c", new CatalogColumnStatisticsDataLong(13L, 14L, 2L, 0L));
+ assertPartitionStatistics(
+ path, "e=2,a=5", 2L, new CatalogColumnStatistics(columnStatisticsData));
+
+ tEnv.executeSql(
+ "analyze table PartitionTable partition(e=2, a=5) compute statistics for columns c, d");
+ assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableStatistics(path))
+ .isEqualTo(new CatalogTableStatistics(-1L, -1, -1L, -1L));
+ columnStatisticsData.put("d", new CatalogColumnStatisticsDataString(3L, 3.0, 2L, 0L));
+ assertPartitionStatistics(
+ path, "e=2,a=5", 2L, new CatalogColumnStatistics(columnStatisticsData));
+ }
+
+ @Test
+ public void testPartitionTableAnalyzePartialPartitionWithSomePartitionHaveColumnStats()
+ throws Exception {
+ // For different partitions, their column stats are isolated and should not affect each
+ // other.
+ // Adding column stats to one partition.
+ tEnv.executeSql(
+ "analyze table PartitionTable partition(e=2, a=5) compute statistics for columns a, b, c");
+ ObjectPath path = new ObjectPath(tEnv.getCurrentDatabase(), "PartitionTable");
+ assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableStatistics(path))
+ .isEqualTo(new CatalogTableStatistics(-1L, -1, -1L, -1L));
+ Map<String, CatalogColumnStatisticsDataBase> columnStatisticsData1 = new HashMap<>();
+ columnStatisticsData1.put("a", new CatalogColumnStatisticsDataLong(5L, 5L, 1L, 0L));
+ columnStatisticsData1.put("b", new CatalogColumnStatisticsDataLong(14L, 15L, 2L, 0L));
+ columnStatisticsData1.put("c", new CatalogColumnStatisticsDataLong(13L, 14L, 2L, 0L));
+ assertPartitionStatistics(
+ path, "e=2,a=5", 2L, new CatalogColumnStatistics(columnStatisticsData1));
+
+ // Adding column stats to another partition.
+ tEnv.executeSql(
+ "analyze table PartitionTable partition(e=2, a=4) compute statistics for columns a, d");
+ assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableStatistics(path))
+ .isEqualTo(new CatalogTableStatistics(-1L, -1, -1L, -1L));
+ // origin analyze partition.
+ assertPartitionStatistics(
+ path, "e=2,a=5", 2L, new CatalogColumnStatistics(columnStatisticsData1));
+ Map<String, CatalogColumnStatisticsDataBase> columnStatisticsData2 = new HashMap<>();
+ columnStatisticsData2.put("a", new CatalogColumnStatisticsDataLong(4L, 4L, 1L, 0L));
+ columnStatisticsData2.put("d", new CatalogColumnStatisticsDataString(3L, 3.0, 2L, 0L));
+ // new analyze partition.
+ assertPartitionStatistics(
+ path, "e=2,a=4", 2L, new CatalogColumnStatistics(columnStatisticsData2));
+ }
+
private void assertPartitionStatistics(ObjectPath path, String partitionSpec, long rowCount)
throws Exception {
CatalogPartitionSpec spec = createCatalogPartitionSpec(partitionSpec);