You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by yu...@apache.org on 2023/07/18 01:15:28 UTC
[flink] branch master updated: [FLINK-32365][hive] Allow Hive source to scan table to get statistics in parallel (#22805)
This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0f2ee60c277 [FLINK-32365][hive] Allow Hive source to scan table to get statistics in parallel (#22805)
0f2ee60c277 is described below
commit 0f2ee60c2774c7b03c20ae0d495c51c35df48789
Author: baiwuchang <41...@users.noreply.github.com>
AuthorDate: Tue Jul 18 09:15:19 2023 +0800
[FLINK-32365][hive] Allow Hive source to scan table to get statistics in parallel (#22805)
---
.../docs/connectors/table/hive/hive_read_write.md | 4 +
.../docs/connectors/table/hive/hive_read_write.md | 5 +
.../apache/flink/connectors/hive/HiveOptions.java | 7 +
.../flink/connectors/hive/HiveTableSource.java | 13 +-
.../orc/util/OrcFormatStatisticsReportUtil.java | 133 ++++++++++++++-----
.../utils/ParquetFormatStatisticsReportUtil.java | 144 +++++++++++++++------
6 files changed, 236 insertions(+), 70 deletions(-)
diff --git a/docs/content.zh/docs/connectors/table/hive/hive_read_write.md b/docs/content.zh/docs/connectors/table/hive/hive_read_write.md
index c54efe4c37f..bdd3ac4f06d 100644
--- a/docs/content.zh/docs/connectors/table/hive/hive_read_write.md
+++ b/docs/content.zh/docs/connectors/table/hive/hive_read_write.md
@@ -190,6 +190,10 @@ Flink 允许你灵活的配置并发推断策略。你可以在 `TableConfig`
- 目前上述参数仅适用于 ORC 格式的 Hive 表。
{{< /hint >}}
+### 读取表统计信息
+
+当hive metastore 中没有表的统计信息时,Flink 会尝试扫描表来获取统计信息从而生成合适的执行计划。此过程可以会比较耗时,你可以使用`table.exec.hive.read-statistics.thread-num`去配置使用多少个线程去扫描表,默认值是当前系统可用处理器数,配置的值应该大于0。
+
### 加载分区切片
Flink 使用多个线程并发将 Hive 分区切分成多个 split 进行读取。你可以使用 `table.exec.hive.load-partition-splits.thread-num` 去配置线程数。默认值是3,你配置的值应该大于0。
diff --git a/docs/content/docs/connectors/table/hive/hive_read_write.md b/docs/content/docs/connectors/table/hive/hive_read_write.md
index c5bc91edb11..98742a92be7 100644
--- a/docs/content/docs/connectors/table/hive/hive_read_write.md
+++ b/docs/content/docs/connectors/table/hive/hive_read_write.md
@@ -206,6 +206,11 @@ Users can do some performance tuning by tuning the split's size with the follow
- Currently, these configurations for tuning split size only works for the Hive table stored as ORC format.
{{< /hint >}}
+### Read Table Statistics
+
+When the table statistic is not available from the Hive metastore, Flink will try to scan the table to get the statistic to generate a better execution plan. It may cost some time to get the statistic. To get it faster, you can use `table.exec.hive.read-statistics.thread-num` to configure how many threads to use to scan the table.
+The default value is the number of available processors in the current system and the configured value should be bigger than 0.
+
### Load Partition Splits
Multi-thread is used to split hive's partitions. You can use `table.exec.hive.load-partition-splits.thread-num` to configure the thread number. The default value is 3 and the configured value should be bigger than 0.
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java
index 2cba3dae4fc..01e1493681d 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java
@@ -134,6 +134,13 @@ public class HiveOptions {
+ " custom: use policy class to create a commit policy."
+ " Support to configure multiple policies: 'metastore,success-file'.");
+ public static final ConfigOption<Integer> TABLE_EXEC_HIVE_READ_STATISTICS_THREAD_NUM =
+ key("table.exec.hive.read-statistics.thread-num")
+ .intType()
+ .defaultValue(Runtime.getRuntime().availableProcessors())
+ .withDescription(
+ "The thread number to read input format statistics. It should be bigger than 0.");
+
public static final ConfigOption<MemorySize> COMPACT_SMALL_FILES_AVG_SIZE =
key("compaction.small-files.avg-size")
.memoryType()
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
index e5acc1d8ca5..a1dac332c2d 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
@@ -85,6 +85,7 @@ import java.util.stream.Collectors;
import static org.apache.flink.connector.file.table.FileSystemConnectorOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_FORMATTER;
import static org.apache.flink.connectors.hive.HiveOptions.STREAMING_SOURCE_CONSUME_START_OFFSET;
import static org.apache.flink.connectors.hive.HiveOptions.STREAMING_SOURCE_ENABLE;
+import static org.apache.flink.connectors.hive.HiveOptions.TABLE_EXEC_HIVE_READ_STATISTICS_THREAD_NUM;
import static org.apache.flink.connectors.hive.util.HivePartitionUtils.getAllPartitions;
/** A TableSource implementation to read data from Hive tables. */
@@ -373,13 +374,21 @@ public class HiveTableSource
.toLowerCase();
List<Path> files =
inputSplits.stream().map(FileSourceSplit::path).collect(Collectors.toList());
+ int statisticsThreadNum = flinkConf.get(TABLE_EXEC_HIVE_READ_STATISTICS_THREAD_NUM);
+ Preconditions.checkArgument(
+ statisticsThreadNum >= 1,
+ TABLE_EXEC_HIVE_READ_STATISTICS_THREAD_NUM.key() + " cannot be less than 1");
// Now we only support Parquet, Orc formats.
if (serializationLib.contains("parquet")) {
return ParquetFormatStatisticsReportUtil.getTableStatistics(
- files, producedDataType, jobConf, hiveVersion.startsWith("3"));
+ files,
+ producedDataType,
+ jobConf,
+ hiveVersion.startsWith("3"),
+ statisticsThreadNum);
} else if (serializationLib.contains("orc")) {
return OrcFormatStatisticsReportUtil.getTableStatistics(
- files, producedDataType, jobConf);
+ files, producedDataType, jobConf, statisticsThreadNum);
} else {
// Now, only support Orc and Parquet Formats.
LOG.info(
diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/util/OrcFormatStatisticsReportUtil.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/util/OrcFormatStatisticsReportUtil.java
index a4abfd63ad2..e183f9bf394 100644
--- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/util/OrcFormatStatisticsReportUtil.java
+++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/util/OrcFormatStatisticsReportUtil.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.plan.stats.TableStats;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.orc.ColumnStatistics;
@@ -43,9 +44,14 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Date;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
/** Utils for Orc format statistics report. */
public class OrcFormatStatisticsReportUtil {
@@ -54,15 +60,41 @@ public class OrcFormatStatisticsReportUtil {
public static TableStats getTableStatistics(
List<Path> files, DataType producedDataType, Configuration hadoopConfig) {
+ return getTableStatistics(
+ files, producedDataType, hadoopConfig, Runtime.getRuntime().availableProcessors());
+ }
+
+ public static TableStats getTableStatistics(
+ List<Path> files,
+ DataType producedDataType,
+ Configuration hadoopConfig,
+ int statisticsThreadNum) {
+ ExecutorService executorService = null;
try {
long rowCount = 0;
Map<String, ColumnStatistics> columnStatisticsMap = new HashMap<>();
RowType producedRowType = (RowType) producedDataType.getLogicalType();
+ executorService =
+ Executors.newFixedThreadPool(
+ statisticsThreadNum,
+ new ExecutorThreadFactory("orc-get-table-statistic-worker"));
+ List<Future<FileOrcStatistics>> fileRowCountFutures = new ArrayList<>();
for (Path file : files) {
- rowCount +=
- updateStatistics(hadoopConfig, file, columnStatisticsMap, producedRowType);
+ fileRowCountFutures.add(
+ executorService.submit(new OrcFileRowCountCalculator(hadoopConfig, file)));
+ }
+ for (Future<FileOrcStatistics> fileCountFuture : fileRowCountFutures) {
+ FileOrcStatistics fileOrcStatistics = fileCountFuture.get();
+ rowCount += fileOrcStatistics.getRowCount();
+ for (String column : producedRowType.getFieldNames()) {
+ int fieldIdx = fileOrcStatistics.getFieldNames().indexOf(column);
+ if (fieldIdx >= 0) {
+ int colId = fileOrcStatistics.getColumnTypes().get(fieldIdx).getId();
+ ColumnStatistics statistic = fileOrcStatistics.getStatistics()[colId];
+ updateStatistics(statistic, column, columnStatisticsMap);
+ }
+ }
}
-
Map<String, ColumnStats> columnStatsMap =
convertToColumnStats(rowCount, columnStatisticsMap, producedRowType);
@@ -70,35 +102,11 @@ public class OrcFormatStatisticsReportUtil {
} catch (Exception e) {
LOG.warn("Reporting statistics failed for Orc format: {}", e.getMessage());
return TableStats.UNKNOWN;
- }
- }
-
- private static long updateStatistics(
- Configuration hadoopConf,
- Path file,
- Map<String, ColumnStatistics> columnStatisticsMap,
- RowType producedRowType)
- throws IOException {
- org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(file.toUri());
- Reader reader =
- OrcFile.createReader(
- path,
- OrcFile.readerOptions(hadoopConf)
- .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(hadoopConf)));
- ColumnStatistics[] statistics = reader.getStatistics();
- TypeDescription schema = reader.getSchema();
- List<String> fieldNames = schema.getFieldNames();
- List<TypeDescription> columnTypes = schema.getChildren();
- for (String column : producedRowType.getFieldNames()) {
- int fieldIdx = fieldNames.indexOf(column);
- if (fieldIdx >= 0) {
- int colId = columnTypes.get(fieldIdx).getId();
- ColumnStatistics statistic = statistics[colId];
- updateStatistics(statistic, column, columnStatisticsMap);
+ } finally {
+ if (executorService != null) {
+ executorService.shutdownNow();
}
}
-
- return reader.getNumberOfRows();
}
private static void updateStatistics(
@@ -217,4 +225,69 @@ public class OrcFormatStatisticsReportUtil {
}
return builder.build();
}
+
+ private static class FileOrcStatistics {
+ private final Long rowCount;
+
+ private final List<String> fieldNames;
+
+ private final ColumnStatistics[] statistics;
+
+ private final List<TypeDescription> columnTypes;
+
+ public FileOrcStatistics(
+ Long rowCount,
+ List<String> fieldNames,
+ ColumnStatistics[] statistics,
+ List<TypeDescription> columnTypes) {
+ this.rowCount = rowCount;
+ this.fieldNames = fieldNames;
+ this.statistics = statistics;
+ this.columnTypes = columnTypes;
+ }
+
+ public Long getRowCount() {
+ return rowCount;
+ }
+
+ public List<String> getFieldNames() {
+ return fieldNames;
+ }
+
+ public ColumnStatistics[] getStatistics() {
+ return statistics;
+ }
+
+ public List<TypeDescription> getColumnTypes() {
+ return columnTypes;
+ }
+ }
+
+ private static class OrcFileRowCountCalculator implements Callable<FileOrcStatistics> {
+
+ private final Configuration hadoopConf;
+ private final Path file;
+
+ public OrcFileRowCountCalculator(Configuration hadoopConf, Path file) {
+ this.hadoopConf = hadoopConf;
+ this.file = file;
+ }
+
+ @Override
+ public FileOrcStatistics call() throws IOException {
+ org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(file.toUri());
+ Reader reader =
+ OrcFile.createReader(
+ path,
+ OrcFile.readerOptions(hadoopConf)
+ .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(hadoopConf)));
+ ColumnStatistics[] statistics = reader.getStatistics();
+ TypeDescription schema = reader.getSchema();
+ List<String> fieldNames = schema.getFieldNames();
+ List<TypeDescription> columnTypes = schema.getChildren();
+
+ return new FileOrcStatistics(
+ reader.getNumberOfRows(), fieldNames, statistics, columnTypes);
+ }
+ }
}
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetFormatStatisticsReportUtil.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetFormatStatisticsReportUtil.java
index 7e0aad6ce28..a40784db312 100644
--- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetFormatStatisticsReportUtil.java
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetFormatStatisticsReportUtil.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.utils.DateTimeUtils;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.Preconditions;
@@ -46,7 +47,6 @@ import org.apache.parquet.schema.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
@@ -54,9 +54,14 @@ import java.nio.ByteOrder;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.stream.Collectors;
/** Utils for Parquet format statistics report. */
@@ -70,12 +75,49 @@ public class ParquetFormatStatisticsReportUtil {
DataType producedDataType,
Configuration hadoopConfig,
boolean isUtcTimestamp) {
+ return getTableStatistics(
+ files,
+ producedDataType,
+ hadoopConfig,
+ isUtcTimestamp,
+ Runtime.getRuntime().availableProcessors());
+ }
+
+ public static TableStats getTableStatistics(
+ List<Path> files,
+ DataType producedDataType,
+ Configuration hadoopConfig,
+ boolean isUtcTimestamp,
+ int statisticsThreadNum) {
+ ExecutorService executorService = null;
try {
Map<String, Statistics<?>> columnStatisticsMap = new HashMap<>();
RowType producedRowType = (RowType) producedDataType.getLogicalType();
+ executorService =
+ Executors.newFixedThreadPool(
+ statisticsThreadNum,
+ new ExecutorThreadFactory("parquet-get-table-statistic-worker"));
long rowCount = 0;
+ List<Future<FileParquetStatistics>> fileRowCountFutures = new ArrayList<>();
for (Path file : files) {
- rowCount += updateStatistics(hadoopConfig, file, columnStatisticsMap);
+ fileRowCountFutures.add(
+ executorService.submit(
+ new ParquetFileRowCountCalculator(
+ hadoopConfig, file, columnStatisticsMap)));
+ }
+ for (Future<FileParquetStatistics> fileCountFuture : fileRowCountFutures) {
+ FileParquetStatistics fileStatistics = fileCountFuture.get();
+ List<String> columns = fileStatistics.getColumns();
+ List<BlockMetaData> blocks = fileStatistics.blocks;
+ for (BlockMetaData block : blocks) {
+ rowCount += block.getRowCount();
+ for (int i = 0; i < columns.size(); ++i) {
+ updateStatistics(
+ block.getColumns().get(i).getStatistics(),
+ columns.get(i),
+ columnStatisticsMap);
+ }
+ }
}
Map<String, ColumnStats> columnStatsMap =
convertToColumnStats(columnStatisticsMap, producedRowType, isUtcTimestamp);
@@ -83,6 +125,22 @@ public class ParquetFormatStatisticsReportUtil {
} catch (Exception e) {
LOG.warn("Reporting statistics failed for Parquet format", e);
return TableStats.UNKNOWN;
+ } finally {
+ if (executorService != null) {
+ executorService.shutdownNow();
+ }
+ }
+ }
+
+ private static void updateStatistics(
+ Statistics<?> statistics,
+ String column,
+ Map<String, Statistics<?>> columnStatisticsMap) {
+ Statistics<?> previousStatistics = columnStatisticsMap.get(column);
+ if (previousStatistics == null) {
+ columnStatisticsMap.put(column, statistics);
+ } else {
+ previousStatistics.mergeStatistics(statistics);
}
}
@@ -252,42 +310,6 @@ public class ParquetFormatStatisticsReportUtil {
return builder.build();
}
- private static long updateStatistics(
- Configuration hadoopConfig, Path file, Map<String, Statistics<?>> columnStatisticsMap)
- throws IOException {
- org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(file.toUri());
- ParquetMetadata metadata = ParquetFileReader.readFooter(hadoopConfig, hadoopPath);
- MessageType schema = metadata.getFileMetaData().getSchema();
- List<String> columns =
- schema.asGroupType().getFields().stream()
- .map(Type::getName)
- .collect(Collectors.toList());
- List<BlockMetaData> blocks = metadata.getBlocks();
- long rowCount = 0;
- for (BlockMetaData block : blocks) {
- rowCount += block.getRowCount();
- for (int i = 0; i < columns.size(); ++i) {
- updateStatistics(
- block.getColumns().get(i).getStatistics(),
- columns.get(i),
- columnStatisticsMap);
- }
- }
- return rowCount;
- }
-
- private static void updateStatistics(
- Statistics<?> statistics,
- String column,
- Map<String, Statistics<?>> columnStatisticsMap) {
- Statistics<?> previousStatistics = columnStatisticsMap.get(column);
- if (previousStatistics == null) {
- columnStatisticsMap.put(column, statistics);
- } else {
- previousStatistics.mergeStatistics(statistics);
- }
- }
-
private static BigDecimal binaryToDecimal(Binary decimal, int scale) {
BigInteger bigInteger = new BigInteger(decimal.getBytesUnsafe());
return new BigDecimal(bigInteger, scale);
@@ -304,4 +326,50 @@ public class ParquetFormatStatisticsReportUtil {
TimestampColumnReader.int96ToTimestamp(utcTimestamp, timeOfDayNanos, julianDay);
return timestampData.toTimestamp();
}
+
+ private static class FileParquetStatistics {
+
+ private final List<String> columns;
+
+ private final List<BlockMetaData> blocks;
+
+ public FileParquetStatistics(List<String> columns, List<BlockMetaData> blocks) {
+ this.columns = columns;
+ this.blocks = blocks;
+ }
+
+ public List<String> getColumns() {
+ return columns;
+ }
+
+ public List<BlockMetaData> getBlocks() {
+ return blocks;
+ }
+ }
+
+ private static class ParquetFileRowCountCalculator implements Callable<FileParquetStatistics> {
+ private final Configuration hadoopConfig;
+ private final Path file;
+
+ public ParquetFileRowCountCalculator(
+ Configuration hadoopConfig,
+ Path file,
+ Map<String, Statistics<?>> columnStatisticsMap) {
+ this.hadoopConfig = hadoopConfig;
+ this.file = file;
+ }
+
+ @Override
+ public FileParquetStatistics call() throws Exception {
+ org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(file.toUri());
+ ParquetMetadata metadata = ParquetFileReader.readFooter(hadoopConfig, hadoopPath);
+ MessageType schema = metadata.getFileMetaData().getSchema();
+ List<String> columns =
+ schema.asGroupType().getFields().stream()
+ .map(Type::getName)
+ .collect(Collectors.toList());
+ List<BlockMetaData> blocks = metadata.getBlocks();
+ return new FileParquetStatistics(columns, blocks);
+ }
+ }
}