You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by yu...@apache.org on 2023/07/18 01:15:28 UTC

[flink] branch master updated: [FLINK-32365][hive] Allow Hive source to scan table to get statistics in parallel (#22805)

This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f2ee60c277 [FLINK-32365][hive] Allow Hive source to scan table to get statistics in parallel (#22805)
0f2ee60c277 is described below

commit 0f2ee60c2774c7b03c20ae0d495c51c35df48789
Author: baiwuchang <41...@users.noreply.github.com>
AuthorDate: Tue Jul 18 09:15:19 2023 +0800

    [FLINK-32365][hive] Allow Hive source to scan table to get statistics in parallel (#22805)
---
 .../docs/connectors/table/hive/hive_read_write.md  |   4 +
 .../docs/connectors/table/hive/hive_read_write.md  |   5 +
 .../apache/flink/connectors/hive/HiveOptions.java  |   7 +
 .../flink/connectors/hive/HiveTableSource.java     |  13 +-
 .../orc/util/OrcFormatStatisticsReportUtil.java    | 133 ++++++++++++++-----
 .../utils/ParquetFormatStatisticsReportUtil.java   | 144 +++++++++++++++------
 6 files changed, 236 insertions(+), 70 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/hive/hive_read_write.md b/docs/content.zh/docs/connectors/table/hive/hive_read_write.md
index c54efe4c37f..bdd3ac4f06d 100644
--- a/docs/content.zh/docs/connectors/table/hive/hive_read_write.md
+++ b/docs/content.zh/docs/connectors/table/hive/hive_read_write.md
@@ -190,6 +190,10 @@ Flink 允许你灵活的配置并发推断策略。你可以在 `TableConfig` 
 - 目前上述参数仅适用于 ORC 格式的 Hive 表。
 {{< /hint >}}
 
+### 读取表统计信息
+
+当hive metastore 中没有表的统计信息时,Flink 会尝试扫描表来获取统计信息从而生成合适的执行计划。此过程可以会比较耗时,你可以使用`table.exec.hive.read-statistics.thread-num`去配置使用多少个线程去扫描表,默认值是当前系统可用处理器数,配置的值应该大于0。
+
 ### 加载分区切片
 
 Flink 使用多个线程并发将 Hive 分区切分成多个 split 进行读取。你可以使用 `table.exec.hive.load-partition-splits.thread-num` 去配置线程数。默认值是3,你配置的值应该大于0。
diff --git a/docs/content/docs/connectors/table/hive/hive_read_write.md b/docs/content/docs/connectors/table/hive/hive_read_write.md
index c5bc91edb11..98742a92be7 100644
--- a/docs/content/docs/connectors/table/hive/hive_read_write.md
+++ b/docs/content/docs/connectors/table/hive/hive_read_write.md
@@ -206,6 +206,11 @@ Users can do some performance tuning by tuning the split's size with the follow
 - Currently, these configurations for tuning split size only works for the Hive table stored as ORC format.
 {{< /hint >}}
 
+### Read Table Statistics
+
+When the table statistic is not available from the Hive metastore, Flink will try to scan the table to get the statistic to generate a better execution plan. It may cost some time to get the statistic. To get it faster, you can use `table.exec.hive.read-statistics.thread-num` to configure how many threads to use to scan the table.
+The default value is the number of available processors in the current system and the configured value should be bigger than 0.
+
 ### Load Partition Splits
 
 Multi-thread is used to split hive's partitions. You can use `table.exec.hive.load-partition-splits.thread-num` to configure the thread number. The default value is 3 and the configured value should be bigger than 0.
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java
index 2cba3dae4fc..01e1493681d 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java
@@ -134,6 +134,13 @@ public class HiveOptions {
                                     + " custom: use policy class to create a commit policy."
                                     + " Support to configure multiple policies: 'metastore,success-file'.");
 
+    public static final ConfigOption<Integer> TABLE_EXEC_HIVE_READ_STATISTICS_THREAD_NUM =
+            key("table.exec.hive.read-statistics.thread-num")
+                    .intType()
+                    .defaultValue(Runtime.getRuntime().availableProcessors())
+                    .withDescription(
+                            "The thread number to read input format statistics. It should be bigger than 0.");
+
     public static final ConfigOption<MemorySize> COMPACT_SMALL_FILES_AVG_SIZE =
             key("compaction.small-files.avg-size")
                     .memoryType()
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
index e5acc1d8ca5..a1dac332c2d 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
@@ -85,6 +85,7 @@ import java.util.stream.Collectors;
 import static org.apache.flink.connector.file.table.FileSystemConnectorOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_FORMATTER;
 import static org.apache.flink.connectors.hive.HiveOptions.STREAMING_SOURCE_CONSUME_START_OFFSET;
 import static org.apache.flink.connectors.hive.HiveOptions.STREAMING_SOURCE_ENABLE;
+import static org.apache.flink.connectors.hive.HiveOptions.TABLE_EXEC_HIVE_READ_STATISTICS_THREAD_NUM;
 import static org.apache.flink.connectors.hive.util.HivePartitionUtils.getAllPartitions;
 
 /** A TableSource implementation to read data from Hive tables. */
@@ -373,13 +374,21 @@ public class HiveTableSource
                         .toLowerCase();
         List<Path> files =
                 inputSplits.stream().map(FileSourceSplit::path).collect(Collectors.toList());
+        int statisticsThreadNum = flinkConf.get(TABLE_EXEC_HIVE_READ_STATISTICS_THREAD_NUM);
+        Preconditions.checkArgument(
+                statisticsThreadNum >= 1,
+                TABLE_EXEC_HIVE_READ_STATISTICS_THREAD_NUM.key() + " cannot be less than 1");
         // Now we only support Parquet, Orc formats.
         if (serializationLib.contains("parquet")) {
             return ParquetFormatStatisticsReportUtil.getTableStatistics(
-                    files, producedDataType, jobConf, hiveVersion.startsWith("3"));
+                    files,
+                    producedDataType,
+                    jobConf,
+                    hiveVersion.startsWith("3"),
+                    statisticsThreadNum);
         } else if (serializationLib.contains("orc")) {
             return OrcFormatStatisticsReportUtil.getTableStatistics(
-                    files, producedDataType, jobConf);
+                    files, producedDataType, jobConf, statisticsThreadNum);
         } else {
             // Now, only support Orc and Parquet Formats.
             LOG.info(
diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/util/OrcFormatStatisticsReportUtil.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/util/OrcFormatStatisticsReportUtil.java
index a4abfd63ad2..e183f9bf394 100644
--- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/util/OrcFormatStatisticsReportUtil.java
+++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/util/OrcFormatStatisticsReportUtil.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.plan.stats.TableStats;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.orc.ColumnStatistics;
@@ -43,9 +44,14 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.sql.Date;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 /** Utils for Orc format statistics report. */
 public class OrcFormatStatisticsReportUtil {
@@ -54,15 +60,41 @@ public class OrcFormatStatisticsReportUtil {
 
     public static TableStats getTableStatistics(
             List<Path> files, DataType producedDataType, Configuration hadoopConfig) {
+        return getTableStatistics(
+                files, producedDataType, hadoopConfig, Runtime.getRuntime().availableProcessors());
+    }
+
+    public static TableStats getTableStatistics(
+            List<Path> files,
+            DataType producedDataType,
+            Configuration hadoopConfig,
+            int statisticsThreadNum) {
+        ExecutorService executorService = null;
         try {
             long rowCount = 0;
             Map<String, ColumnStatistics> columnStatisticsMap = new HashMap<>();
             RowType producedRowType = (RowType) producedDataType.getLogicalType();
+            executorService =
+                    Executors.newFixedThreadPool(
+                            statisticsThreadNum,
+                            new ExecutorThreadFactory("orc-get-table-statistic-worker"));
+            List<Future<FileOrcStatistics>> fileRowCountFutures = new ArrayList<>();
             for (Path file : files) {
-                rowCount +=
-                        updateStatistics(hadoopConfig, file, columnStatisticsMap, producedRowType);
+                fileRowCountFutures.add(
+                        executorService.submit(new OrcFileRowCountCalculator(hadoopConfig, file)));
+            }
+            for (Future<FileOrcStatistics> fileCountFuture : fileRowCountFutures) {
+                FileOrcStatistics fileOrcStatistics = fileCountFuture.get();
+                rowCount += fileOrcStatistics.getRowCount();
+                for (String column : producedRowType.getFieldNames()) {
+                    int fieldIdx = fileOrcStatistics.getFieldNames().indexOf(column);
+                    if (fieldIdx >= 0) {
+                        int colId = fileOrcStatistics.getColumnTypes().get(fieldIdx).getId();
+                        ColumnStatistics statistic = fileOrcStatistics.getStatistics()[colId];
+                        updateStatistics(statistic, column, columnStatisticsMap);
+                    }
+                }
             }
-
             Map<String, ColumnStats> columnStatsMap =
                     convertToColumnStats(rowCount, columnStatisticsMap, producedRowType);
 
@@ -70,35 +102,11 @@ public class OrcFormatStatisticsReportUtil {
         } catch (Exception e) {
             LOG.warn("Reporting statistics failed for Orc format: {}", e.getMessage());
             return TableStats.UNKNOWN;
-        }
-    }
-
-    private static long updateStatistics(
-            Configuration hadoopConf,
-            Path file,
-            Map<String, ColumnStatistics> columnStatisticsMap,
-            RowType producedRowType)
-            throws IOException {
-        org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(file.toUri());
-        Reader reader =
-                OrcFile.createReader(
-                        path,
-                        OrcFile.readerOptions(hadoopConf)
-                                .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(hadoopConf)));
-        ColumnStatistics[] statistics = reader.getStatistics();
-        TypeDescription schema = reader.getSchema();
-        List<String> fieldNames = schema.getFieldNames();
-        List<TypeDescription> columnTypes = schema.getChildren();
-        for (String column : producedRowType.getFieldNames()) {
-            int fieldIdx = fieldNames.indexOf(column);
-            if (fieldIdx >= 0) {
-                int colId = columnTypes.get(fieldIdx).getId();
-                ColumnStatistics statistic = statistics[colId];
-                updateStatistics(statistic, column, columnStatisticsMap);
+        } finally {
+            if (executorService != null) {
+                executorService.shutdownNow();
             }
         }
-
-        return reader.getNumberOfRows();
     }
 
     private static void updateStatistics(
@@ -217,4 +225,69 @@ public class OrcFormatStatisticsReportUtil {
         }
         return builder.build();
     }
+
+    private static class FileOrcStatistics {
+        private final Long rowCount;
+
+        private final List<String> fieldNames;
+
+        private final ColumnStatistics[] statistics;
+
+        private final List<TypeDescription> columnTypes;
+
+        public FileOrcStatistics(
+                Long rowCount,
+                List<String> fieldNames,
+                ColumnStatistics[] statistics,
+                List<TypeDescription> columnTypes) {
+            this.rowCount = rowCount;
+            this.fieldNames = fieldNames;
+            this.statistics = statistics;
+            this.columnTypes = columnTypes;
+        }
+
+        public Long getRowCount() {
+            return rowCount;
+        }
+
+        public List<String> getFieldNames() {
+            return fieldNames;
+        }
+
+        public ColumnStatistics[] getStatistics() {
+            return statistics;
+        }
+
+        public List<TypeDescription> getColumnTypes() {
+            return columnTypes;
+        }
+    }
+
+    private static class OrcFileRowCountCalculator implements Callable<FileOrcStatistics> {
+
+        private final Configuration hadoopConf;
+        private final Path file;
+
+        public OrcFileRowCountCalculator(Configuration hadoopConf, Path file) {
+            this.hadoopConf = hadoopConf;
+            this.file = file;
+        }
+
+        @Override
+        public FileOrcStatistics call() throws IOException {
+            org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(file.toUri());
+            Reader reader =
+                    OrcFile.createReader(
+                            path,
+                            OrcFile.readerOptions(hadoopConf)
+                                    .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(hadoopConf)));
+            ColumnStatistics[] statistics = reader.getStatistics();
+            TypeDescription schema = reader.getSchema();
+            List<String> fieldNames = schema.getFieldNames();
+            List<TypeDescription> columnTypes = schema.getChildren();
+
+            return new FileOrcStatistics(
+                    reader.getNumberOfRows(), fieldNames, statistics, columnTypes);
+        }
+    }
 }
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetFormatStatisticsReportUtil.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetFormatStatisticsReportUtil.java
index 7e0aad6ce28..a40784db312 100644
--- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetFormatStatisticsReportUtil.java
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetFormatStatisticsReportUtil.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.utils.DateTimeUtils;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.Preconditions;
@@ -46,7 +47,6 @@ import org.apache.parquet.schema.Type;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
@@ -54,9 +54,14 @@ import java.nio.ByteOrder;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
 /** Utils for Parquet format statistics report. */
@@ -70,12 +75,49 @@ public class ParquetFormatStatisticsReportUtil {
             DataType producedDataType,
             Configuration hadoopConfig,
             boolean isUtcTimestamp) {
+        return getTableStatistics(
+                files,
+                producedDataType,
+                hadoopConfig,
+                isUtcTimestamp,
+                Runtime.getRuntime().availableProcessors());
+    }
+
+    public static TableStats getTableStatistics(
+            List<Path> files,
+            DataType producedDataType,
+            Configuration hadoopConfig,
+            boolean isUtcTimestamp,
+            int statisticsThreadNum) {
+        ExecutorService executorService = null;
         try {
             Map<String, Statistics<?>> columnStatisticsMap = new HashMap<>();
             RowType producedRowType = (RowType) producedDataType.getLogicalType();
+            executorService =
+                    Executors.newFixedThreadPool(
+                            statisticsThreadNum,
+                            new ExecutorThreadFactory("parquet-get-table-statistic-worker"));
             long rowCount = 0;
+            List<Future<FileParquetStatistics>> fileRowCountFutures = new ArrayList<>();
             for (Path file : files) {
-                rowCount += updateStatistics(hadoopConfig, file, columnStatisticsMap);
+                fileRowCountFutures.add(
+                        executorService.submit(
+                                new ParquetFileRowCountCalculator(
+                                        hadoopConfig, file, columnStatisticsMap)));
+            }
+            for (Future<FileParquetStatistics> fileCountFuture : fileRowCountFutures) {
+                FileParquetStatistics fileStatistics = fileCountFuture.get();
+                List<String> columns = fileStatistics.getColumns();
+                List<BlockMetaData> blocks = fileStatistics.blocks;
+                for (BlockMetaData block : blocks) {
+                    rowCount += block.getRowCount();
+                    for (int i = 0; i < columns.size(); ++i) {
+                        updateStatistics(
+                                block.getColumns().get(i).getStatistics(),
+                                columns.get(i),
+                                columnStatisticsMap);
+                    }
+                }
             }
             Map<String, ColumnStats> columnStatsMap =
                     convertToColumnStats(columnStatisticsMap, producedRowType, isUtcTimestamp);
@@ -83,6 +125,22 @@ public class ParquetFormatStatisticsReportUtil {
         } catch (Exception e) {
             LOG.warn("Reporting statistics failed for Parquet format", e);
             return TableStats.UNKNOWN;
+        } finally {
+            if (executorService != null) {
+                executorService.shutdownNow();
+            }
+        }
+    }
+
+    private static void updateStatistics(
+            Statistics<?> statistics,
+            String column,
+            Map<String, Statistics<?>> columnStatisticsMap) {
+        Statistics<?> previousStatistics = columnStatisticsMap.get(column);
+        if (previousStatistics == null) {
+            columnStatisticsMap.put(column, statistics);
+        } else {
+            previousStatistics.mergeStatistics(statistics);
         }
     }
 
@@ -252,42 +310,6 @@ public class ParquetFormatStatisticsReportUtil {
         return builder.build();
     }
 
-    private static long updateStatistics(
-            Configuration hadoopConfig, Path file, Map<String, Statistics<?>> columnStatisticsMap)
-            throws IOException {
-        org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(file.toUri());
-        ParquetMetadata metadata = ParquetFileReader.readFooter(hadoopConfig, hadoopPath);
-        MessageType schema = metadata.getFileMetaData().getSchema();
-        List<String> columns =
-                schema.asGroupType().getFields().stream()
-                        .map(Type::getName)
-                        .collect(Collectors.toList());
-        List<BlockMetaData> blocks = metadata.getBlocks();
-        long rowCount = 0;
-        for (BlockMetaData block : blocks) {
-            rowCount += block.getRowCount();
-            for (int i = 0; i < columns.size(); ++i) {
-                updateStatistics(
-                        block.getColumns().get(i).getStatistics(),
-                        columns.get(i),
-                        columnStatisticsMap);
-            }
-        }
-        return rowCount;
-    }
-
-    private static void updateStatistics(
-            Statistics<?> statistics,
-            String column,
-            Map<String, Statistics<?>> columnStatisticsMap) {
-        Statistics<?> previousStatistics = columnStatisticsMap.get(column);
-        if (previousStatistics == null) {
-            columnStatisticsMap.put(column, statistics);
-        } else {
-            previousStatistics.mergeStatistics(statistics);
-        }
-    }
-
     private static BigDecimal binaryToDecimal(Binary decimal, int scale) {
         BigInteger bigInteger = new BigInteger(decimal.getBytesUnsafe());
         return new BigDecimal(bigInteger, scale);
@@ -304,4 +326,50 @@ public class ParquetFormatStatisticsReportUtil {
                 TimestampColumnReader.int96ToTimestamp(utcTimestamp, timeOfDayNanos, julianDay);
         return timestampData.toTimestamp();
     }
+
+    private static class FileParquetStatistics {
+
+        private final List<String> columns;
+
+        private final List<BlockMetaData> blocks;
+
+        public FileParquetStatistics(List<String> columns, List<BlockMetaData> blocks) {
+            this.columns = columns;
+            this.blocks = blocks;
+        }
+
+        public List<String> getColumns() {
+            return columns;
+        }
+
+        public List<BlockMetaData> getBlocks() {
+            return blocks;
+        }
+    }
+
+    private static class ParquetFileRowCountCalculator implements Callable<FileParquetStatistics> {
+        private final Configuration hadoopConfig;
+        private final Path file;
+
+        public ParquetFileRowCountCalculator(
+                Configuration hadoopConfig,
+                Path file,
+                Map<String, Statistics<?>> columnStatisticsMap) {
+            this.hadoopConfig = hadoopConfig;
+            this.file = file;
+        }
+
+        @Override
+        public FileParquetStatistics call() throws Exception {
+            org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(file.toUri());
+            ParquetMetadata metadata = ParquetFileReader.readFooter(hadoopConfig, hadoopPath);
+            MessageType schema = metadata.getFileMetaData().getSchema();
+            List<String> columns =
+                    schema.asGroupType().getFields().stream()
+                            .map(Type::getName)
+                            .collect(Collectors.toList());
+            List<BlockMetaData> blocks = metadata.getBlocks();
+            return new FileParquetStatistics(columns, blocks);
+        }
+    }
 }