You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/08/25 14:01:03 UTC

[flink] branch master updated: [FLINK-29059][table-planner] Fix the existing column stats are deleted incorrectly when analyze table for partial columns

This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new fe392645421 [FLINK-29059][table-planner] Fix the existing column stats are deleted incorrectly when analyze table for partial columns
fe392645421 is described below

commit fe392645421d10923c75cd5438b91d9ed55900d3
Author: zhengyunhong.zyh <33...@qq.com>
AuthorDate: Wed Aug 24 11:51:29 2022 +0800

    [FLINK-29059][table-planner] Fix the existing column stats are deleted incorrectly when analyze table for partial columns
    
    This closes #20672
---
 .../flink/table/api/internal/AnalyzeTableUtil.java | 32 ++++++--
 .../runtime/batch/sql/AnalyzeTableITCase.java      | 92 ++++++++++++++++++++++
 2 files changed, 118 insertions(+), 6 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/AnalyzeTableUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/AnalyzeTableUtil.java
index 37471ed52ae..45d324d4e48 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/AnalyzeTableUtil.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/AnalyzeTableUtil.java
@@ -85,10 +85,15 @@ public class AnalyzeTableUtil {
                         executeSqlAndGenerateStatistics(tableEnv, columns, statSql);
                 CatalogTableStatistics tableStat = result.f0;
                 catalog.alterPartitionStatistics(objectPath, partitionSpec, tableStat, false);
-                CatalogColumnStatistics columnStat = result.f1;
-                if (columnStat != null) {
+                CatalogColumnStatistics newColumnStat = result.f1;
+                if (newColumnStat != null) {
+                    CatalogColumnStatistics oldColumnStat =
+                            catalog.getPartitionColumnStatistics(objectPath, partitionSpec);
+                    // merge stats
+                    CatalogColumnStatistics mergedColumnStatistics =
+                            mergeColumnStatistics(oldColumnStat, newColumnStat);
                     catalog.alterPartitionColumnStatistics(
-                            objectPath, partitionSpec, columnStat, false);
+                            objectPath, partitionSpec, mergedColumnStatistics, false);
                 }
             }
         } else {
@@ -97,14 +102,29 @@ public class AnalyzeTableUtil {
                     executeSqlAndGenerateStatistics(tableEnv, columns, statSql);
             CatalogTableStatistics tableStat = result.f0;
             catalog.alterTableStatistics(objectPath, tableStat, false);
-            CatalogColumnStatistics columnStat = result.f1;
-            if (columnStat != null) {
-                catalog.alterTableColumnStatistics(objectPath, columnStat, false);
+            CatalogColumnStatistics newColumnStat = result.f1;
+            if (newColumnStat != null) {
+                CatalogColumnStatistics oldColumnStat =
+                        catalog.getTableColumnStatistics(objectPath);
+                // merge stats.
+                CatalogColumnStatistics mergedColumnStatistics =
+                        mergeColumnStatistics(oldColumnStat, newColumnStat);
+                catalog.alterTableColumnStatistics(objectPath, mergedColumnStatistics, false);
             }
         }
         return TableResultImpl.TABLE_RESULT_OK;
     }
 
+    private static CatalogColumnStatistics mergeColumnStatistics(
+            CatalogColumnStatistics oldColumnStatistics,
+            CatalogColumnStatistics newColumnStatistics) {
+        CatalogColumnStatistics columnStatistics = oldColumnStatistics.copy();
+        columnStatistics
+                .getColumnStatisticsData()
+                .putAll(newColumnStatistics.getColumnStatisticsData());
+        return columnStatistics;
+    }
+
     private static Tuple2<CatalogTableStatistics, CatalogColumnStatistics>
             executeSqlAndGenerateStatistics(
                     TableEnvironmentImpl tableEnv, List<Column> columns, String statSql) {
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/AnalyzeTableITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/AnalyzeTableITCase.java
index 2ba4e0b4bf0..d61bb8440bd 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/AnalyzeTableITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/AnalyzeTableITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogPartitionImpl;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBoolean;
@@ -353,6 +354,36 @@ public class AnalyzeTableITCase extends BatchTestBase {
                 .isEqualTo(new CatalogColumnStatistics(columnStatisticsData2));
     }
 
+    @Test
+    public void testNonPartitionTableAnalyzePartialColumnsWithSomeColumnsHaveColumnStats()
+            throws TableNotExistException {
+        // If some columns have table column stats, analyze table for partial columns will merge
+        // these exist columns stats instead of covering it.
+        // Adding column stats to partial columns.
+        tEnv.executeSql("analyze table NonPartitionTable compute statistics for columns f, a, d");
+        ObjectPath path = new ObjectPath(tEnv.getCurrentDatabase(), "NonPartitionTable");
+        assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableStatistics(path))
+                .isEqualTo(new CatalogTableStatistics(5L, -1, -1L, -1L));
+        Map<String, CatalogColumnStatisticsDataBase> columnStatisticsData = new HashMap<>();
+        columnStatisticsData.put("a", new CatalogColumnStatisticsDataBoolean(2L, 2L, 1L));
+        columnStatisticsData.put("f", new CatalogColumnStatisticsDataDouble(-1.123d, 3.4d, 4L, 1L));
+        columnStatisticsData.put(
+                "d",
+                new CatalogColumnStatisticsDataLong(
+                        (long) Integer.MIN_VALUE, (long) Integer.MAX_VALUE, 4L, 1L));
+        assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableColumnStatistics(path))
+                .isEqualTo(new CatalogColumnStatistics(columnStatisticsData));
+
+        // Analyze different column sets.
+        tEnv.executeSql("analyze table NonPartitionTable compute statistics for columns d, e");
+        assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableStatistics(path))
+                .isEqualTo(new CatalogTableStatistics(5L, -1, -1L, -1L));
+        columnStatisticsData.put(
+                "e", new CatalogColumnStatisticsDataLong(Long.MIN_VALUE, Long.MAX_VALUE, 4L, 1L));
+        assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableColumnStatistics(path))
+                .isEqualTo(new CatalogColumnStatistics(columnStatisticsData));
+    }
+
     @Test
     public void testPartitionTableWithoutPartition() {
         assertThatThrownBy(() -> tEnv.executeSql("analyze table PartitionTable compute statistics"))
@@ -537,6 +568,67 @@ public class AnalyzeTableITCase extends BatchTestBase {
         assertPartitionStatistics(path, "e=3,a=5", -1L);
     }
 
+    @Test
+    public void testPartitionTableAnalyzePartialColumnsWithSomeColumnsHaveColumnStats()
+            throws Exception {
+        // If some columns have table column stats, analyze table for partial columns will merge
+        // these exist columns stats instead of covering it.
+        // Adding column stats to partial columns.
+        tEnv.executeSql(
+                "analyze table PartitionTable partition(e=2, a=5) compute statistics for columns a, b, c");
+        ObjectPath path = new ObjectPath(tEnv.getCurrentDatabase(), "PartitionTable");
+        assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableStatistics(path))
+                .isEqualTo(new CatalogTableStatistics(-1L, -1, -1L, -1L));
+        Map<String, CatalogColumnStatisticsDataBase> columnStatisticsData = new HashMap<>();
+        columnStatisticsData.put("a", new CatalogColumnStatisticsDataLong(5L, 5L, 1L, 0L));
+        columnStatisticsData.put("b", new CatalogColumnStatisticsDataLong(14L, 15L, 2L, 0L));
+        columnStatisticsData.put("c", new CatalogColumnStatisticsDataLong(13L, 14L, 2L, 0L));
+        assertPartitionStatistics(
+                path, "e=2,a=5", 2L, new CatalogColumnStatistics(columnStatisticsData));
+
+        tEnv.executeSql(
+                "analyze table PartitionTable partition(e=2, a=5) compute statistics for columns c, d");
+        assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableStatistics(path))
+                .isEqualTo(new CatalogTableStatistics(-1L, -1, -1L, -1L));
+        columnStatisticsData.put("d", new CatalogColumnStatisticsDataString(3L, 3.0, 2L, 0L));
+        assertPartitionStatistics(
+                path, "e=2,a=5", 2L, new CatalogColumnStatistics(columnStatisticsData));
+    }
+
+    @Test
+    public void testPartitionTableAnalyzePartialPartitionWithSomePartitionHaveColumnStats()
+            throws Exception {
+        // For different partitions, their column stats are isolated and should not affect each
+        // other.
+        // Adding column stats to one partition.
+        tEnv.executeSql(
+                "analyze table PartitionTable partition(e=2, a=5) compute statistics for columns a, b, c");
+        ObjectPath path = new ObjectPath(tEnv.getCurrentDatabase(), "PartitionTable");
+        assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableStatistics(path))
+                .isEqualTo(new CatalogTableStatistics(-1L, -1, -1L, -1L));
+        Map<String, CatalogColumnStatisticsDataBase> columnStatisticsData1 = new HashMap<>();
+        columnStatisticsData1.put("a", new CatalogColumnStatisticsDataLong(5L, 5L, 1L, 0L));
+        columnStatisticsData1.put("b", new CatalogColumnStatisticsDataLong(14L, 15L, 2L, 0L));
+        columnStatisticsData1.put("c", new CatalogColumnStatisticsDataLong(13L, 14L, 2L, 0L));
+        assertPartitionStatistics(
+                path, "e=2,a=5", 2L, new CatalogColumnStatistics(columnStatisticsData1));
+
+        // Adding column stats to another partition.
+        tEnv.executeSql(
+                "analyze table PartitionTable partition(e=2, a=4) compute statistics for columns a, d");
+        assertThat(tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTableStatistics(path))
+                .isEqualTo(new CatalogTableStatistics(-1L, -1, -1L, -1L));
+        // origin analyze partition.
+        assertPartitionStatistics(
+                path, "e=2,a=5", 2L, new CatalogColumnStatistics(columnStatisticsData1));
+        Map<String, CatalogColumnStatisticsDataBase> columnStatisticsData2 = new HashMap<>();
+        columnStatisticsData2.put("a", new CatalogColumnStatisticsDataLong(4L, 4L, 1L, 0L));
+        columnStatisticsData2.put("d", new CatalogColumnStatisticsDataString(3L, 3.0, 2L, 0L));
+        // new analyze partition.
+        assertPartitionStatistics(
+                path, "e=2,a=4", 2L, new CatalogColumnStatistics(columnStatisticsData2));
+    }
+
     private void assertPartitionStatistics(ObjectPath path, String partitionSpec, long rowCount)
             throws Exception {
         CatalogPartitionSpec spec = createCatalogPartitionSpec(partitionSpec);