You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2015/07/29 21:02:39 UTC

spark git commit: [SPARK-746] [CORE] Added Avro Serialization to Kryo

Repository: spark
Updated Branches:
  refs/heads/master 97906944e -> 069a4c414


[SPARK-746] [CORE] Added Avro Serialization to Kryo

Added a custom Kryo serializer for generic Avro records to reduce the network IO
involved during a shuffle. This compresses the schema and allows for users to
register their schemas ahead of time to further reduce traffic.

Currently Kryo tries to use its default serializer for generic Records, which will include
a lot of unneeded data in each record.

Author: Joseph Batchik <jo...@cloudera.com>
Author: Joseph Batchik <jo...@gmail.com>

Closes #7004 from JDrit/Avro_serialization and squashes the following commits:

8158d51 [Joseph Batchik] updated per feedback
c0cf329 [Joseph Batchik] implemented @squito suggestion for SparkEnv
dd71efe [Joseph Batchik] fixed bug with serializing
1183a48 [Joseph Batchik] updated codec settings
fa9298b [Joseph Batchik] forgot a couple of fixes
c5fe794 [Joseph Batchik] implemented @squito suggestion
0f5471a [Joseph Batchik] implemented @squito suggestion to use a codec that is already in spark
6d1925c [Joseph Batchik] fixed to changes suggested by @squito
d421bf5 [Joseph Batchik] updated pom to removed versions
ab46d10 [Joseph Batchik] Changed Avro dependency to be similar to parent
f4ae251 [Joseph Batchik] fixed serialization error in that SparkConf cannot be serialized
2b545cc [Joseph Batchik] started working on fixes for pr
97fba62 [Joseph Batchik] Added a custom Kryo serializer for generic Avro records to reduce the network IO involved during a shuffle. This compresses the schema and allows for users to register their schemas ahead of time to further reduce traffic.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/069a4c41
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/069a4c41
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/069a4c41

Branch: refs/heads/master
Commit: 069a4c414db4612d7bdb6f5615c1ba36998e5a49
Parents: 9790694
Author: Joseph Batchik <jo...@cloudera.com>
Authored: Wed Jul 29 14:02:32 2015 -0500
Committer: Imran Rashid <ir...@cloudera.com>
Committed: Wed Jul 29 14:02:32 2015 -0500

----------------------------------------------------------------------
 core/pom.xml                                    |   5 +
 .../main/scala/org/apache/spark/SparkConf.scala |  23 ++-
 .../serializer/GenericAvroSerializer.scala      | 150 +++++++++++++++++++
 .../spark/serializer/KryoSerializer.scala       |   6 +
 .../serializer/GenericAvroSerializerSuite.scala |  84 +++++++++++
 5 files changed, 267 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/069a4c41/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 95f36eb..6fa87ec 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -35,6 +35,11 @@
   <url>http://spark.apache.org/</url>
   <dependencies>
     <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro-mapred</artifactId>
+      <classifier>${avro.mapred.classifier}</classifier>
+    </dependency>
+    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/spark/blob/069a4c41/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 6cf36fb..4161792 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -18,11 +18,12 @@
 package org.apache.spark
 
 import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.LinkedHashSet
 
+import org.apache.avro.{SchemaNormalization, Schema}
+
 import org.apache.spark.serializer.KryoSerializer
 import org.apache.spark.util.Utils
 
@@ -161,6 +162,26 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
     this
   }
 
