You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/07/20 21:58:02 UTC
spark git commit: [SPARK-24876][SQL] Avro: simplify schema
serialization
Repository: spark
Updated Branches:
refs/heads/master 2333a34d3 -> 00b864aa7
[SPARK-24876][SQL] Avro: simplify schema serialization
## What changes were proposed in this pull request?
Previously in the refactoring of Avro Serializer and Deserializer, a new class SerializableSchema is created for serializing the Avro schema:
https://github.com/apache/spark/pull/21762/files#diff-01fea32e6ec6bcf6f34d06282e08705aR37
On second thought, we can use `toString` method for serialization. After that, parse the JSON format schema on executor. This makes the code much simpler.
## How was this patch tested?
Unit test
Author: Gengliang Wang <ge...@databricks.com>
Closes #21829 from gengliangwang/removeSerializableSchema.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/00b864aa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/00b864aa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/00b864aa
Branch: refs/heads/master
Commit: 00b864aa7054a34f3d7a118d92eae0b3c28b86e5
Parents: 2333a34
Author: Gengliang Wang <ge...@databricks.com>
Authored: Fri Jul 20 14:57:59 2018 -0700
Committer: Xiao Li <ga...@gmail.com>
Committed: Fri Jul 20 14:57:59 2018 -0700
----------------------------------------------------------------------
.../apache/spark/sql/avro/AvroFileFormat.scala | 2 +-
.../sql/avro/AvroOutputWriterFactory.scala | 14 +++-
.../spark/sql/avro/SerializableSchema.scala | 69 --------------------
.../sql/avro/SerializableSchemaSuite.scala | 56 ----------------
4 files changed, 12 insertions(+), 129 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/00b864aa/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
index 1d0f40e..780e457 100755
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
@@ -146,7 +146,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
log.error(s"unsupported compression codec $unknown")
}
- new AvroOutputWriterFactory(dataSchema, new SerializableSchema(outputAvroSchema))
+ new AvroOutputWriterFactory(dataSchema, outputAvroSchema.toString)
}
override def buildReader(
http://git-wip-us.apache.org/repos/asf/spark/blob/00b864aa/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala
index 18a6d93..116020e 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala
@@ -17,14 +17,22 @@
package org.apache.spark.sql.avro
+import org.apache.avro.Schema
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.types.StructType
+/**
+ * A factory that produces [[AvroOutputWriter]].
+ * @param catalystSchema Catalyst schema of input data.
+ * @param avroSchemaAsJsonString Avro schema of output result, in JSON string format.
+ */
private[avro] class AvroOutputWriterFactory(
- schema: StructType,
- avroSchema: SerializableSchema) extends OutputWriterFactory {
+ catalystSchema: StructType,
+ avroSchemaAsJsonString: String) extends OutputWriterFactory {
+
+ private lazy val avroSchema = new Schema.Parser().parse(avroSchemaAsJsonString)
override def getFileExtension(context: TaskAttemptContext): String = ".avro"
@@ -32,6 +40,6 @@ private[avro] class AvroOutputWriterFactory(
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
- new AvroOutputWriter(path, context, schema, avroSchema.value)
+ new AvroOutputWriter(path, context, catalystSchema, avroSchema)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/00b864aa/external/avro/src/main/scala/org/apache/spark/sql/avro/SerializableSchema.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/SerializableSchema.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/SerializableSchema.scala
deleted file mode 100644
index ec0ddc7..0000000
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/SerializableSchema.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.avro
-
-import java.io._
-
-import scala.util.control.NonFatal
-
-import com.esotericsoftware.kryo.{Kryo, KryoSerializable}
-import com.esotericsoftware.kryo.io.{Input, Output}
-import org.apache.avro.Schema
-import org.slf4j.LoggerFactory
-
-class SerializableSchema(@transient var value: Schema)
- extends Serializable with KryoSerializable {
-
- @transient private[avro] lazy val log = LoggerFactory.getLogger(getClass)
-
- private def writeObject(out: ObjectOutputStream): Unit = tryOrIOException {
- out.defaultWriteObject()
- out.writeUTF(value.toString())
- out.flush()
- }
-
- private def readObject(in: ObjectInputStream): Unit = tryOrIOException {
- val json = in.readUTF()
- value = new Schema.Parser().parse(json)
- }
-
- private def tryOrIOException[T](block: => T): T = {
- try {
- block
- } catch {
- case e: IOException =>
- log.error("Exception encountered", e)
- throw e
- case NonFatal(e) =>
- log.error("Exception encountered", e)
- throw new IOException(e)
- }
- }
-
- def write(kryo: Kryo, out: Output): Unit = {
- val dos = new DataOutputStream(out)
- dos.writeUTF(value.toString())
- dos.flush()
- }
-
- def read(kryo: Kryo, in: Input): Unit = {
- val dis = new DataInputStream(in)
- val json = dis.readUTF()
- value = new Schema.Parser().parse(json)
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/00b864aa/external/avro/src/test/scala/org/apache/spark/sql/avro/SerializableSchemaSuite.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/SerializableSchemaSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/SerializableSchemaSuite.scala
deleted file mode 100644
index 510bcbd..0000000
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/SerializableSchemaSuite.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.avro
-
-import org.apache.avro.Schema
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, SerializerInstance}
-
-class SerializableSchemaSuite extends SparkFunSuite {
-
- private def testSerialization(serializer: SerializerInstance): Unit = {
- val avroTypeJson =
- s"""
- |{
- | "type": "string",
- | "name": "my_string"
- |}
- """.stripMargin
- val avroSchema = new Schema.Parser().parse(avroTypeJson)
- val serializableSchema = new SerializableSchema(avroSchema)
- val serialized = serializer.serialize(serializableSchema)
-
- serializer.deserialize[Any](serialized) match {
- case c: SerializableSchema =>
- assert(c.log != null, "log was null")
- assert(c.value != null, "value was null")
- assert(c.value == avroSchema)
- case other => fail(
- s"Expecting ${classOf[SerializableSchema]}, but got ${other.getClass}.")
- }
- }
-
- test("serialization with JavaSerializer") {
- testSerialization(new JavaSerializer(new SparkConf()).newInstance())
- }
-
- test("serialization with KryoSerializer") {
- testSerialization(new KryoSerializer(new SparkConf()).newInstance())
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org