You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/13 06:41:29 UTC

[GitHub] [flink] godfreyhe commented on a diff in pull request #20248: [FLINK-28075][table-planner] get statistics for partitioned table eve…

godfreyhe commented on code in PR #20248:
URL: https://github.com/apache/flink/pull/20248#discussion_r919698302


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRecomputeStatisticsProgram.java:
##########
@@ -116,32 +121,42 @@ private TableStats recomputeStatistics(
             return reportStatEnabled
                     ? ((SupportsStatisticReport) tableSource).reportStatistics()
                     : null;
-        } else {
+        } else if (partitionPushDownSpec != null) {
             // ignore filter push down if all pushdown predicates are also in outer Filter operator
             // otherwise the result will be estimated twice.
-            if (partitionPushDownSpec != null) {
-                // partition push down
-                // try to get the statistics for the remaining partitions
-                TableStats newTableStat = getPartitionsTableStats(table, partitionPushDownSpec);
-                // call reportStatistics method if reportStatEnabled is true and the partition
-                // statistics is unknown
-                if (reportStatEnabled && isUnknownTableStats(newTableStat)) {
-                    return ((SupportsStatisticReport) tableSource).reportStatistics();
-                } else {
-                    return newTableStat;
-                }
+            // partition push down
+            // try to get the statistics for the remaining partitions
+            TableStats newTableStat = getPartitionsTableStats(table, partitionPushDownSpec);
+            // call reportStatistics method if reportStatEnabled is true and the partition
+            // statistics is unknown
+            if (reportStatEnabled && isUnknownTableStats(newTableStat)) {
+                return ((SupportsStatisticReport) tableSource).reportStatistics();
             } else {
-                // call reportStatistics method if reportStatEnabled is true and the original
-                // catalog statistics is unknown
-                if (reportStatEnabled && isUnknownTableStats(origTableStats)) {
-                    return ((SupportsStatisticReport) tableSource).reportStatistics();
-                } else {
-                    return origTableStats;
-                }
+                return newTableStat;
+            }
+        } else {
+            if (isPartitionTable(table) && isUnknownTableStats(origTableStats)) {
+                // if table is partition table, try to recompute stats by catalog.
+                origTableStats = getPartitionsTableStats(table, null);
+            }
+            // call reportStatistics method if reportStatEnabled is true and the newTableStats is
+            // unknown.
+            if (reportStatEnabled && isUnknownTableStats(origTableStats)) {
+                return ((SupportsStatisticReport) tableSource).reportStatistics();
+            } else {
+                return origTableStats;
             }
         }
     }
 
+    private boolean isPartitionTable(TableSourceTable table) {
+        return table.contextResolvedTable()
+                        .<ResolvedCatalogTable>getResolvedTable()
+                        .getPartitionKeys()

Review Comment:
   use  `isPartitioned`



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRecomputeStatisticsProgram.java:
##########
@@ -153,7 +168,21 @@ private TableStats getPartitionsTableStats(
             ObjectIdentifier identifier = table.contextResolvedTable().getIdentifier();
             ObjectPath tablePath = identifier.toObjectPath();
             Catalog catalog = table.contextResolvedTable().getCatalog().get();
-            for (Map<String, String> partition : partitionPushDownSpec.getPartitions()) {
+            List<Map<String, String>> partitionList = new ArrayList<>();
+            if (partitionPushDownSpec == null) {
+                try {
+                    List<CatalogPartitionSpec> catalogPartitionSpecs =
+                            catalog.listPartitions(tablePath);
+                    for (CatalogPartitionSpec partitionSpec : catalogPartitionSpecs) {
+                        partitionList.add(partitionSpec.getPartitionSpec());
+                    }
+                } catch (TableNotExistException | TableNotPartitionedException e) {
+                    throw new RuntimeException(e);

Review Comment:
   throw `TableException` here



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkRecomputeStatisticsProgram.java:
##########
@@ -153,7 +168,21 @@ private TableStats getPartitionsTableStats(
             ObjectIdentifier identifier = table.contextResolvedTable().getIdentifier();
             ObjectPath tablePath = identifier.toObjectPath();
             Catalog catalog = table.contextResolvedTable().getCatalog().get();
-            for (Map<String, String> partition : partitionPushDownSpec.getPartitions()) {
+            List<Map<String, String>> partitionList = new ArrayList<>();
+            if (partitionPushDownSpec == null) {

Review Comment:
   mark the argument as @Nullable



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemStatisticsReportTest.java:
##########
@@ -208,8 +208,7 @@ public void testNoPartitionPushDownAndCatalogStatisticsExist()
                         false);
 
         FlinkStatistic statistic = getStatisticsFromOptimizedPlan("select * from PartTable");
-        // TODO get partition statistics from catalog

Review Comment:
   please remove the following `TODO`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org