+  private final val avroNamespace = "avro.schema."
+
+  /**
+   * Use Kryo serialization and register the given set of Avro schemas so that the generic
+   * record serializer can decrease network IO
+   */
+  def registerAvroSchemas(schemas: Schema*): SparkConf = {
+    for (schema <- schemas) {
+      set(avroNamespace + SchemaNormalization.parsingFingerprint64(schema), schema.toString)
+    }
+    this
+  }
+
+  /** Gets all the avro schemas in the configuration used in the generic Avro record serializer */
+  def getAvroSchema: Map[Long, String] = {
+    getAll.filter { case (k, v) => k.startsWith(avroNamespace) }
+      .map { case (k, v) => (k.substring(avroNamespace.length).toLong, v) }
+      .toMap
+  }
+
   /** Remove a parameter from the configuration */
   def remove(key: String): SparkConf = {
     settings.remove(key)

http://git-wip-us.apache.org/repos/asf/spark/blob/069a4c41/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
new file mode 100644
index 0000000..62f8aae
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.nio.ByteBuffer
+
+import scala.collection.mutable
+
+import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer}
+import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
+import org.apache.avro.{Schema, SchemaNormalization}
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.io._
+import org.apache.commons.io.IOUtils
+
+import org.apache.spark.{SparkException, SparkEnv}
+import org.apache.spark.io.CompressionCodec
+
+/**
+ * Custom serializer used for generic Avro records. If the user registers the schemas
+ * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual
+ * schema, as to reduce network IO.
+ * Actions like parsing or compressing schemas are computationally expensive so the serializer
+ * caches all previously seen values as to reduce the amount of work needed to do.
+ * @param schemas a map where the keys are unique IDs for Avro schemas and the values are the
+ *                string representation of the Avro schema, used to decrease the amount of data
+ *                that needs to be serialized.
+ */
+private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
+  extends KSerializer[GenericRecord] {
+
+  /** Used to reduce the amount of effort to compress the schema */
+  private val compressCache = new mutable.HashMap[Schema, Array[Byte]]()
+  private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]()
+
+  /** Reuses the same datum reader/writer since the same schema will be used many times */
+  private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]()
+  private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]()
+
+  /** Fingerprinting is very expensive so this alleviates most of the work */
+  private val fingerprintCache = new mutable.HashMap[Schema, Long]()
+  private val schemaCache = new mutable.HashMap[Long, Schema]()
+
+  // GenericAvroSerializer can't take a SparkConf in the constructor b/c then it would become
+  // a member of KryoSerializer, which would make KryoSerializer not Serializable.  We make
+  // the codec lazy here just b/c in some unit tests, we use a KryoSerializer w/out having
+  // the SparkEnv set (note those tests would fail if they tried to serialize avro data).
+  private lazy val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
+
+  /**
+   * Used to compress Schemas when they are being sent over the wire.
+   * The compression results are memoized to reduce the compression time since the
+   * same schema is compressed many times over
+   */
+  def compress(schema: Schema): Array[Byte] = compressCache.getOrElseUpdate(schema, {
+    val bos = new ByteArrayOutputStream()
+    val out = codec.compressedOutputStream(bos)
+    out.write(schema.toString.getBytes("UTF-8"))
+    out.close()
+    bos.toByteArray
+  })
+
+  /**
+   * Decompresses the schema into the actual in-memory object. Keeps an internal cache of already
+   * seen values so to limit the number of times that decompression has to be done.
+   */
+  def decompress(schemaBytes: ByteBuffer): Schema = decompressCache.getOrElseUpdate(schemaBytes, {
+    val bis = new ByteArrayInputStream(schemaBytes.array())
+    val bytes = IOUtils.toByteArray(codec.compressedInputStream(bis))
+    new Schema.Parser().parse(new String(bytes, "UTF-8"))
+  })
+
+  /**
+   * Serializes a record to the given output stream. It caches a lot of the internal data as
+   * to not redo work
+   */
+  def serializeDatum[R <: GenericRecord](datum: R, output: KryoOutput): Unit = {
+    val encoder = EncoderFactory.get.binaryEncoder(output, null)
+    val schema = datum.getSchema
+    val fingerprint = fingerprintCache.getOrElseUpdate(schema, {
+      SchemaNormalization.parsingFingerprint64(schema)
+    })
+    schemas.get(fingerprint) match {
+      case Some(_) =>
+        output.writeBoolean(true)
+        output.writeLong(fingerprint)
+      case None =>
+        output.writeBoolean(false)
+        val compressedSchema = compress(schema)
+        output.writeInt(compressedSchema.length)
+        output.writeBytes(compressedSchema)
+    }
+
+    writerCache.getOrElseUpdate(schema, GenericData.get.createDatumWriter(schema))
+      .asInstanceOf[DatumWriter[R]]
+      .write(datum, encoder)
+    encoder.flush()
+  }
+
+  /**
+   * Deserializes generic records into their in-memory form. There is internal
+   * state to keep a cache of already seen schemas and datum readers.
+   */
+  def deserializeDatum(input: KryoInput): GenericRecord = {
+    val schema = {
+      if (input.readBoolean()) {
+        val fingerprint = input.readLong()
+        schemaCache.getOrElseUpdate(fingerprint, {
+          schemas.get(fingerprint) match {
+            case Some(s) => new Schema.Parser().parse(s)
+            case None =>
+              throw new SparkException(
+                "Error reading attempting to read avro data -- encountered an unknown " +
+                  s"fingerprint: $fingerprint, not sure what schema to use.  This could happen " +
+                  "if you registered additional schemas after starting your spark context.")
+          }
+        })
+      } else {
+        val length = input.readInt()
+        decompress(ByteBuffer.wrap(input.readBytes(length)))
+      }
+    }
+    val decoder = DecoderFactory.get.directBinaryDecoder(input, null)
+    readerCache.getOrElseUpdate(schema, GenericData.get.createDatumReader(schema))
+      .asInstanceOf[DatumReader[GenericRecord]]
+      .read(null, decoder)
+  }
+
+  override def write(kryo: Kryo, output: KryoOutput, datum: GenericRecord): Unit =
+    serializeDatum(datum, output)
+
+  override def read(kryo: Kryo, input: KryoInput, datumClass: Class[GenericRecord]): GenericRecord =
+    deserializeDatum(input)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/069a4c41/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 7cb6e08..0ff7562 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -27,6 +27,7 @@ import com.esotericsoftware.kryo.{Kryo, KryoException}
 import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
 import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
 import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
