You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/09/07 07:55:18 UTC

[hudi] branch master updated: [HUDI-4782] Support TIMESTAMP_LTZ type for flink (#6607)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ffcb2fc94 [HUDI-4782] Support TIMESTAMP_LTZ type for flink (#6607)
8ffcb2fc94 is described below

commit 8ffcb2fc9470077bdcf3810756545d081fb6523c
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Wed Sep 7 15:55:12 2022 +0800

    [HUDI-4782] Support TIMESTAMP_LTZ type for flink (#6607)
---
 .../org/apache/hudi/util/AvroSchemaConverter.java  | 31 +++++++++++++++---
 .../apache/hudi/util/AvroToRowDataConverters.java  |  3 ++
 .../java/org/apache/hudi/util/DataTypeUtils.java   | 13 +++++---
 .../apache/hudi/util/RowDataToAvroConverters.java  |  6 ++--
 .../apache/hudi/table/ITTestHoodieDataSource.java  | 32 ++++++++++++++++++
 .../apache/hudi/utils/TestAvroSchemaConverter.java | 38 ++++++++++++++++++++++
 .../table/format/cow/ParquetSplitReaderUtil.java   |  6 +++-
 .../table/format/cow/ParquetSplitReaderUtil.java   |  6 +++-
 .../table/format/cow/ParquetSplitReaderUtil.java   |  6 +++-
 9 files changed, 125 insertions(+), 16 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
index 050d1728f7..5f50607952 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.MapType;
@@ -149,8 +150,12 @@ public class AvroSchemaConverter {
         // logical timestamp type
         if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
           return DataTypes.TIMESTAMP(3).notNull();
+        } else if (schema.getLogicalType() == LogicalTypes.localTimestampMillis()) {
+          return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull();
         } else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) {
           return DataTypes.TIMESTAMP(6).notNull();
+        } else if (schema.getLogicalType() == LogicalTypes.localTimestampMicros()) {
+          return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6).notNull();
         } else if (schema.getLogicalType() == LogicalTypes.timeMillis()) {
           return DataTypes.TIME(3).notNull();
         } else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
@@ -242,19 +247,36 @@ public class AvroSchemaConverter {
         // use long to represents Timestamp
         final TimestampType timestampType = (TimestampType) logicalType;
         precision = timestampType.getPrecision();
-        org.apache.avro.LogicalType avroLogicalType;
+        org.apache.avro.LogicalType timestampLogicalType;
         if (precision <= 3) {
-          avroLogicalType = LogicalTypes.timestampMillis();
+          timestampLogicalType = LogicalTypes.timestampMillis();
         } else if (precision <= 6) {
-          avroLogicalType = LogicalTypes.timestampMicros();
+          timestampLogicalType = LogicalTypes.timestampMicros();
         } else {
           throw new IllegalArgumentException(
               "Avro does not support TIMESTAMP type with precision: "
                   + precision
                   + ", it only supports precision less than 6.");
         }
-        Schema timestamp = avroLogicalType.addToSchema(SchemaBuilder.builder().longType());
+        Schema timestamp = timestampLogicalType.addToSchema(SchemaBuilder.builder().longType());
         return nullable ? nullableSchema(timestamp) : timestamp;
+      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+        // use long to represents LocalZonedTimestampType
+        final LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) logicalType;
+        precision = localZonedTimestampType.getPrecision();
+        org.apache.avro.LogicalType localZonedTimestampLogicalType;
+        if (precision <= 3) {
+          localZonedTimestampLogicalType = LogicalTypes.localTimestampMillis();
+        } else if (precision <= 6) {
+          localZonedTimestampLogicalType = LogicalTypes.localTimestampMicros();
+        } else {
+          throw new IllegalArgumentException(
+              "Avro does not support LOCAL TIMESTAMP type with precision: "
+                  + precision
+                  + ", it only supports precision less than 6.");
+        }
+        Schema localZonedTimestamp = localZonedTimestampLogicalType.addToSchema(SchemaBuilder.builder().longType());
+        return nullable ? nullableSchema(localZonedTimestamp) : localZonedTimestamp;
       case DATE:
         // use int to represents Date
         Schema date = LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
