You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/07 06:34:12 UTC

[GitHub] [flink] swuferhong commented on a diff in pull request #20009: [FLINK-27991][table-planner] ORC format supports reporting statistics

swuferhong commented on code in PR #20009:
URL: https://github.com/apache/flink/pull/20009#discussion_r915507045


##########
flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java:
##########
@@ -167,5 +188,179 @@ public ChangelogMode getChangelogMode() {
         public void applyFilters(List<ResolvedExpression> filters) {
             this.filters = filters;
         }
+
+        @Override
+        public TableStats reportStatistics(List<Path> files, DataType producedDataType) {
+            try {
+                Properties properties = getOrcProperties(formatOptions);
+                Configuration hadoopConfig = new Configuration();
+                properties.forEach((k, v) -> hadoopConfig.set(k.toString(), v.toString()));
+
+                long rowCount = 0;
+                Map<String, ColumnStatistics> columnStatisticsMap = new HashMap<>();
+                RowType producedRowType = (RowType) producedDataType.getLogicalType();
+                for (Path file : files) {
+                    rowCount +=
+                            updateStatistics(
+                                    hadoopConfig, file, columnStatisticsMap, producedRowType);
+                }
+
+                Map<String, ColumnStats> columnStatsMap =
+                        convertToColumnStats(rowCount, columnStatisticsMap, producedRowType);
+
+                return new TableStats(rowCount, columnStatsMap);
+            } catch (Exception e) {
+                return TableStats.UNKNOWN;
+            }
+        }
+
+        private long updateStatistics(
+                Configuration hadoopConf,
+                Path file,
+                Map<String, ColumnStatistics> columnStatisticsMap,
+                RowType producedRowType)
+                throws IOException {
+            org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(file.toUri());
+            Reader reader =
+                    OrcFile.createReader(
+                            path,
+                            OrcFile.readerOptions(hadoopConf)
+                                    .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(hadoopConf)));
+            ColumnStatistics[] statistics = reader.getStatistics();
+            TypeDescription schema = reader.getSchema();
+            List<String> fieldNames = schema.getFieldNames();
+            List<TypeDescription> columnTypes = schema.getChildren();
+            for (String column : producedRowType.getFieldNames()) {
+                int fieldIdx = fieldNames.indexOf(column);
+                if (fieldIdx >= 0) {
+                    int colId = columnTypes.get(fieldIdx).getId();
+                    ColumnStatistics statistic = statistics[colId];
+                    updateStatistics(statistic, column, columnStatisticsMap);
+                }
+            }
+
+            return reader.getNumberOfRows();
+        }
+
+        private void updateStatistics(
+                ColumnStatistics statistic,
+                String column,
+                Map<String, ColumnStatistics> columnStatisticsMap) {
+            ColumnStatistics previousStatistics = columnStatisticsMap.get(column);
+            if (previousStatistics == null) {
+                columnStatisticsMap.put(column, statistic);
+            } else {
+                if (previousStatistics instanceof ColumnStatisticsImpl) {
+                    ((ColumnStatisticsImpl) previousStatistics)
+                            .merge((ColumnStatisticsImpl) statistic);
+                }
+            }
+        }
+
+        private Map<String, ColumnStats> convertToColumnStats(
+                long totalRowCount,
+                Map<String, ColumnStatistics> columnStatisticsMap,
+                RowType logicalType) {
+            Map<String, ColumnStats> columnStatsMap = new HashMap<>();
+            for (String column : logicalType.getFieldNames()) {
+                ColumnStatistics columnStatistics = columnStatisticsMap.get(column);
+                if (columnStatistics == null) {
+                    continue;
+                }
+                ColumnStats columnStats =
+                        convertToColumnStats(
+                                totalRowCount,
+                                logicalType.getTypeAt(logicalType.getFieldIndex(column)),
+                                columnStatistics);
+                columnStatsMap.put(column, columnStats);
+            }
+
+            return columnStatsMap;
+        }
+
+        private ColumnStats convertToColumnStats(
+                long totalRowCount, LogicalType logicalType, ColumnStatistics columnStatistics) {
+            ColumnStats.Builder builder =
+                    new ColumnStats.Builder().setNdv(null).setAvgLen(null).setMaxLen(null);
+            if (!columnStatistics.hasNull()) {
+                builder.setNullCount(0L);
+            } else {
+                builder.setNullCount(totalRowCount - columnStatistics.getNumberOfValues());
+            }
+
+            switch (logicalType.getTypeRoot()) {
+                case BOOLEAN:
+                    builder.setMax(null).setMin(null);
+                    break;
+                case TINYINT:
+                case SMALLINT:
+                case INTEGER:
+                case BIGINT:
+                    if (columnStatistics instanceof IntegerColumnStatistics) {
+                        builder.setMax(((IntegerColumnStatistics) columnStatistics).getMaximum())
+                                .setMin(((IntegerColumnStatistics) columnStatistics).getMinimum());
+                        break;
+                    } else {
+                        return null;
+                    }
+                case FLOAT:
+                case DOUBLE:
+                    if (columnStatistics instanceof DoubleColumnStatistics) {
+                        builder.setMax(((DoubleColumnStatistics) columnStatistics).getMaximum())
+                                .setMin(((DoubleColumnStatistics) columnStatistics).getMinimum());
+                        break;
+                    } else {
+                        return null;
+                    }
+                case CHAR:
+                case VARCHAR:
+                case VARBINARY:
+                    if (columnStatistics instanceof StringColumnStatistics) {
+                        builder.setMax(((StringColumnStatistics) columnStatistics).getMaximum())
+                                .setMin(((StringColumnStatistics) columnStatistics).getMinimum());
+                        break;
+                    } else {
+                        return null;
+                    }
+                case DATE:
+                    if (columnStatistics instanceof DateColumnStatistics) {
+                        builder.setMax(((DateColumnStatistics) columnStatistics).getMaximum())

Review Comment:
   > the `Date` type of `((DateColumnStatistics) columnStatistics).getMaximum()` is not `java.sql.Date`
   
   done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org