+import org.apache.avro.generic.{GenericData, GenericRecord}
 import org.roaringbitmap.{ArrayContainer, BitmapContainer, RoaringArray, RoaringBitmap}
 
 import org.apache.spark._
@@ -73,6 +74,8 @@ class KryoSerializer(conf: SparkConf)
     .split(',')
     .filter(!_.isEmpty)
 
+  private val avroSchemas = conf.getAvroSchema
+
   def newKryoOutput(): KryoOutput = new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
 
   def newKryo(): Kryo = {
@@ -101,6 +104,9 @@ class KryoSerializer(conf: SparkConf)
     kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
     kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer())
 
+    kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas))
+    kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas))
+
     try {
       // scalastyle:off classforname
       // Use the default classloader when calling the user registrator.

http://git-wip-us.apache.org/repos/asf/spark/blob/069a4c41/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala
new file mode 100644
index 0000000..bc9f370
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.nio.ByteBuffer
+
+import com.esotericsoftware.kryo.io.{Output, Input}
+import org.apache.avro.{SchemaBuilder, Schema}
+import org.apache.avro.generic.GenericData.Record
+
+import org.apache.spark.{SparkFunSuite, SharedSparkContext}
+
+class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext {
+  conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+
+  val schema : Schema = SchemaBuilder
+    .record("testRecord").fields()
+    .requiredString("data")
+    .endRecord()
+  val record = new Record(schema)
+  record.put("data", "test data")
+
+  test("schema compression and decompression") {
+    val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
+    assert(schema === genericSer.decompress(ByteBuffer.wrap(genericSer.compress(schema))))
+  }
+
+  test("record serialization and deserialization") {
+    val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
+
+    val outputStream = new ByteArrayOutputStream()
+    val output = new Output(outputStream)
+    genericSer.serializeDatum(record, output)
+    output.flush()
+    output.close()
+
+    val input = new Input(new ByteArrayInputStream(outputStream.toByteArray))
+    assert(genericSer.deserializeDatum(input) === record)
+  }
+
+  test("uses schema fingerprint to decrease message size") {
+    val genericSerFull = new GenericAvroSerializer(conf.getAvroSchema)
+
+    val output = new Output(new ByteArrayOutputStream())
+
+    val beginningNormalPosition = output.total()
+    genericSerFull.serializeDatum(record, output)
+    output.flush()
+    val normalLength = output.total - beginningNormalPosition
+
+    conf.registerAvroSchemas(schema)
+    val genericSerFinger = new GenericAvroSerializer(conf.getAvroSchema)
+    val beginningFingerprintPosition = output.total()
+    genericSerFinger.serializeDatum(record, output)
+    val fingerprintLength = output.total - beginningFingerprintPosition
+
+    assert(fingerprintLength < normalLength)
+  }
+
+  test("caches previously seen schemas") {
+    val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
+    val compressedSchema = genericSer.compress(schema)
+    val decompressedScheam = genericSer.decompress(ByteBuffer.wrap(compressedSchema))
+
+    assert(compressedSchema.eq(genericSer.compress(schema)))
+    assert(decompressedScheam.eq(genericSer.decompress(ByteBuffer.wrap(compressedSchema))))
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org