You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/08/05 03:27:02 UTC

[spark] branch branch-3.2 updated: [SPARK-39775][CORE][AVRO] Disable validate default values when parsing Avro schemas

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 0e5812c49d2 [SPARK-39775][CORE][AVRO] Disable validate default values when parsing Avro schemas
0e5812c49d2 is described below

commit 0e5812c49d2552d8779f94fbaad2fc1b69d8a9e8
Author: Yuming Wang <yu...@ebay.com>
AuthorDate: Fri Aug 5 11:25:51 2022 +0800

    [SPARK-39775][CORE][AVRO] Disable validate default values when parsing Avro schemas
    
    ### What changes were proposed in this pull request?
    
    This PR disables validate default values when parsing Avro schemas.
    
    ### Why are the changes needed?
    
    Spark will throw exception if upgrade to Spark 3.2. We have fixed the Hive serde tables before: SPARK-34512.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit test.
    
    Closes #37191 from wangyum/SPARK-39775.
    
    Authored-by: Yuming Wang <yu...@ebay.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 5c1b99f441ec5e178290637a9a9e7902aaa116e1)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/serializer/GenericAvroSerializer.scala   |  4 +--
 .../serializer/GenericAvroSerializerSuite.scala    | 16 +++++++++++
 .../apache/spark/sql/avro/AvroDataToCatalyst.scala |  3 +-
 .../org/apache/spark/sql/avro/AvroOptions.scala    |  4 +--
 .../apache/spark/sql/avro/CatalystDataToAvro.scala |  2 +-
 .../apache/spark/sql/avro/AvroFunctionsSuite.scala | 32 ++++++++++++++++++++++
 6 files changed, 55 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
index c1ef3ee769a..7d2923fdf37 100644
--- a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
@@ -97,7 +97,7 @@ private[serializer] class GenericAvroSerializer[D <: GenericContainer]
     } {
       in.close()
     }
-    new Schema.Parser().parse(new String(bytes, StandardCharsets.UTF_8))
+    new Schema.Parser().setValidateDefaults(false).parse(new String(bytes, StandardCharsets.UTF_8))
   })
 
   /**
@@ -137,7 +137,7 @@ private[serializer] class GenericAvroSerializer[D <: GenericContainer]
         val fingerprint = input.readLong()
         schemaCache.getOrElseUpdate(fingerprint, {
           schemas.get(fingerprint) match {
-            case Some(s) => new Schema.Parser().parse(s)
+            case Some(s) => new Schema.Parser().setValidateDefaults(false).parse(s)
             case None =>
               throw new SparkException(
                 "Error reading attempting to read avro data -- encountered an unknown " +
diff --git a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala
index 54e4aebe544..98493c12f59 100644
--- a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala
@@ -110,4 +110,20 @@ class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext {
       assert(rdd.collect() sameElements Array.fill(10)(datum))
     }
   }
+
+  test("SPARK-39775: Disable validate default values when parsing Avro schemas") {
+    val avroTypeStruct = s"""
+      |{
+      |  "type": "record",
+      |  "name": "struct",
+      |  "fields": [
+      |    {"name": "id", "type": "long", "default": null}
+      |  ]
+      |}
+    """.stripMargin
+    val schema = new Schema.Parser().setValidateDefaults(false).parse(avroTypeStruct)
+
+    val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
+    assert(schema === genericSer.decompress(ByteBuffer.wrap(genericSer.compress(schema))))
+  }
 }
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
index b4965003ba3..c4a4b16b052 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
@@ -53,7 +53,8 @@ private[avro] case class AvroDataToCatalyst(
 
   private lazy val avroOptions = AvroOptions(options)
 
-  @transient private lazy val actualSchema = new Schema.Parser().parse(jsonFormatSchema)
+  @transient private lazy val actualSchema =
+    new Schema.Parser().setValidateDefaults(false).parse(jsonFormatSchema)
 
   @transient private lazy val expectedSchema = avroOptions.schema.getOrElse(actualSchema)
 
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
index 9fe50079b24..a505a1656fc 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
@@ -52,13 +52,13 @@ private[sql] class AvroOptions(
    * instead of "string" type in the default converted schema.
    */
   val schema: Option[Schema] = {
-    parameters.get("avroSchema").map(new Schema.Parser().parse).orElse({
+    parameters.get("avroSchema").map(new Schema.Parser().setValidateDefaults(false).parse).orElse({
       val avroUrlSchema = parameters.get("avroSchemaUrl").map(url => {
         log.debug("loading avro schema from url: " + url)
         val fs = FileSystem.get(new URI(url), conf)
         val in = fs.open(new Path(url))
         try {
-          new Schema.Parser().parse(in)
+          new Schema.Parser().setValidateDefaults(false).parse(in)
         } finally {
           in.close()
         }
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala
index 5d79c44ad42..1e7e8600977 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala
@@ -35,7 +35,7 @@ private[avro] case class CatalystDataToAvro(
 
   @transient private lazy val avroType =
     jsonFormatSchema
-      .map(new Schema.Parser().parse)
+      .map(new Schema.Parser().setValidateDefaults(false).parse)
       .getOrElse(SchemaConverters.toAvroType(child.dataType, child.nullable))
 
   @transient private lazy val serializer =
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
index c9e0d434469..69cda3efb52 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.LocalTableScanExec
 import org.apache.spark.sql.functions.{col, lit, struct}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.StructType
 
 class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
   import testImplicits._
@@ -238,4 +239,35 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
       assert(message.contains("Only UNION of a null type and a non-null type is supported"))
     }
   }
+
+  test("SPARK-39775: Disable validate default values when parsing Avro schemas") {
+    val avroTypeStruct = s"""
+      |{
+      |  "type": "record",
+      |  "name": "struct",
+      |  "fields": [
+      |    {"name": "id", "type": "long", "default": null}
+      |  ]
+      |}
+    """.stripMargin
+    val avroSchema = AvroOptions(Map("avroSchema" -> avroTypeStruct)).schema.get
+    val sparkSchema = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
+
+    val df = spark.range(5).select($"id")
+    val structDf = df.select(struct($"id").as("struct"))
+    val avroStructDF = structDf.select(functions.to_avro('struct, avroTypeStruct).as("avro"))
+    checkAnswer(avroStructDF.select(functions.from_avro('avro, avroTypeStruct)), structDf)
+
+    withTempPath { dir =>
+      df.write.format("avro").save(dir.getCanonicalPath)
+      checkAnswer(spark.read.schema(sparkSchema).format("avro").load(dir.getCanonicalPath), df)
+
+      val msg = intercept[SparkException] {
+        spark.read.option("avroSchema", avroTypeStruct).format("avro")
+          .load(dir.getCanonicalPath)
+          .collect()
+      }.getCause.getMessage
+      assert(msg.contains("Invalid default for field id: null not a \"long\""))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org