You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/07/20 21:58:02 UTC

spark git commit: [SPARK-24876][SQL] Avro: simplify schema serialization

Repository: spark
Updated Branches:
  refs/heads/master 2333a34d3 -> 00b864aa7


[SPARK-24876][SQL] Avro: simplify schema serialization

## What changes were proposed in this pull request?

Previously in the refactoring of Avro Serializer and Deserializer, a new class SerializableSchema is created for serializing the Avro schema:
https://github.com/apache/spark/pull/21762/files#diff-01fea32e6ec6bcf6f34d06282e08705aR37

On second thought, we can use `toString` method for serialization. After that, parse the JSON format schema on executor. This makes the code much simpler.

## How was this patch tested?

Unit test

Author: Gengliang Wang <ge...@databricks.com>

Closes #21829 from gengliangwang/removeSerializableSchema.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/00b864aa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/00b864aa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/00b864aa

Branch: refs/heads/master
Commit: 00b864aa7054a34f3d7a118d92eae0b3c28b86e5
Parents: 2333a34
Author: Gengliang Wang <ge...@databricks.com>
Authored: Fri Jul 20 14:57:59 2018 -0700
Committer: Xiao Li <ga...@gmail.com>
Committed: Fri Jul 20 14:57:59 2018 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/avro/AvroFileFormat.scala  |  2 +-
 .../sql/avro/AvroOutputWriterFactory.scala      | 14 +++-
 .../spark/sql/avro/SerializableSchema.scala     | 69 --------------------
 .../sql/avro/SerializableSchemaSuite.scala      | 56 ----------------
 4 files changed, 12 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/00b864aa/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
index 1d0f40e..780e457 100755
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
@@ -146,7 +146,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
         log.error(s"unsupported compression codec $unknown")
     }
 
-    new AvroOutputWriterFactory(dataSchema, new SerializableSchema(outputAvroSchema))
+    new AvroOutputWriterFactory(dataSchema, outputAvroSchema.toString)
   }
 
   override def buildReader(

http://git-wip-us.apache.org/repos/asf/spark/blob/00b864aa/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala
index 18a6d93..116020e 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala
@@ -17,14 +17,22 @@
 
 package org.apache.spark.sql.avro
 
+import org.apache.avro.Schema
 import org.apache.hadoop.mapreduce.TaskAttemptContext
 
 import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
 import org.apache.spark.sql.types.StructType
 
+/**
+ * A factory that produces [[AvroOutputWriter]].
+ * @param catalystSchema Catalyst schema of input data.
+ * @param avroSchemaAsJsonString Avro schema of output result, in JSON string format.
+ */
 private[avro] class AvroOutputWriterFactory(
-    schema: StructType,
-    avroSchema: SerializableSchema) extends OutputWriterFactory {
+    catalystSchema: StructType,
+    avroSchemaAsJsonString: String) extends OutputWriterFactory {
+
+  private lazy val avroSchema = new Schema.Parser().parse(avroSchemaAsJsonString)
 
   override def getFileExtension(context: TaskAttemptContext): String = ".avro"
 
@@ -32,6 +40,6 @@ private[avro] class AvroOutputWriterFactory(
       path: String,
       dataSchema: StructType,
       context: TaskAttemptContext): OutputWriter = {
-    new AvroOutputWriter(path, context, schema, avroSchema.value)
+    new AvroOutputWriter(path, context, catalystSchema, avroSchema)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/00b864aa/external/avro/src/main/scala/org/apache/spark/sql/avro/SerializableSchema.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/SerializableSchema.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/SerializableSchema.scala
deleted file mode 100644
index ec0ddc7..0000000
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/SerializableSchema.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.avro
-
-import java.io._
-
-import scala.util.control.NonFatal
-
-import com.esotericsoftware.kryo.{Kryo, KryoSerializable}
-import com.esotericsoftware.kryo.io.{Input, Output}
-import org.apache.avro.Schema
-import org.slf4j.LoggerFactory
-
-class SerializableSchema(@transient var value: Schema)
-  extends Serializable with KryoSerializable {
-
-  @transient private[avro] lazy val log = LoggerFactory.getLogger(getClass)
-
-    private def writeObject(out: ObjectOutputStream): Unit = tryOrIOException {
-      out.defaultWriteObject()
-      out.writeUTF(value.toString())
-      out.flush()
-    }
-
-    private def readObject(in: ObjectInputStream): Unit = tryOrIOException {
-      val json = in.readUTF()
-      value = new Schema.Parser().parse(json)
-    }
-
-    private def tryOrIOException[T](block: => T): T = {
-      try {
-        block
-      } catch {
-        case e: IOException =>
-          log.error("Exception encountered", e)
-          throw e
-        case NonFatal(e) =>
-          log.error("Exception encountered", e)
-          throw new IOException(e)
-      }
-    }
-
-    def write(kryo: Kryo, out: Output): Unit = {
-      val dos = new DataOutputStream(out)
-      dos.writeUTF(value.toString())
-      dos.flush()
-    }
-
-    def read(kryo: Kryo, in: Input): Unit = {
-      val dis = new DataInputStream(in)
-      val json = dis.readUTF()
-      value = new Schema.Parser().parse(json)
-    }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/00b864aa/external/avro/src/test/scala/org/apache/spark/sql/avro/SerializableSchemaSuite.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/SerializableSchemaSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/SerializableSchemaSuite.scala
deleted file mode 100644
index 510bcbd..0000000
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/SerializableSchemaSuite.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.avro
-
-import org.apache.avro.Schema
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, SerializerInstance}
-
-class SerializableSchemaSuite extends SparkFunSuite {
-
-  private def testSerialization(serializer: SerializerInstance): Unit = {
-    val avroTypeJson =
-      s"""
-         |{
-         |  "type": "string",
-         |  "name": "my_string"
-         |}
-       """.stripMargin
-    val avroSchema = new Schema.Parser().parse(avroTypeJson)
-    val serializableSchema = new SerializableSchema(avroSchema)
-    val serialized = serializer.serialize(serializableSchema)
-
-    serializer.deserialize[Any](serialized) match {
-      case c: SerializableSchema =>
-        assert(c.log != null, "log was null")
-        assert(c.value != null, "value was null")
-        assert(c.value == avroSchema)
-      case other => fail(
-        s"Expecting ${classOf[SerializableSchema]}, but got ${other.getClass}.")
-    }
-  }
-
-  test("serialization with JavaSerializer") {
-    testSerialization(new JavaSerializer(new SparkConf()).newInstance())
-  }
-
-  test("serialization with KryoSerializer") {
-    testSerialization(new KryoSerializer(new SparkConf()).newInstance())
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org