You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/07/07 13:03:48 UTC
[flink] 01/02: [FLINK-27244][hive] Support read sub-directories in partition directory with Hive tables
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3d14248c104c649dfce065c8f6f6d177be35f3ef
Author: luoyuxia <lu...@alibaba-inc.com>
AuthorDate: Fri Apr 15 10:40:43 2022 +0800
[FLINK-27244][hive] Support read sub-directories in partition directory with Hive tables
This closes #19482
---
.../docs/connectors/table/hive/hive_read_write.md | 20 +++++++++++
.../docs/connectors/table/hive/hive_read_write.md | 20 +++++++++++
.../connectors/hive/HiveDynamicTableFactory.java | 7 ++++
.../apache/flink/connectors/hive/HiveOptions.java | 9 +++++
.../flink/connectors/hive/HiveDialectITCase.java | 42 ++++++++++++++++++++++
5 files changed, 98 insertions(+)
diff --git a/docs/content.zh/docs/connectors/table/hive/hive_read_write.md b/docs/content.zh/docs/connectors/table/hive/hive_read_write.md
index 63e31538f5b..ba712cc82e5 100644
--- a/docs/content.zh/docs/connectors/table/hive/hive_read_write.md
+++ b/docs/content.zh/docs/connectors/table/hive/hive_read_write.md
@@ -170,6 +170,26 @@ following parameters in `TableConfig` (note that these parameters affect all sou
Multi-thread is used to split hive's partitions. You can use `table.exec.hive.load-partition-splits.thread-num` to configure the thread number. The default value is 3 and the configured value should be bigger than 0.
+### Read Partition With Subdirectory
+
+In some case, you may create an external table referring another table, but the partition columns is a subset of the referred table.
+For example, you have a partitioned table `fact_tz` with partition `day`/`hour`:
+
+```sql
+CREATE TABLE fact_tz(x int) PARTITIONED BY (day STRING, hour STRING);
+```
+
+And you have an external table `fact_daily` referring to table `fact_tz` with a coarse-grained partition `day`:
+
+```sql
+create external table fact_daily(x int) PARTITIONED BY (ds STRING) location 'fact_tz_localtion' ;
+```
+
+Then when reading the external table, there will be sub-directories in the partition directory of the external table.
+
+You can configure `table.exec.hive.read-partition-with-subdirectory.enabled` to allow Flink to read the sub-directories or skip them directly.
+The default value is true, it will read the sub-directories. Otherwise, it will throw the exception "not a file: xxx" when the partition directory contains any sub-directory.
+
## Temporal Table Join
You can use a Hive table as a temporal table, and then a stream can correlate the Hive table by temporal join.
diff --git a/docs/content/docs/connectors/table/hive/hive_read_write.md b/docs/content/docs/connectors/table/hive/hive_read_write.md
index 507836c9bf1..3c5f7cd043a 100644
--- a/docs/content/docs/connectors/table/hive/hive_read_write.md
+++ b/docs/content/docs/connectors/table/hive/hive_read_write.md
@@ -170,6 +170,26 @@ following parameters in `TableConfig` (note that these parameters affect all sou
Multi-thread is used to split hive's partitions. You can use `table.exec.hive.load-partition-splits.thread-num` to configure the thread number. The default value is 3 and the configured value should be bigger than 0.
+### Read Partition With Subdirectory
+
+In some case, you may create an external table referring another table, but the partition columns is a subset of the referred table.
+For example, you have a partitioned table `fact_tz` with partition `day`/`hour`:
+
+```sql
+CREATE TABLE fact_tz(x int) PARTITIONED BY (day STRING, hour STRING);
+```
+
+And you have an external table `fact_daily` referring to table `fact_tz` with a coarse-grained partition `day`:
+
+```sql
+create external table fact_daily(x int) PARTITIONED BY (ds STRING) location 'fact_tz_localtion' ;
+```
+
+Then when reading the external table `fact_daily`, there will be sub-directories in the partition directory of the table.
+
+You can configure `table.exec.hive.read-partition-with-subdirectory.enabled` to allow Flink to read the sub-directories or skip them directly.
+The default value is true, it will read the sub-directories. Otherwise, it will throw the exception "not a file: xxx" when the partition directory contains any sub-directory.
+
## Temporal Table Join
You can use a Hive table as a temporal table, and then a stream can correlate the Hive table by temporal join.
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java
index fe657c68fbf..0647f07a750 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicTableFactory.java
@@ -36,11 +36,13 @@ import org.apache.flink.util.Preconditions;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.util.Set;
import static org.apache.flink.connectors.hive.HiveOptions.STREAMING_SOURCE_ENABLE;
import static org.apache.flink.connectors.hive.HiveOptions.STREAMING_SOURCE_PARTITION_INCLUDE;
+import static org.apache.flink.connectors.hive.HiveOptions.TABLE_EXEC_HIVE_READ_PARTITION_WITH_SUBDIRECTORY_ENABLED;
/** A dynamic table factory implementation for Hive catalog. */
public class HiveDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
@@ -130,6 +132,11 @@ public class HiveDynamicTableFactory implements DynamicTableSourceFactory, Dynam
.defaultValue()
.equals(configuration.get(STREAMING_SOURCE_PARTITION_INCLUDE));
final JobConf jobConf = JobConfUtils.createJobConfWithCredentials(hiveConf);
+ boolean readSubDirectory =
+ context.getConfiguration()
+ .get(TABLE_EXEC_HIVE_READ_PARTITION_WITH_SUBDIRECTORY_ENABLED);
+ // set whether to read directory recursively
+ jobConf.set(FileInputFormat.INPUT_DIR_RECURSIVE, String.valueOf(readSubDirectory));
// hive table source that has not lookup ability
if (isStreamingSource && includeAllPartition) {
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java
index 8717e842e3d..37095bd65d7 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java
@@ -39,6 +39,15 @@ public class HiveOptions {
"If it is false, using flink native vectorized reader to read orc files; "
+ "If it is true, using hadoop mapred record reader to read orc files.");
+ public static final ConfigOption<Boolean>
+ TABLE_EXEC_HIVE_READ_PARTITION_WITH_SUBDIRECTORY_ENABLED =
+ key("table.exec.hive.read-partition-with-subdirectory.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "If it is true, flink will read the files of partitioned hive table from subdirectories under the partition directory to be read.\n"
+ + "If it is false, an exception that 'not a file: xxx' will be thrown when the partition directory contains any sub-directory.");
+
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM =
key("table.exec.hive.infer-source-parallelism")
.booleanType()
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
index 491b19104f9..12ca33871d0 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
@@ -76,6 +76,7 @@ import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -557,6 +558,47 @@ public class HiveDialectITCase {
.isEqualTo("\n");
}
+ @Test
+ public void testTableWithSubDirsInPartitionDir() throws Exception {
+ tableEnv.executeSql("CREATE TABLE fact_tz(x int) PARTITIONED BY (ds STRING, hr STRING)");
+ tableEnv.executeSql("INSERT OVERWRITE TABLE fact_tz PARTITION (ds='1', hr='1') select 1")
+ .await();
+ tableEnv.executeSql("INSERT OVERWRITE TABLE fact_tz PARTITION (ds='1', hr='2') select 2")
+ .await();
+ String location = warehouse + "/fact_tz";
+ // create an external table
+ tableEnv.executeSql(
+ String.format(
+ "create external table fact_daily(x int) PARTITIONED BY (ds STRING) location '%s'",
+ location));
+ tableEnv.executeSql(
+ String.format(
+ "ALTER TABLE fact_daily ADD PARTITION (ds='1') location '%s'",
+ location + "/ds=1"));
+ List<Row> results =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select * from fact_daily WHERE ds='1' order by x")
+ .collect());
+ // the data read from the external table fact_daily should contain the data in
+ // directory 'ds=1/hr=1', 'ds=1/hr=2'
+ assertThat(results.toString()).isEqualTo("[+I[1, 1], +I[2, 1]]");
+
+ tableEnv.getConfig()
+ .set(
+ HiveOptions.TABLE_EXEC_HIVE_READ_PARTITION_WITH_SUBDIRECTORY_ENABLED.key(),
+ "false");
+ // should throw exception when disable reading sub-dirs in partition directory
+ assertThatThrownBy(
+ () ->
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select * from fact_daily WHERE ds='1'")
+ .collect()))
+ .satisfies(
+ anyCauseMatches(
+ String.format(
+ "Not a file: file:%s", warehouse + "/fact_tz/ds=1/hr=2")));
+ }
+
@Test
public void testView() throws Exception {
tableEnv.executeSql("create table tbl (x int,y string)");