You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/07/07 13:03:48 UTC

[flink] 01/02: [FLINK-27244][hive] Support read sub-directories in partition directory with Hive tables

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3d14248c104c649dfce065c8f6f6d177be35f3ef
Author: luoyuxia <lu...@alibaba-inc.com>
AuthorDate: Fri Apr 15 10:40:43 2022 +0800

    [FLINK-27244][hive] Support read sub-directories in partition directory with Hive tables
    
    This closes #19482
---
 .../docs/connectors/table/hive/hive_read_write.md  | 20 +++++++++++
 .../docs/connectors/table/hive/hive_read_write.md  | 20 +++++++++++
 .../connectors/hive/HiveDynamicTableFactory.java   |  7 ++++
 .../apache/flink/connectors/hive/HiveOptions.java  |  9 +++++
 .../flink/connectors/hive/HiveDialectITCase.java   | 42 ++++++++++++++++++++++
 5 files changed, 98 insertions(+)

diff --git a/docs/content.zh/docs/connectors/table/hive/hive_read_write.md b/docs/content.zh/docs/connectors/table/hive/hive_read_write.md
index 63e31538f5b..ba712cc82e5 100644
--- a/docs/content.zh/docs/connectors/table/hive/hive_read_write.md
+++ b/docs/content.zh/docs/connectors/table/hive/hive_read_write.md
@@ -170,6 +170,26 @@ following parameters in `TableConfig` (note that these parameters affect all sou
 
 Multi-thread is used to split hive's partitions. You can use `table.exec.hive.load-partition-splits.thread-num` to configure the thread number. The default value is 3 and the configured value should be bigger than 0.
 
+### Read Partition With Subdirectory
+
+In some case, you may create an external table referring another table, but the partition columns is a subset of the referred table.
+For example, you have a partitioned table `fact_tz` with partition `day`/`hour`:
+
+```sql
+CREATE TABLE fact_tz(x int) PARTITIONED BY (day STRING, hour STRING);
+```
+
+And you have an external table `fact_daily` referring to table `fact_tz` with a coarse-grained partition `day`:
+
+```sql
+create external table fact_daily(x int) PARTITIONED BY (ds STRING) location 'fact_tz_localtion' ;
+```
+
+Then when reading the external table, there will be sub-directories in the partition directory of the external table.
+
+You can configure `table.exec.hive.read-partition-with-subdirectory.enabled` to allow Flink to read the sub-directories or skip them directly.
+The default value is true, it will read the sub-directories. Otherwise, it will throw the exception "not a file: xxx" when the partition directory contains any sub-directory.
+
 ## Temporal Table Join
 
 You can use a Hive table as a temporal table, and then a stream can correlate the Hive table by temporal join. 
diff --git a/docs/content/docs/connectors/table/hive/hive_read_write.md b/docs/content/docs/connectors/table/hive/hive_read_write.md
index 507836c9bf1..3c5f7cd043a 100644
--- a/docs/content/docs/connectors/table/hive/hive_read_write.md
+++ b/docs/content/docs/connectors/table/hive/hive_read_write.md
@@ -170,6 +170,26 @@ following parameters in `TableConfig` (note that these parameters affect all sou
 
 Multi-thread is used to split hive's partitions. You can use `table.exec.hive.load-partition-splits.thread-num` to configure the thread number. The default value is 3 and the configured value should be bigger than 0.
 
+### Read Partition With Subdirectory
+
+In some case, you may create an external table referring another table, but the partition columns is a subset of the referred table.
+For example, you have a partitioned table `fact_tz` with partition `day`/`hour`:
+
+```sql
+CREATE TABLE fact_tz(x int) PARTITIONED BY (day STRING, hour STRING);
+```
+
+And you have an external table `fact_daily` referring to table `fact_tz` with a coarse-grained partition `day`:
+
+```sql
+create external table fact_daily(x int) PARTITIONED BY (ds STRING) location 'fact_tz_localtion' ;
+```
+
+Then when reading the external table `fact_daily`, there will be sub-directories in the partition directory of the table.
+
+You can configure `table.exec.hive.read-partition-with-subdirectory.enabled` to allow Flink to read the sub-directories or skip them directly.
+The default value is true, it will read the sub-directories. Otherwise, it will throw the exception "not a file: xxx" when the partition directory contains any sub-directory.
+
 ## Temporal Table Join
 
 You can use a Hive table as a temporal table, and then a stream can correlate the Hive table by temporal join. 
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java
index fe657c68fbf..0647f07a750 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java
@@ -36,11 +36,13 @@ import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 
 import java.util.Set;
 
 import static org.apache.flink.connectors.hive.HiveOptions.STREAMING_SOURCE_ENABLE;
 import static org.apache.flink.connectors.hive.HiveOptions.STREAMING_SOURCE_PARTITION_INCLUDE;
+import static org.apache.flink.connectors.hive.HiveOptions.TABLE_EXEC_HIVE_READ_PARTITION_WITH_SUBDIRECTORY_ENABLED;
 
 /** A dynamic table factory implementation for Hive catalog. */
 public class HiveDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
@@ -130,6 +132,11 @@ public class HiveDynamicTableFactory implements DynamicTableSourceFactory, Dynam
                         .defaultValue()
                         .equals(configuration.get(STREAMING_SOURCE_PARTITION_INCLUDE));
         final JobConf jobConf = JobConfUtils.createJobConfWithCredentials(hiveConf);
+        boolean readSubDirectory =
+                context.getConfiguration()
+                        .get(TABLE_EXEC_HIVE_READ_PARTITION_WITH_SUBDIRECTORY_ENABLED);
+        // set whether to read directory recursively
+        jobConf.set(FileInputFormat.INPUT_DIR_RECURSIVE, String.valueOf(readSubDirectory));
 
         // hive table source that has not lookup ability
         if (isStreamingSource && includeAllPartition) {
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java
index 8717e842e3d..37095bd65d7 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java
@@ -39,6 +39,15 @@ public class HiveOptions {
                             "If it is false, using flink native vectorized reader to read orc files; "
                                     + "If it is true, using hadoop mapred record reader to read orc files.");
 
+    public static final ConfigOption<Boolean>
+            TABLE_EXEC_HIVE_READ_PARTITION_WITH_SUBDIRECTORY_ENABLED =
+                    key("table.exec.hive.read-partition-with-subdirectory.enabled")
+                            .booleanType()
+                            .defaultValue(true)
+                            .withDescription(
+                                    "If it is true, flink will read the files of partitioned hive table from subdirectories under the partition directory to be read.\n"
+                                            + "If it is false, an exception that 'not a file: xxx' will be thrown when the partition directory contains any sub-directory.");
+
     public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM =
             key("table.exec.hive.infer-source-parallelism")
                     .booleanType()
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
index 491b19104f9..12ca33871d0 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
@@ -76,6 +76,7 @@ import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.List;
 
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -557,6 +558,47 @@ public class HiveDialectITCase {
                 .isEqualTo("\n");
     }
 
+    @Test
+    public void testTableWithSubDirsInPartitionDir() throws Exception {
+        tableEnv.executeSql("CREATE TABLE fact_tz(x int) PARTITIONED BY (ds STRING, hr STRING)");
+        tableEnv.executeSql("INSERT OVERWRITE TABLE fact_tz PARTITION (ds='1', hr='1') select 1")
+                .await();
+        tableEnv.executeSql("INSERT OVERWRITE TABLE fact_tz PARTITION (ds='1', hr='2') select 2")
+                .await();
+        String location = warehouse + "/fact_tz";
+        // create an external table
+        tableEnv.executeSql(
+                String.format(
+                        "create external table fact_daily(x int) PARTITIONED BY (ds STRING) location '%s'",
+                        location));
+        tableEnv.executeSql(
+                String.format(
+                        "ALTER TABLE fact_daily ADD PARTITION (ds='1') location '%s'",
+                        location + "/ds=1"));
+        List<Row> results =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select * from fact_daily WHERE ds='1' order by x")
+                                .collect());
+        // the data read from the external table fact_daily should contain the data in
+        // directory 'ds=1/hr=1', 'ds=1/hr=2'
+        assertThat(results.toString()).isEqualTo("[+I[1, 1], +I[2, 1]]");
+
+        tableEnv.getConfig()
+                .set(
+                        HiveOptions.TABLE_EXEC_HIVE_READ_PARTITION_WITH_SUBDIRECTORY_ENABLED.key(),
+                        "false");
+        // should throw exception when disable reading sub-dirs in partition directory
+        assertThatThrownBy(
+                        () ->
+                                CollectionUtil.iteratorToList(
+                                        tableEnv.executeSql("select * from fact_daily WHERE ds='1'")
+                                                .collect()))
+                .satisfies(
+                        anyCauseMatches(
+                                String.format(
+                                        "Not a file: file:%s", warehouse + "/fact_tz/ds=1/hr=2")));
+    }
+
     @Test
     public void testView() throws Exception {
         tableEnv.executeSql("create table tbl (x int,y string)");