You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Zhenhao Li (Jira)" <ji...@apache.org> on 2022/02/03 09:39:00 UTC
[jira] [Updated] (SPARK-38091) AvroSerializer can cause java.lang.ClassCastException at run time
[ https://issues.apache.org/jira/browse/SPARK-38091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Zhenhao Li updated SPARK-38091:
-------------------------------
Description:
{{{}AvroSerializer{}}}'s implementation, at least in {{{}newConverter{}}}, was not 100% based on the {{nternalRow}} and {{SpecializedGetters}} interface. It assumes many implementation details of the interface.
For example, in
{code}
case (TimestampType, LONG) => avroType.getLogicalType match {
// For backward compatibility, if the Avro type is Long and it is not logical type
// (the `null` case), output the timestamp value as with millisecond precision.
case null | _: TimestampMillis => (getter, ordinal) =>
DateTimeUtils.microsToMillis(timestampRebaseFunc(getter.getLong(ordinal)))
case _: TimestampMicros => (getter, ordinal) =>
timestampRebaseFunc(getter.getLong(ordinal))
case other => throw new IncompatibleSchemaException(errorPrefix +
s"SQL type ${TimestampType.sql} cannot be converted to Avro logical type $other")
}
{code}
it assumes the {{InternalRow}} instance encodes {{TimestampType}} as {{{}java.lang.Long{}}}. That's true for {{Unsaferow}} but not for {{{}GenericInternalRow{}}}.
Hence the above code will end up with runtime exceptions when used on an instance of {{{}GenericInternalRow{}}}, which is the case for Python UDF.
I didn't get time to dig deeper than that. I got the impression that Spark's optimizer(s) will turn a row into a {{UnsafeRow}} and Python UDF doesn't involve the optimizer(s) and hence each row is a {{{}GenericInternalRow{}}}.
It would be great if someone can correct me or offer a better explanation.
To reproduce the issue,
{{git checkout master}} and {{git cherry-pick --no-commit}} [this-commit|https://github.com/Zhen-hao/spark/commit/1ffe8e8f35273b2f3529f6c2d004822f480e4c88]
and run the test {{{}org.apache.spark.sql.avro.AvroSerdeSuite{}}}.
You will see runtime exceptions like the following one
\\{code}
Serialize DecimalType to Avro BYTES with logical type decimal *** FAILED ***
java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to class org.apache.spark.sql.types.Decimal (java.math.BigDecimal is in module java.base of loader 'bootstrap'; org.apache.spark.sql.types.Decimal is in unnamed module of loader 'app')
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getDecimal(rows.scala:45)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getDecimal$(rows.scala:45)
at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getDecimal(rows.scala:195)
at org.apache.spark.sql.avro.AvroSerializer.$anonfun$newConverter$10(AvroSerializer.scala:136)
at org.apache.spark.sql.avro.AvroSerializer.$anonfun$newConverter$10$adapted(AvroSerializer.scala:135)
at org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$2(AvroSerializer.scala:283)
at org.apache.spark.sql.avro.AvroSerializer.serialize(AvroSerializer.scala:60)
at org.apache.spark.sql.avro.AvroSerdeSuite.$anonfun$new$5(AvroSerdeSuite.scala:82)
at org.apache.spark.sql.avro.AvroSerdeSuite.$anonfun$new$5$adapted(AvroSerdeSuite.scala:67)
at org.apache.spark.sql.avro.AvroSerdeSuite.$anonfun$withFieldMatchType$2(AvroSerdeSuite.scala:217)
\\{code}
was:
`AvroSerializer`'s implementation, at least in `newConverter`, was not 100% based on the `InternalRow` and `SpecializedGetters` interface. It assumes many implementation details of the interface.
For example, in
```scala
case (TimestampType, LONG) => avroType.getLogicalType match {
// For backward compatibility, if the Avro type is Long and it is not logical type
// (the `null` case), output the timestamp value as with millisecond precision.
case null | _: TimestampMillis => (getter, ordinal) =>
DateTimeUtils.microsToMillis(timestampRebaseFunc(getter.getLong(ordinal)))
case _: TimestampMicros => (getter, ordinal) =>
timestampRebaseFunc(getter.getLong(ordinal))
case other => throw new IncompatibleSchemaException(errorPrefix +
s"SQL type ${TimestampType.sql} cannot be converted to Avro logical type $other")
}
```
it assumes the `InternalRow` instance encodes `TimestampType` as `java.lang.Long`. That's true for `Unsaferow` but not for `GenericInternalRow`.
Hence the above code will end up with runtime exceptions when used on an instance of `GenericInternalRow`, which is the case for Python UDF.
I didn't get time to dig deeper than that. I got the impression that Spark's optimizer(s) will turn a row into a `UnsafeRow` and Python UDF doesn't involve the optimizer(s) and hence each row is a `GenericInternalRow`.
It would be great if someone can correct me or offer a better explanation.
To reproduce the issue,
`git checkout master` and `git cherry-pick --no-commit` [this-commit|https://github.com/Zhen-hao/spark/commit/1ffe8e8f35273b2f3529f6c2d004822f480e4c88]
and run the test `org.apache.spark.sql.avro.AvroSerdeSuite`.
You will see runtime exceptions like the following one
```
- Serialize DecimalType to Avro BYTES with logical type decimal *** FAILED ***
java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to class org.apache.spark.sql.types.Decimal (java.math.BigDecimal is in module java.base of loader 'bootstrap'; org.apache.spark.sql.types.Decimal is in unnamed module of loader 'app')
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getDecimal(rows.scala:45)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getDecimal$(rows.scala:45)
at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getDecimal(rows.scala:195)
at org.apache.spark.sql.avro.AvroSerializer.$anonfun$newConverter$10(AvroSerializer.scala:136)
at org.apache.spark.sql.avro.AvroSerializer.$anonfun$newConverter$10$adapted(AvroSerializer.scala:135)
at org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$2(AvroSerializer.scala:283)
at org.apache.spark.sql.avro.AvroSerializer.serialize(AvroSerializer.scala:60)
at org.apache.spark.sql.avro.AvroSerdeSuite.$anonfun$new$5(AvroSerdeSuite.scala:82)
at org.apache.spark.sql.avro.AvroSerdeSuite.$anonfun$new$5$adapted(AvroSerdeSuite.scala:67)
at org.apache.spark.sql.avro.AvroSerdeSuite.$anonfun$withFieldMatchType$2(AvroSerdeSuite.scala:217)
```
> AvroSerializer can cause java.lang.ClassCastException at run time
> -----------------------------------------------------------------
>
> Key: SPARK-38091
> URL: https://issues.apache.org/jira/browse/SPARK-38091
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.0.0, 3.0.1, 3.0.2, 3.0.3, 3.1.0, 3.1.1, 3.1.2, 3.2.0, 3.2.1
> Reporter: Zhenhao Li
> Priority: Major
> Labels: Avro, serializers
>
> {{{}AvroSerializer{}}}'s implementation, at least in {{{}newConverter{}}}, was not 100% based on the {{nternalRow}} and {{SpecializedGetters}} interface. It assumes many implementation details of the interface.
> For example, in
> {code}
> case (TimestampType, LONG) => avroType.getLogicalType match {
> // For backward compatibility, if the Avro type is Long and it is not logical type
> // (the `null` case), output the timestamp value as with millisecond precision.
> case null | _: TimestampMillis => (getter, ordinal) =>
> DateTimeUtils.microsToMillis(timestampRebaseFunc(getter.getLong(ordinal)))
> case _: TimestampMicros => (getter, ordinal) =>
> timestampRebaseFunc(getter.getLong(ordinal))
> case other => throw new IncompatibleSchemaException(errorPrefix +
> s"SQL type ${TimestampType.sql} cannot be converted to Avro logical type $other")
> }
> {code}
> it assumes the {{InternalRow}} instance encodes {{TimestampType}} as {{{}java.lang.Long{}}}. That's true for {{Unsaferow}} but not for {{{}GenericInternalRow{}}}.
> Hence the above code will end up with runtime exceptions when used on an instance of {{{}GenericInternalRow{}}}, which is the case for Python UDF.
> I didn't get time to dig deeper than that. I got the impression that Spark's optimizer(s) will turn a row into a {{UnsafeRow}} and Python UDF doesn't involve the optimizer(s) and hence each row is a {{{}GenericInternalRow{}}}.
> It would be great if someone can correct me or offer a better explanation.
>
> To reproduce the issue,
> {{git checkout master}} and {{git cherry-pick --no-commit}} [this-commit|https://github.com/Zhen-hao/spark/commit/1ffe8e8f35273b2f3529f6c2d004822f480e4c88]
> and run the test {{{}org.apache.spark.sql.avro.AvroSerdeSuite{}}}.
>
> You will see runtime exceptions like the following one
> \\{code}
> Serialize DecimalType to Avro BYTES with logical type decimal *** FAILED ***
> java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to class org.apache.spark.sql.types.Decimal (java.math.BigDecimal is in module java.base of loader 'bootstrap'; org.apache.spark.sql.types.Decimal is in unnamed module of loader 'app')
> at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getDecimal(rows.scala:45)
> at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getDecimal$(rows.scala:45)
> at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getDecimal(rows.scala:195)
> at org.apache.spark.sql.avro.AvroSerializer.$anonfun$newConverter$10(AvroSerializer.scala:136)
> at org.apache.spark.sql.avro.AvroSerializer.$anonfun$newConverter$10$adapted(AvroSerializer.scala:135)
> at org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$2(AvroSerializer.scala:283)
> at org.apache.spark.sql.avro.AvroSerializer.serialize(AvroSerializer.scala:60)
> at org.apache.spark.sql.avro.AvroSerdeSuite.$anonfun$new$5(AvroSerdeSuite.scala:82)
> at org.apache.spark.sql.avro.AvroSerdeSuite.$anonfun$new$5$adapted(AvroSerdeSuite.scala:67)
> at org.apache.spark.sql.avro.AvroSerdeSuite.$anonfun$withFieldMatchType$2(AvroSerdeSuite.scala:217)
> \\{code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org