You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Alexander Filipchik (Jira)" <ji...@apache.org> on 2021/02/09 04:40:00 UTC

[jira] [Updated] (HUDI-1602) Corrupted Avro schema extracted from parquet file

     [ https://issues.apache.org/jira/browse/HUDI-1602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Alexander Filipchik updated HUDI-1602:
--------------------------------------
    Description: 
we are running a HUDI deltastreamer on a very complex stream. Schema is deeply nested, with several levels of hierarchy (avro schema is around 6600 LOC).

 

The version of HUDI that writes the dataset if 0.5-SNAPTHOT and we recently started attempts to upgrade to the latest. Hovewer, latest HUDI can't read the provided dataset. Exception I get: 

 

 
{code:java}
Got exception while parsing the arguments:Got exception while parsing the arguments:Found recursive reference in Avro schema, which can not be processed by Spark:{  "type" : "record",  "name" : "array",  "fields" : [ {    "name" : "id",    "type" : [ "null", "string" ],    "default" : null  }, {    "name" : "type",    "type" : [ "null", "string" ],    "default" : null  }, {    "name" : "exist",    "type" : [ "null", "boolean" ],    "default" : null  } ]}          Stack trace:org.apache.spark.sql.avro.IncompatibleSchemaException:Found recursive reference in Avro schema, which can not be processed by Spark:{  "type" : "record",  "name" : "array",  "fields" : [ {    "name" : "id",    "type" : [ "null", "string" ],    "default" : null  }, {    "name" : "type",    "type" : [ "null", "string" ],    "default" : null  }, {    "name" : "exist",    "type" : [ "null", "boolean" ],    "default" : null  } ]}
 at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:75) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:89) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:89) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) at org.apache.spark.sql.avro.SchemaConverters$.toSqlType(SchemaConverters.scala:46) at org.apache.hudi.AvroConversionUtils$.convertAvroSchemaToStructType(AvroConversionUtils.scala:56) at org.apache.hudi.MergeOnReadSnapshotRelation.<init>(MergeOnReadSnapshotRelation.scala:67) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:89) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:53) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178) at com.css.dw.spark.SQLHudiOutputJob.run(SQLHudiOutputJob.java:118) at com.css.dw.spark.SQLHudiOutputJob.main(SQLHudiOutputJob.java:164) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
{code}
 

I wrote a simple test that opens parquet file, loads schema, and attempts to convert it into avro and it does fail with the same error. It appears that Avro schema that looked like:

 
{noformat}
{
          "name": "entity_path",
          "type": [
            "null",
            {
              "type": "record",
              "name": "MenuEntityPath",
              "fields": [
                {
                  "name": "path_nodes",
                  "type": [
                    "null",
                    {
                      "type": "array",
                      "items": {
                        "type": "record",
                        "name": "PathNode",
                        "namespace": "Menue_pathPath$",
                        "fields": [
                          {
                            "name": "id",
                            "type": [
                              "null",
                              {
                                "type": "string",
                                "avro.java.string": "String"
                              }
                            ],
                            "default": null
                          },
                          {
                            "name": "type",
                            "type": [
                              "null",
                              {
                                "type": "enum",
                                "name": "MenuEntityType",
                                "namespace": "shared",
                                "symbols": [
                                  "UNKNOWN"
                                ]
                              }
                            ],
                            "default": null
                          }
                        ]
                      }
                    }
                  ],
                  "default": null
                }
              ]
            }
          ],
          "default": null
        }
      ]
    }
  ],
  "default": null
},{noformat}
Is converted into:
{noformat}
[
  "null",
  {
    "type": "record",
    "name": "entity_path",
    "fields": [
      {
        "name": "path_nodes",
        "type": [
          "null",
          {
            "type": "array",
            "items": {
              "type": "record",
              "name": "array",
              "fields": [
                {
                  "name": "id",
                  "type": [
                    "null",
                    "string"
                  ],
                  "default": null
                },
                {
                  "name": "type",
                  "type": [
                    "null",
                    "string"
                  ],
                  "default": null
                },
                {
                  "name": "exist",
                  "type": [
                    "null",
                    "boolean"
                  ],
                  "default": null
                }
              ]
            }
          }
        ],
        "default": null
      },
      {
        "name": "exist",
        "type": [
          "null",
          "boolean"
        ],
        "default": null
      }
    ]
  }
]{noformat}
A couple of questions: did anyone have similar issues and what is the best way forward?

 

Edit: 

