You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2022/10/15 21:54:00 UTC

[jira] [Updated] (HUDI-5034) Enum info lost during schema conversion

     [ https://issues.apache.org/jira/browse/HUDI-5034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

ASF GitHub Bot updated HUDI-5034:
---------------------------------
    Labels: pull-request-available  (was: )

> Enum info lost during schema conversion
> ---------------------------------------
>
>                 Key: HUDI-5034
>                 URL: https://issues.apache.org/jira/browse/HUDI-5034
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: deltastreamer
>            Reporter: Shawn Chang
>            Priority: Major
>              Labels: pull-request-available
>
> When a transformer is used in deltastreamer sync, SparkAvroPostProcessor would be attached to SchemaProvider by default (see [[code|https://github.com/apache/hudi/blob/94c068ae4fd75ab011e6a3f1c593fdd5db42da3b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java#L485]])
>  
> And in SparkAvroPostProcessor it's converting avro schema to struct type schema and then convert it back immediately (see [code|https://github.com/apache/hudi/blob/94c068ae4fd75ab011e6a3f1c593fdd5db42da3b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java#L42]) 
>  
> But during the conversion, if the original avro schema has 'enum' field specified, the field would be lost: schema would first be converted to struct type schema and the 'enum' would be converted to 'string' type. And when it's converted back to avro type, the 'string' type would not be converted back to 'enum'.
>  
> Steps to reproduce:
>  # Prepare an avro schema that contains enum field, sample below
>  # 
> {code:java}
> {
>     "name": "accountDataRecord",
>     "namespace": "sample.test",
>     "type": "record",
>     "fields": [
>         {
>             "name": "action",
>             "type": {
>                 "name": "testEnum",
>                 "type" : "enum",
>                 "symbols": [
>                     "INSERT",
>                     "UPDATE",
>                     "DELETE"
>                 ]
>             }
>         },
>     {"name":"ts","type":"int"}
>     ]
> } {code}
>  # Run Deltastreamer with a transformer
>  # Exception: 
>  # 
> {code:java}
> Driver stacktrace:
>     at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2610)
>     at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2559)
>     at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2558)
>     at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>     at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2558)
>     at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1200)
>     at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1200)
>     at scala.Option.foreach(Option.scala:407)
>     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1200)
>     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2798)
>     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2740)
>     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2729)
>     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
>     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:978)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2215)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2255)
>     at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1449)
>     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>     at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
>     at org.apache.spark.rdd.RDD.take(RDD.scala:1422)
>     at org.apache.spark.rdd.RDD.$anonfun$isEmpty$1(RDD.scala:1557)
>     at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
>     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>     at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
>     at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1557)
>     at org.apache.hudi.AvroConversionUtils$.createDataFrame(AvroConversionUtils.scala:131)
>     at org.apache.hudi.AvroConversionUtils.createDataFrame(AvroConversionUtils.scala)
>     at org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.lambda$fetchNewDataInRowFormat$2(SourceFormatAdapter.java:109)
>     at org.apache.hudi.common.util.Option.map(Option.java:108)
>     at org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInRowFormat(SourceFormatAdapter.java:109)
>     at org.apache.hudi.utilities.deltastreamer.DeltaSync.fetchFromSource(DeltaSync.java:424)
>     at org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:398)
>     at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:303)
>     at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:200)
>     at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
>     at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:198)
>     at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:549)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
>     at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1000)
>     at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
>     at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
>     at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
>     at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1089)
>     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1098)
>     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: org.apache.avro.AvroTypeException: Found sample.test.testEnum, expecting string
>     at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
>     at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
>     at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:203)
>     at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469)
>     at org.apache.avro.reflect.ReflectDatumReader.readString(ReflectDatumReader.java:222)
>     at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459)
>     at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
>     at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>     at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
>     at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:142)
>     at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:298)
>     at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>     at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
>     at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>     at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>     at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>     at org.apache.avro.file.DataFileStream.next(DataFileStream.java:251)
>     at org.apache.avro.mapreduce.AvroRecordReaderBase.nextKeyValue(AvroRecordReaderBase.java:126)
>     at org.apache.avro.mapreduce.AvroKeyRecordReader.nextKeyValue(AvroKeyRecordReader.java:55)
>     at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:247)
>     at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
>     at scala.collection.Iterator$SliceIterator.hasNext(Iterator.scala:268)
>     at scala.collection.Iterator.foreach(Iterator.scala:943)
>     at scala.collection.Iterator.foreach$(Iterator.scala:943)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>     at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) {code}
> Some logs I added to expose the issues:
> {code:java}
> // original schema
> 22/10/14 03:33:51 INFO UtilHelpers: UtilHelpers, wrapSchemaProviderWithPostProcessor, schema provider by the end: {
>   "type" : "record",
>   "name" : "accountDataRecord",
>   "namespace" : "sample.test",
>   "fields" : [ {
>     "name" : "action",
>     "type" : {
>       "type" : "enum",
>       "name" : "testEnum",
>       "symbols" : [ "INSERT", "UPDATE", "DELETE" ]
>     }
>   }, {
>     "name" : "ts",
>     "type" : "int"
>   } ]
> }
> /* Around this LOC (https://github.com/apache/hudi/blob/94c068ae4fd75ab011e6a3f1c593fdd5db42da3b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java#L673) The schema converted back lost enum field already */
> 22/10/14 03:33:51 INFO HoodieDeltaStreamer: HoodieDeltaStreamer, source schema from schema provider: {
>   "type" : "record",
>   "name" : "hoodie_source",
>   "namespace" : "hoodie.source",
>   "fields" : [ {
>     "name" : "action",
>     "type" : "string"
>   }, {
>     "name" : "ts",
>     "type" : "int"
>   } ]
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)