You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/30 11:40:11 UTC

[incubator-inlong] branch master updated: [INLONG-3457][Sort] Exclude partition fields when writing data with Parquet or Orc format to Hive (#3458)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4fa2a09  [INLONG-3457][Sort] Exclude partition fields when writing data with Parquet or Orc format to Hive (#3458)
4fa2a09 is described below

commit 4fa2a09af950dd916981453d665ba427e1a60f28
Author: TianqiWan <52...@users.noreply.github.com>
AuthorDate: Wed Mar 30 19:40:06 2022 +0800

    [INLONG-3457][Sort] Exclude partition fields when writing data with Parquet or Orc format to Hive (#3458)
    
    Co-authored-by: tianqiwan <ti...@tencent.com>
---
 .../inlong/sort/flink/hive/HiveSinkHelper.java     | 45 ++++++----
 .../inlong/sort/flink/hive/HiveSinkHelperTest.java | 96 ++++++++++++++++++++++
 .../flink/hive/formats/orc/OrcBulkWriterTest.java  |  2 -
 .../formats/parquet/ParquetBulkWriterTest.java     |  2 -
 4 files changed, 123 insertions(+), 22 deletions(-)

diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/HiveSinkHelper.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/HiveSinkHelper.java
index 7c7e6f9..05f8239 100644
--- a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/HiveSinkHelper.java
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/HiveSinkHelper.java
@@ -17,8 +17,10 @@
 
 package org.apache.inlong.sort.flink.hive;
 
-import java.util.ArrayList;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
@@ -31,6 +33,7 @@ import org.apache.inlong.sort.formats.base.TableFormatUtils;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.sink.HiveSinkInfo;
 import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HiveFileFormat;
+import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HivePartitionInfo;
 import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.OrcFileFormat;
 import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.ParquetFileFormat;
 import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.TextFileFormat;
@@ -43,8 +46,9 @@ public class HiveSinkHelper {
     public static BulkWriter.Factory<Row> createBulkWriterFactory(
             HiveSinkInfo hiveSinkInfo,
             Configuration config) {
-        String[] fieldNames = getFieldNames(hiveSinkInfo).toArray(new String[0]);
-        LogicalType[] fieldTypes = getFieldLogicalTypes(hiveSinkInfo).toArray(new LogicalType[0]);
+        final FieldInfo[] fieldInfos = getNonPartitionFields(hiveSinkInfo);
+        String[] fieldNames = getFieldNames(fieldInfos);
+        LogicalType[] fieldTypes = getFieldLogicalTypes(fieldInfos);
         RowType rowType = RowType.of(fieldTypes, fieldNames);
         HiveFileFormat hiveFileFormat = hiveSinkInfo.getHiveFileFormat();
         if (hiveFileFormat instanceof ParquetFileFormat) {
@@ -59,23 +63,28 @@ public class HiveSinkHelper {
         }
     }
 
-    private static List<String> getFieldNames(HiveSinkInfo hiveSinkInfo) {
-        FieldInfo[] fieldInfos = hiveSinkInfo.getFields();
-        List<String> fieldNames = new ArrayList<>();
-        for (FieldInfo fieldInfo : fieldInfos) {
-            fieldNames.add(fieldInfo.getName());
-        }
-
-        return fieldNames;
+    @VisibleForTesting
+    static FieldInfo[] getNonPartitionFields(HiveSinkInfo hiveSinkInfo) {
+        final List<String> partitionFields =
+                Arrays.stream(hiveSinkInfo.getPartitions())
+                        .map(HivePartitionInfo::getFieldName)
+                        .collect(Collectors.toList());
+        return Arrays.stream(hiveSinkInfo.getFields())
+                .filter(fieldInfo -> !partitionFields.contains(fieldInfo.getName()))
+                .toArray(FieldInfo[]::new);
     }
 
-    private static List<LogicalType> getFieldLogicalTypes(HiveSinkInfo hiveSinkInfo) {
-        FieldInfo[] fieldInfos = hiveSinkInfo.getFields();
-        List<LogicalType> fieldLogicalTypes = new ArrayList<>();
-        for (FieldInfo fieldInfo : fieldInfos) {
-            fieldLogicalTypes.add(TableFormatUtils.deriveLogicalType(fieldInfo.getFormatInfo()));
-        }
+    @VisibleForTesting
+    static String[] getFieldNames(FieldInfo[] fieldInfos) {
+        return Arrays.stream(fieldInfos)
+                .map(FieldInfo::getName)
+                .toArray(String[]::new);
+    }
 
-        return fieldLogicalTypes;
+    @VisibleForTesting
+    static LogicalType[] getFieldLogicalTypes(FieldInfo[] fieldInfos) {
+        return Arrays.stream(fieldInfos)
+                .map(fieldInfo -> TableFormatUtils.deriveLogicalType(fieldInfo.getFormatInfo()))
+                .toArray(LogicalType[]::new);
     }
 }
diff --git a/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/HiveSinkHelperTest.java b/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/HiveSinkHelperTest.java
new file mode 100644
index 0000000..68b71cb
--- /dev/null
+++ b/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/HiveSinkHelperTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.hive;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimeFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.sink.HiveSinkInfo;
+import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HiveFieldPartitionInfo;
+import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HivePartitionInfo;
+import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.TextFileFormat;
+import org.junit.Test;
+
+public class HiveSinkHelperTest {
+
+    @Test
+    public void testGetNonPartitionFields() {
+        final FieldInfo[] fieldInfos = new FieldInfo[]{
+                new FieldInfo("string", StringFormatInfo.INSTANCE),
+                new FieldInfo("int", IntFormatInfo.INSTANCE),
+                new FieldInfo("timestamp", new TimeFormatInfo("MILLIS"))
+        };
+
+        final HivePartitionInfo[] partitionInfos = new HivePartitionInfo[]{
+                new HiveFieldPartitionInfo("string"),
+                new HiveFieldPartitionInfo("int")
+        };
+
+        final HiveSinkInfo hiveSinkInfo = new HiveSinkInfo(
+                fieldInfos,
+                "jdbc",
+                "db",
+                "table",
+                "user",
+                "password",
+                "path",
+                partitionInfos,
+                new TextFileFormat(','));
+
+        final FieldInfo[] nonPartitionFieldInfos = HiveSinkHelper.getNonPartitionFields(hiveSinkInfo);
+        assertEquals(1, nonPartitionFieldInfos.length);
+        assertEquals("timestamp", nonPartitionFieldInfos[0].getName());
+        assertTrue(nonPartitionFieldInfos[0].getFormatInfo() instanceof TimeFormatInfo);
+    }
+
+    @Test
+    public void testGetFieldNames() {
+        final FieldInfo[] fieldInfos = new FieldInfo[]{
+                new FieldInfo("string", StringFormatInfo.INSTANCE),
+                new FieldInfo("int", IntFormatInfo.INSTANCE),
+                new FieldInfo("timestamp", new TimeFormatInfo("MILLIS"))
+        };
+        final String[] fieldNames = HiveSinkHelper.getFieldNames(fieldInfos);
+        assertEquals(3, fieldNames.length);
+        assertEquals("string", fieldNames[0]);
+        assertEquals("int", fieldNames[1]);
+        assertEquals("timestamp", fieldNames[2]);
+    }
+
+    @Test
+    public void testGetFieldLogicalTypes() {
+        final FieldInfo[] fieldInfos = new FieldInfo[]{
+                new FieldInfo("string", StringFormatInfo.INSTANCE),
+                new FieldInfo("int", IntFormatInfo.INSTANCE),
+                new FieldInfo("timestamp", new TimeFormatInfo("MILLIS"))
+        };
+        final LogicalType[] fieldTypes = HiveSinkHelper.getFieldLogicalTypes(fieldInfos);
+        assertEquals(3, fieldTypes.length);
+        assertTrue(fieldTypes[0] instanceof VarCharType);
+        assertTrue(fieldTypes[1] instanceof IntType);
+        assertTrue(fieldTypes[2] instanceof TimeType);
+    }
+}
diff --git a/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/formats/orc/OrcBulkWriterTest.java b/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/formats/orc/OrcBulkWriterTest.java
index f35ba69..c4bff05 100644
--- a/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/formats/orc/OrcBulkWriterTest.java
+++ b/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/formats/orc/OrcBulkWriterTest.java
@@ -38,7 +38,6 @@ import org.apache.inlong.sort.formats.common.IntFormatInfo;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.sink.HiveSinkInfo;
-import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HiveFieldPartitionInfo;
 import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HivePartitionInfo;
 import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.OrcFileFormat;
 import org.apache.inlong.sort.util.TestLogger;
@@ -159,7 +158,6 @@ public class OrcBulkWriterTest extends TestLogger {
                 "testPassword",
                 "/path",
                 new HivePartitionInfo[]{
-                        new HiveFieldPartitionInfo("f3"),
                 },
                 new OrcFileFormat(64));
     }
diff --git a/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetBulkWriterTest.java b/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetBulkWriterTest.java
index 4834ff8..8f82cb3 100644
--- a/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetBulkWriterTest.java
+++ b/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetBulkWriterTest.java
@@ -56,7 +56,6 @@ import org.apache.inlong.sort.formats.common.TimeFormatInfo;
 import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.sink.HiveSinkInfo;
-import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HiveFieldPartitionInfo;
 import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HivePartitionInfo;
 import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.ParquetFileFormat;
 import org.apache.parquet.example.data.Group;
@@ -168,7 +167,6 @@ public class ParquetBulkWriterTest {
                 "testPassword",
                 "/path",
                 new HivePartitionInfo[] {
-                        new HiveFieldPartitionInfo("f13"),
                 },
                 new ParquetFileFormat()
         );