@@ -319,7 +341,6 @@ public class AvroSchemaConverter {
                 .items(convertToSchema(arrayType.getElementType(), rowName));
         return nullable ? nullableSchema(array) : array;
       case RAW:
-      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
       default:
         throw new UnsupportedOperationException(
             "Unsupported to derive Schema for type: " + logicalType);
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
index 558bb41f90..5c9988dc0b 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
@@ -127,6 +128,8 @@ public class AvroToRowDataConverters {
         return AvroToRowDataConverters::convertToDate;
       case TIME_WITHOUT_TIME_ZONE:
         return AvroToRowDataConverters::convertToTime;
+      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+        return createTimestampConverter(((LocalZonedTimestampType) type).getPrecision());
       case TIMESTAMP_WITHOUT_TIME_ZONE:
         return createTimestampConverter(((TimestampType) type).getPrecision());
       case CHAR:
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java
index d63cd7689b..35737f8aea 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java
@@ -18,9 +18,8 @@
 
 package org.apache.hudi.util;
 
-import org.apache.hudi.common.util.ValidationUtils;
-
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
@@ -47,9 +46,13 @@ public class DataTypeUtils {
    * Returns the precision of the given TIMESTAMP type.
    */
   public static int precision(LogicalType logicalType) {
-    ValidationUtils.checkArgument(logicalType instanceof TimestampType);
-    TimestampType timestampType = (TimestampType) logicalType;
-    return timestampType.getPrecision();
+    if (logicalType instanceof TimestampType) {
+      return ((TimestampType) logicalType).getPrecision();
+    } else if (logicalType instanceof LocalZonedTimestampType) {
+      return ((LocalZonedTimestampType) logicalType).getPrecision();
+    } else {
+      throw new AssertionError("Unexpected type: " + logicalType);
+    }
   }
 
   /**
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
index 446a6d0417..ecebd1adcd 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
@@ -32,7 +32,6 @@ import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.TimestampType;
 
 import java.io.Serializable;
 import java.math.BigDecimal;
@@ -157,8 +156,9 @@ public class RowDataToAvroConverters {
               }
             };
         break;
+      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
       case TIMESTAMP_WITHOUT_TIME_ZONE:
-        final int precision = ((TimestampType) type).getPrecision();
+        final int precision = DataTypeUtils.precision(type);
         if (precision <= 3) {
           converter =
               new RowDataToAvroConverter() {
@@ -231,7 +231,7 @@ public class RowDataToAvroConverters {
             actualSchema = types.get(1);
           } else {
             throw new IllegalArgumentException(
-                "The Avro schema is not a nullable type: " + schema.toString());
+                "The Avro schema is not a nullable type: " + schema);
           }
         } else {
           actualSchema = schema;
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index e59a393fd3..998cd49fdd 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -56,6 +56,7 @@ import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
+import java.time.ZoneId;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -1438,6 +1439,37 @@ public class ITTestHoodieDataSource extends AbstractTestBase {
     assertRowsEquals(result2, "[+I[3]]");
   }
 
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableType.class)
+  void testWriteReadWithLocalTimestamp(HoodieTableType tableType) {
+    TableEnvironment tableEnv = batchTableEnv;
+    tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
+    String createTable = sql("t1")
+        .field("f0 int")
+        .field("f1 varchar(10)")
+        .field("f2 TIMESTAMP_LTZ(3)")
+        .field("f4 TIMESTAMP_LTZ(6)")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.PRECOMBINE_FIELD, "f1")
+        .option(FlinkOptions.TABLE_TYPE, tableType)
+        .pkField("f0")
+        .noPartition()
+        .end();
+    tableEnv.executeSql(createTable);
+
+    String insertInto = "insert into t1 values\n"
+        + "(1, 'abc', TIMESTAMP '1970-01-01 08:00:01', TIMESTAMP '1970-01-01 08:00:02'),\n"
+        + "(2, 'def', TIMESTAMP '1970-01-01 08:00:03', TIMESTAMP '1970-01-01 08:00:04')";
+    execInsertSql(tableEnv, insertInto);
+
+    List<Row> result = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+    final String expected = "["
+        + "+I[1, abc, 1970-01-01T00:00:01Z, 1970-01-01T00:00:02Z], "
+        + "+I[2, def, 1970-01-01T00:00:03Z, 1970-01-01T00:00:04Z]]";
+    assertRowsEquals(result, expected);
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java
index e56541ed57..b297b627ba 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java
@@ -23,6 +23,7 @@ import org.apache.hudi.metadata.HoodieMetadataPayload;
 import org.apache.hudi.util.AvroSchemaConverter;
 
 import org.apache.avro.Schema;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.types.DataType;
 import org.junit.jupiter.api.Test;
 
@@ -50,4 +51,41 @@ public class TestAvroSchemaConverter {
         + "`isDeleted` BOOLEAN NOT NULL>";
     assertThat(dataType.getChildren().get(pos).toString(), is(expected));
   }
+
+  @Test
+  void testLocalTimestampType() {
+    DataType dataType = DataTypes.ROW(
+        DataTypes.FIELD("f_localtimestamp_millis", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)),
+        DataTypes.FIELD("f_localtimestamp_micros", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6))
+    );
+    // convert to avro schema
+    Schema schema = AvroSchemaConverter.convertToSchema(dataType.getLogicalType());
+    final String expectedSchema = ""
+        + "[ \"null\", {\n"
+        + "  \"type\" : \"record\",\n"
+        + "  \"name\" : \"record\",\n"
+        + "  \"fields\" : [ {\n"
+        + "    \"name\" : \"f_localtimestamp_millis\",\n"
+        + "    \"type\" : [ \"null\", {\n"
+        + "      \"type\" : \"long\",\n"
+        + "      \"logicalType\" : \"local-timestamp-millis\"\n"
+        + "    } ],\n"
+        + "    \"default\" : null\n"
+        + "  }, {\n"
+        + "    \"name\" : \"f_localtimestamp_micros\",\n"
+        + "    \"type\" : [ \"null\", {\n"
+        + "      \"type\" : \"long\",\n"
+        + "      \"logicalType\" : \"local-timestamp-micros\"\n"
+        + "    } ],\n"
+        + "    \"default\" : null\n"
+        + "  } ]\n"
+        + "} ]";
+    assertThat(schema.toString(true), is(expectedSchema));
+    // convert it back
+    DataType convertedDataType = AvroSchemaConverter.convertToDataType(schema);
+    final String expectedDataType = "ROW<"
+        + "`f_localtimestamp_millis` TIMESTAMP_LTZ(3), "
+        + "`f_localtimestamp_micros` TIMESTAMP_LTZ(6)>";
+    assertThat(convertedDataType.toString(), is(expectedDataType));
+  }
 }
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index aa63856040..75cf3272d6 100644
--- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++ b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -59,6 +59,7 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.RowType;
@@ -334,7 +335,10 @@ public class ParquetSplitReaderUtil {
       case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
         switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
           case INT64:
-            return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, ((TimestampType) fieldType).getPrecision());
+            int precision = fieldType instanceof TimestampType
+                ? ((TimestampType) fieldType).getPrecision()
+                : ((LocalZonedTimestampType) fieldType).getPrecision();
+            return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, precision);
           case INT96:
             return new TimestampColumnReader(utcTimestamp, descriptor, pageReader);
           default:
diff --git a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index c636b36100..dc59abe460 100644
--- a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++ b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -59,6 +59,7 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.RowType;
@@ -334,7 +335,10 @@ public class ParquetSplitReaderUtil {
       case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
         switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
           case INT64:
-            return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, ((TimestampType) fieldType).getPrecision());
+            int precision = fieldType instanceof TimestampType
+                ? ((TimestampType) fieldType).getPrecision()
+                : ((LocalZonedTimestampType) fieldType).getPrecision();
+            return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, precision);
           case INT96:
             return new TimestampColumnReader(utcTimestamp, descriptor, pageReader);
           default:
diff --git a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index ae79966e88..5eeb42514a 100644
--- a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++ b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -59,6 +59,7 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.RowType;
@@ -334,7 +335,10 @@ public class ParquetSplitReaderUtil {
       case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
         switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
           case INT64:
-            return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, ((TimestampType) fieldType).getPrecision());
+            int precision = fieldType instanceof TimestampType
+                ? ((TimestampType) fieldType).getPrecision()
+                : ((LocalZonedTimestampType) fieldType).getPrecision();
+            return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, precision);
           case INT96:
             return new TimestampColumnReader(utcTimestamp, descriptor, pageReader);
           default: