You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/08/09 16:04:26 UTC

[flink] 01/02: [FLINK-28797][hive] HiveSource enables vector reading for complex data type with parquet format

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 43eda5d3fe23de1aa95e73475dfb4792d36d4490
Author: luoyuxia <lu...@alumni.sjtu.edu.cn>
AuthorDate: Sun Aug 7 17:11:44 2022 +0800

    [FLINK-28797][hive] HiveSource enables vector reading for complex data type with parquet format
---
 .../connectors/hive/read/HiveInputFormat.java      |  6 ++---
 .../connectors/hive/HiveTableSourceITCase.java     | 28 ++++++++++++++++++++++
 2 files changed, 31 insertions(+), 3 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormat.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormat.java
index d88ba944c67..d6ae3656381 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormat.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormat.java
@@ -254,14 +254,14 @@ public class HiveInputFormat implements BulkFormat<RowData, HiveSourceSplit> {
             case TIME_WITHOUT_TIME_ZONE:
             case TIMESTAMP_WITHOUT_TIME_ZONE:
             case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+            case ARRAY:
+            case MAP:
+            case ROW:
                 return false;
             case TIMESTAMP_WITH_TIME_ZONE:
             case INTERVAL_YEAR_MONTH:
             case INTERVAL_DAY_TIME:
-            case ARRAY:
             case MULTISET:
-            case MAP:
-            case ROW:
             case DISTINCT_TYPE:
             case STRUCTURED_TYPE:
             case NULL:
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
index 31ee7de2113..dcce413340b 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
@@ -45,6 +45,7 @@ import org.apache.flink.table.connector.ProviderContext;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.TableSourceFactory;
+import org.apache.flink.table.module.hive.HiveModule;
 import org.apache.flink.table.planner.delegation.PlannerBase;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase;
@@ -156,6 +157,33 @@ public class HiveTableSourceITCase extends BatchAbstractTestBase {
         assertThat(rows.get(0).getField(2)).isEqualTo(Row.of(struct[0], struct[1]));
     }
 
+    @Test
+    public void testReadParquetComplexDataType() throws Exception {
+        batchTableEnv.executeSql(
+                "create table parquet_complex_type_test("
+                        + "a array<int>, m map<int,string>, s struct<f1:int,f2:bigint>) stored as parquet");
+        String[] modules = batchTableEnv.listModules();
+        // load hive module so that we can use array,map, named_struct function
+        // for convenient writing complex data
+        batchTableEnv.loadModule("hive", new HiveModule());
+        String[] newModules = new String[modules.length + 1];
+        newModules[0] = "hive";
+        System.arraycopy(modules, 0, newModules, 1, modules.length);
+        batchTableEnv.useModules(newModules);
+
+        batchTableEnv
+                .executeSql(
+                        "insert into parquet_complex_type_test"
+                                + " select array(1, 2), map(1, 'val1', 2, 'val2'),"
+                                + " named_struct('f1', 1,  'f2', 2)")
+                .await();
+
+        Table src = batchTableEnv.sqlQuery("select * from parquet_complex_type_test");
+        List<Row> rows = CollectionUtil.iteratorToList(src.execute().collect());
+        assertThat(rows.toString()).isEqualTo("[+I[[1, 2], {1=val1, 2=val2}, +I[1, 2]]]");
+        batchTableEnv.unloadModule("hive");
+    }
+
     /**
      * Test to read from partition table.
      *