You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/09/07 07:55:18 UTC
[hudi] branch master updated: [HUDI-4782] Support TIMESTAMP_LTZ type for flink (#6607)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8ffcb2fc94 [HUDI-4782] Support TIMESTAMP_LTZ type for flink (#6607)
8ffcb2fc94 is described below
commit 8ffcb2fc9470077bdcf3810756545d081fb6523c
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Wed Sep 7 15:55:12 2022 +0800
[HUDI-4782] Support TIMESTAMP_LTZ type for flink (#6607)
---
.../org/apache/hudi/util/AvroSchemaConverter.java | 31 +++++++++++++++---
.../apache/hudi/util/AvroToRowDataConverters.java | 3 ++
.../java/org/apache/hudi/util/DataTypeUtils.java | 13 +++++---
.../apache/hudi/util/RowDataToAvroConverters.java | 6 ++--
.../apache/hudi/table/ITTestHoodieDataSource.java | 32 ++++++++++++++++++
.../apache/hudi/utils/TestAvroSchemaConverter.java | 38 ++++++++++++++++++++++
.../table/format/cow/ParquetSplitReaderUtil.java | 6 +++-
.../table/format/cow/ParquetSplitReaderUtil.java | 6 +++-
.../table/format/cow/ParquetSplitReaderUtil.java | 6 +++-
9 files changed, 125 insertions(+), 16 deletions(-)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
index 050d1728f7..5f50607952 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.types.logical.MapType;
@@ -149,8 +150,12 @@ public class AvroSchemaConverter {
// logical timestamp type
if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
return DataTypes.TIMESTAMP(3).notNull();
+ } else if (schema.getLogicalType() == LogicalTypes.localTimestampMillis()) {
+ return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull();
} else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) {
return DataTypes.TIMESTAMP(6).notNull();
+ } else if (schema.getLogicalType() == LogicalTypes.localTimestampMicros()) {
+ return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6).notNull();
} else if (schema.getLogicalType() == LogicalTypes.timeMillis()) {
return DataTypes.TIME(3).notNull();
} else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
@@ -242,19 +247,36 @@ public class AvroSchemaConverter {
// use long to represents Timestamp
final TimestampType timestampType = (TimestampType) logicalType;
precision = timestampType.getPrecision();
- org.apache.avro.LogicalType avroLogicalType;
+ org.apache.avro.LogicalType timestampLogicalType;
if (precision <= 3) {
- avroLogicalType = LogicalTypes.timestampMillis();
+ timestampLogicalType = LogicalTypes.timestampMillis();
} else if (precision <= 6) {
- avroLogicalType = LogicalTypes.timestampMicros();
+ timestampLogicalType = LogicalTypes.timestampMicros();
} else {
throw new IllegalArgumentException(
"Avro does not support TIMESTAMP type with precision: "
+ precision
+ ", it only supports precision less than 6.");
}
- Schema timestamp = avroLogicalType.addToSchema(SchemaBuilder.builder().longType());
+ Schema timestamp = timestampLogicalType.addToSchema(SchemaBuilder.builder().longType());
return nullable ? nullableSchema(timestamp) : timestamp;
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ // use long to represents LocalZonedTimestampType
+ final LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) logicalType;
+ precision = localZonedTimestampType.getPrecision();
+ org.apache.avro.LogicalType localZonedTimestampLogicalType;
+ if (precision <= 3) {
+ localZonedTimestampLogicalType = LogicalTypes.localTimestampMillis();
+ } else if (precision <= 6) {
+ localZonedTimestampLogicalType = LogicalTypes.localTimestampMicros();
+ } else {
+ throw new IllegalArgumentException(
+ "Avro does not support LOCAL TIMESTAMP type with precision: "
+ + precision
+ + ", it only supports precision less than 6.");
+ }
+ Schema localZonedTimestamp = localZonedTimestampLogicalType.addToSchema(SchemaBuilder.builder().longType());
+ return nullable ? nullableSchema(localZonedTimestamp) : localZonedTimestamp;
case DATE:
// use int to represents Date
Schema date = LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
@@ -319,7 +341,6 @@ public class AvroSchemaConverter {
.items(convertToSchema(arrayType.getElementType(), rowName));
return nullable ? nullableSchema(array) : array;
case RAW:
- case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
default:
throw new UnsupportedOperationException(
"Unsupported to derive Schema for type: " + logicalType);
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
index 558bb41f90..5c9988dc0b 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
@@ -127,6 +128,8 @@ public class AvroToRowDataConverters {
return AvroToRowDataConverters::convertToDate;
case TIME_WITHOUT_TIME_ZONE:
return AvroToRowDataConverters::convertToTime;
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return createTimestampConverter(((LocalZonedTimestampType) type).getPrecision());
case TIMESTAMP_WITHOUT_TIME_ZONE:
return createTimestampConverter(((TimestampType) type).getPrecision());
case CHAR:
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java
index d63cd7689b..35737f8aea 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java
@@ -18,9 +18,8 @@
package org.apache.hudi.util;
-import org.apache.hudi.common.util.ValidationUtils;
-
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
@@ -47,9 +46,13 @@ public class DataTypeUtils {
* Returns the precision of the given TIMESTAMP type.
*/
public static int precision(LogicalType logicalType) {
- ValidationUtils.checkArgument(logicalType instanceof TimestampType);
- TimestampType timestampType = (TimestampType) logicalType;
- return timestampType.getPrecision();
+ if (logicalType instanceof TimestampType) {
+ return ((TimestampType) logicalType).getPrecision();
+ } else if (logicalType instanceof LocalZonedTimestampType) {
+ return ((LocalZonedTimestampType) logicalType).getPrecision();
+ } else {
+ throw new AssertionError("Unexpected type: " + logicalType);
+ }
}
/**
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
index 446a6d0417..ecebd1adcd 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
@@ -32,7 +32,6 @@ import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.TimestampType;
import java.io.Serializable;
import java.math.BigDecimal;
@@ -157,8 +156,9 @@ public class RowDataToAvroConverters {
}
};
break;
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
- final int precision = ((TimestampType) type).getPrecision();
+ final int precision = DataTypeUtils.precision(type);
if (precision <= 3) {
converter =
new RowDataToAvroConverter() {
@@ -231,7 +231,7 @@ public class RowDataToAvroConverters {
actualSchema = types.get(1);
} else {
throw new IllegalArgumentException(
- "The Avro schema is not a nullable type: " + schema.toString());
+ "The Avro schema is not a nullable type: " + schema);
}
} else {
actualSchema = schema;
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index e59a393fd3..998cd49fdd 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -56,6 +56,7 @@ import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
+import java.time.ZoneId;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -1438,6 +1439,37 @@ public class ITTestHoodieDataSource extends AbstractTestBase {
assertRowsEquals(result2, "[+I[3]]");
}
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class)
+ void testWriteReadWithLocalTimestamp(HoodieTableType tableType) {
+ TableEnvironment tableEnv = batchTableEnv;
+ tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
+ String createTable = sql("t1")
+ .field("f0 int")
+ .field("f1 varchar(10)")
+ .field("f2 TIMESTAMP_LTZ(3)")
+ .field("f4 TIMESTAMP_LTZ(6)")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.PRECOMBINE_FIELD, "f1")
+ .option(FlinkOptions.TABLE_TYPE, tableType)
+ .pkField("f0")
+ .noPartition()
+ .end();
+ tableEnv.executeSql(createTable);
+
+ String insertInto = "insert into t1 values\n"
+ + "(1, 'abc', TIMESTAMP '1970-01-01 08:00:01', TIMESTAMP '1970-01-01 08:00:02'),\n"
+ + "(2, 'def', TIMESTAMP '1970-01-01 08:00:03', TIMESTAMP '1970-01-01 08:00:04')";
+ execInsertSql(tableEnv, insertInto);
+
+ List<Row> result = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+ final String expected = "["
+ + "+I[1, abc, 1970-01-01T00:00:01Z, 1970-01-01T00:00:02Z], "
+ + "+I[2, def, 1970-01-01T00:00:03Z, 1970-01-01T00:00:04Z]]";
+ assertRowsEquals(result, expected);
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java
index e56541ed57..b297b627ba 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java
@@ -23,6 +23,7 @@ import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.avro.Schema;
+import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.junit.jupiter.api.Test;
@@ -50,4 +51,41 @@ public class TestAvroSchemaConverter {
+ "`isDeleted` BOOLEAN NOT NULL>";
assertThat(dataType.getChildren().get(pos).toString(), is(expected));
}
+
+ @Test
+ void testLocalTimestampType() {
+ DataType dataType = DataTypes.ROW(
+ DataTypes.FIELD("f_localtimestamp_millis", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)),
+ DataTypes.FIELD("f_localtimestamp_micros", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6))
+ );
+ // convert to avro schema
+ Schema schema = AvroSchemaConverter.convertToSchema(dataType.getLogicalType());
+ final String expectedSchema = ""
+ + "[ \"null\", {\n"
+ + " \"type\" : \"record\",\n"
+ + " \"name\" : \"record\",\n"
+ + " \"fields\" : [ {\n"
+ + " \"name\" : \"f_localtimestamp_millis\",\n"
+ + " \"type\" : [ \"null\", {\n"
+ + " \"type\" : \"long\",\n"
+ + " \"logicalType\" : \"local-timestamp-millis\"\n"
+ + " } ],\n"
+ + " \"default\" : null\n"
+ + " }, {\n"
+ + " \"name\" : \"f_localtimestamp_micros\",\n"
+ + " \"type\" : [ \"null\", {\n"
+ + " \"type\" : \"long\",\n"
+ + " \"logicalType\" : \"local-timestamp-micros\"\n"
+ + " } ],\n"
+ + " \"default\" : null\n"
+ + " } ]\n"
+ + "} ]";
+ assertThat(schema.toString(true), is(expectedSchema));
+ // convert it back
+ DataType convertedDataType = AvroSchemaConverter.convertToDataType(schema);
+ final String expectedDataType = "ROW<"
+ + "`f_localtimestamp_millis` TIMESTAMP_LTZ(3), "
+ + "`f_localtimestamp_micros` TIMESTAMP_LTZ(6)>";
+ assertThat(convertedDataType.toString(), is(expectedDataType));
+ }
}
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index aa63856040..75cf3272d6 100644
--- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++ b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -59,6 +59,7 @@ import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
@@ -334,7 +335,10 @@ public class ParquetSplitReaderUtil {
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
case INT64:
- return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, ((TimestampType) fieldType).getPrecision());
+ int precision = fieldType instanceof TimestampType
+ ? ((TimestampType) fieldType).getPrecision()
+ : ((LocalZonedTimestampType) fieldType).getPrecision();
+ return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, precision);
case INT96:
return new TimestampColumnReader(utcTimestamp, descriptor, pageReader);
default:
diff --git a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index c636b36100..dc59abe460 100644
--- a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++ b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -59,6 +59,7 @@ import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
@@ -334,7 +335,10 @@ public class ParquetSplitReaderUtil {
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
case INT64:
- return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, ((TimestampType) fieldType).getPrecision());
+ int precision = fieldType instanceof TimestampType
+ ? ((TimestampType) fieldType).getPrecision()
+ : ((LocalZonedTimestampType) fieldType).getPrecision();
+ return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, precision);
case INT96:
return new TimestampColumnReader(utcTimestamp, descriptor, pageReader);
default:
diff --git a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index ae79966e88..5eeb42514a 100644
--- a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++ b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -59,6 +59,7 @@ import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
@@ -334,7 +335,10 @@ public class ParquetSplitReaderUtil {
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
case INT64:
- return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, ((TimestampType) fieldType).getPrecision());
+ int precision = fieldType instanceof TimestampType
+ ? ((TimestampType) fieldType).getPrecision()
+ : ((LocalZonedTimestampType) fieldType).getPrecision();
+ return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, precision);
case INT96:
return new TimestampColumnReader(utcTimestamp, descriptor, pageReader);
default: