You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Alexey Kudinkin (Jira)" <ji...@apache.org> on 2022/09/29 20:38:00 UTC

[jira] [Assigned] (HUDI-4879) MERGE INTO fails when setting "hoodie.datasource.write.payload.class"

     [ https://issues.apache.org/jira/browse/HUDI-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Alexey Kudinkin reassigned HUDI-4879:
-------------------------------------

    Assignee: Jian Feng  (was: Alexey Kudinkin)

> MERGE INTO fails when setting "hoodie.datasource.write.payload.class"
> ---------------------------------------------------------------------
>
>                 Key: HUDI-4879
>                 URL: https://issues.apache.org/jira/browse/HUDI-4879
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Alexey Kudinkin
>            Assignee: Jian Feng
>            Priority: Blocker
>             Fix For: 0.12.1
>
>
> As reported by the user:
> [https://github.com/apache/hudi/issues/6354]
>  
> Currently, setting \{{hoodie.datasource.write.payload.class = 'org.apache.hudi.common.model.DefaultHoodieRecordPayload' }}will result in the following exception:
> {code:java}
> org.apache.hudi.exception.HoodieUpsertExceptio
> n: Error upserting bucketType UPDATE for partition :0 at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
> at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
> at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
> at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
> at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
> at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
> at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498)
> at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408)
> at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472)
> at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.run(Task.scala:131)
> at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merg
> e new record with old value in storage, for new record {HoodieRecord{key=HoodieKey { recordKey=id:1 partitionPath=}, currentLocation='HoodieRecordLocation {instantTime=20220810095846644, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}', newLocation='HoodieRecordLocation {instantTime=20220810101719437, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}'}}, old value {{"_hoodie_commit_time": "20220810095824514", "_hoodie_commit_seqno": "20220810095824514_0_0", "_hoodie_record_key": "id:1", "_hoodie_partition_path": "", "_hoodie_file_name": "60c04f95-ca5e-4f82-9558-40da29cc022e-0_0-937-24808_20220810095846644.parquet", "id": 1, "name": "a0", "ts": 1000}} at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
> at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
> at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
> at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
> ... 28 more
> Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merge new record with old value in storage, for
> new record {HoodieRecord{key=HoodieKey { recordKey=id:1 partitionPath=}, currentLocation='HoodieRecordLocation {instantTime=20220810095846644, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}', newLocation='HoodieRecordLocation {instantTime=20220810101719437, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}'}}, old value {{"_hoodie_commit_time": "20220810095824514", "_hoodie_commit_seqno": "20220810095824514_0_0", "_hoodie_record_key": "id:1", "_hoodie_partition_path": "", "_hoodie_file_name": "60c04f95-ca5e-4f82-9558-40da29cc022e-0_0-937-24808_20220810095846644.parquet", "id": 1, "name": "a0", "ts": 1000}} at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:161)
> at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:147)
> ... 31 more
> Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merge new record with old value in storage, for new record {HoodieRecord{key=HoodieKey { r
> ecordKey=id:1 partitionPath=}, currentLocation='HoodieRecordLocation {instantTime=20220810095846644, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}', newLocation='HoodieRecordLocation {instantTime=20220810101719437, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}'}}, old value {{"_hoodie_commit_time": "20220810095824514", "_hoodie_commit_seqno": "20220810095824514_0_0", "_hoodie_record_key": "id:1", "_hoodie_partition_path": "", "_hoodie_file_name": "60c04f95-ca5e-4f82-9558-40da29cc022e-0_0-937-24808_20220810095846644.parquet", "id": 1, "name": "a0", "ts": 1000}} at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:155)
> ... 32 more
> Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merge new record with old value in storage, for new record {HoodieRecord{key=HoodieKey { recordKey=id:1 partitionPath=}, currentLoc
> ation='HoodieRecordLocation {instantTime=20220810095846644, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}', newLocation='HoodieRecordLocation {instantTime=20220810101719437, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}'}}, old value {{"_hoodie_commit_time": "20220810095824514", "_hoodie_commit_seqno": "20220810095824514_0_0", "_hoodie_record_key": "id:1", "_hoodie_partition_path": "", "_hoodie_file_name": "60c04f95-ca5e-4f82-9558-40da29cc022e-0_0-937-24808_20220810095846644.parquet", "id": 1, "name": "a0", "ts": 1000}} at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:351)
> at org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consumeOneRecord(BaseMergeHelper.java:122)
> at org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consumeOneRecord(BaseMergeHelper.java:112)
> at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37)
> at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:135)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> ... 3 more
> Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
> at java.lang.Long.compareTo(Long.java:54)
> at org.apache.hudi.common.model.DefaultHoodieRecordPayload.needUpdatingPersistedRecord(DefaultHoodieRecordPayload.java:139)
> at org.apache.hudi.common.model.DefaultHoodieRecordPayload.combineAndGetUpdateValue(DefaultHoodieRecordPayload.java:62)
> at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:332)
> ... 8 more3071575 [task-result-getter-2] ERROR org.apache.spark.scheduler.TaskSetManager - Task 0 in stage 996.0 failed 4 times; aborting job
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 996.0 failed 4 times, most recent failure: Lost task 0.3 in stage 996.0 (TID 27262) (szzb-bg-uat-sdp-hadoop-05 executor 8): or
> g.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0 at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
> at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
> at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
> at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
> at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
> at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
> at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498)
> at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408)
> at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472)
> at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.run(Task.scala:131)
> at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merg
> e new record with old value in storage, for new record {HoodieRecord{key=HoodieKey { recordKey=id:1 partitionPath=}, currentLocation='HoodieRecordLocation {instantTime=20220810095846644, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}', newLocation='HoodieRecordLocation {instantTime=20220810101719437, fileId=60c04f95-ca5e-4f82-9558-40da29cc022e-0}'}}, old value {{"_hoodie_commit_time": "20220810095824514", "_hoodie_commit_seqno": "20220810095824514_0_0", "_hoodie_record_key": "id:1", "_hoodie_partition_path": "", "_hoodie_file_name": "60c04f95-ca5e-4f82-9558-40da29cc022e-0_0-937-24808_20220810095846644.parquet", "id": 1, "name": "a0", "ts": 1000}} at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
> at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
> at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
> at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
> ... 28 more {code}
> This occurs b/c DefaultHoodieRecordPayload somehow overrides the setting of ExpressionPayload that have to be used to appropriately handle MERGE INTO statement.
>  
> Steps to reproduce:
> {code:java}
> --step 1: create table 
> drop table hudi_cow_pk_cbfield_tbl;
> create table hudi_cow_pk_cbfield_tbl (
>   id bigint,
>   name string,
>   ts bigint
> ) using hudi
> tblproperties (
>   type = 'cow',
>   primaryKey = 'id',
>   preCombineField = 'ts',
>   hoodie.datasource.write.hive_style_partitioning = false,
>   hoodie.datasource.write.operation = 'upsert',
>   hoodie.datasource.write.payload.class = 'org.apache.hudi.common.model.DefaultHoodieRecordPayload'
>  )
> ;
> --step 2: insert into a recored with primaryKey=1, preCombineField=1000
> insert into hudi_cow_pk_cbfield_tbl select 1, 'a0', 1000;
> --step 3: 'insert'  with same primaryKey, but change the preCombineField value to the smaller value 100,  the action not effect expectedly (note:is normal for setting the param: hoodie.datasource.write.payload.class = 'org.apache.hudi.common.model.DefaultHoodieRecordPayload')
> insert into hudi_cow_pk_cbfield_tbl select 1, 'a0_new', 100;
> select * from hudi_cow_pk_cbfield_tbl;
> --step 4: 'update' action with same primaryKey, but change the preCombineField value to the smaller value 20, the action not effect expectedly (note:is normal for setting the param: hoodie.datasource.write.payload.class = 'org.apache.hudi.common.model.DefaultHoodieRecordPayload')
> update hudi_cow_pk_cbfield_tbl set name='a1_new',ts=20  where id= 1;
> select * from hudi_cow_pk_cbfield_tbl;
> --step 5: 'merge into' action with same primaryKey, occure a error: org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
> merge into hudi_cow_pk_cbfield_tbl as target
> using (select 1 as id,'a1_merge' as name,2000 as ts) as source
> on target.id = source.id
> when matched then update set *
> when not matched then insert *
> ; {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)