You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Sagar Sumit (Jira)" <ji...@apache.org> on 2022/08/09 16:44:00 UTC
[jira] [Updated] (HUDI-4219) Merge Into when update expression "col=s.col+2" on precombine cause exception
[ https://issues.apache.org/jira/browse/HUDI-4219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sagar Sumit updated HUDI-4219:
------------------------------
Fix Version/s: 0.12.0
> Merge Into when update expression "col=s.col+2" on precombine cause exception
> -----------------------------------------------------------------------------
>
> Key: HUDI-4219
> URL: https://issues.apache.org/jira/browse/HUDI-4219
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: Jiayu Shen
> Priority: Major
> Labels: pull-request-available
> Fix For: 0.12.0
>
>
> Merge sql:
> {code:java}
> merge into target_table as t using source_table as s on t.primary_key = s.primary_key
> when matched then update set col0=s.col0, col1=s.col1, col2=s.col2+2;
> // col2 is precombine field{code}
>
> In MergeIntoHoodieTableCommand, already process preCombine field expression.
> {code:java}
> target2SourcePreCombineFiled.foreach {
> case (targetPreCombineField, sourceExpression)
> if !isEqualToTarget(targetPreCombineField, sourceExpression) =>
> sourceDF = sourceDF.withColumn(targetPreCombineField, new Column(sourceExpression))
> sourceDFOutput = sourceDFOutput :+ AttributeReference(targetPreCombineField, sourceExpression.dataType)()
> case _=>
> } {code}
> When "col=s.col", method _isEqualToTarget_ will return true, but "col=s.col+2" return false.
> In this case, "col=s.col+2" is added to sourceDF, and then the result of "s.col+2" will overwrite origin column "col" after optimizer.
> So, "col=s.col+2" has been calculated in sourceDF, but it is also in UpdateAction and will be calculated again in gencode, then throw exception as below.
>
> There is exception detail, we can reproduce it in test case.
> {code:java}
> Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to combine/merge new record with old value in storage, for new record {HoodieRecord{key=HoodieKey { recordKey=id:8 partitionPath=dt=2021-03-21}, currentLocation='HoodieRecordLocation {instantTime=20220610022503112, fileId=9144c9eb-ea3d-4b83-bb3f-7016d0b5b7f7-0}', newLocation='HoodieRecordLocation {instantTime=20220610022508224, fileId=9144c9eb-ea3d-4b83-bb3f-7016d0b5b7f7-0}'}}, old value {{"_hoodie_commit_time": "20220610022453319", "_hoodie_commit_seqno": "20220610022453319_0_1", "_hoodie_record_key": "id:8", "_hoodie_partition_path": "dt=2021-03-21", "_hoodie_file_name": "9144c9eb-ea3d-4b83-bb3f-7016d0b5b7f7-0_0-136-175_20220610022503112.parquet", "id": 8, "name": "a8", "price": 80.0, "ts": 1008, "dt": "2021-03-21"}}
> at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:351)
> at org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consumeOneRecord(BaseMergeHelper.java:122)
> at org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consumeOneRecord(BaseMergeHelper.java:112)
> at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37)
> at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:135)
> at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
> at java.util.concurrent.FutureTask.run(FutureTask.java)
> ... 3 more
> Caused by: java.lang.RuntimeException: Error in execute expression: org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Integer.
> Expressions is: [boundreference() AS `id` 'sxx' AS `name` (boundreference() * CAST(2 AS DOUBLE)) AS `price` (boundreference() + CAST(10000 AS BIGINT)) AS `ts` boundreference() AS `dt`]
> CodeBody is: {private Object[] references;
> private String code;
> private AvroSerializer serializer;public ExpressionPayloadEvaluator_ef8480e5_883a_4560_bed7_c62dad761520(Object references, String code, AvroSerializer serializer) {
> this.references = (Object[])references;
> this.code = code;
> this.serializer = serializer;
> }public GenericRecord eval(IndexedRecord record) {
> boolean isNull_0 = record.get(6) == null;
> int value_0 = isNull_0 ?
> -1 : ((Integer)record.get(6));boolean isNull_2 = true;
> double value_2 = -1.0;
> boolean isNull_3 = record.get(2) == null;
> double value_3 = isNull_3 ?
> -1.0 : ((Double)record.get(2));
> if (!isNull_3) {
> boolean isNull_4 = false;
> double value_4 = -1.0;
> if (!false) {
> value_4 = (double) 2;
> } isNull_2 = false; // resultCode could change nullability.
>
> value_2 = value_3 * value_4;
>
>
> }
> boolean isNull_6 = true;
> long value_6 = -1L;
> boolean isNull_7 = record.get(3) == null;
> long value_7 = isNull_7 ?
> -1L : ((Long)record.get(3));
> if (!isNull_7) {
> boolean isNull_8 = false;
> long value_8 = -1L;
> if (!false) {
> value_8 = (long) 10000;
> } isNull_6 = false; // resultCode could change nullability.
>
> value_6 = value_7 + value_8;
>
>
> }
> boolean isNull_10 = record.get(4) == null;
> UTF8String value_10 = isNull_10 ?
> null : ((UTF8String)record.get(4));
> Object[] results = new Object[5];
>
> if (isNull_0) {
> results[0] = null;
> } else {
> results[0] = value_0;
> }
> if (false) {
> results[1] = null;
> } else {
> results[1] = ((UTF8String) references[0] /* literal */);
> }
> if (isNull_2) {
> results[2] = null;
> } else {
> results[2] = value_2;
> }
> if (isNull_6) {
> results[3] = null;
> } else {
> results[3] = value_6;
> }
> if (isNull_10) {
> results[4] = null;
> } else {
> results[4] = value_10;
> }
>
> InternalRow row = new GenericInternalRow(results);
> return (GenericRecord) serializer.serialize(row);
> }public String getCode() {
> return code;
> }
>
> }
> at org.apache.spark.sql.hudi.command.payload.ExpressionPayload.evaluate(ExpressionPayload.scala:261)
> at org.apache.spark.sql.hudi.command.payload.ExpressionPayload.$anonfun$processMatchedRecord$4(ExpressionPayload.scala:109)
> at org.apache.spark.sql.hudi.command.payload.ExpressionPayload.$anonfun$processMatchedRecord$4$adapted(ExpressionPayload.scala:103)
> at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)
> at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)
> at org.apache.spark.sql.hudi.command.payload.ExpressionPayload.processMatchedRecord(ExpressionPayload.scala:103)
> at org.apache.spark.sql.hudi.command.payload.ExpressionPayload.combineAndGetUpdateValue(ExpressionPayload.scala:77)
> at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:332)
> ... 9 more
> Caused by: java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Integer
> at org.apache.hudi.sql.payload.ExpressionPayloadEvaluator_ef8480e5_883a_4560_bed7_c62dad761520.eval(Unknown Source)
> at org.apache.spark.sql.hudi.command.payload.ExpressionPayload.evaluate(ExpressionPayload.scala:259)
> ... 17 more{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)