You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Sagar Sumit (Jira)" <ji...@apache.org> on 2022/08/09 16:44:00 UTC

[jira] [Updated] (HUDI-4219) Merge Into when update expression "col=s.col+2" on precombine cause exception

     [ https://issues.apache.org/jira/browse/HUDI-4219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sagar Sumit updated HUDI-4219:
------------------------------
    Fix Version/s: 0.12.0

> Merge Into when update expression "col=s.col+2" on precombine cause exception
> -----------------------------------------------------------------------------
>
>                 Key: HUDI-4219
>                 URL: https://issues.apache.org/jira/browse/HUDI-4219
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Jiayu Shen
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 0.12.0
>
>
> Merge sql:
> {code:java}
> merge into target_table as t using source_table as s on t.primary_key = s.primary_key
> when matched then update set col0=s.col0, col1=s.col1, col2=s.col2+2; 
> // col2 is precombine field{code}
>  
> In MergeIntoHoodieTableCommand, already process preCombine field expression.
> {code:java}
> target2SourcePreCombineFiled.foreach {
>   case (targetPreCombineField, sourceExpression)
>     if !isEqualToTarget(targetPreCombineField, sourceExpression) =>
>       sourceDF = sourceDF.withColumn(targetPreCombineField, new Column(sourceExpression))
>      sourceDFOutput = sourceDFOutput :+ AttributeReference(targetPreCombineField, sourceExpression.dataType)()
>   case _=>
> } {code}
> When "col=s.col", method _isEqualToTarget_ will return true, but "col=s.col+2" return false.
> In this case, "col=s.col+2" is added to sourceDF, and then the result of "s.col+2" will overwrite origin column "col" after optimizer. 
> So, "col=s.col+2" has been calculated in sourceDF, but it is also in UpdateAction and will be calculated again in gencode, then throw exception as below.
>  
> There is exception detail, we can reproduce it in test case.
> {code:java}
> Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merge new record with old value in storage, for new record {HoodieRecord{key=HoodieKey { recordKey=id:8 partitionPath=dt=2021-03-21}, currentLocation='HoodieRecordLocation {instantTime=20220610022503112, fileId=9144c9eb-ea3d-4b83-bb3f-7016d0b5b7f7-0}', newLocation='HoodieRecordLocation {instantTime=20220610022508224, fileId=9144c9eb-ea3d-4b83-bb3f-7016d0b5b7f7-0}'}}, old value {{"_hoodie_commit_time": "20220610022453319", "_hoodie_commit_seqno": "20220610022453319_0_1", "_hoodie_record_key": "id:8", "_hoodie_partition_path": "dt=2021-03-21", "_hoodie_file_name": "9144c9eb-ea3d-4b83-bb3f-7016d0b5b7f7-0_0-136-175_20220610022503112.parquet", "id": 8, "name": "a8", "price": 80.0, "ts": 1008, "dt": "2021-03-21"}}
>     at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:351)
>     at org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consumeOneRecord(BaseMergeHelper.java:122)
>     at org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consumeOneRecord(BaseMergeHelper.java:112)
>     at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37)
>     at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:135)
>     at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
>     at java.util.concurrent.FutureTask.run(FutureTask.java)
>     ... 3 more
> Caused by: java.lang.RuntimeException: Error in execute expression: org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Integer.
> Expressions is: [boundreference() AS `id`  'sxx' AS `name`  (boundreference() * CAST(2 AS DOUBLE)) AS `price`  (boundreference() + CAST(10000 AS BIGINT)) AS `ts`  boundreference() AS `dt`]
> CodeBody is: {private Object[] references;
> private String code;
> private AvroSerializer serializer;public ExpressionPayloadEvaluator_ef8480e5_883a_4560_bed7_c62dad761520(Object references, String code, AvroSerializer serializer) {
>   this.references = (Object[])references;
>   this.code = code;
>   this.serializer = serializer;
> }public GenericRecord eval(IndexedRecord record) {
>     boolean isNull_0 = record.get(6) == null;
>  int value_0 = isNull_0 ?
>  -1 : ((Integer)record.get(6));boolean isNull_2 = true;
>         double value_2 = -1.0;
>         boolean isNull_3 = record.get(2) == null;
>  double value_3 = isNull_3 ?
>  -1.0 : ((Double)record.get(2));
>         if (!isNull_3) {
>           boolean isNull_4 = false;
>       double value_4 = -1.0;
>       if (!false) {
>         value_4 = (double) 2;
>       }              isNull_2 = false; // resultCode could change nullability.
>               
> value_2 = value_3 * value_4;
>          
>             
>         }
> boolean isNull_6 = true;
>         long value_6 = -1L;
>         boolean isNull_7 = record.get(3) == null;
>  long value_7 = isNull_7 ?
>  -1L : ((Long)record.get(3));
>         if (!isNull_7) {
>           boolean isNull_8 = false;
>       long value_8 = -1L;
>       if (!false) {
>         value_8 = (long) 10000;
>       }              isNull_6 = false; // resultCode could change nullability.
>               
> value_6 = value_7 + value_8;
>          
>             
>         }
> boolean isNull_10 = record.get(4) == null;
>  UTF8String value_10 = isNull_10 ?
>  null : ((UTF8String)record.get(4));
>     Object[] results = new Object[5];
>     
> if (isNull_0) {
>   results[0] = null;
> } else {
>   results[0] = value_0;
> }
>                        if (false) {
>   results[1] = null;
> } else {
>   results[1] = ((UTF8String) references[0] /* literal */);
> }
>                        if (isNull_2) {
>   results[2] = null;
> } else {
>   results[2] = value_2;
> }
>                        if (isNull_6) {
>   results[3] = null;
> } else {
>   results[3] = value_6;
> }
>                        if (isNull_10) {
>   results[4] = null;
> } else {
>   results[4] = value_10;
> }
>                        
>               InternalRow row = new GenericInternalRow(results);
>               return (GenericRecord) serializer.serialize(row);
>   }public String getCode() {
>   return code;
> }
>      
> }
>     at org.apache.spark.sql.hudi.command.payload.ExpressionPayload.evaluate(ExpressionPayload.scala:261)
>     at org.apache.spark.sql.hudi.command.payload.ExpressionPayload.$anonfun$processMatchedRecord$4(ExpressionPayload.scala:109)
>     at org.apache.spark.sql.hudi.command.payload.ExpressionPayload.$anonfun$processMatchedRecord$4$adapted(ExpressionPayload.scala:103)
>     at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)
>     at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)
>     at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)
>     at org.apache.spark.sql.hudi.command.payload.ExpressionPayload.processMatchedRecord(ExpressionPayload.scala:103)
>     at org.apache.spark.sql.hudi.command.payload.ExpressionPayload.combineAndGetUpdateValue(ExpressionPayload.scala:77)
>     at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:332)
>     ... 9 more
> Caused by: java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Integer
>     at org.apache.hudi.sql.payload.ExpressionPayloadEvaluator_ef8480e5_883a_4560_bed7_c62dad761520.eval(Unknown Source)
>     at org.apache.spark.sql.hudi.command.payload.ExpressionPayload.evaluate(ExpressionPayload.scala:259)
>     ... 17 more{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)