You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Sebastian Bernauer (Jira)" <ji...@apache.org> on 2020/07/30 09:23:00 UTC

[jira] [Commented] (HUDI-1129) AvroConversionUtils unable to handle avro to row transformation when passing evolved schema

    [ https://issues.apache.org/jira/browse/HUDI-1129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167768#comment-17167768 ] 

Sebastian Bernauer commented on HUDI-1129:
------------------------------------------

Hi, when running the specified test to repro i get the following execption:

 
{noformat}
Job aborted due to stage failure: Task 0 in stage 16.0 failed 1 times, most recent failure: Lost task 0.0 in stage 16.0 (TID 23, localhost, executor driver): java.lang.ArrayIndexOutOfBoundsException: 19
        at org.apache.avro.generic.GenericData$Record.get(GenericData.java:212)
        at org.apache.hudi.AvroConversionHelper$.$anonfun$createConverterToRow$9(AvroConversionHelper.scala:170)
        at org.apache.hudi.AvroConversionUtils$.$anonfun$createDataFrame$2(AvroConversionUtils.scala:66)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at scala.collection.Iterator.isEmpty(Iterator.scala:385)
        at scala.collection.Iterator.isEmpty$(Iterator.scala:385)
        at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1429)
        at org.apache.hudi.AvroConversionUtils$.$anonfun$createRdd$2(AvroConversionUtils.scala:44)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:801)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:801)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
        at org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer.testSchemaEvolution(TestHoodieDeltaStreamer.java:491)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 19
{noformat}
Doing some analysis in this line [https://github.com/apache/hudi/blob/d5b593b7d952a39679cade2b18aadbdfb2dc3eed/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala#L164] i found out the following:
{noformat}
Source Avro schema: {"type":"record","name":"triprec","fields":[{"name":"timestamp","type":"double"},{"name":"_row_key","type":"string"},{"name":"rider","type":"string"},{"name":"driver","type":"string"},{"name":"begin_lat","type":"double"},{"name":"begin_lon","type":"double"},{"name":"end_lat","type":"double"},{"name":"end_lon","type":"double"},{"name":"distance_in_meters","type":"int"},{"name":"seconds_since_epoch","type":"long"},{"name":"weight","type":"float"},{"name":"nation","type":"bytes"},{"name":"current_date","type":{"type":"int","logicalType":"date"}},{"name":"current_ts","type":{"type":"long","logicalType":"timestamp-micros"}},{"name":"height","type":{"type":"fixed","name":"abc","size":5,"logicalType":"decimal","precision":10,"scale":6}},{"name":"city_to_state","type":{"type":"map","values":"string"}},{"name":"fare","type":{"type":"record","name":"fare","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}]}},{"name":"tip_history","type":{"type":"array","items":{"type":"record","name":"tip_history","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}]}}},{"name":"_hoodie_is_deleted","type":"boolean","default":false},{"name":"evoluted_optional_union_field","type":["null",{"type":"string","avro.java.string":"String"}],"default":null}]}
Target Catalyst type: StructType(StructField(timestamp,DoubleType,false), StructField(_row_key,StringType,false), StructField(rider,StringType,false), StructField(driver,StringType,false), StructField(begin_lat,DoubleType,false), StructField(begin_lon,DoubleType,false), StructField(end_lat,DoubleType,false), StructField(end_lon,DoubleType,false), StructField(distance_in_meters,IntegerType,false), StructField(seconds_since_epoch,LongType,false), StructField(weight,FloatType,false), StructField(nation,BinaryType,false), StructField(current_date,DateType,false), StructField(current_ts,TimestampType,false), StructField(height,DecimalType(10,6),false), StructField(city_to_state,MapType(StringType,StringType,false),false), StructField(fare,StructType(StructField(amount,DoubleType,false), StructField(currency,StringType,false)),false), StructField(tip_history,ArrayType(StructType(StructField(amount,DoubleType,false), StructField(currency,StringType,false)),false),false), StructField(_hoodie_is_deleted,BooleanType,false), StructField(evoluted_optional_union_field,StringType,true))
record: {"timestamp": 0.0, "_row_key": "5d508e9e-d443-4a19-94e0-df1ee7752814", "rider": "rider-00001", "driver": "driver-00001", "begin_lat": 0.060955975927488026, "begin_lon": 0.06050725594343709, "end_lat": 0.1234477515120661, "end_lon": 0.8671207412077184, "distance_in_meters": -994219658, "seconds_since_epoch": 2759468501308244827, "weight": 0.34771907, "nation": {"bytes": "{bytes=Canada}"}, "current_date": 18473, "current_ts": 1596099482175, "height": [0, 0, 9, 1, -110], "city_to_state": {"LA": "CA"}, "fare": {"amount": 3.551008739179684, "currency": "USD"}, "tip_history": [{"amount": 32.21482737487774, "currency": "USD"}], "_hoodie_is_deleted": true}
{noformat}
The avro and catalyst schema match, but the record has not all the fields of the avro schema. Hudi tries to read the field with id 19, which does not exist in the record, thus the ArrayIndexOutOfBoundsException. I suggest to not rely on the index to be present, but instead ask the record for the value not by index, but by field name. I dont know how much this affects performance. I opened a PR at https://github.com/apache/hudi/pull/1888.

After doing this i get the same error as in production. This  error is getting researched in https://issues.apache.org/jira/browse/HUDI-1128
{noformat}
 Job aborted due to stage failure: Task 0 in stage 89.0 failed 1 times, most recent failure: Lost task 0.0 in stage 89.0 (TID 492, localhost, executor driver): org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
        at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.handleUpsertPartition(BaseCommitActionExecutor.java:253)
        at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.lambda$execute$caffe4c4$1(BaseCommitActionExecutor.java:102)
        at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
        at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:853)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:853)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:337)
        at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1182)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
        at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to close UpdateHandle
        at org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:302)
        at org.apache.hudi.table.action.commit.CommitActionExecutor.handleUpdateInternal(CommitActionExecutor.java:101)
        at org.apache.hudi.table.action.commit.CommitActionExecutor.handleUpdate(CommitActionExecutor.java:74)
        at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.handleUpsertPartition(BaseCommitActionExecutor.java:246)
        ... 28 more
Caused by: java.io.EOFException
        at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473)
        at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128)
        at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:259)
        at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:272)
        at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:214)
        at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:412)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
        at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
        at org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:108)
        at org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:80)
        at org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:271)
        ... 31 moreDriver stacktrace:
        at org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer.testSchemaEvolution(TestHoodieDeltaStreamer.java:501)
Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to close UpdateHandle
Caused by: java.io.EOFException{noformat}

> AvroConversionUtils unable to handle avro to row transformation when passing evolved schema 
> --------------------------------------------------------------------------------------------
>
>                 Key: HUDI-1129
>                 URL: https://issues.apache.org/jira/browse/HUDI-1129
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: DeltaStreamer
>            Reporter: Balaji Varadarajan
>            Priority: Major
>              Labels: pull-request-available
>
> Unit test to repro : [https://github.com/apache/hudi/pull/1844/files#diff-2c3763c5782af9c3cbc02e2935211587R476]
> Context in : [https://github.com/apache/hudi/issues/1845#issuecomment-665180775] (issue 2)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)