I converted the dataset into pure parquet by using presto as an intermediary (create table as select). The result fails with a similar error, but in the different place:

 
{noformat}
Found recursive reference in Avro schema, which can not be processed by Spark:
{
  "type" : "record",
  "name" : "bag",
  "fields" : [ {
    "name" : "array_element",
    "type" : [ "null", {
      "type" : "record",
      "name" : "array_element",
      "fields" : [ {
        "name" : "id",{noformat}
it looks like parquet writer replaces arrays with some synthetic records and gives them the same name.  

  was:
we are running a HUDI deltastreamer on a very complex stream. Schema is deeply nested, with several levels of hierarchy (avro schema is around 6600 LOC).

 

The version of HUDI that writes the dataset if 0.5-SNAPTHOT and we recently started attempts to upgrade to the latest. Hovewer, latest HUDI can't read the provided dataset. Exception I get: 

 

 
{code:java}
Got exception while parsing the arguments:Got exception while parsing the arguments:Found recursive reference in Avro schema, which can not be processed by Spark:{  "type" : "record",  "name" : "array",  "fields" : [ {    "name" : "id",    "type" : [ "null", "string" ],    "default" : null  }, {    "name" : "type",    "type" : [ "null", "string" ],    "default" : null  }, {    "name" : "exist",    "type" : [ "null", "boolean" ],    "default" : null  } ]}          Stack trace:org.apache.spark.sql.avro.IncompatibleSchemaException:Found recursive reference in Avro schema, which can not be processed by Spark:{  "type" : "record",  "name" : "array",  "fields" : [ {    "name" : "id",    "type" : [ "null", "string" ],    "default" : null  }, {    "name" : "type",    "type" : [ "null", "string" ],    "default" : null  }, {    "name" : "exist",    "type" : [ "null", "boolean" ],    "default" : null  } ]}
 at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:75) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:89) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:89) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) at org.apache.spark.sql.avro.SchemaConverters$.toSqlType(SchemaConverters.scala:46) at org.apache.hudi.AvroConversionUtils$.convertAvroSchemaToStructType(AvroConversionUtils.scala:56) at org.apache.hudi.MergeOnReadSnapshotRelation.<init>(MergeOnReadSnapshotRelation.scala:67) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:89) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:53) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178) at com.css.dw.spark.SQLHudiOutputJob.run(SQLHudiOutputJob.java:118) at com.css.dw.spark.SQLHudiOutputJob.main(SQLHudiOutputJob.java:164) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
{code}
 

I wrote a simple test that opens parquet file, loads schema, and attempts to convert it into avro and it does fail with the same error. It appears that Avro schema that looked like:

 
{noformat}
{
          "name": "entity_path",
          "type": [
            "null",
            {
              "type": "record",
              "name": "MenuEntityPath",
              "fields": [
                {
                  "name": "path_nodes",
                  "type": [
                    "null",
                    {
                      "type": "array",
                      "items": {
                        "type": "record",
                        "name": "PathNode",
                        "namespace": "Menue_pathPath$",
                        "fields": [
                          {
                            "name": "id",
                            "type": [
                              "null",
                              {
                                "type": "string",
                                "avro.java.string": "String"
                              }
                            ],
                            "default": null
                          },
                          {
                            "name": "type",
                            "type": [
                              "null",
                              {
                                "type": "enum",
                                "name": "MenuEntityType",
                                "namespace": "shared",
                                "symbols": [
                                  "UNKNOWN"
                                ]
                              }
                            ],
                            "default": null
                          }
                        ]
                      }
                    }
                  ],
                  "default": null
                }
              ]
            }
          ],
          "default": null
        }
      ]
    }
  ],
  "default": null
},{noformat}
Is converted into:
{noformat}
[
  "null",
  {
    "type": "record",
    "name": "entity_path",
    "fields": [
      {
        "name": "path_nodes",
        "type": [
          "null",
          {
            "type": "array",
            "items": {
              "type": "record",
              "name": "array",
              "fields": [
                {
                  "name": "id",
                  "type": [
                    "null",
                    "string"
                  ],
                  "default": null
                },
                {
                  "name": "type",
                  "type": [
                    "null",
                    "string"
                  ],
                  "default": null
                },
                {
                  "name": "exist",
                  "type": [
                    "null",
                    "boolean"
                  ],
                  "default": null
                }
              ]
            }
          }
        ],
        "default": null
      },
      {
        "name": "exist",
        "type": [
          "null",
          "boolean"
        ],
        "default": null
      }
    ]
  }
]{noformat}
Couple of questions: did anyone have similar issues and what is the best way forward?

 


