You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/13 12:55:44 UTC

[GitHub] [flink] godfreyhe commented on a diff in pull request #20008: [FLINK-27990][table-planner] Parquet format supports reporting statistics

godfreyhe commented on code in PR #20008:
URL: https://github.com/apache/flink/pull/20008#discussion_r920040118


##########
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileFormatFactory.java:
##########
@@ -142,5 +182,246 @@ public BulkFormat<RowData, FileSourceSplit> createRuntimeDecoder(
         public ChangelogMode getChangelogMode() {
             return ChangelogMode.insertOnly();
         }
+
+        @Override
+        public TableStats reportStatistics(List<Path> files, DataType producedDataType) {
+            try {
+                Configuration hadoopConfig = getParquetConfiguration(formatOptions);
+                Map<String, Statistics<?>> columnStatisticsMap = new HashMap<>();
+                RowType producedRowType = (RowType) producedDataType.getLogicalType();
+                long rowCount = 0;
+                for (Path file : files) {
+                    rowCount += updateStatistics(hadoopConfig, file, columnStatisticsMap);
+                }
+                Map<String, ColumnStats> columnStatsMap =
+                        convertToColumnStats(columnStatisticsMap, producedRowType);
+                return new TableStats(rowCount, columnStatsMap);
+            } catch (Exception e) {
+                return TableStats.UNKNOWN;
+            }
+        }
+
+        private Map<String, ColumnStats> convertToColumnStats(
+                Map<String, Statistics<?>> columnStatisticsMap, RowType producedRowType) {
+            Map<String, ColumnStats> columnStatMap = new HashMap<>();
+            for (String column : producedRowType.getFieldNames()) {
+                Statistics<?> statistics = columnStatisticsMap.get(column);
+                if (statistics == null) {
+                    continue;
+                }
+                ColumnStats columnStats =
+                        convertToColumnStats(
+                                producedRowType.getTypeAt(producedRowType.getFieldIndex(column)),
+                                statistics);
+                columnStatMap.put(column, columnStats);
+            }
+            return columnStatMap;
+        }
+
+        private ColumnStats convertToColumnStats(
+                LogicalType logicalType, Statistics<?> statistics) {
+            ColumnStats.Builder builder =
+                    new ColumnStats.Builder().setNullCount(statistics.getNumNulls());
+
+            switch (logicalType.getTypeRoot()) {
+                case BOOLEAN:
+                case BINARY:
+                case VARBINARY:
+                case ROW:
+                case ARRAY:
+                case MAP:
+                    break;

Review Comment:
   these unsupported types can be moved the `default` branch, and they have `nullCount` only.



##########
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileFormatFactory.java:
##########
@@ -142,5 +182,246 @@ public BulkFormat<RowData, FileSourceSplit> createRuntimeDecoder(
         public ChangelogMode getChangelogMode() {
             return ChangelogMode.insertOnly();
         }
+
+        @Override
+        public TableStats reportStatistics(List<Path> files, DataType producedDataType) {
+            try {
+                Configuration hadoopConfig = getParquetConfiguration(formatOptions);
+                Map<String, Statistics<?>> columnStatisticsMap = new HashMap<>();
+                RowType producedRowType = (RowType) producedDataType.getLogicalType();
+                long rowCount = 0;
+                for (Path file : files) {
+                    rowCount += updateStatistics(hadoopConfig, file, columnStatisticsMap);
+                }
+                Map<String, ColumnStats> columnStatsMap =
+                        convertToColumnStats(columnStatisticsMap, producedRowType);
+                return new TableStats(rowCount, columnStatsMap);
+            } catch (Exception e) {
+                return TableStats.UNKNOWN;
+            }
+        }
+
+        private Map<String, ColumnStats> convertToColumnStats(
+                Map<String, Statistics<?>> columnStatisticsMap, RowType producedRowType) {
+            Map<String, ColumnStats> columnStatMap = new HashMap<>();
+            for (String column : producedRowType.getFieldNames()) {
+                Statistics<?> statistics = columnStatisticsMap.get(column);
+                if (statistics == null) {
+                    continue;
+                }
+                ColumnStats columnStats =
+                        convertToColumnStats(
+                                producedRowType.getTypeAt(producedRowType.getFieldIndex(column)),
+                                statistics);
+                columnStatMap.put(column, columnStats);
+            }
+            return columnStatMap;
+        }
+
+        private ColumnStats convertToColumnStats(
+                LogicalType logicalType, Statistics<?> statistics) {
+            ColumnStats.Builder builder =
+                    new ColumnStats.Builder().setNullCount(statistics.getNumNulls());
+
+            switch (logicalType.getTypeRoot()) {
+                case BOOLEAN:
+                case BINARY:
+                case VARBINARY:
+                case ROW:
+                case ARRAY:
+                case MAP:
+                    break;
+                case TINYINT:
+                case SMALLINT:
+                case INTEGER:
+                case BIGINT:
+                    if (statistics instanceof IntStatistics) {
+                        builder.setMin(((IntStatistics) statistics).getMin())
+                                .setMax(((IntStatistics) statistics).getMax());
+                        break;
+                    } else if (statistics instanceof LongStatistics) {
+                        builder.setMin(((LongStatistics) statistics).getMin())
+                                .setMax(((LongStatistics) statistics).getMax());
+                        break;
+                    } else {
+                        return null;
+                    }
+                case DOUBLE:
+                    if (statistics instanceof DoubleStatistics) {
+                        builder.setMin(((DoubleStatistics) statistics).getMin())
+                                .setMax(((DoubleStatistics) statistics).getMax());
+                        break;
+                    } else {
+                        return null;
+                    }
+                case FLOAT:
+                    if (statistics instanceof FloatStatistics) {
+                        builder.setMin(((FloatStatistics) statistics).getMin())
+                                .setMax(((FloatStatistics) statistics).getMax());
+                        break;
+                    } else {
+                        return null;
+                    }
+                case DATE:
+                    if (statistics instanceof IntStatistics) {
+                        Date min =
+                                Date.valueOf(
+                                        DateTimeUtils.formatDate(
+                                                ((IntStatistics) statistics).getMin()));
+                        Date max =
+                                Date.valueOf(
+                                        DateTimeUtils.formatDate(
+                                                ((IntStatistics) statistics).getMax()));
+                        builder.setMin(min).setMax(max);
+                        break;
+                    } else {
+                        return null;
+                    }
+                case TIME_WITHOUT_TIME_ZONE:
+                    if (statistics instanceof IntStatistics) {
+                        Time min =
+                                Time.valueOf(
+                                        DateTimeUtils.toLocalTime(
+                                                ((IntStatistics) statistics).getMin()));
+                        Time max =
+                                Time.valueOf(
+                                        DateTimeUtils.toLocalTime(
+                                                ((IntStatistics) statistics).getMax()));
+                        builder.setMin(min).setMax(max);
+                        break;
+                    } else {
+                        return null;
+                    }
+                case CHAR:
+                case VARCHAR:
+                    if (statistics instanceof BinaryStatistics) {
+                        Binary min = ((BinaryStatistics) statistics).genericGetMin();
+                        Binary max = ((BinaryStatistics) statistics).genericGetMax();
+                        if (min != null) {
+                            builder.setMin(min.toStringUsingUTF8());
+                        } else {
+                            builder.setMin(null);
+                        }
+                        if (max != null) {
+                            builder.setMax(max.toStringUsingUTF8());
+                        } else {
+                            builder.setMax(null);
+                        }
+                        break;
+                    } else {
+                        return null;
+                    }
+                case TIMESTAMP_WITHOUT_TIME_ZONE:
+                case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                case TIMESTAMP_WITH_TIME_ZONE:
+                    if (statistics instanceof LongStatistics) {
+                        builder.setMin(new Timestamp(((LongStatistics) statistics).getMin()))
+                                .setMax(new Timestamp(((LongStatistics) statistics).getMax()));
+                        break;
+                    } else if (statistics instanceof BinaryStatistics) {
+                        Binary min = ((BinaryStatistics) statistics).genericGetMin();
+                        Binary max = ((BinaryStatistics) statistics).genericGetMax();
+                        if (min != null) {
+                            builder.setMin(binaryToTimestamp(min, formatOptions.get(UTC_TIMEZONE)));
+                        } else {
+                            builder.setMin(null);
+                        }
+                        if (max != null) {
+                            builder.setMax(binaryToTimestamp(max, formatOptions.get(UTC_TIMEZONE)));
+                        } else {
+                            builder.setMax(null);
+                        }
+                        break;
+                    } else {
+                        return null;
+                    }
+                case DECIMAL:
+                    if (statistics instanceof IntStatistics) {
+                        builder.setMin(BigDecimal.valueOf(((IntStatistics) statistics).getMin()))
+                                .setMax(BigDecimal.valueOf(((IntStatistics) statistics).getMax()));
+                        break;
+                    } else if (statistics instanceof LongStatistics) {
+                        builder.setMin(BigDecimal.valueOf(((LongStatistics) statistics).getMin()))
+                                .setMax(BigDecimal.valueOf(((LongStatistics) statistics).getMax()));
+                        break;
+                    } else if (statistics instanceof BinaryStatistics) {
+                        Binary min = ((BinaryStatistics) statistics).genericGetMin();
+                        Binary max = ((BinaryStatistics) statistics).genericGetMax();
+                        if (min != null) {
+                            builder.setMin(
+                                    binaryToDecimal(min, ((DecimalType) logicalType).getScale()));
+                        } else {
+                            builder.setMin(null);
+                        }
+                        if (max != null) {
+                            builder.setMax(
+                                    binaryToDecimal(max, ((DecimalType) logicalType).getScale()));
+                        } else {
+                            builder.setMax(null);
+                        }
+                        break;
+                    } else {
+                        return null;
+                    }

Review Comment:
   only one `break` is needed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org