You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/04/07 07:17:46 UTC
[hudi] branch master updated: [HUDI-3808] Flink bulk_insert timestamp(3) can not be read by Spark (#5236)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e33149be9a [HUDI-3808] Flink bulk_insert timestamp(3) can not be read by Spark (#5236)
e33149be9a is described below
commit e33149be9a5b8718f3b97e335ae249db0cef4b2d
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Thu Apr 7 15:17:39 2022 +0800
[HUDI-3808] Flink bulk_insert timestamp(3) can not be read by Spark (#5236)
---
hudi-client/hudi-flink-client/pom.xml | 11 ++++++++++
.../row/parquet/ParquetSchemaConverter.java | 19 +++++++++-------
.../row/parquet/TestParquetSchemaConverter.java | 25 ++++++++++++++++++----
hudi-flink-datasource/hudi-flink/pom.xml | 2 +-
.../apache/hudi/configuration/FlinkOptions.java | 2 +-
packaging/hudi-flink-bundle/pom.xml | 4 ++--
pom.xml | 1 +
7 files changed, 48 insertions(+), 16 deletions(-)
diff --git a/hudi-client/hudi-flink-client/pom.xml b/hudi-client/hudi-flink-client/pom.xml
index a44bdf1b83..eb044312c4 100644
--- a/hudi-client/hudi-flink-client/pom.xml
+++ b/hudi-client/hudi-flink-client/pom.xml
@@ -29,6 +29,10 @@
<name>hudi-flink-client</name>
<packaging>jar</packaging>
+ <properties>
+ <parquet.version>${flink.format.parquet.version}</parquet.version>
+ </properties>
+
<dependencies>
<!-- Hudi -->
<dependency>
@@ -87,6 +91,13 @@
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
+ <version>${parquet.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-column</artifactId>
+ <version>${parquet.version}</version>
</dependency>
<!-- Hoodie - Test -->
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
index 5da45bf25d..66a39b54a9 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
@@ -34,6 +34,7 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
@@ -46,6 +47,8 @@ import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.List;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit;
+
/**
* Schema converter converts Parquet schema to and from Flink internal types.
*
@@ -436,7 +439,7 @@ public class ParquetSchemaConverter {
String.format(
"Can not convert Flink MapTypeInfo %s to Parquet"
+ " Map type as key has to be String",
- typeInfo.toString()));
+ typeInfo));
}
} else if (typeInfo instanceof ObjectArrayTypeInfo) {
ObjectArrayTypeInfo objectArrayTypeInfo = (ObjectArrayTypeInfo) typeInfo;
@@ -567,18 +570,16 @@ public class ParquetSchemaConverter {
int numBytes = computeMinBytesForDecimalPrecision(precision);
return Types.primitive(
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, repetition)
- .precision(precision)
- .scale(scale)
+ .as(LogicalTypeAnnotation.decimalType(scale, precision))
.length(numBytes)
- .as(OriginalType.DECIMAL)
.named(name);
case TINYINT:
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
- .as(OriginalType.INT_8)
+ .as(LogicalTypeAnnotation.intType(8, true))
.named(name);
case SMALLINT:
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
- .as(OriginalType.INT_16)
+ .as(LogicalTypeAnnotation.intType(16, true))
.named(name);
case INTEGER:
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
@@ -594,16 +595,17 @@ public class ParquetSchemaConverter {
.named(name);
case DATE:
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
- .as(OriginalType.DATE)
+ .as(LogicalTypeAnnotation.dateType())
.named(name);
case TIME_WITHOUT_TIME_ZONE:
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
- .as(OriginalType.TIME_MILLIS)
+ .as(LogicalTypeAnnotation.timeType(true, TimeUnit.MILLIS))
.named(name);
case TIMESTAMP_WITHOUT_TIME_ZONE:
TimestampType timestampType = (TimestampType) type;
if (timestampType.getPrecision() == 3) {
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
+ .as(LogicalTypeAnnotation.timestampType(true, TimeUnit.MILLIS))
.named(name);
} else {
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition)
@@ -613,6 +615,7 @@ public class ParquetSchemaConverter {
LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) type;
if (localZonedTimestampType.getPrecision() == 3) {
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
+ .as(LogicalTypeAnnotation.timestampType(false, TimeUnit.MILLIS))
.named(name);
} else {
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition)
diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
index 5305bcc8ab..a1a07a65f9 100644
--- a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
+++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
@@ -51,24 +51,41 @@ public class TestParquetSchemaConverter {
final String expected = "message converted {\n"
+ " optional group f_array (LIST) {\n"
+ " repeated group list {\n"
- + " optional binary element (UTF8);\n"
+ + " optional binary element (STRING);\n"
+ " }\n"
+ " }\n"
+ " optional group f_map (MAP) {\n"
+ " repeated group key_value {\n"
+ " optional int32 key;\n"
- + " optional binary value (UTF8);\n"
+ + " optional binary value (STRING);\n"
+ " }\n"
+ " }\n"
+ " optional group f_row {\n"
+ " optional int32 f_row_f0;\n"
- + " optional binary f_row_f1 (UTF8);\n"
+ + " optional binary f_row_f1 (STRING);\n"
+ " optional group f_row_f2 {\n"
+ " optional int32 f_row_f2_f0;\n"
- + " optional binary f_row_f2_f1 (UTF8);\n"
+ + " optional binary f_row_f2_f1 (STRING);\n"
+ " }\n"
+ " }\n"
+ "}\n";
assertThat(messageType.toString(), is(expected));
}
+
+ @Test
+ void testConvertTimestampTypes() {
+ DataType dataType = DataTypes.ROW(
+ DataTypes.FIELD("ts_3", DataTypes.TIMESTAMP(3)),
+ DataTypes.FIELD("ts_6", DataTypes.TIMESTAMP(6)),
+ DataTypes.FIELD("ts_9", DataTypes.TIMESTAMP(9)));
+ org.apache.parquet.schema.MessageType messageType =
+ ParquetSchemaConverter.convertToParquetMessageType("converted", (RowType) dataType.getLogicalType());
+ assertThat(messageType.getColumns().size(), is(3));
+ final String expected = "message converted {\n"
+ + " optional int64 ts_3 (TIMESTAMP(MILLIS,true));\n"
+ + " optional int96 ts_6;\n"
+ + " optional int96 ts_9;\n"
+ + "}\n";
+ assertThat(messageType.toString(), is(expected));
+ }
}
diff --git a/hudi-flink-datasource/hudi-flink/pom.xml b/hudi-flink-datasource/hudi-flink/pom.xml
index abc458c649..97288d19cd 100644
--- a/hudi-flink-datasource/hudi-flink/pom.xml
+++ b/hudi-flink-datasource/hudi-flink/pom.xml
@@ -32,7 +32,7 @@
<properties>
<main.basedir>${project.parent.parent.basedir}</main.basedir>
- <parquet.version>1.11.1</parquet.version>
+ <parquet.version>${flink.format.parquet.version}</parquet.version>
</properties>
<build>
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index b1e00875de..e2be7d364b 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -671,7 +671,7 @@ public class FlinkOptions extends HoodieConfig {
public static final ConfigOption<Boolean> HIVE_SYNC_SUPPORT_TIMESTAMP = ConfigOptions
.key("hive_sync.support_timestamp")
.booleanType()
- .defaultValue(false)
+ .defaultValue(true)
.withDescription("INT64 with original type TIMESTAMP_MICROS is converted to hive timestamp type.\n"
+ "Disabled by default for backward compatibility.");
diff --git a/packaging/hudi-flink-bundle/pom.xml b/packaging/hudi-flink-bundle/pom.xml
index 75b4629d0e..584c3871cd 100644
--- a/packaging/hudi-flink-bundle/pom.xml
+++ b/packaging/hudi-flink-bundle/pom.xml
@@ -34,8 +34,8 @@
<flink.bundle.hive.scope>provided</flink.bundle.hive.scope>
<flink.bundle.shade.prefix>org.apache.hudi.</flink.bundle.shade.prefix>
<javax.servlet.version>3.1.0</javax.servlet.version>
- <!-- override to be same with flink 1.12.2 -->
- <parquet.version>1.11.1</parquet.version>
+ <!-- override to be same with flink 1.15.x -->
+ <parquet.version>${flink.format.parquet.version}</parquet.version>
<hive.version>2.3.1</hive.version>
<thrift.version>0.9.3</thrift.version>
</properties>
diff --git a/pom.xml b/pom.xml
index 5d67a43dbd..29fb187d3d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -129,6 +129,7 @@
<flink.runtime.artifactId>flink-runtime</flink.runtime.artifactId>
<flink.table.runtime.artifactId>flink-table-runtime_${scala.binary.version}</flink.table.runtime.artifactId>
<flink.table.planner.artifactId>flink-table-planner_${scala.binary.version}</flink.table.planner.artifactId>
+ <flink.format.parquet.version>1.12.2</flink.format.parquet.version>
<spark31.version>3.1.3</spark31.version>
<spark32.version>3.2.1</spark32.version>
<hudi.spark.module>hudi-spark2</hudi.spark.module>