You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "sivabalan narayanan (Jira)" <ji...@apache.org> on 2021/09/27 21:57:00 UTC

[jira] [Created] (HUDI-2495) Difference in behavior between GenericRecord based key gen and Row based key gen

sivabalan narayanan created HUDI-2495:
-----------------------------------------

             Summary: Difference in behavior between GenericRecord based key gen and Row based key gen 
                 Key: HUDI-2495
                 URL: https://issues.apache.org/jira/browse/HUDI-2495
             Project: Apache Hudi
          Issue Type: Bug
            Reporter: sivabalan narayanan


when complex key gen is used and one of the field in record key is a timestamp field, row writer path and rdd path gives different record key values. 

 


import java.sql.Timestamp
import spark.implicits._

val df = Seq(
 (1, Timestamp.valueOf("2014-01-01 23:00:01"), "abc"),
 (1, Timestamp.valueOf("2014-11-30 12:40:32"), "abc"),
 (2, Timestamp.valueOf("2016-12-29 09:54:00"), "def"),
 (2, Timestamp.valueOf("2016-05-09 10:12:43"), "def")
).toDF("typeId","eventTime", "str")

 


df.write.format("hudi").
option("hoodie.insert.shuffle.parallelism", "2").
option("hoodie.upsert.shuffle.parallelism", "2").
option("hoodie.bulkinsert.shuffle.parallelism", "2").
 option("hoodie.datasource.write.precombine.field", "typeId").
 option("hoodie.datasource.write.partitionpath.field", "typeId").
 option("hoodie.datasource.write.recordkey.field", "str,eventTime").
 option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.ComplexKeyGenerator").
 option("hoodie.table.name", "hudi_tbl").
 mode(Overwrite).
 save("/tmp/hudi_tbl_trial/")

 

val hudiDF = spark.read.format("hudi").load("/tmp/hudi_tbl_trial/")

hudiDF.createOrReplaceTempView("hudi_sql_tbl")

spark.sql("select _hoodie_record_key, str, eventTime, typeId from hudi_sql_tbl").show(false)

 
{code:java}
+----------------------------------+---+-------------------+------+
|_hoodie_record_key                |str|eventTime          |typeId|
+----------------------------------+---+-------------------+------+
|str:abc,eventTime:1417369232000000|abc|2014-11-30 12:40:32|1     |
|str:abc,eventTime:1388635201000000|abc|2014-01-01 23:00:01|1     |
|str:def,eventTime:1462803163000000|def|2016-05-09 10:12:43|2     |
|str:def,eventTime:1483023240000000|def|2016-12-29 09:54:00|2     |
+----------------------------------+---+-------------------+------+
{code}
 

 

// now retry w/ bulk_insert row writer path


df.write.format("hudi").
option("hoodie.insert.shuffle.parallelism", "2").
option("hoodie.upsert.shuffle.parallelism", "2").
option("hoodie.bulkinsert.shuffle.parallelism", "2").
 option("hoodie.datasource.write.precombine.field", "typeId").
 option("hoodie.datasource.write.partitionpath.field", "typeId").
 option("hoodie.datasource.write.recordkey.field", "str,eventTime").
 option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.ComplexKeyGenerator").
 option("hoodie.table.name", "hudi_tbl").

"hoodie.datasource.write.operation","bulk_insert").
 mode(Overwrite).
 save("/tmp/hudi_tbl_trial_bulk_insert/")

 

val hudiDF_bulk_insert = spark.read.format("hudi").load("/tmp/hudi_tbl_trial_bulk_insert/")

hudiDF_bulk_insert.createOrReplaceTempView("hudi_sql_tbl_bulk_insert")

spark.sql("select _hoodie_record_key, str, eventTime, typeId from hudi_sql_tbl_bulk_insert").show(false)
{code:java}
+---------------------------------------+---+-------------------+------+
|_hoodie_record_key                     |str|eventTime          |typeId|
+---------------------------------------+---+-------------------+------+
|str:def,eventTime:2016-05-09 10:12:43.0|def|2016-05-09 10:12:43|2     |
|str:def,eventTime:2016-12-29 09:54:00.0|def|2016-12-29 09:54:00|2     |
|str:abc,eventTime:2014-01-01 23:00:01.0|abc|2014-01-01 23:00:01|1     |
|str:abc,eventTime:2014-11-30 12:40:32.0|abc|2014-11-30 12:40:32|1     |
+---------------------------------------+---+-------------------+------+
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)