You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Udit Mehrotra (Jira)" <ji...@apache.org> on 2021/08/25 09:25:00 UTC

[jira] [Updated] (HUDI-2059) When log exists in mor table, clustering is triggered. The query result shows that the update record in log is lost

     [ https://issues.apache.org/jira/browse/HUDI-2059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Udit Mehrotra updated HUDI-2059:
--------------------------------
    Fix Version/s:     (was: 0.9.0)
                   0.10.0

> When log exists in mor table,  clustering is triggered. The query result shows that the update record in log is lost
> --------------------------------------------------------------------------------------------------------------------
>
>                 Key: HUDI-2059
>                 URL: https://issues.apache.org/jira/browse/HUDI-2059
>             Project: Apache Hudi
>          Issue Type: Bug
>    Affects Versions: 0.8.0
>         Environment: hadoop 3.1.1
> spark3.1.1/spark2.4.5
> hive3.1.1
>            Reporter: tao meng
>            Assignee: tao meng
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 0.10.0
>
>
> When log exists in mor table, and clustering is triggered. The query result shows that the update record of log is lost。
> the reason of this problem is that:  hoodie use HoodieFileSliceReader to read table data and then do clustering.  HoodieFileSliceReader call HoodieMergedLogRecordScanner.
> processNextRecord to merge update values and old valuse,   when call that function old values is reserved update values is discarded, this is wrong。
> test step:
> // step1 : create hudi mor table
> val df = spark.range(0, 1000).toDF("keyid")
>  .withColumn("col3", expr("keyid"))
>  .withColumn("age", lit(1))
>  .withColumn("p", lit(2))
> df.write.format("hudi").
>  option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
>  option(PRECOMBINE_FIELD_OPT_KEY, "col3").
>  option(RECORDKEY_FIELD_OPT_KEY, "keyid").
>  option(PARTITIONPATH_FIELD_OPT_KEY, "p").
>  option(DataSourceWriteOptions.OPERATION_OPT_KEY, "insert").
>  option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, classOf[org.apache.hudi.keygen.ComplexKeyGenerator].getName).
>  option("hoodie.insert.shuffle.parallelism", "4").
>  option("hoodie.upsert.shuffle.parallelism", "4").
>  option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
>  .mode(SaveMode.Overwrite).save(basePath)
> // step2, update age where keyid < 5 to produce log files
> val df1 = spark.range(0, 5).toDF("keyid")
>  .withColumn("col3", expr("keyid"))
>  .withColumn("age", lit(1 + 1000))
>  .withColumn("p", lit(2))
> df1.write.format("hudi").
>  option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
>  option(PRECOMBINE_FIELD_OPT_KEY, "col3").
>  option(RECORDKEY_FIELD_OPT_KEY, "keyid").
>  option(PARTITIONPATH_FIELD_OPT_KEY, "p").
>  option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert").
>  option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, classOf[org.apache.hudi.keygen.ComplexKeyGenerator].getName).
>  option("hoodie.insert.shuffle.parallelism", "4").
>  option("hoodie.upsert.shuffle.parallelism", "4").
>  option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
>  .mode(SaveMode.Append).save(basePath)
> // step3, do cluster inline
> val df2 = spark.range(6, 10).toDF("keyid")
>  .withColumn("col3", expr("keyid"))
>  .withColumn("age", lit(1 + 2000))
>  .withColumn("p", lit(2))
> df2.write.format("hudi").
>  option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
>  option(PRECOMBINE_FIELD_OPT_KEY, "col3").
>  option(RECORDKEY_FIELD_OPT_KEY, "keyid").
>  option(PARTITIONPATH_FIELD_OPT_KEY, "p").
>  option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert").
>  option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, classOf[org.apache.hudi.keygen.ComplexKeyGenerator].getName).
>  option("hoodie.insert.shuffle.parallelism", "4").
>  option("hoodie.upsert.shuffle.parallelism", "4").
>  option("hoodie.parquet.small.file.limit", "0").
>  option("hoodie.clustering.inline", "true").
>  option("hoodie.clustering.inline.max.commits", "1").
>  option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824").
>  option("hoodie.clustering.plan.strategy.small.file.limit", "629145600").
>  option("hoodie.clustering.plan.strategy.max.bytes.per.group", Long.MaxValue.toString)
>  .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
>  .mode(SaveMode.Append).save(basePath)
> spark.read.format("hudi")
>  .load(basePath).select("age").where("keyid = 0").show(100, false)
> +---+
> |age|
> +---+
> |1 |
> +—+
> the result is wrong, since we update the value of age to 1001 at step 2.
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)