You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/23 04:09:51 UTC

[GitHub] [flink] luoyuxia commented on a diff in pull request #20084: [FLINK-27988][table-planner] Let HiveTableSource extend from SupportStatisticsReport

luoyuxia commented on code in PR #20084:
URL: https://github.com/apache/flink/pull/20084#discussion_r928066455


##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java:
##########
@@ -315,7 +315,7 @@ private RowType getProducedRowType() {
         return (RowType) producedSchema.toRowDataType().bridgedTo(RowData.class).getLogicalType();
     }
 
-    private BulkFormat<RowData, HiveSourceSplit> createDefaultBulkFormat() {
+    public BulkFormat<RowData, HiveSourceSplit> createDefaultBulkFormat() {

Review Comment:
   Can be `protected`.



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java:
##########
@@ -261,6 +273,99 @@ public DynamicTableSource copy() {
         return source;
     }
 
+    @Override
+    public TableStats reportStatistics() {
+        try {
+            // only support BOUNDED source
+            if (isStreamingSource()) {
+                return TableStats.UNKNOWN;
+            }
+            if (flinkConf.get(FileSystemConnectorOptions.SOURCE_REPORT_STATISTICS)
+                    != FileSystemConnectorOptions.FileStatisticsType.ALL) {
+                return TableStats.UNKNOWN;
+            }
+
+            HiveSourceBuilder sourceBuilder =
+                    new HiveSourceBuilder(jobConf, flinkConf, tablePath, hiveVersion, catalogTable)
+                            .setProjectedFields(projectedFields)
+                            .setLimit(limit);
+            int threadNum =
+                    flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM);
+            List<HiveTablePartition> hivePartitionsToRead =
+                    getAllPartitions(
+                            jobConf,
+                            hiveVersion,
+                            tablePath,
+                            catalogTable.getPartitionKeys(),
+                            remainingPartitions);
+            BulkFormat<RowData, HiveSourceSplit> defaultBulkFormat =
+                    sourceBuilder.createDefaultBulkFormat();
+            List<HiveSourceSplit> inputSplits =
+                    HiveSourceFileEnumerator.createInputSplits(
+                            0, hivePartitionsToRead, threadNum, jobConf);
+            if (inputSplits.size() != 0) {
+                TableStats tableStats;
+                if (defaultBulkFormat instanceof FileBasedStatisticsReportableInputFormat) {
+                    // If HiveInputFormat's variable useMapRedReader is false, Hive using Flink's
+                    // InputFormat to read data.
+                    tableStats =
+                            ((FileBasedStatisticsReportableInputFormat) defaultBulkFormat)
+                                    .reportStatistics(
+                                            inputSplits.stream()
+                                                    .map(FileSourceSplit::path)
+                                                    .collect(Collectors.toList()),
+                                            catalogTable.getSchema().toRowDataType());
+                } else {
+                    // If HiveInputFormat's variable useMapRedReader is true, Hive using MapRed

Review Comment:
   ```suggestion
                       // If HiveInputFormat's variable useMapRedReader is true, Flink will use hadoop mapred record reader to read data.
   ```



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java:
##########
@@ -261,6 +273,99 @@ public DynamicTableSource copy() {
         return source;
     }
 
+    @Override
+    public TableStats reportStatistics() {
+        try {
+            // only support BOUNDED source
+            if (isStreamingSource()) {
+                return TableStats.UNKNOWN;
+            }
+            if (flinkConf.get(FileSystemConnectorOptions.SOURCE_REPORT_STATISTICS)
+                    != FileSystemConnectorOptions.FileStatisticsType.ALL) {
+                return TableStats.UNKNOWN;
+            }
+
+            HiveSourceBuilder sourceBuilder =
+                    new HiveSourceBuilder(jobConf, flinkConf, tablePath, hiveVersion, catalogTable)
+                            .setProjectedFields(projectedFields)
+                            .setLimit(limit);
+            int threadNum =
+                    flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM);
+            List<HiveTablePartition> hivePartitionsToRead =
+                    getAllPartitions(
+                            jobConf,
+                            hiveVersion,
+                            tablePath,
+                            catalogTable.getPartitionKeys(),
+                            remainingPartitions);
+            BulkFormat<RowData, HiveSourceSplit> defaultBulkFormat =
+                    sourceBuilder.createDefaultBulkFormat();
+            List<HiveSourceSplit> inputSplits =
+                    HiveSourceFileEnumerator.createInputSplits(
+                            0, hivePartitionsToRead, threadNum, jobConf);
+            if (inputSplits.size() != 0) {
+                TableStats tableStats;
+                if (defaultBulkFormat instanceof FileBasedStatisticsReportableInputFormat) {
+                    // If HiveInputFormat's variable useMapRedReader is false, Hive using Flink's
+                    // InputFormat to read data.
+                    tableStats =
+                            ((FileBasedStatisticsReportableInputFormat) defaultBulkFormat)
+                                    .reportStatistics(
+                                            inputSplits.stream()
+                                                    .map(FileSourceSplit::path)
+                                                    .collect(Collectors.toList()),
+                                            catalogTable.getSchema().toRowDataType());
+                } else {
+                    // If HiveInputFormat's variable useMapRedReader is true, Hive using MapRed
+                    // InputFormat to read data.
+                    tableStats =
+                            getMapRedInputFormatStatistics(
+                                    inputSplits, catalogTable.getSchema().toRowDataType());
+                }
+                if (limit == null) {
+                    // If no limit push down, return recompute table stats.
+                    return tableStats;
+                } else {
+                    // If table have limit push down, return new table stats without table column
+                    // stats.
+                    long newRowCount = Math.min(limit, tableStats.getRowCount());
+                    return new TableStats(newRowCount);
+                }
+            } else {
+                return TableStats.UNKNOWN;
+            }
+
+        } catch (Exception e) {
+            return TableStats.UNKNOWN;
+        }
+    }
+
+    private TableStats getMapRedInputFormatStatistics(
+            List<HiveSourceSplit> inputSplits, DataType producedDataType) {
+        // TODO now we assume that one hive external table has only one storage file format
+        String serializationLib =
+                inputSplits
+                        .get(0)
+                        .getHiveTablePartition()
+                        .getStorageDescriptor()
+                        .getSerdeInfo()
+                        .getSerializationLib()
+                        .toLowerCase();
+        List<Path> files =
+                inputSplits.stream().map(FileSourceSplit::path).collect(Collectors.toList());
+        // Now we only support Csv, Parquet, Orc formats.
+        if (serializationLib.contains("parquet")) {
+            return ParquetFormatStatisticsReportUtil.getTableStatistics(
+                    files, producedDataType, jobConf, hiveVersion.startsWith("3"));

Review Comment:
   Why check `hiveVersion.startsWith("3")`?



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceStatisticsReportTest.java:
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.plan.stats.ColumnStats;
+import org.apache.flink.table.plan.stats.TableStats;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.utils.StatisticsReportTestBase;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for statistics functionality in {@link HiveTableSource}. */
+public class HiveTableSourceStatisticsReportTest extends StatisticsReportTestBase {
+
+    private static HiveCatalog hiveCatalog;
+    private static final String catalogName = "hive";
+    private static final String dbName = "db1";
+    private static final String sourceTable = "sourceTable";
+
+    @BeforeEach
+    public void setup(@TempDir File file) throws Exception {
+        super.setup(file);
+        hiveCatalog = HiveTestUtils.createHiveCatalog();
+        hiveCatalog.open();
+
+        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+        tEnv.registerCatalog(catalogName, hiveCatalog);
+        tEnv.useCatalog(catalogName);
+        tEnv.executeSql("create database " + dbName);
+
+        tEnv.executeSql(
+                "create table "
+                        + catalogName
+                        + "."
+                        + dbName
+                        + "."
+                        + sourceTable
+                        + "("
+                        + String.join(", ", ddlTypesMapToStringList(ddlTypesMap()))
+                        + ")");
+
+        DataType dataType =
+                tEnv.from(catalogName + "." + dbName + "." + sourceTable)
+                        .getResolvedSchema()
+                        .toPhysicalRowDataType();
+        tEnv.fromValues(dataType, getData())
+                .executeInsert(catalogName + "." + dbName + "." + sourceTable)
+                .await();
+    }
+
+    @AfterEach
+    public void after() {
+        super.after();
+        if (null != hiveCatalog) {
+            hiveCatalog.close();
+        }
+    }
+
+    @Override
+    protected String[] properties() {
+        return new String[0];
+    }
+
+    @Test
+    public void testMapRedCsvFormatHiveTableSourceStatisticsReport() {
+        FlinkStatistic statistic =
+                getStatisticsFromOptimizedPlan(
+                        "select * from " + catalogName + "." + dbName + "." + sourceTable);
+        assertThat(statistic.getTableStats()).isEqualTo(TableStats.UNKNOWN);
+    }
+
+    @Test
+    public void testFlinkOrcFormatHiveTableSourceStatisticsReport() throws Exception {
+        tEnv.executeSql(
+                "create table hive.db1.orcTable "
+                        + " ("
+                        + String.join(", ", ddlTypesMapToStringList(ddlTypesMap()))
+                        + ") stored as orc");
+        tEnv.executeSql(
+                        "insert into hive.db1.orcTable select * from "
+                                + catalogName
+                                + "."
+                                + dbName
+                                + "."
+                                + sourceTable)
+                .await();
+
+        // Hive to read Orc file
+        FlinkStatistic statistic =
+                getStatisticsFromOptimizedPlan(
+                        "select * from hive.db1.orcTable where f_smallint > 100");
+        assertHiveTableOrcFormatTableStatsEquals(statistic.getTableStats(), 3, 1L);
+    }
+
+    @Test
+    public void testFlinkParquetFormatHiveTableSourceStatisticsReport() throws Exception {
+        tEnv.executeSql(
+                "create table hive.db1.parquetTable"
+                        + " ("
+                        + String.join(", ", ddlTypesMapToStringList(ddlTypesMap()))
+                        + ") stored as parquet");
+        tEnv.executeSql(
+                        "insert into hive.db1.parquetTable select * from "
+                                + catalogName
+                                + "."
+                                + dbName
+                                + "."
+                                + sourceTable)
+                .await();
+
+        // Hive to read Parquet file
+        FlinkStatistic statistic =
+                getStatisticsFromOptimizedPlan(
+                        "select * from hive.db1.parquetTable where f_smallint > 100");
+        assertHiveTableParquetFormatTableStatsEquals(statistic.getTableStats(), 3, 1L);
+    }
+
+    @Test
+    public void testMapRedOrcFormatHiveTableSourceStatisticsReport() throws Exception {
+        // Use mapRed parquet format.
+        tEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, true);
+        tEnv.executeSql(
+                "create table hive.db1.orcTable"
+                        + " ("
+                        + String.join(", ", ddlTypesMapToStringList(ddlTypesMap()))
+                        + ") stored as orc");
+        tEnv.executeSql(
+                        "insert into hive.db1.orcTable select * from "
+                                + catalogName
+                                + "."
+                                + dbName
+                                + "."
+                                + sourceTable)
+                .await();
+
+        // Hive to read Orc file
+        FlinkStatistic statistic =
+                getStatisticsFromOptimizedPlan(
+                        "select * from hive.db1.orcTable where f_smallint > 100");
+        assertHiveTableOrcFormatTableStatsEquals(statistic.getTableStats(), 3, 1L);
+    }
+
+    @Test
+    public void testMapRedParquetFormatHiveTableSourceStatisticsReport() throws Exception {
+        // Use mapRed parquet format.
+        tEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, true);
+        tEnv.executeSql(
+                "create table hive.db1.parquetTable"
+                        + " ("
+                        + String.join(", ", ddlTypesMapToStringList(ddlTypesMap()))
+                        + ") stored as parquet");
+        tEnv.executeSql(
+                        "insert into hive.db1.parquetTable select * from "
+                                + catalogName
+                                + "."
+                                + dbName
+                                + "."
+                                + sourceTable)
+                .await();
+
+        // Hive to read Parquet file.
+        FlinkStatistic statistic =
+                getStatisticsFromOptimizedPlan(
+                        "select * from hive.db1.parquetTable where f_smallint > 100");
+        assertHiveTableParquetFormatTableStatsEquals(statistic.getTableStats(), 3, 1L);
+    }
+
+    @Test
+    public void testHiveTableSourceWithLimitPushDown() {
+        FlinkStatistic statistic =
+                getStatisticsFromOptimizedPlan(
+                        "select * from "
+                                + catalogName
+                                + "."
+                                + dbName
+                                + "."
+                                + sourceTable
+                                + " limit 1");
+        assertThat(statistic.getTableStats()).isEqualTo(new TableStats(1));
+    }
+
+    @Override
+    protected Map<String, String> ddlTypesMap() {
+        // hive table ddl now don't support type: TIMESTAMP(3), TIMESTAMP(9), TIMESTAMP WITHOUT TIME
+        // ZONE, TIMESTAMP WITH LOCAL TIME ZONE AND ROW. So we remove these types.
+        Map<String, String> ddlTypesMap = super.ddlTypesMap();
+        String timestampTypeName = ddlTypesMap.remove("timestamp(3)");
+        ddlTypesMap.remove("timestamp(9)");
+        ddlTypesMap.remove("timestamp without time zone");
+        ddlTypesMap.remove("timestamp with local time zone");
+        String binaryTypeName = ddlTypesMap.remove("binary(1)");
+        ddlTypesMap.remove("varbinary(1)");
+        ddlTypesMap.remove("time");
+        ddlTypesMap.remove("row<col1 string, col2 int>");
+        ddlTypesMap.put("timestamp", timestampTypeName);
+        ddlTypesMap.put("binary", binaryTypeName);
+
+        return ddlTypesMap;
+    }
+
+    @Override
+    protected Map<String, List<Object>> getDataMap() {
+        // hive table ddl now don't support type: TIMESTAMP(3), TIMESTAMP(9), TIMESTAMP WITHOUT TIME
+        // ZONE, TIMESTAMP WITH LOCAL TIME ZONE AND ROW. So we remove these types related data.
+        Map<String, List<Object>> dataMap = super.getDataMap();
+        List<Object> timestampDate = dataMap.remove("timestamp(3)");
+        dataMap.remove("timestamp(9)");
+        dataMap.remove("timestamp without time zone");
+        dataMap.remove("timestamp with local time zone");
+        List<Object> binaryDate = dataMap.remove("binary(1)");
+        dataMap.remove("varbinary(1)");
+        dataMap.remove("time");
+        dataMap.remove("row<col1 string, col2 int>");
+        dataMap.put("timestamp", timestampDate);
+        dataMap.put("binary", binaryDate);
+
+        return dataMap;
+    }
+
+    private static void assertHiveTableOrcFormatTableStatsEquals(

Review Comment:
   Is it possible to extact the common code between `assertHiveTableOrcFormatTableStatsEquals`
   and `assertHiveTableParquetFormatTableStatsEquals`. 
   Also,  could it be moved to `StatisticsReportTestBase`?



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java:
##########
@@ -261,6 +273,99 @@ public DynamicTableSource copy() {
         return source;
     }
 
+    @Override
+    public TableStats reportStatistics() {
+        try {
+            // only support BOUNDED source
+            if (isStreamingSource()) {
+                return TableStats.UNKNOWN;
+            }
+            if (flinkConf.get(FileSystemConnectorOptions.SOURCE_REPORT_STATISTICS)
+                    != FileSystemConnectorOptions.FileStatisticsType.ALL) {
+                return TableStats.UNKNOWN;
+            }
+
+            HiveSourceBuilder sourceBuilder =
+                    new HiveSourceBuilder(jobConf, flinkConf, tablePath, hiveVersion, catalogTable)
+                            .setProjectedFields(projectedFields)
+                            .setLimit(limit);
+            int threadNum =
+                    flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM);
+            List<HiveTablePartition> hivePartitionsToRead =
+                    getAllPartitions(
+                            jobConf,
+                            hiveVersion,
+                            tablePath,
+                            catalogTable.getPartitionKeys(),
+                            remainingPartitions);
+            BulkFormat<RowData, HiveSourceSplit> defaultBulkFormat =
+                    sourceBuilder.createDefaultBulkFormat();
+            List<HiveSourceSplit> inputSplits =
+                    HiveSourceFileEnumerator.createInputSplits(
+                            0, hivePartitionsToRead, threadNum, jobConf);
+            if (inputSplits.size() != 0) {
+                TableStats tableStats;
+                if (defaultBulkFormat instanceof FileBasedStatisticsReportableInputFormat) {
+                    // If HiveInputFormat's variable useMapRedReader is false, Hive using Flink's

Review Comment:
   ```suggestion
                       // If HiveInputFormat's variable useMapRedReader is false, it will use Flink's
   ```



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java:
##########
@@ -261,6 +273,99 @@ public DynamicTableSource copy() {
         return source;
     }
 
+    @Override
+    public TableStats reportStatistics() {
+        try {
+            // only support BOUNDED source
+            if (isStreamingSource()) {
+                return TableStats.UNKNOWN;
+            }
+            if (flinkConf.get(FileSystemConnectorOptions.SOURCE_REPORT_STATISTICS)
+                    != FileSystemConnectorOptions.FileStatisticsType.ALL) {
+                return TableStats.UNKNOWN;
+            }
+
+            HiveSourceBuilder sourceBuilder =
+                    new HiveSourceBuilder(jobConf, flinkConf, tablePath, hiveVersion, catalogTable)
+                            .setProjectedFields(projectedFields)
+                            .setLimit(limit);
+            int threadNum =
+                    flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM);
+            List<HiveTablePartition> hivePartitionsToRead =
+                    getAllPartitions(
+                            jobConf,
+                            hiveVersion,
+                            tablePath,
+                            catalogTable.getPartitionKeys(),
+                            remainingPartitions);
+            BulkFormat<RowData, HiveSourceSplit> defaultBulkFormat =
+                    sourceBuilder.createDefaultBulkFormat();
+            List<HiveSourceSplit> inputSplits =
+                    HiveSourceFileEnumerator.createInputSplits(
+                            0, hivePartitionsToRead, threadNum, jobConf);
+            if (inputSplits.size() != 0) {
+                TableStats tableStats;
+                if (defaultBulkFormat instanceof FileBasedStatisticsReportableInputFormat) {
+                    // If HiveInputFormat's variable useMapRedReader is false, Hive using Flink's
+                    // InputFormat to read data.
+                    tableStats =
+                            ((FileBasedStatisticsReportableInputFormat) defaultBulkFormat)
+                                    .reportStatistics(
+                                            inputSplits.stream()
+                                                    .map(FileSourceSplit::path)
+                                                    .collect(Collectors.toList()),
+                                            catalogTable.getSchema().toRowDataType());
+                } else {
+                    // If HiveInputFormat's variable useMapRedReader is true, Hive using MapRed
+                    // InputFormat to read data.
+                    tableStats =
+                            getMapRedInputFormatStatistics(
+                                    inputSplits, catalogTable.getSchema().toRowDataType());
+                }
+                if (limit == null) {
+                    // If no limit push down, return recompute table stats.
+                    return tableStats;
+                } else {
+                    // If table have limit push down, return new table stats without table column
+                    // stats.
+                    long newRowCount = Math.min(limit, tableStats.getRowCount());
+                    return new TableStats(newRowCount);
+                }
+            } else {
+                return TableStats.UNKNOWN;
+            }
+
+        } catch (Exception e) {
+            return TableStats.UNKNOWN;

Review Comment:
   I think it will be better to log the exception



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java:
##########
@@ -261,6 +273,99 @@ public DynamicTableSource copy() {
         return source;
     }
 
+    @Override
+    public TableStats reportStatistics() {
+        try {
+            // only support BOUNDED source
+            if (isStreamingSource()) {
+                return TableStats.UNKNOWN;
+            }
+            if (flinkConf.get(FileSystemConnectorOptions.SOURCE_REPORT_STATISTICS)
+                    != FileSystemConnectorOptions.FileStatisticsType.ALL) {
+                return TableStats.UNKNOWN;
+            }
+
+            HiveSourceBuilder sourceBuilder =
+                    new HiveSourceBuilder(jobConf, flinkConf, tablePath, hiveVersion, catalogTable)
+                            .setProjectedFields(projectedFields)
+                            .setLimit(limit);
+            int threadNum =
+                    flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM);
+            List<HiveTablePartition> hivePartitionsToRead =
+                    getAllPartitions(
+                            jobConf,
+                            hiveVersion,
+                            tablePath,
+                            catalogTable.getPartitionKeys(),
+                            remainingPartitions);
+            BulkFormat<RowData, HiveSourceSplit> defaultBulkFormat =
+                    sourceBuilder.createDefaultBulkFormat();
+            List<HiveSourceSplit> inputSplits =
+                    HiveSourceFileEnumerator.createInputSplits(
+                            0, hivePartitionsToRead, threadNum, jobConf);
+            if (inputSplits.size() != 0) {
+                TableStats tableStats;
+                if (defaultBulkFormat instanceof FileBasedStatisticsReportableInputFormat) {
+                    // If HiveInputFormat's variable useMapRedReader is false, Hive using Flink's
+                    // InputFormat to read data.
+                    tableStats =
+                            ((FileBasedStatisticsReportableInputFormat) defaultBulkFormat)
+                                    .reportStatistics(
+                                            inputSplits.stream()
+                                                    .map(FileSourceSplit::path)
+                                                    .collect(Collectors.toList()),
+                                            catalogTable.getSchema().toRowDataType());
+                } else {
+                    // If HiveInputFormat's variable useMapRedReader is true, Hive using MapRed
+                    // InputFormat to read data.
+                    tableStats =
+                            getMapRedInputFormatStatistics(
+                                    inputSplits, catalogTable.getSchema().toRowDataType());
+                }
+                if (limit == null) {
+                    // If no limit push down, return recompute table stats.
+                    return tableStats;
+                } else {
+                    // If table have limit push down, return new table stats without table column
+                    // stats.
+                    long newRowCount = Math.min(limit, tableStats.getRowCount());
+                    return new TableStats(newRowCount);
+                }
+            } else {
+                return TableStats.UNKNOWN;

Review Comment:
   If `inputSplits.size() == 0`, why not return `TableStats(0)`?



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java:
##########
@@ -261,6 +273,99 @@ public DynamicTableSource copy() {
         return source;
     }
 
+    @Override
+    public TableStats reportStatistics() {
+        try {
+            // only support BOUNDED source
+            if (isStreamingSource()) {
+                return TableStats.UNKNOWN;
+            }
+            if (flinkConf.get(FileSystemConnectorOptions.SOURCE_REPORT_STATISTICS)
+                    != FileSystemConnectorOptions.FileStatisticsType.ALL) {
+                return TableStats.UNKNOWN;
+            }
+
+            HiveSourceBuilder sourceBuilder =
+                    new HiveSourceBuilder(jobConf, flinkConf, tablePath, hiveVersion, catalogTable)
+                            .setProjectedFields(projectedFields)
+                            .setLimit(limit);
+            int threadNum =
+                    flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM);
+            List<HiveTablePartition> hivePartitionsToRead =
+                    getAllPartitions(
+                            jobConf,
+                            hiveVersion,
+                            tablePath,
+                            catalogTable.getPartitionKeys(),
+                            remainingPartitions);
+            BulkFormat<RowData, HiveSourceSplit> defaultBulkFormat =
+                    sourceBuilder.createDefaultBulkFormat();
+            List<HiveSourceSplit> inputSplits =
+                    HiveSourceFileEnumerator.createInputSplits(
+                            0, hivePartitionsToRead, threadNum, jobConf);
+            if (inputSplits.size() != 0) {
+                TableStats tableStats;
+                if (defaultBulkFormat instanceof FileBasedStatisticsReportableInputFormat) {
+                    // If HiveInputFormat's variable useMapRedReader is false, Hive using Flink's
+                    // InputFormat to read data.
+                    tableStats =
+                            ((FileBasedStatisticsReportableInputFormat) defaultBulkFormat)
+                                    .reportStatistics(
+                                            inputSplits.stream()
+                                                    .map(FileSourceSplit::path)
+                                                    .collect(Collectors.toList()),
+                                            catalogTable.getSchema().toRowDataType());
+                } else {
+                    // If HiveInputFormat's variable useMapRedReader is true, Hive using MapRed
+                    // InputFormat to read data.
+                    tableStats =
+                            getMapRedInputFormatStatistics(
+                                    inputSplits, catalogTable.getSchema().toRowDataType());
+                }
+                if (limit == null) {
+                    // If no limit push down, return recompute table stats.
+                    return tableStats;
+                } else {
+                    // If table have limit push down, return new table stats without table column
+                    // stats.
+                    long newRowCount = Math.min(limit, tableStats.getRowCount());
+                    return new TableStats(newRowCount);
+                }
+            } else {
+                return TableStats.UNKNOWN;
+            }
+
+        } catch (Exception e) {
+            return TableStats.UNKNOWN;
+        }
+    }
+
+    private TableStats getMapRedInputFormatStatistics(
+            List<HiveSourceSplit> inputSplits, DataType producedDataType) {
+        // TODO now we assume that one hive external table has only one storage file format

Review Comment:
   What if Hive's partitions have different formats?  Seems it will return `UNKONW`?



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java:
##########
@@ -261,6 +273,99 @@ public DynamicTableSource copy() {
         return source;
     }
 
+    @Override
+    public TableStats reportStatistics() {
+        try {
+            // only support BOUNDED source
+            if (isStreamingSource()) {
+                return TableStats.UNKNOWN;
+            }
+            if (flinkConf.get(FileSystemConnectorOptions.SOURCE_REPORT_STATISTICS)
+                    != FileSystemConnectorOptions.FileStatisticsType.ALL) {
+                return TableStats.UNKNOWN;
+            }
+
+            HiveSourceBuilder sourceBuilder =
+                    new HiveSourceBuilder(jobConf, flinkConf, tablePath, hiveVersion, catalogTable)
+                            .setProjectedFields(projectedFields)
+                            .setLimit(limit);
+            int threadNum =
+                    flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM);
+            List<HiveTablePartition> hivePartitionsToRead =
+                    getAllPartitions(
+                            jobConf,
+                            hiveVersion,
+                            tablePath,
+                            catalogTable.getPartitionKeys(),
+                            remainingPartitions);
+            BulkFormat<RowData, HiveSourceSplit> defaultBulkFormat =
+                    sourceBuilder.createDefaultBulkFormat();
+            List<HiveSourceSplit> inputSplits =
+                    HiveSourceFileEnumerator.createInputSplits(
+                            0, hivePartitionsToRead, threadNum, jobConf);
+            if (inputSplits.size() != 0) {
+                TableStats tableStats;
+                if (defaultBulkFormat instanceof FileBasedStatisticsReportableInputFormat) {
+                    // If HiveInputFormat's variable useMapRedReader is false, Hive using Flink's
+                    // InputFormat to read data.
+                    tableStats =
+                            ((FileBasedStatisticsReportableInputFormat) defaultBulkFormat)
+                                    .reportStatistics(
+                                            inputSplits.stream()
+                                                    .map(FileSourceSplit::path)
+                                                    .collect(Collectors.toList()),
+                                            catalogTable.getSchema().toRowDataType());
+                } else {
+                    // If HiveInputFormat's variable useMapRedReader is true, Hive using MapRed
+                    // InputFormat to read data.
+                    tableStats =
+                            getMapRedInputFormatStatistics(
+                                    inputSplits, catalogTable.getSchema().toRowDataType());
+                }
+                if (limit == null) {
+                    // If no limit push down, return recompute table stats.
+                    return tableStats;
+                } else {
+                    // If table have limit push down, return new table stats without table column
+                    // stats.
+                    long newRowCount = Math.min(limit, tableStats.getRowCount());
+                    return new TableStats(newRowCount);
+                }
+            } else {
+                return TableStats.UNKNOWN;
+            }
+
+        } catch (Exception e) {
+            return TableStats.UNKNOWN;
+        }
+    }
+
+    private TableStats getMapRedInputFormatStatistics(
+            List<HiveSourceSplit> inputSplits, DataType producedDataType) {
+        // TODO now we assume that one hive external table has only one storage file format
+        String serializationLib =
+                inputSplits
+                        .get(0)
+                        .getHiveTablePartition()
+                        .getStorageDescriptor()
+                        .getSerdeInfo()
+                        .getSerializationLib()
+                        .toLowerCase();
+        List<Path> files =
+                inputSplits.stream().map(FileSourceSplit::path).collect(Collectors.toList());
+        // Now we only support Csv, Parquet, Orc formats.

Review Comment:
   Csv is also supported? Seems there's no code cover it.



##########
flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFilesystemStatisticsReportTest.java:
##########
@@ -18,90 +18,36 @@
 
 package org.apache.flink.formats.csv;
 
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.plan.stats.TableStats;
-import org.apache.flink.table.planner.plan.schema.TableSourceTable;
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
-import org.apache.flink.table.planner.utils.BatchTableTestUtil;
-import org.apache.flink.table.planner.utils.TableTestBase;
-import org.apache.flink.table.planner.utils.TableTestUtil;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.table.types.DataType;
 
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelVisitor;
-import org.apache.calcite.rel.core.TableScan;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.Arrays;
-import java.util.List;
+import java.util.concurrent.ExecutionException;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Test for statistics functionality in {@link CsvFormatFactory} in the case of file system source.
  */
-public class CsvFormatFilesystemStatisticsReportTest extends TableTestBase {
-    private BatchTableTestUtil util;
-    private TableEnvironment tEnv;
-    @TempDir private static File path;
+public class CsvFormatFilesystemStatisticsReportTest extends CsvFormatStatisticsReportTest {

Review Comment:
   Since `CsvFormatFilesystemStatisticsReportTest` extends `CsvFormatStatisticsReportTest`, it wll also run the tests in `CsvFormatStatisticsReportTest`. Is it expected?



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceStatisticsReportTest.java:
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.plan.stats.ColumnStats;
+import org.apache.flink.table.plan.stats.TableStats;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.utils.StatisticsReportTestBase;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for statistics functionality in {@link HiveTableSource}. */
+public class HiveTableSourceStatisticsReportTest extends StatisticsReportTestBase {
+
+    private static HiveCatalog hiveCatalog;
+    private static final String catalogName = "hive";
+    private static final String dbName = "db1";
+    private static final String sourceTable = "sourceTable";
+
+    @BeforeEach
+    public void setup(@TempDir File file) throws Exception {
+        super.setup(file);
+        hiveCatalog = HiveTestUtils.createHiveCatalog();
+        hiveCatalog.open();
+
+        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+        tEnv.registerCatalog(catalogName, hiveCatalog);
+        tEnv.useCatalog(catalogName);
+        tEnv.executeSql("create database " + dbName);
+
+        tEnv.executeSql(
+                "create table "
+                        + catalogName
+                        + "."
+                        + dbName
+                        + "."
+                        + sourceTable
+                        + "("
+                        + String.join(", ", ddlTypesMapToStringList(ddlTypesMap()))
+                        + ")");
+
+        DataType dataType =
+                tEnv.from(catalogName + "." + dbName + "." + sourceTable)
+                        .getResolvedSchema()
+                        .toPhysicalRowDataType();
+        tEnv.fromValues(dataType, getData())
+                .executeInsert(catalogName + "." + dbName + "." + sourceTable)
+                .await();
+    }
+
+    @AfterEach
+    public void after() {
+        super.after();
+        if (null != hiveCatalog) {
+            hiveCatalog.close();
+        }
+    }
+
+    @Override
+    protected String[] properties() {
+        return new String[0];
+    }
+
+    @Test
+    public void testMapRedCsvFormatHiveTableSourceStatisticsReport() {
+        FlinkStatistic statistic =
+                getStatisticsFromOptimizedPlan(
+                        "select * from " + catalogName + "." + dbName + "." + sourceTable);
+        assertThat(statistic.getTableStats()).isEqualTo(TableStats.UNKNOWN);
+    }
+
+    @Test
+    public void testFlinkOrcFormatHiveTableSourceStatisticsReport() throws Exception {
+        tEnv.executeSql(
+                "create table hive.db1.orcTable "
+                        + " ("
+                        + String.join(", ", ddlTypesMapToStringList(ddlTypesMap()))
+                        + ") stored as orc");
+        tEnv.executeSql(
+                        "insert into hive.db1.orcTable select * from "
+                                + catalogName
+                                + "."
+                                + dbName
+                                + "."
+                                + sourceTable)
+                .await();
+
+        // Hive to read Orc file
+        FlinkStatistic statistic =
+                getStatisticsFromOptimizedPlan(
+                        "select * from hive.db1.orcTable where f_smallint > 100");
+        assertHiveTableOrcFormatTableStatsEquals(statistic.getTableStats(), 3, 1L);
+    }
+
+    @Test
+    public void testFlinkParquetFormatHiveTableSourceStatisticsReport() throws Exception {
+        tEnv.executeSql(
+                "create table hive.db1.parquetTable"
+                        + " ("
+                        + String.join(", ", ddlTypesMapToStringList(ddlTypesMap()))
+                        + ") stored as parquet");
+        tEnv.executeSql(
+                        "insert into hive.db1.parquetTable select * from "
+                                + catalogName
+                                + "."
+                                + dbName
+                                + "."
+                                + sourceTable)
+                .await();
+
+        // Hive to read Parquet file
+        FlinkStatistic statistic =
+                getStatisticsFromOptimizedPlan(
+                        "select * from hive.db1.parquetTable where f_smallint > 100");
+        assertHiveTableParquetFormatTableStatsEquals(statistic.getTableStats(), 3, 1L);
+    }
+
+    @Test
+    public void testMapRedOrcFormatHiveTableSourceStatisticsReport() throws Exception {
+        // Use mapRed parquet format.
+        tEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, true);
+        tEnv.executeSql(
+                "create table hive.db1.orcTable"
+                        + " ("
+                        + String.join(", ", ddlTypesMapToStringList(ddlTypesMap()))
+                        + ") stored as orc");
+        tEnv.executeSql(
+                        "insert into hive.db1.orcTable select * from "
+                                + catalogName
+                                + "."
+                                + dbName
+                                + "."
+                                + sourceTable)
+                .await();
+
+        // Hive to read Orc file
+        FlinkStatistic statistic =
+                getStatisticsFromOptimizedPlan(
+                        "select * from hive.db1.orcTable where f_smallint > 100");
+        assertHiveTableOrcFormatTableStatsEquals(statistic.getTableStats(), 3, 1L);
+    }
+
+    @Test
+    public void testMapRedParquetFormatHiveTableSourceStatisticsReport() throws Exception {
+        // Use mapRed parquet format.
+        tEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, true);
+        tEnv.executeSql(
+                "create table hive.db1.parquetTable"
+                        + " ("
+                        + String.join(", ", ddlTypesMapToStringList(ddlTypesMap()))
+                        + ") stored as parquet");
+        tEnv.executeSql(
+                        "insert into hive.db1.parquetTable select * from "
+                                + catalogName
+                                + "."
+                                + dbName
+                                + "."
+                                + sourceTable)
+                .await();
+
+        // Hive to read Parquet file.
+        FlinkStatistic statistic =
+                getStatisticsFromOptimizedPlan(
+                        "select * from hive.db1.parquetTable where f_smallint > 100");
+        assertHiveTableParquetFormatTableStatsEquals(statistic.getTableStats(), 3, 1L);
+    }
+
+    @Test
+    public void testHiveTableSourceWithLimitPushDown() {
+        FlinkStatistic statistic =
+                getStatisticsFromOptimizedPlan(
+                        "select * from "
+                                + catalogName
+                                + "."
+                                + dbName
+                                + "."
+                                + sourceTable
+                                + " limit 1");
+        assertThat(statistic.getTableStats()).isEqualTo(new TableStats(1));
+    }
+
+    @Override
+    protected Map<String, String> ddlTypesMap() {
+        // hive table ddl now don't support type: TIMESTAMP(3), TIMESTAMP(9), TIMESTAMP WITHOUT TIME
+        // ZONE, TIMESTAMP WITH LOCAL TIME ZONE AND ROW. So we remove these types.
+        Map<String, String> ddlTypesMap = super.ddlTypesMap();
+        String timestampTypeName = ddlTypesMap.remove("timestamp(3)");
+        ddlTypesMap.remove("timestamp(9)");
+        ddlTypesMap.remove("timestamp without time zone");
+        ddlTypesMap.remove("timestamp with local time zone");
+        String binaryTypeName = ddlTypesMap.remove("binary(1)");
+        ddlTypesMap.remove("varbinary(1)");
+        ddlTypesMap.remove("time");
+        ddlTypesMap.remove("row<col1 string, col2 int>");
+        ddlTypesMap.put("timestamp", timestampTypeName);
+        ddlTypesMap.put("binary", binaryTypeName);
+
+        return ddlTypesMap;
+    }
+
+    @Override
+    protected Map<String, List<Object>> getDataMap() {
+        // hive table ddl now don't support type: TIMESTAMP(3), TIMESTAMP(9), TIMESTAMP WITHOUT TIME
+        // ZONE, TIMESTAMP WITH LOCAL TIME ZONE AND ROW. So we remove these types related data.
+        Map<String, List<Object>> dataMap = super.getDataMap();
+        List<Object> timestampDate = dataMap.remove("timestamp(3)");
+        dataMap.remove("timestamp(9)");
+        dataMap.remove("timestamp without time zone");
+        dataMap.remove("timestamp with local time zone");
+        List<Object> binaryDate = dataMap.remove("binary(1)");
+        dataMap.remove("varbinary(1)");
+        dataMap.remove("time");
+        dataMap.remove("row<col1 string, col2 int>");

Review Comment:
   Why remove this?



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceStatisticsReportTest.java:
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.plan.stats.ColumnStats;
+import org.apache.flink.table.plan.stats.TableStats;
+import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.utils.StatisticsReportTestBase;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for statistics functionality in {@link HiveTableSource}. */
+public class HiveTableSourceStatisticsReportTest extends StatisticsReportTestBase {
+
+    private static HiveCatalog hiveCatalog;
+    private static final String catalogName = "hive";
+    private static final String dbName = "db1";
+    private static final String sourceTable = "sourceTable";
+
+    @BeforeEach
+    public void setup(@TempDir File file) throws Exception {
+        super.setup(file);
+        hiveCatalog = HiveTestUtils.createHiveCatalog();
+        hiveCatalog.open();
+
+        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+        tEnv.registerCatalog(catalogName, hiveCatalog);
+        tEnv.useCatalog(catalogName);
+        tEnv.executeSql("create database " + dbName);
+
+        tEnv.executeSql(
+                "create table "

Review Comment:
   nit: Using String.formart(xx)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org