You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Danny Chen (Jira)" <ji...@apache.org> on 2022/01/12 02:56:00 UTC

[jira] [Commented] (HUDI-3184) hudi-flink support timestamp-micros

    [ https://issues.apache.org/jira/browse/HUDI-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17473253#comment-17473253 ] 

Danny Chen commented on HUDI-3184:
----------------------------------

Fixed via master branch: 4b0111974fa4e7046640c600834d586c6735c708

> hudi-flink support timestamp-micros
> -----------------------------------
>
>                 Key: HUDI-3184
>                 URL: https://issues.apache.org/jira/browse/HUDI-3184
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: Flink Integration
>            Reporter: Well Tang
>            Assignee: Well Tang
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 0.11.0
>
>         Attachments: 1.png, 2.png, 3.png
>
>   Original Estimate: 120h
>  Remaining Estimate: 120h
>
> {*}Problem overview{*}:
> Steps to reproduce the behavior:
> ①The spark engine is used to write data into the hoodie table(PS: There are timestamp type columns in the dataset field).
> ②Use the Flink engine to read the hoodie table written in step 1.
> *Expected behavior*
> Caused by: java.lang.IllegalArgumentException: Avro does not support TIMESTAMP type with precision: 6, it only supports precision less than 3.
> at org.apache.hudi.util.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:221) ~...
> at org.apache.hudi.util.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:263) ~...
> at org.apache.hudi.util.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:169) ~...
> at org.apache.hudi.table.HoodieTableFactory.inferAvroSchema(HoodieTableFactory.java:239) ~...
> at org.apache.hudi.table.HoodieTableFactory.setupConfOptions(HoodieTableFactory.java:155) ~...
> at org.apache.hudi.table.HoodieTableFactory.createDynamicTableSource(HoodieTableFactory.java:65) ~...
> *Environment Description*
>   Hudi version : 0.11.0-SNAPSHOT
>   Spark version : 3.1.2
>   Flink version : 1.13.1
>   Hive version : None
>   Hadoop version : 2.9.2
>   Storage (HDFS/S3/GCS..) : HDFS
>   Running on Docker? (yes/no) : None
> *Additional context*
> We are using hoodie as a data lake to deliver projects to customers. We found such application scenarios: write data to the hoodie table through the spark engine, and then read data from the hoodie table through the finlk engine.
> It should be noted that the above exception will be caused by how to write to the column containing the timestamp in the dataset.
> In order to simplify the description of the problem, we summarize the problem into the following steps:
> 【step-1】Mock data:
> {code:java}
> /home/deploy/spark-3.1.2-bin-hadoop2.7/bin/spark-shell \
> --driver-class-path /home/workflow/apache-hive-2.3.8-bin/conf/ \
> --master spark://2-120:7077 \
> --executor-memory 4g \
> --driver-memory 4g \
> --num-executors 4 \
> --total-executor-cores 4 \
> --name test \
> --jars /home/deploy/spark-3.1.2-bin-hadoop2.7/jars/hudi-spark3-bundle_2.12-0.11.0-SNAPSHOT.jar,/home/deploy/spark-3.1.2-bin-hadoop2.7/jars/spark-avro_2.12-3.1.2.jar \
> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
> --conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED \
> --conf spark.sql.hive.convertMetastoreParquet=false {code}
> {code:java}
> val df = spark.sql("select 1 as id, 'A' as name, current_timestamp as dt")
> df.write.format("hudi").
>   option("hoodie.datasource.write.recordkey.field", "id").
>   option("hoodie.datasource.write.precombine.field", "id").
>   option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
>   option("hoodie.upsert.shuffle.parallelism", "2").
>   option("hoodie.table.name", "timestamp_table").
>   mode("append").
>   save("/hudi/suite/data_type_timestamp_table")
> spark.read.format("hudi").load("/hudi/suite/data_type_timestamp_table").show(false) {code}
> 【step-2】Consumption data through flink:
> {code:java}
> bin/sql-client.sh embedded -j lib/hudi-flink-bundle_2.12-0.11.0-SNAPSHOT.jar {code}
> {code:java}
> create table data_type_timestamp_table (
>   `id` INT,
>   `name` STRING,
>   `dt` TIMESTAMP(6)
> ) with (
>   'connector' = 'hudi',
>   'hoodie.table.name' = 'data_type_timestamp_table',
>   'read.streaming.enabled' = 'true',
>   'hoodie.datasource.write.recordkey.field' = 'id',
>   'path' = '/hudi/suite/data_type_timestamp_table',
>   'read.streaming.check-interval' = '10',
>   'table.type' = 'COPY_ON_WRITE',
>   'write.precombine.field' = 'id'
> );
> select * from data_type_timestamp_table; {code}
> As shown below:
> !1.png!
> If we changge timestamp (6) to timestamp (3),the result is as follows:
> !2.png!
> The data can be found here, but the display is incorrect!
> After checking It is found in the Hoodie directory that the spark write timestamp type is timestamp micros:
> !3.png!
> However, the timestamp type of hook reading and writing Hoodie data is timestamp-millis!Therefore, it is problematic for us to read and write timestamp types through Spark and Flink computing engines. We hope that hudi-flink module needs to support timestamp micros and cannot lose time accuracy.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)