You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/08/09 16:04:26 UTC
[flink] 01/02: [FLINK-28797][hive] HiveSource enables vector reading for complex data type with parquet format
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 43eda5d3fe23de1aa95e73475dfb4792d36d4490
Author: luoyuxia <lu...@alumni.sjtu.edu.cn>
AuthorDate: Sun Aug 7 17:11:44 2022 +0800
[FLINK-28797][hive] HiveSource enables vector reading for complex data type with parquet format
---
.../connectors/hive/read/HiveInputFormat.java | 6 ++---
.../connectors/hive/HiveTableSourceITCase.java | 28 ++++++++++++++++++++++
2 files changed, 31 insertions(+), 3 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormat.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormat.java
index d88ba944c67..d6ae3656381 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormat.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormat.java
@@ -254,14 +254,14 @@ public class HiveInputFormat implements BulkFormat<RowData, HiveSourceSplit> {
case TIME_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ case ARRAY:
+ case MAP:
+ case ROW:
return false;
case TIMESTAMP_WITH_TIME_ZONE:
case INTERVAL_YEAR_MONTH:
case INTERVAL_DAY_TIME:
- case ARRAY:
case MULTISET:
- case MAP:
- case ROW:
case DISTINCT_TYPE:
case STRUCTURED_TYPE:
case NULL:
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
index 31ee7de2113..dcce413340b 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
@@ -45,6 +45,7 @@ import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.TableSourceFactory;
+import org.apache.flink.table.module.hive.HiveModule;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase;
@@ -156,6 +157,33 @@ public class HiveTableSourceITCase extends BatchAbstractTestBase {
assertThat(rows.get(0).getField(2)).isEqualTo(Row.of(struct[0], struct[1]));
}
+ @Test
+ public void testReadParquetComplexDataType() throws Exception {
+ batchTableEnv.executeSql(
+ "create table parquet_complex_type_test("
+ + "a array<int>, m map<int,string>, s struct<f1:int,f2:bigint>) stored as parquet");
+ String[] modules = batchTableEnv.listModules();
+ // load hive module so that we can use array,map, named_struct function
+ // for convenient writing complex data
+ batchTableEnv.loadModule("hive", new HiveModule());
+ String[] newModules = new String[modules.length + 1];
+ newModules[0] = "hive";
+ System.arraycopy(modules, 0, newModules, 1, modules.length);
+ batchTableEnv.useModules(newModules);
+
+ batchTableEnv
+ .executeSql(
+ "insert into parquet_complex_type_test"
+ + " select array(1, 2), map(1, 'val1', 2, 'val2'),"
+ + " named_struct('f1', 1, 'f2', 2)")
+ .await();
+
+ Table src = batchTableEnv.sqlQuery("select * from parquet_complex_type_test");
+ List<Row> rows = CollectionUtil.iteratorToList(src.execute().collect());
+ assertThat(rows.toString()).isEqualTo("[+I[[1, 2], {1=val1, 2=val2}, +I[1, 2]]]");
+ batchTableEnv.unloadModule("hive");
+ }
+
/**
* Test to read from partition table.
*