You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/08/05 03:27:02 UTC
[spark] branch branch-3.2 updated: [SPARK-39775][CORE][AVRO] Disable validate default values when parsing Avro schemas
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 0e5812c49d2 [SPARK-39775][CORE][AVRO] Disable validate default values when parsing Avro schemas
0e5812c49d2 is described below
commit 0e5812c49d2552d8779f94fbaad2fc1b69d8a9e8
Author: Yuming Wang <yu...@ebay.com>
AuthorDate: Fri Aug 5 11:25:51 2022 +0800
[SPARK-39775][CORE][AVRO] Disable validate default values when parsing Avro schemas
### What changes were proposed in this pull request?
This PR disables validate default values when parsing Avro schemas.
### Why are the changes needed?
Spark will throw exception if upgrade to Spark 3.2. We have fixed the Hive serde tables before: SPARK-34512.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit test.
Closes #37191 from wangyum/SPARK-39775.
Authored-by: Yuming Wang <yu...@ebay.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 5c1b99f441ec5e178290637a9a9e7902aaa116e1)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/serializer/GenericAvroSerializer.scala | 4 +--
.../serializer/GenericAvroSerializerSuite.scala | 16 +++++++++++
.../apache/spark/sql/avro/AvroDataToCatalyst.scala | 3 +-
.../org/apache/spark/sql/avro/AvroOptions.scala | 4 +--
.../apache/spark/sql/avro/CatalystDataToAvro.scala | 2 +-
.../apache/spark/sql/avro/AvroFunctionsSuite.scala | 32 ++++++++++++++++++++++
6 files changed, 55 insertions(+), 6 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
index c1ef3ee769a..7d2923fdf37 100644
--- a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
@@ -97,7 +97,7 @@ private[serializer] class GenericAvroSerializer[D <: GenericContainer]
} {
in.close()
}
- new Schema.Parser().parse(new String(bytes, StandardCharsets.UTF_8))
+ new Schema.Parser().setValidateDefaults(false).parse(new String(bytes, StandardCharsets.UTF_8))
})
/**
@@ -137,7 +137,7 @@ private[serializer] class GenericAvroSerializer[D <: GenericContainer]
val fingerprint = input.readLong()
schemaCache.getOrElseUpdate(fingerprint, {
schemas.get(fingerprint) match {
- case Some(s) => new Schema.Parser().parse(s)
+ case Some(s) => new Schema.Parser().setValidateDefaults(false).parse(s)
case None =>
throw new SparkException(
"Error reading attempting to read avro data -- encountered an unknown " +
diff --git a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala
index 54e4aebe544..98493c12f59 100644
--- a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala
@@ -110,4 +110,20 @@ class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext {
assert(rdd.collect() sameElements Array.fill(10)(datum))
}
}
+
+ test("SPARK-39775: Disable validate default values when parsing Avro schemas") {
+ val avroTypeStruct = s"""
+ |{
+ | "type": "record",
+ | "name": "struct",
+ | "fields": [
+ | {"name": "id", "type": "long", "default": null}
+ | ]
+ |}
+ """.stripMargin
+ val schema = new Schema.Parser().setValidateDefaults(false).parse(avroTypeStruct)
+
+ val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
+ assert(schema === genericSer.decompress(ByteBuffer.wrap(genericSer.compress(schema))))
+ }
}
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
index b4965003ba3..c4a4b16b052 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
@@ -53,7 +53,8 @@ private[avro] case class AvroDataToCatalyst(
private lazy val avroOptions = AvroOptions(options)
- @transient private lazy val actualSchema = new Schema.Parser().parse(jsonFormatSchema)
+ @transient private lazy val actualSchema =
+ new Schema.Parser().setValidateDefaults(false).parse(jsonFormatSchema)
@transient private lazy val expectedSchema = avroOptions.schema.getOrElse(actualSchema)
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
index 9fe50079b24..a505a1656fc 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
@@ -52,13 +52,13 @@ private[sql] class AvroOptions(
* instead of "string" type in the default converted schema.
*/
val schema: Option[Schema] = {
- parameters.get("avroSchema").map(new Schema.Parser().parse).orElse({
+ parameters.get("avroSchema").map(new Schema.Parser().setValidateDefaults(false).parse).orElse({
val avroUrlSchema = parameters.get("avroSchemaUrl").map(url => {
log.debug("loading avro schema from url: " + url)
val fs = FileSystem.get(new URI(url), conf)
val in = fs.open(new Path(url))
try {
- new Schema.Parser().parse(in)
+ new Schema.Parser().setValidateDefaults(false).parse(in)
} finally {
in.close()
}
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala
index 5d79c44ad42..1e7e8600977 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala
@@ -35,7 +35,7 @@ private[avro] case class CatalystDataToAvro(
@transient private lazy val avroType =
jsonFormatSchema
- .map(new Schema.Parser().parse)
+ .map(new Schema.Parser().setValidateDefaults(false).parse)
.getOrElse(SchemaConverters.toAvroType(child.dataType, child.nullable))
@transient private lazy val serializer =
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
index c9e0d434469..69cda3efb52 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.LocalTableScanExec
import org.apache.spark.sql.functions.{col, lit, struct}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.StructType
class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
import testImplicits._
@@ -238,4 +239,35 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
assert(message.contains("Only UNION of a null type and a non-null type is supported"))
}
}
+
+ test("SPARK-39775: Disable validate default values when parsing Avro schemas") {
+ val avroTypeStruct = s"""
+ |{
+ | "type": "record",
+ | "name": "struct",
+ | "fields": [
+ | {"name": "id", "type": "long", "default": null}
+ | ]
+ |}
+ """.stripMargin
+ val avroSchema = AvroOptions(Map("avroSchema" -> avroTypeStruct)).schema.get
+ val sparkSchema = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
+
+ val df = spark.range(5).select($"id")
+ val structDf = df.select(struct($"id").as("struct"))
+ val avroStructDF = structDf.select(functions.to_avro('struct, avroTypeStruct).as("avro"))
+ checkAnswer(avroStructDF.select(functions.from_avro('avro, avroTypeStruct)), structDf)
+
+ withTempPath { dir =>
+ df.write.format("avro").save(dir.getCanonicalPath)
+ checkAnswer(spark.read.schema(sparkSchema).format("avro").load(dir.getCanonicalPath), df)
+
+ val msg = intercept[SparkException] {
+ spark.read.option("avroSchema", avroTypeStruct).format("avro")
+ .load(dir.getCanonicalPath)
+ .collect()
+ }.getCause.getMessage
+ assert(msg.contains("Invalid default for field id: null not a \"long\""))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org