You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Julian Keppel (Jira)" <ji...@apache.org> on 2022/04/05 14:12:00 UTC

[jira] [Created] (SPARK-38789) Spark-Avro throws exception at to_avro after transformation although schema didn't change

Julian Keppel created SPARK-38789:
-------------------------------------

             Summary: Spark-Avro throws exception at to_avro after transformation although schema didn't change
                 Key: SPARK-38789
                 URL: https://issues.apache.org/jira/browse/SPARK-38789
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 3.0.3
         Environment: PySpark Version 3.0.3

Spark-Avro-Version 3.0.3

Python Version 3.6.8

Scala Version 2.12
            Reporter: Julian Keppel


I use Structured Streaming to read Avro records from a Kafka topic A, do some transformations and write as Avro to another Kafka topic B. I use [those functions|https://spark.apache.org/docs/latest/sql-data-sources-avro.html#to_avro-and-from_avro] for serializing and deserializing the Avro records.

I faced another exception (parsing error with method "FAILFAST") originally while reading from the second topic B with the specified schema in a separate job. I wondered how this could happen because the schema are exactly the same.

To discover more details, I came up with the following experimental setup to reproduce the exception. I found out that the serialized Avro records (with the exact same schema, just some easy transformations) can't be deserialized again:

 
{code:java}
from pyspark.sql.types import StringType, Row, StructField, StructType, LongType, StringType, ArrayType

from pyspark.sql import functions as func
from pyspark.sql.avro import functions as afunc

avro_schema = """
{
    "fields": [
        {
            "name": "field_1",
            "type": [
                "string",
                "null"
            ]
        },
        {
            "name": "field_2",
            "type": [
                "long",
                "null"
            ]
        },
        {
            "name": "field_3",
            "type": [
                {
                    "items": [
                        "string",
                        "null"
                    ],
                    "type": "array"
                },
                "null"
            ]
        }
    ],
    "name": "bla",
    "namespace": "bla",
    "type": "record"
}
"""

# Create example DF
df = spark.createDataFrame(
    [Row("bla", 42, ["bla", "blubb"])],
    StructType(
        [
            StructField("field_1", StringType(), True),
            StructField("field_2", LongType(), True),
            StructField("field_3", ArrayType(StringType()), True),
        ]
    ),
)

# Serialize columns to Avro record
df_avro = df.select(afunc.to_avro(func.struct(df.columns), avro_schema))

# Now, change values in DF to some arbitrary value
df_changed_1 = df.withColumn("field_2", func.lit(998877665544332211))
df_changed_2 = df.withColumn("field_2", func.lit(None)) # To enforce nullability in schema for field_2
df_changed = df_changed_1.unionByName(df_changed_2)

# Again, serialize columns to Avro record with same schema as before
df_changed_avro = df_changed.select(afunc.to_avro(func.struct(df_changed.columns), avro_schema))

# Schemas are exactly identical, which makes sense because nothing changed in the schema itself, only the values
df.schema
df_changed.schema

# Have a look at the DFs values
df.show()
df_changed.show()

# Enforce DF evaulation
df_avro.show() # This works as expected
df_changed_avro.show() # This produced the exception{code}
 

The last line produces the following exception: 
{code:java}
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 35.0 failed 1 times, most recent failure: Lost task 2.0 in stage 35.0 (TID 109, 192.168.2.117, executor driver): org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Catalyst type LongType to Avro type ["long","null"].
        at org.apache.spark.sql.avro.AvroSerializer.newConverter(AvroSerializer.scala:219)
        at org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$1(AvroSerializer.scala:239)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at org.apache.spark.sql.types.StructType.map(StructType.scala:102)
        at org.apache.spark.sql.avro.AvroSerializer.newStructConverter(AvroSerializer.scala:232)
        at org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:72)
        at org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:55)
        at org.apache.spark.sql.avro.CatalystDataToAvro.serializer$lzycompute(CatalystDataToAvro.scala:42)
        at org.apache.spark.sql.avro.CatalystDataToAvro.serializer(CatalystDataToAvro.scala:41)
        at org.apache.spark.sql.avro.CatalystDataToAvro.nullSafeEval(CatalystDataToAvro.scala:54)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
        at scala.Option.foreach(Option.scala:407)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2154)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
        at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
        at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
        at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697)
        at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Catalyst type LongType to Avro type ["long","null"].
        at org.apache.spark.sql.avro.AvroSerializer.newConverter(AvroSerializer.scala:219)
        at org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$1(AvroSerializer.scala:239)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at org.apache.spark.sql.types.StructType.map(StructType.scala:102)
        at org.apache.spark.sql.avro.AvroSerializer.newStructConverter(AvroSerializer.scala:232)
        at org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:72)
        at org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:55)
        at org.apache.spark.sql.avro.CatalystDataToAvro.serializer$lzycompute(CatalystDataToAvro.scala:42)
        at org.apache.spark.sql.avro.CatalystDataToAvro.serializer(CatalystDataToAvro.scala:41)
        at org.apache.spark.sql.avro.CatalystDataToAvro.nullSafeEval(CatalystDataToAvro.scala:54)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        ... 1 more {code}
According to the documentation, Spark SQL types can not be converted to Avro union types (at least it's not explicitly [written down here|https://spark.apache.org/docs/latest/sql-data-sources-avro.html#supported-types-for-spark-sql---avro-conversion]), but I wonder why it seem to work for the original Dataframe.

Am I missing something or is this a Bug? This also happens when I for example add a new field to the original Dataframe (df) and try to serialize it with an enhanced schema with the new field added. 

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org