You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Danny Chen (Jira)" <ji...@apache.org> on 2022/01/12 02:56:00 UTC
[jira] [Commented] (HUDI-3184) hudi-flink support timestamp-micros
[ https://issues.apache.org/jira/browse/HUDI-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17473253#comment-17473253 ]
Danny Chen commented on HUDI-3184:
----------------------------------
Fixed via master branch: 4b0111974fa4e7046640c600834d586c6735c708
> hudi-flink support timestamp-micros
> -----------------------------------
>
> Key: HUDI-3184
> URL: https://issues.apache.org/jira/browse/HUDI-3184
> Project: Apache Hudi
> Issue Type: Improvement
> Components: Flink Integration
> Reporter: Well Tang
> Assignee: Well Tang
> Priority: Major
> Labels: pull-request-available
> Fix For: 0.11.0
>
> Attachments: 1.png, 2.png, 3.png
>
> Original Estimate: 120h
> Remaining Estimate: 120h
>
> {*}Problem overview{*}:
> Steps to reproduce the behavior:
> ①The spark engine is used to write data into the hoodie table(PS: There are timestamp type columns in the dataset field).
> ②Use the Flink engine to read the hoodie table written in step 1.
> *Expected behavior*
> Caused by: java.lang.IllegalArgumentException: Avro does not support TIMESTAMP type with precision: 6, it only supports precision less than 3.
> at org.apache.hudi.util.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:221) ~...
> at org.apache.hudi.util.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:263) ~...
> at org.apache.hudi.util.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:169) ~...
> at org.apache.hudi.table.HoodieTableFactory.inferAvroSchema(HoodieTableFactory.java:239) ~...
> at org.apache.hudi.table.HoodieTableFactory.setupConfOptions(HoodieTableFactory.java:155) ~...
> at org.apache.hudi.table.HoodieTableFactory.createDynamicTableSource(HoodieTableFactory.java:65) ~...
> *Environment Description*
> Hudi version : 0.11.0-SNAPSHOT
> Spark version : 3.1.2
> Flink version : 1.13.1
> Hive version : None
> Hadoop version : 2.9.2
> Storage (HDFS/S3/GCS..) : HDFS
> Running on Docker? (yes/no) : None
> *Additional context*
> We are using hoodie as a data lake to deliver projects to customers. We found such application scenarios: write data to the hoodie table through the spark engine, and then read data from the hoodie table through the finlk engine.
> It should be noted that the above exception will be caused by how to write to the column containing the timestamp in the dataset.
> In order to simplify the description of the problem, we summarize the problem into the following steps:
> 【step-1】Mock data:
> {code:java}
> /home/deploy/spark-3.1.2-bin-hadoop2.7/bin/spark-shell \
> --driver-class-path /home/workflow/apache-hive-2.3.8-bin/conf/ \
> --master spark://2-120:7077 \
> --executor-memory 4g \
> --driver-memory 4g \
> --num-executors 4 \
> --total-executor-cores 4 \
> --name test \
> --jars /home/deploy/spark-3.1.2-bin-hadoop2.7/jars/hudi-spark3-bundle_2.12-0.11.0-SNAPSHOT.jar,/home/deploy/spark-3.1.2-bin-hadoop2.7/jars/spark-avro_2.12-3.1.2.jar \
> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
> --conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED \
> --conf spark.sql.hive.convertMetastoreParquet=false {code}
> {code:java}
> val df = spark.sql("select 1 as id, 'A' as name, current_timestamp as dt")
> df.write.format("hudi").
> option("hoodie.datasource.write.recordkey.field", "id").
> option("hoodie.datasource.write.precombine.field", "id").
> option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
> option("hoodie.upsert.shuffle.parallelism", "2").
> option("hoodie.table.name", "timestamp_table").
> mode("append").
> save("/hudi/suite/data_type_timestamp_table")
> spark.read.format("hudi").load("/hudi/suite/data_type_timestamp_table").show(false) {code}
> 【step-2】Consumption data through flink:
> {code:java}
> bin/sql-client.sh embedded -j lib/hudi-flink-bundle_2.12-0.11.0-SNAPSHOT.jar {code}
> {code:java}
> create table data_type_timestamp_table (
> `id` INT,
> `name` STRING,
> `dt` TIMESTAMP(6)
> ) with (
> 'connector' = 'hudi',
> 'hoodie.table.name' = 'data_type_timestamp_table',
> 'read.streaming.enabled' = 'true',
> 'hoodie.datasource.write.recordkey.field' = 'id',
> 'path' = '/hudi/suite/data_type_timestamp_table',
> 'read.streaming.check-interval' = '10',
> 'table.type' = 'COPY_ON_WRITE',
> 'write.precombine.field' = 'id'
> );
> select * from data_type_timestamp_table; {code}
> As shown below:
> !1.png!
> If we changge timestamp (6) to timestamp (3),the result is as follows:
> !2.png!
> The data can be found here, but the display is incorrect!
> After checking It is found in the Hoodie directory that the spark write timestamp type is timestamp micros:
> !3.png!
> However, the timestamp type of hook reading and writing Hoodie data is timestamp-millis!Therefore, it is problematic for us to read and write timestamp types through Spark and Flink computing engines. We hope that hudi-flink module needs to support timestamp micros and cannot lose time accuracy.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)