You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/04/07 07:17:46 UTC

[hudi] branch master updated: [HUDI-3808] Flink bulk_insert timestamp(3) can not be read by Spark (#5236)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e33149be9a [HUDI-3808] Flink bulk_insert timestamp(3) can not be read by Spark (#5236)
e33149be9a is described below

commit e33149be9a5b8718f3b97e335ae249db0cef4b2d
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Thu Apr 7 15:17:39 2022 +0800

    [HUDI-3808] Flink bulk_insert timestamp(3) can not be read by Spark (#5236)
---
 hudi-client/hudi-flink-client/pom.xml              | 11 ++++++++++
 .../row/parquet/ParquetSchemaConverter.java        | 19 +++++++++-------
 .../row/parquet/TestParquetSchemaConverter.java    | 25 ++++++++++++++++++----
 hudi-flink-datasource/hudi-flink/pom.xml           |  2 +-
 .../apache/hudi/configuration/FlinkOptions.java    |  2 +-
 packaging/hudi-flink-bundle/pom.xml                |  4 ++--
 pom.xml                                            |  1 +
 7 files changed, 48 insertions(+), 16 deletions(-)

diff --git a/hudi-client/hudi-flink-client/pom.xml b/hudi-client/hudi-flink-client/pom.xml
index a44bdf1b83..eb044312c4 100644
--- a/hudi-client/hudi-flink-client/pom.xml
+++ b/hudi-client/hudi-flink-client/pom.xml
@@ -29,6 +29,10 @@
   <name>hudi-flink-client</name>
   <packaging>jar</packaging>
 
+  <properties>
+    <parquet.version>${flink.format.parquet.version}</parquet.version>
+  </properties>
+
   <dependencies>
     <!-- Hudi  -->
     <dependency>
@@ -87,6 +91,13 @@
     <dependency>
       <groupId>org.apache.parquet</groupId>
       <artifactId>parquet-avro</artifactId>
+      <version>${parquet.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.parquet</groupId>
+      <artifactId>parquet-column</artifactId>
+      <version>${parquet.version}</version>
     </dependency>
 
     <!-- Hoodie - Test -->
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
index 5da45bf25d..66a39b54a9 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
@@ -34,6 +34,7 @@ import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
@@ -46,6 +47,8 @@ import java.lang.reflect.Array;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit;
+
 /**
  * Schema converter converts Parquet schema to and from Flink internal types.
  *
@@ -436,7 +439,7 @@ public class ParquetSchemaConverter {
             String.format(
                 "Can not convert Flink MapTypeInfo %s to Parquet"
                     + " Map type as key has to be String",
-                typeInfo.toString()));
+                typeInfo));
       }
     } else if (typeInfo instanceof ObjectArrayTypeInfo) {
       ObjectArrayTypeInfo objectArrayTypeInfo = (ObjectArrayTypeInfo) typeInfo;
@@ -567,18 +570,16 @@ public class ParquetSchemaConverter {
         int numBytes = computeMinBytesForDecimalPrecision(precision);
         return Types.primitive(
             PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, repetition)
-            .precision(precision)
-            .scale(scale)
+            .as(LogicalTypeAnnotation.decimalType(scale, precision))
             .length(numBytes)
-            .as(OriginalType.DECIMAL)
             .named(name);
       case TINYINT:
         return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
-            .as(OriginalType.INT_8)
+            .as(LogicalTypeAnnotation.intType(8, true))
             .named(name);
       case SMALLINT:
         return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
-            .as(OriginalType.INT_16)
+            .as(LogicalTypeAnnotation.intType(16, true))
             .named(name);
       case INTEGER:
         return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
@@ -594,16 +595,17 @@ public class ParquetSchemaConverter {
             .named(name);
       case DATE:
         return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
-            .as(OriginalType.DATE)
+            .as(LogicalTypeAnnotation.dateType())
             .named(name);
       case TIME_WITHOUT_TIME_ZONE:
         return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
-            .as(OriginalType.TIME_MILLIS)
+            .as(LogicalTypeAnnotation.timeType(true, TimeUnit.MILLIS))
             .named(name);
       case TIMESTAMP_WITHOUT_TIME_ZONE:
         TimestampType timestampType = (TimestampType) type;
         if (timestampType.getPrecision() == 3) {
           return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
+              .as(LogicalTypeAnnotation.timestampType(true, TimeUnit.MILLIS))
               .named(name);
         } else {
           return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition)
@@ -613,6 +615,7 @@ public class ParquetSchemaConverter {
         LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) type;
         if (localZonedTimestampType.getPrecision() == 3) {
           return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
+              .as(LogicalTypeAnnotation.timestampType(false, TimeUnit.MILLIS))
               .named(name);
         } else {
           return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition)
diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
index 5305bcc8ab..a1a07a65f9 100644
--- a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
+++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
@@ -51,24 +51,41 @@ public class TestParquetSchemaConverter {
     final String expected = "message converted {\n"
         + "  optional group f_array (LIST) {\n"
         + "    repeated group list {\n"
-        + "      optional binary element (UTF8);\n"
+        + "      optional binary element (STRING);\n"
         + "    }\n"
         + "  }\n"
         + "  optional group f_map (MAP) {\n"
         + "    repeated group key_value {\n"
         + "      optional int32 key;\n"
-        + "      optional binary value (UTF8);\n"
+        + "      optional binary value (STRING);\n"
         + "    }\n"
         + "  }\n"
         + "  optional group f_row {\n"
         + "    optional int32 f_row_f0;\n"
-        + "    optional binary f_row_f1 (UTF8);\n"
+        + "    optional binary f_row_f1 (STRING);\n"
         + "    optional group f_row_f2 {\n"
         + "      optional int32 f_row_f2_f0;\n"
-        + "      optional binary f_row_f2_f1 (UTF8);\n"
+        + "      optional binary f_row_f2_f1 (STRING);\n"
         + "    }\n"
         + "  }\n"
         + "}\n";
     assertThat(messageType.toString(), is(expected));
   }
+
+  @Test
+  void testConvertTimestampTypes() {
+    DataType dataType = DataTypes.ROW(
+        DataTypes.FIELD("ts_3", DataTypes.TIMESTAMP(3)),
+        DataTypes.FIELD("ts_6", DataTypes.TIMESTAMP(6)),
+        DataTypes.FIELD("ts_9", DataTypes.TIMESTAMP(9)));
+    org.apache.parquet.schema.MessageType messageType =
+        ParquetSchemaConverter.convertToParquetMessageType("converted", (RowType) dataType.getLogicalType());
+    assertThat(messageType.getColumns().size(), is(3));
+    final String expected = "message converted {\n"
+        + "  optional int64 ts_3 (TIMESTAMP(MILLIS,true));\n"
+        + "  optional int96 ts_6;\n"
+        + "  optional int96 ts_9;\n"
+        + "}\n";
+    assertThat(messageType.toString(), is(expected));
+  }
 }
diff --git a/hudi-flink-datasource/hudi-flink/pom.xml b/hudi-flink-datasource/hudi-flink/pom.xml
index abc458c649..97288d19cd 100644
--- a/hudi-flink-datasource/hudi-flink/pom.xml
+++ b/hudi-flink-datasource/hudi-flink/pom.xml
@@ -32,7 +32,7 @@
 
     <properties>
         <main.basedir>${project.parent.parent.basedir}</main.basedir>
-        <parquet.version>1.11.1</parquet.version>
+        <parquet.version>${flink.format.parquet.version}</parquet.version>
     </properties>
 
     <build>
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index b1e00875de..e2be7d364b 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -671,7 +671,7 @@ public class FlinkOptions extends HoodieConfig {
   public static final ConfigOption<Boolean> HIVE_SYNC_SUPPORT_TIMESTAMP = ConfigOptions
       .key("hive_sync.support_timestamp")
       .booleanType()
-      .defaultValue(false)
+      .defaultValue(true)
       .withDescription("INT64 with original type TIMESTAMP_MICROS is converted to hive timestamp type.\n"
           + "Disabled by default for backward compatibility.");
 
diff --git a/packaging/hudi-flink-bundle/pom.xml b/packaging/hudi-flink-bundle/pom.xml
index 75b4629d0e..584c3871cd 100644
--- a/packaging/hudi-flink-bundle/pom.xml
+++ b/packaging/hudi-flink-bundle/pom.xml
@@ -34,8 +34,8 @@
     <flink.bundle.hive.scope>provided</flink.bundle.hive.scope>
     <flink.bundle.shade.prefix>org.apache.hudi.</flink.bundle.shade.prefix>
     <javax.servlet.version>3.1.0</javax.servlet.version>
-    <!-- override to be same with flink 1.12.2 -->
-    <parquet.version>1.11.1</parquet.version>
+    <!-- override to be same with flink 1.15.x -->
+    <parquet.version>${flink.format.parquet.version}</parquet.version>
     <hive.version>2.3.1</hive.version>
     <thrift.version>0.9.3</thrift.version>
   </properties>
diff --git a/pom.xml b/pom.xml
index 5d67a43dbd..29fb187d3d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -129,6 +129,7 @@
     <flink.runtime.artifactId>flink-runtime</flink.runtime.artifactId>
     <flink.table.runtime.artifactId>flink-table-runtime_${scala.binary.version}</flink.table.runtime.artifactId>
     <flink.table.planner.artifactId>flink-table-planner_${scala.binary.version}</flink.table.planner.artifactId>
+    <flink.format.parquet.version>1.12.2</flink.format.parquet.version>
     <spark31.version>3.1.3</spark31.version>
     <spark32.version>3.2.1</spark32.version>
     <hudi.spark.module>hudi-spark2</hudi.spark.module>