> Corrupted Avro schema extracted from parquet file
> -------------------------------------------------
>
>                 Key: HUDI-1602
>                 URL: https://issues.apache.org/jira/browse/HUDI-1602
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Alexander Filipchik
>            Priority: Major
>
> we are running a HUDI deltastreamer on a very complex stream. Schema is deeply nested, with several levels of hierarchy (avro schema is around 6600 LOC).
>  
> The version of HUDI that writes the dataset if 0.5-SNAPTHOT and we recently started attempts to upgrade to the latest. Hovewer, latest HUDI can't read the provided dataset. Exception I get: 
>  
>  
> {code:java}
> Got exception while parsing the arguments:Got exception while parsing the arguments:Found recursive reference in Avro schema, which can not be processed by Spark:{  "type" : "record",  "name" : "array",  "fields" : [ {    "name" : "id",    "type" : [ "null", "string" ],    "default" : null  }, {    "name" : "type",    "type" : [ "null", "string" ],    "default" : null  }, {    "name" : "exist",    "type" : [ "null", "boolean" ],    "default" : null  } ]}          Stack trace:org.apache.spark.sql.avro.IncompatibleSchemaException:Found recursive reference in Avro schema, which can not be processed by Spark:{  "type" : "record",  "name" : "array",  "fields" : [ {    "name" : "id",    "type" : [ "null", "string" ],    "default" : null  }, {    "name" : "type",    "type" : [ "null", "string" ],    "default" : null  }, {    "name" : "exist",    "type" : [ "null", "boolean" ],    "default" : null  } ]}
>  at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:75) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:89) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:89) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) at org.apache.spark.sql.avro.SchemaConverters$.toSqlType(SchemaConverters.scala:46) at org.apache.hudi.AvroConversionUtils$.convertAvroSchemaToStructType(AvroConversionUtils.scala:56) at org.apache.hudi.MergeOnReadSnapshotRelation.<init>(MergeOnReadSnapshotRelation.scala:67) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:89) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:53) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178) at com.css.dw.spark.SQLHudiOutputJob.run(SQLHudiOutputJob.java:118) at com.css.dw.spark.SQLHudiOutputJob.main(SQLHudiOutputJob.java:164) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> {code}
>  
> I wrote a simple test that opens parquet file, loads schema, and attempts to convert it into avro and it does fail with the same error. It appears that Avro schema that looked like:
>  
> {noformat}
> {
>           "name": "entity_path",
>           "type": [
>             "null",
>             {
>               "type": "record",
>               "name": "MenuEntityPath",
>               "fields": [
>                 {
>                   "name": "path_nodes",
>                   "type": [
>                     "null",
>                     {
>                       "type": "array",
>                       "items": {
>                         "type": "record",
>                         "name": "PathNode",
>                         "namespace": "Menue_pathPath$",
>                         "fields": [
>                           {
>                             "name": "id",
>                             "type": [
>                               "null",
>                               {
>                                 "type": "string",
>                                 "avro.java.string": "String"
>                               }
>                             ],
>                             "default": null
>                           },
>                           {
>                             "name": "type",
>                             "type": [
>                               "null",
>                               {
>                                 "type": "enum",
>                                 "name": "MenuEntityType",
>                                 "namespace": "shared",
>                                 "symbols": [
>                                   "UNKNOWN"
>                                 ]
>                               }
>                             ],
>                             "default": null
>                           }
>                         ]
>                       }
>                     }
>                   ],
>                   "default": null
>                 }
>               ]
>             }
>           ],
>           "default": null
>         }
>       ]
>     }
>   ],
>   "default": null
> },{noformat}
> Is converted into:
> {noformat}
> [
>   "null",
>   {
>     "type": "record",
>     "name": "entity_path",
>     "fields": [
>       {
>         "name": "path_nodes",
>         "type": [
>           "null",
>           {
>             "type": "array",
>             "items": {
>               "type": "record",
>               "name": "array",
>               "fields": [
>                 {
>                   "name": "id",
>                   "type": [
>                     "null",
>                     "string"
>                   ],
>                   "default": null
>                 },
>                 {
>                   "name": "type",
>                   "type": [
>                     "null",
>                     "string"
>                   ],
>                   "default": null
>                 },
>                 {
>                   "name": "exist",
>                   "type": [
>                     "null",
>                     "boolean"
>                   ],
>                   "default": null
>                 }
>               ]
>             }
>           }
>         ],
>         "default": null
>       },
>       {
>         "name": "exist",
>         "type": [
>           "null",
>           "boolean"
>         ],
>         "default": null
>       }
>     ]
>   }
> ]{noformat}
> A couple of questions: did anyone have similar issues and what is the best way forward?
>  
> Edit: 
> I converted the dataset into pure parquet by using presto as an intermediary (create table as select). The result fails with a similar error, but in the different place:
>  
> {noformat}
> Found recursive reference in Avro schema, which can not be processed by Spark:
> {
>   "type" : "record",
>   "name" : "bag",
>   "fields" : [ {
>     "name" : "array_element",
>     "type" : [ "null", {
>       "type" : "record",
>       "name" : "array_element",
>       "fields" : [ {
>         "name" : "id",{noformat}
> it looks like parquet writer replaces arrays with some synthetic records and gives them the same name.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)