You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Udit Mehrotra (Jira)" <ji...@apache.org> on 2021/08/25 01:59:00 UTC

[jira] [Updated] (HUDI-1602) Corrupted Avro schema extracted from parquet file

     [ https://issues.apache.org/jira/browse/HUDI-1602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Udit Mehrotra updated HUDI-1602:
--------------------------------
    Fix Version/s:     (was: 0.9.0)
                   0.10.0

> Corrupted Avro schema extracted from parquet file
> -------------------------------------------------
>
>                 Key: HUDI-1602
>                 URL: https://issues.apache.org/jira/browse/HUDI-1602
>             Project: Apache Hudi
>          Issue Type: Bug
>    Affects Versions: 0.9.0
>            Reporter: Alexander Filipchik
>            Assignee: Nishith Agarwal
>            Priority: Major
>              Labels: pull-request-available, sev:critical
>             Fix For: 0.10.0
>
>
> we are running a HUDI deltastreamer on a very complex stream. Schema is deeply nested, with several levels of hierarchy (avro schema is around 6600 LOC).
>  
> The version of HUDI that writes the dataset if 0.5-SNAPTHOT and we recently started attempts to upgrade to the latest. Hovewer, latest HUDI can't read the provided dataset. Exception I get: 
>  
>  
> {code:java}
> Got exception while parsing the arguments:Got exception while parsing the arguments:Found recursive reference in Avro schema, which can not be processed by Spark:{  "type" : "record",  "name" : "array",  "fields" : [ {    "name" : "id",    "type" : [ "null", "string" ],    "default" : null  }, {    "name" : "type",    "type" : [ "null", "string" ],    "default" : null  }, {    "name" : "exist",    "type" : [ "null", "boolean" ],    "default" : null  } ]}          Stack trace:org.apache.spark.sql.avro.IncompatibleSchemaException:Found recursive reference in Avro schema, which can not be processed by Spark:{  "type" : "record",  "name" : "array",  "fields" : [ {    "name" : "id",    "type" : [ "null", "string" ],    "default" : null  }, {    "name" : "type",    "type" : [ "null", "string" ],    "default" : null  }, {    "name" : "exist",    "type" : [ "null", "boolean" ],    "default" : null  } ]}
>  at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:75) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:89) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:89) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) at org.apache.spark.sql.avro.SchemaConverters$.toSqlType(SchemaConverters.scala:46) at org.apache.hudi.AvroConversionUtils$.convertAvroSchemaToStructType(AvroConversionUtils.scala:56) at org.apache.hudi.MergeOnReadSnapshotRelation.<init>(MergeOnReadSnapshotRelation.scala:67) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:89) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:53) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178) at com.css.dw.spark.SQLHudiOutputJob.run(SQLHudiOutputJob.java:118) at com.css.dw.spark.SQLHudiOutputJob.main(SQLHudiOutputJob.java:164) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> {code}
>  
> I wrote a simple test that opens parquet file, loads schema, and attempts to convert it into avro and it does fail with the same error. It appears that Avro schema that looked like:
>  
> {noformat}
> {
>           "name": "entity_path",
>           "type": [
>             "null",
>             {
>               "type": "record",
>               "name": "MenuEntityPath",
>               "fields": [
>                 {
>                   "name": "path_nodes",
>                   "type": [
>                     "null",
>                     {
>                       "type": "array",
>                       "items": {
>                         "type": "record",
>                         "name": "PathNode",
>                         "namespace": "Menue_pathPath$",
>                         "fields": [
>                           {
>                             "name": "id",
>                             "type": [
>                               "null",
>                               {
>                                 "type": "string",
>                                 "avro.java.string": "String"
>                               }
>                             ],
>                             "default": null
>                           },
>                           {
>                             "name": "type",
>                             "type": [
>                               "null",
>                               {
>                                 "type": "enum",
>                                 "name": "MenuEntityType",
>                                 "namespace": "shared",
>                                 "symbols": [
>                                   "UNKNOWN"
>                                 ]
>                               }
>                             ],
>                             "default": null
>                           }
>                         ]
>                       }
>                     }
>                   ],
>                   "default": null
>                 }
>               ]
>             }
>           ],
>           "default": null
>         }
>       ]
>     }
>   ],
>   "default": null
> },{noformat}
> Is converted into:
> {noformat}
> [
>   "null",
>   {
>     "type": "record",
>     "name": "entity_path",
>     "fields": [
>       {
>         "name": "path_nodes",
>         "type": [
>           "null",
>           {
>             "type": "array",
>             "items": {
>               "type": "record",
>               "name": "array",
>               "fields": [
>                 {
>                   "name": "id",
>                   "type": [
>                     "null",
>                     "string"
>                   ],
>                   "default": null
>                 },
>                 {
>                   "name": "type",
>                   "type": [
>                     "null",
>                     "string"
>                   ],
>                   "default": null
>                 },
>                 {
>                   "name": "exist",
>                   "type": [
>                     "null",
>                     "boolean"
>                   ],
>                   "default": null
>                 }
>               ]
>             }
>           }
>         ],
>         "default": null
>       },
>       {
>         "name": "exist",
>         "type": [
>           "null",
>           "boolean"
>         ],
>         "default": null
>       }
>     ]
>   }
> ]{noformat}
> A couple of questions: did anyone have similar issues and what is the best way forward?
>  
> Edit: 
> I converted the dataset into pure parquet by using presto as an intermediary (create table as select). The result fails with a similar error, but in the different place:
>  
> {noformat}
> Found recursive reference in Avro schema, which can not be processed by Spark:
> {
>   "type" : "record",
>   "name" : "bag",
>   "fields" : [ {
>     "name" : "array_element",
>     "type" : [ "null", {
>       "type" : "record",
>       "name" : "array_element",
>       "fields" : [ {
>         "name" : "id",{noformat}
> it looks like the parquet writer replaces arrays with some synthetic records and gives them the same name.  
>  
> Also, Spark reader works. I can open the parquet file directly by using:
> {noformat}
> Dataset dataset = spark.read().parquet() {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)