You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Udit Mehrotra (Jira)" <ji...@apache.org> on 2021/08/25 09:25:00 UTC
[jira] [Updated] (HUDI-2059) When log exists in mor table,
clustering is triggered. The query result shows that the update record in
log is lost
[ https://issues.apache.org/jira/browse/HUDI-2059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Udit Mehrotra updated HUDI-2059:
--------------------------------
Fix Version/s: (was: 0.9.0)
0.10.0
> When log exists in mor table, clustering is triggered. The query result shows that the update record in log is lost
> --------------------------------------------------------------------------------------------------------------------
>
> Key: HUDI-2059
> URL: https://issues.apache.org/jira/browse/HUDI-2059
> Project: Apache Hudi
> Issue Type: Bug
> Affects Versions: 0.8.0
> Environment: hadoop 3.1.1
> spark3.1.1/spark2.4.5
> hive3.1.1
> Reporter: tao meng
> Assignee: tao meng
> Priority: Major
> Labels: pull-request-available
> Fix For: 0.10.0
>
>
> When log exists in mor table, and clustering is triggered. The query result shows that the update record of log is lost。
> the reason of this problem is that: hoodie use HoodieFileSliceReader to read table data and then do clustering. HoodieFileSliceReader call HoodieMergedLogRecordScanner.
> processNextRecord to merge update values and old valuse, when call that function old values is reserved update values is discarded, this is wrong。
> test step:
> // step1 : create hudi mor table
> val df = spark.range(0, 1000).toDF("keyid")
> .withColumn("col3", expr("keyid"))
> .withColumn("age", lit(1))
> .withColumn("p", lit(2))
> df.write.format("hudi").
> option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
> option(PRECOMBINE_FIELD_OPT_KEY, "col3").
> option(RECORDKEY_FIELD_OPT_KEY, "keyid").
> option(PARTITIONPATH_FIELD_OPT_KEY, "p").
> option(DataSourceWriteOptions.OPERATION_OPT_KEY, "insert").
> option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, classOf[org.apache.hudi.keygen.ComplexKeyGenerator].getName).
> option("hoodie.insert.shuffle.parallelism", "4").
> option("hoodie.upsert.shuffle.parallelism", "4").
> option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
> .mode(SaveMode.Overwrite).save(basePath)
> // step2, update age where keyid < 5 to produce log files
> val df1 = spark.range(0, 5).toDF("keyid")
> .withColumn("col3", expr("keyid"))
> .withColumn("age", lit(1 + 1000))
> .withColumn("p", lit(2))
> df1.write.format("hudi").
> option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
> option(PRECOMBINE_FIELD_OPT_KEY, "col3").
> option(RECORDKEY_FIELD_OPT_KEY, "keyid").
> option(PARTITIONPATH_FIELD_OPT_KEY, "p").
> option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert").
> option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, classOf[org.apache.hudi.keygen.ComplexKeyGenerator].getName).
> option("hoodie.insert.shuffle.parallelism", "4").
> option("hoodie.upsert.shuffle.parallelism", "4").
> option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
> .mode(SaveMode.Append).save(basePath)
> // step3, do cluster inline
> val df2 = spark.range(6, 10).toDF("keyid")
> .withColumn("col3", expr("keyid"))
> .withColumn("age", lit(1 + 2000))
> .withColumn("p", lit(2))
> df2.write.format("hudi").
> option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
> option(PRECOMBINE_FIELD_OPT_KEY, "col3").
> option(RECORDKEY_FIELD_OPT_KEY, "keyid").
> option(PARTITIONPATH_FIELD_OPT_KEY, "p").
> option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert").
> option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, classOf[org.apache.hudi.keygen.ComplexKeyGenerator].getName).
> option("hoodie.insert.shuffle.parallelism", "4").
> option("hoodie.upsert.shuffle.parallelism", "4").
> option("hoodie.parquet.small.file.limit", "0").
> option("hoodie.clustering.inline", "true").
> option("hoodie.clustering.inline.max.commits", "1").
> option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824").
> option("hoodie.clustering.plan.strategy.small.file.limit", "629145600").
> option("hoodie.clustering.plan.strategy.max.bytes.per.group", Long.MaxValue.toString)
> .option(HoodieWriteConfig.TABLE_NAME, "hoodie_test")
> .mode(SaveMode.Append).save(basePath)
> spark.read.format("hudi")
> .load(basePath).select("age").where("keyid = 0").show(100, false)
> +---+
> |age|
> +---+
> |1 |
> +—+
> the result is wrong, since we update the value of age to 1001 at step 2.
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)