You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "xsys (Jira)" <ji...@apache.org> on 2022/05/03 20:35:00 UTC

[jira] [Comment Edited] (SPARK-39075) IncompatibleSchemaException when selecting data from table stored from a DataFrame in Avro format with BYTE/SHORT

    [ https://issues.apache.org/jira/browse/SPARK-39075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17531398#comment-17531398 ] 

xsys edited comment on SPARK-39075 at 5/3/22 8:34 PM:
------------------------------------------------------

Thanks for the response, Erik.

I understand the concern. OTOH, in principle it is inconsistent and confusing that one can write a piece of data but cannot read it back via Spark/Avro. It’s almost equivalent to a data loss.

Moreover, DataFrame enforces explicit type checks so one can only write SHORT/BYTE-typed data into a SHORT/BYTE column. In this context, it is safe to downcast. And, it does not make sense that Avro’s lack of SHORT/BYTE type support breaks DataFrame operation.

The concern is valid under the context that the source of the serialized data is unknown, so potentially downcasting is unsafe.

One way to systematically address the issue is to determine whether Spark is the source of the serialized data, and permitting the cast in this context. Because the SELECT API is used, the data is retrieved from a table through Hive or another supported Spark store, and not from a standalone Avro file. We could then potentially leverage Spark-specific metadata stored with the Hive table and provide this context to the deserializer.

Or we can change the Spark schema type from SHORT/BYTE to INT, like what SparkSQL does in the [HiveExternalCatalog|https://github.com/apache/spark/blob/4df8512b11dc9cc3a179fd5ccedf91af1f3fc6ee/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala#L821].


was (Author: JIRAUSER288838):
Thanks for the response, Erik.

I understand the concern. OTOH, in principle it is inconsistent and confusing that one can write a piece of data but cannot read it back via Spark/Avro. It’s almost equivalent to a data loss.

Moreover, DataFrame enforces explicit type checks so one can only write SHORT/BYTE-typed data into a SHORT/BYTE column. In this context, it is safe to downcast. And, it does not make sense that Avro’s lack of SHORT/BYTE type support breaks DataFrame operation.

The concern is valid under the context that the source of the serialized data is unknown, so potentially downcasting is unsafe.

 

One way to systematically address the issue is to determine whether Spark is the source of the serialized data, and permitting the cast in this context. Because the SELECT API is used, the data is retrieved from a table through Hive or another supported Spark store, and not from a standalone Avro file. We could then potentially leverage Spark-specific metadata stored with the Hive table and provide this context to the deserializer.

Or we can change the Spark schema type from SHORT/BYTE to INT, like what SparkSQL does in the [HiveExternalCatalog|https://github.com/apache/spark/blob/4df8512b11dc9cc3a179fd5ccedf91af1f3fc6ee/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala#L821].

> IncompatibleSchemaException when selecting data from table stored from a DataFrame in Avro format with BYTE/SHORT
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-39075
>                 URL: https://issues.apache.org/jira/browse/SPARK-39075
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.2.1
>            Reporter: xsys
>            Priority: Major
>
> h3. Describe the bug
> We are trying to save a table constructed through a DataFrame with the {{Avro}} data format. The table contains {{ByteType}} or {{ShortType}} as part of the schema.
> When we {{INSERT}} some valid values (e.g. {{{}-128{}}}) and {{SELECT}} from the table, we expect it to give back the inserted value. However, we instead get an {{IncompatibleSchemaException}} from the {{{}AvroDeserializer{}}}.
> This appears to be caused by a missing case statement handling the {{(INT, ShortType)}} and {{(INT, ByteType)}} cases in [{{AvroDeserializer newWriter}}|https://github.com/apache/spark/blob/4f25b3f71238a00508a356591553f2dfa89f8290/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala#L321].
> h3. To Reproduce
> On Spark 3.2.1 (commit {{{}4f25b3f712{}}}), using {{spark-shell}} with the Avro package:
> {code:java}
> ./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.2.1{code}
> Execute the following:
> {code:java}
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> val schema = new StructType().add(StructField("c1", ShortType, true))
> val rdd = sc.parallelize(Seq(Row("-128".toShort)))
> val df = spark.createDataFrame(rdd, schema)
> df.write.mode("overwrite").format("avro").saveAsTable("t0")
> spark.sql("select * from t0;").show(false){code}
> Resulting error:
> {code:java}
> 22/04/27 18:04:14 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 32) 
> org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro type {"type":"record","name":"topLevelRecord","fields":[
> {"name":"c1","type":["int","null"]}
> ]} to SQL type STRUCT<`c1`: SMALLINT>. 
> at org.apache.spark.sql.avro.AvroDeserializer.liftedTree1$1(AvroDeserializer.scala:102) 
> at org.apache.spark.sql.avro.AvroDeserializer.<init>(AvroDeserializer.scala:74) 
> at org.apache.spark.sql.avro.AvroFileFormat$$anon$1.<init>(AvroFileFormat.scala:143) 
> at org.apache.spark.sql.avro.AvroFileFormat.$anonfun$buildReader$1(AvroFileFormat.scala:136) 
> at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:148) 
> at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:133) 
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127) 
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:187) 
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104) 
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) 
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) 
> at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
> at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
> at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349) 
> at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) 
> at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) 
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) 
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) 
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 
> at org.apache.spark.scheduler.Task.run(Task.scala:131) 
> at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) 
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) 
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) 
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
> at java.lang.Thread.run(Thread.java:748) 
> Caused by: org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro field 'c1' to SQL field 'c1' because schema is incompatible (avroType = "int", sqlType = SMALLINT) 
> at org.apache.spark.sql.avro.AvroDeserializer.newWriter(AvroDeserializer.scala:321)
> at org.apache.spark.sql.avro.AvroDeserializer.getRecordWriter(AvroDeserializer.scala:356) 
> at org.apache.spark.sql.avro.AvroDeserializer.liftedTree1$1(AvroDeserializer.scala:84)
> ... 26 more
> {code}
> h3. Expected behavior & Possible Solution
> We expect the output to successfully select {{-128}}. We tried other formats like Parquet and the outcome is consistent with this expectation.
> In the [{{AvroSerializer newConverter}}|https://github.com/apache/spark/blob/4f25b3f71238a00508a356591553f2dfa89f8290/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala#L114], {{ByteType}} and {{ShortType}} in Spark are converted to {{INT}} in Avro. It can be fixed by adding the {{(INT, ShortType)}} and {{(INT, ByteType)}} cases in [AvroDeserializer newWriter|https://github.com/apache/spark/blob/4f25b3f71238a00508a356591553f2dfa89f8290/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala#L321] so that the writer can handle the case instead of falling to the default error case.
> The added cases may look like this:
> {code:java}
> + case (INT, ShortType) => (updater, ordinal, value) =>
>     + updater.setInt(ordinal, value.asInstanceOf[Short]){code}
> h3. Additional context
> We are happy to send a PR for this issue if the fix simply entails adding those cases and doing a cast similar to the {{(INT, IntegerType)}} case. This fix would provide the cases in the deserializer to match the cases converting {{ShortType}} and {{ByteType}} to {{INT}} that are handled in the serializer.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org