You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/28 04:26:18 UTC

[GitHub] [flink] godfreyhe commented on a diff in pull request #20084: [FLINK-27988][table-planner] Let HiveTableSource extend from SupportStatisticsReport

godfreyhe commented on code in PR #20084:
URL: https://github.com/apache/flink/pull/20084#discussion_r908013925


##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormat.java:
##########
@@ -132,10 +132,10 @@ private RowType tableRowType() {
         return RowType.of(types, fieldNames);
     }
 
-    private BulkFormat<RowData, ? super HiveSourceSplit> createBulkFormatForSplit(
+    public BulkFormat<RowData, ? super HiveSourceSplit> createBulkFormatForSplit(

Review Comment:
   why the access level should be changed ?



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java:
##########
@@ -315,7 +315,7 @@ private RowType getProducedRowType() {
         return (RowType) producedSchema.toRowDataType().bridgedTo(RowData.class).getLogicalType();
     }
 
-    private BulkFormat<RowData, HiveSourceSplit> createDefaultBulkFormat() {
+    public BulkFormat<RowData, HiveSourceSplit> createDefaultBulkFormat() {

Review Comment:
   ditto



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormat.java:
##########
@@ -302,7 +302,8 @@ public TypeInformation<RowData> getProducedType() {
         }
     }
 
-    private class HiveReader implements BulkFormat.Reader<RowData> {
+    /** HiveReader. */
+    public class HiveReader implements BulkFormat.Reader<RowData> {

Review Comment:
   useless change ?



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java:
##########
@@ -261,6 +279,93 @@ public DynamicTableSource copy() {
         return source;
     }
 
+    @Override
+    public TableStats reportStatistics() {
+        try {
+            // only support BOUNDED source
+            if (isStreamingSource()) {
+                return TableStats.UNKNOWN;
+            }
+            if (flinkConf.get(HiveOptions.SOURCE_REPORT_STATISTICS)
+                    != FileSystemConnectorOptions.FileStatisticsType.ALL) {
+                return TableStats.UNKNOWN;
+            }
+
+            HiveSourceBuilder sourceBuilder =
+                    new HiveSourceBuilder(jobConf, flinkConf, tablePath, hiveVersion, catalogTable)
+                            .setProjectedFields(projectedFields)
+                            .setLimit(limit);
+            int threadNum =
+                    flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM);
+            List<HiveTablePartition> hivePartitionsToRead =
+                    getAllPartitions(
+                            jobConf,
+                            hiveVersion,
+                            tablePath,
+                            catalogTable.getPartitionKeys(),
+                            remainingPartitions);
+            BulkFormat<RowData, HiveSourceSplit> defaultBulkFormat =
+                    sourceBuilder.createDefaultBulkFormat();
+            List<HiveSourceSplit> inputSplits =
+                    HiveSourceFileEnumerator.createInputSplits(
+                            0, hivePartitionsToRead, threadNum, jobConf);
+            if (inputSplits.size() != 0) {
+                if (defaultBulkFormat instanceof FileBasedStatisticsReportableInputFormat) {
+                    return ((FileBasedStatisticsReportableInputFormat) defaultBulkFormat)
+                            .reportStatistics(
+                                    inputSplits.stream()
+                                            .map(FileSourceSplit::path)
+                                            .collect(Collectors.toList()),
+                                    catalogTable.getSchema().toRowDataType());
+                } else {
+                    return getMapRedInputFormatStatistics(
+                            inputSplits, catalogTable.getSchema().toRowDataType());
+                }
+            } else {
+                return TableStats.UNKNOWN;
+            }
+
+        } catch (Exception e) {
+            return TableStats.UNKNOWN;
+        }
+    }
+
+    private TableStats getMapRedInputFormatStatistics(
+            List<HiveSourceSplit> inputSplits, DataType producedDataType) {
+        if (inputSplits.isEmpty()) {

Review Comment:
   the branch is checked before



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java:
##########
@@ -130,6 +131,16 @@ public class HiveOptions {
                                             "This is a synonym for the deprecated 'streaming-source.consume-order' option.")
                                     .build());
 
+    public static final ConfigOption<FileSystemConnectorOptions.FileStatisticsType>

Review Comment:
   use directly the defination in FileSystemConnectorOptions



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java:
##########
@@ -261,6 +279,93 @@ public DynamicTableSource copy() {
         return source;
     }
 
+    @Override
+    public TableStats reportStatistics() {
+        try {
+            // only support BOUNDED source
+            if (isStreamingSource()) {
+                return TableStats.UNKNOWN;
+            }
+            if (flinkConf.get(HiveOptions.SOURCE_REPORT_STATISTICS)
+                    != FileSystemConnectorOptions.FileStatisticsType.ALL) {
+                return TableStats.UNKNOWN;
+            }
+
+            HiveSourceBuilder sourceBuilder =
+                    new HiveSourceBuilder(jobConf, flinkConf, tablePath, hiveVersion, catalogTable)
+                            .setProjectedFields(projectedFields)
+                            .setLimit(limit);
+            int threadNum =
+                    flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM);
+            List<HiveTablePartition> hivePartitionsToRead =
+                    getAllPartitions(
+                            jobConf,
+                            hiveVersion,
+                            tablePath,
+                            catalogTable.getPartitionKeys(),
+                            remainingPartitions);
+            BulkFormat<RowData, HiveSourceSplit> defaultBulkFormat =
+                    sourceBuilder.createDefaultBulkFormat();
+            List<HiveSourceSplit> inputSplits =
+                    HiveSourceFileEnumerator.createInputSplits(
+                            0, hivePartitionsToRead, threadNum, jobConf);
+            if (inputSplits.size() != 0) {
+                if (defaultBulkFormat instanceof FileBasedStatisticsReportableInputFormat) {
+                    return ((FileBasedStatisticsReportableInputFormat) defaultBulkFormat)
+                            .reportStatistics(
+                                    inputSplits.stream()
+                                            .map(FileSourceSplit::path)
+                                            .collect(Collectors.toList()),
+                                    catalogTable.getSchema().toRowDataType());
+                } else {
+                    return getMapRedInputFormatStatistics(
+                            inputSplits, catalogTable.getSchema().toRowDataType());
+                }
+            } else {
+                return TableStats.UNKNOWN;
+            }
+
+        } catch (Exception e) {
+            return TableStats.UNKNOWN;
+        }
+    }
+
+    private TableStats getMapRedInputFormatStatistics(
+            List<HiveSourceSplit> inputSplits, DataType producedDataType) {
+        if (inputSplits.isEmpty()) {
+            return TableStats.UNKNOWN;
+        }
+        // TODO now we assume that one hive external table has only one storage file format
+        HiveTablePartition hiveTablePartition = inputSplits.get(0).getHiveTablePartition();
+        StorageDescriptor sd = hiveTablePartition.getStorageDescriptor();
+        List<Path> files =
+                inputSplits.stream().map(FileSourceSplit::path).collect(Collectors.toList());
+        InputFormat mapredInputFormat;
+        try {
+            mapredInputFormat =
+                    (InputFormat)
+                            Class.forName(
+                                            sd.getInputFormat(),

Review Comment:
   do we have any better solution instead of reflection. cc @luoyuxia could you provide some suggestions ?



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormat.java:
##########
@@ -327,6 +328,10 @@ private HiveReader(HiveSourceSplit split) throws IOException {
             serializer = new RowDataSerializer(producedRowType);
         }
 
+        public HiveMapredSplitReader getHiveMapredSplitReader() {

Review Comment:
   useless change ?



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java:
##########
@@ -261,6 +279,93 @@ public DynamicTableSource copy() {
         return source;
     }
 
+    @Override
+    public TableStats reportStatistics() {
+        try {
+            // only support BOUNDED source
+            if (isStreamingSource()) {
+                return TableStats.UNKNOWN;
+            }
+            if (flinkConf.get(HiveOptions.SOURCE_REPORT_STATISTICS)
+                    != FileSystemConnectorOptions.FileStatisticsType.ALL) {
+                return TableStats.UNKNOWN;
+            }
+
+            HiveSourceBuilder sourceBuilder =
+                    new HiveSourceBuilder(jobConf, flinkConf, tablePath, hiveVersion, catalogTable)
+                            .setProjectedFields(projectedFields)
+                            .setLimit(limit);

Review Comment:
   we should consider how to handle the case after limit push and filter push down



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org