You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/05/06 19:53:11 UTC

spark git commit: [SPARK-7311] Introduce internal Serializer API for determining if serializers support object relocation

Repository: spark
Updated Branches:
  refs/heads/master f2c47082c -> 002c12384


[SPARK-7311] Introduce internal Serializer API for determining if serializers support object relocation

This patch extends the `Serializer` interface with a new `Private` API which allows serializers to indicate whether they support relocation of serialized objects in serializer stream output.

This relocatibilty property is described in more detail in `Serializer.scala`, but in a nutshell a serializer supports relocation if reordering the bytes of serialized objects in serialization stream output is equivalent to having re-ordered those elements prior to serializing them.  The optimized shuffle path introduced in #4450 and #5868 both rely on serializers having this property; this patch just centralizes the logic for determining whether a serializer has this property.  I also added tests and comments clarifying when this works for KryoSerializer.

This change allows the optimizations in #4450 to be applied for shuffles that use `SqlSerializer2`.

Author: Josh Rosen <jo...@databricks.com>

Closes #5924 from JoshRosen/SPARK-7311 and squashes the following commits:

50a68ca [Josh Rosen] Address minor nits
0a7ebd7 [Josh Rosen] Clarify reason why SqlSerializer2 supports this serializer
123b992 [Josh Rosen] Cleanup for submitting as standalone patch.
4aa61b2 [Josh Rosen] Add missing newline
2c1233a [Josh Rosen] Small refactoring of SerializerPropertiesSuite to enable test re-use:
0ba75e6 [Josh Rosen] Add tests for serializer relocation property.
450fa21 [Josh Rosen] Back out accidental log4j.properties change
86d4dcd [Josh Rosen] Flag that SparkSqlSerializer2 supports relocation
b9624ee [Josh Rosen] Expand serializer API and use new function to help control when new UnsafeShuffle path is used.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/002c1238
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/002c1238
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/002c1238

Branch: refs/heads/master
Commit: 002c12384d6ecebbb3e7fc853dbdfbc5aaa3d6a6
Parents: f2c4708
Author: Josh Rosen <jo...@databricks.com>
Authored: Wed May 6 10:52:55 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Wed May 6 10:52:55 2015 -0700

----------------------------------------------------------------------
 .../spark/serializer/KryoSerializer.scala       |   7 ++
 .../apache/spark/serializer/Serializer.scala    |  35 +++++-
 .../spark/util/collection/ExternalSorter.scala  |   3 +-
 .../serializer/SerializerPropertiesSuite.scala  | 119 +++++++++++++++++++
 .../sql/execution/SparkSqlSerializer2.scala     |   5 +
 5 files changed, 166 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/002c1238/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index b7bc087..f9f7885 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -125,6 +125,13 @@ class KryoSerializer(conf: SparkConf)
   override def newInstance(): SerializerInstance = {
     new KryoSerializerInstance(this)
   }
+
+  private[spark] override lazy val supportsRelocationOfSerializedObjects: Boolean = {
+    // If auto-reset is disabled, then Kryo may store references to duplicate occurrences of objects
+    // in the stream rather than writing those objects' serialized bytes, breaking relocation. See
+    // https://groups.google.com/d/msg/kryo-users/6ZUSyfjjtdo/FhGG1KHDXPgJ for more details.
+    newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset()
+  }
 }
 
 private[spark]

http://git-wip-us.apache.org/repos/asf/spark/blob/002c1238/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
index c381672..6078c9d 100644
--- a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
@@ -23,7 +23,7 @@ import java.nio.ByteBuffer
 import scala.reflect.ClassTag
 
 import org.apache.spark.{SparkConf, SparkEnv}
-import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.annotation.{DeveloperApi, Private}
 import org.apache.spark.util.{Utils, ByteBufferInputStream, NextIterator}
 
 /**
@@ -63,6 +63,39 @@ abstract class Serializer {
 
   /** Creates a new [[SerializerInstance]]. */
   def newInstance(): SerializerInstance
+
+  /**
+   * :: Private ::
+   * Returns true if this serializer supports relocation of its serialized objects and false
+   * otherwise. This should return true if and only if reordering the bytes of serialized objects
+   * in serialization stream output is equivalent to having re-ordered those elements prior to
+   * serializing them. More specifically, the following should hold if a serializer supports
+   * relocation:
+   *
+   * {{{
+   * serOut.open()
+   * position = 0
+   * serOut.write(obj1)
+   * serOut.flush()
+   * position = # of bytes writen to stream so far
+   * obj1Bytes = output[0:position-1]
+   * serOut.write(obj2)
+   * serOut.flush()
+   * position2 = # of bytes written to stream so far
+   * obj2Bytes = output[position:position2-1]
+   * serIn.open([obj2bytes] concatenate [obj1bytes]) should return (obj2, obj1)
+   * }}}
+   *
+   * In general, this property should hold for serializers that are stateless and that do not
+   * write special metadata at the beginning or end of the serialization stream.
+   *
+   * This API is private to Spark; this method should not be overridden in third-party subclasses
+   * or called in user code and is subject to removal in future Spark releases.
+   *
+   * See SPARK-7311 for more details.
+   */
+  @Private
+  private[spark] def supportsRelocationOfSerializedObjects: Boolean = false
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/002c1238/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index b7306cd..7d5cf7b 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -131,8 +131,7 @@ private[spark] class ExternalSorter[K, V, C](
   private val kvChunkSize = conf.getInt("spark.shuffle.sort.kvChunkSize", 1 << 22) // 4 MB
   private val useSerializedPairBuffer =
     !ordering.isDefined && conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) &&
-    ser.isInstanceOf[KryoSerializer] &&
-    serInstance.asInstanceOf[KryoSerializerInstance].getAutoReset
+    ser.supportsRelocationOfSerializedObjects
 
   // Data structures to store in-memory objects before we spill. Depending on whether we have an
   // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we

http://git-wip-us.apache.org/repos/asf/spark/blob/002c1238/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala b/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala
new file mode 100644
index 0000000..bb34033
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+
+import scala.util.Random
+
+import org.scalatest.{Assertions, FunSuite}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.serializer.KryoTest.RegistratorWithoutAutoReset
+
+/**
+ * Tests to ensure that [[Serializer]] implementations obey the API contracts for methods that
+ * describe properties of the serialized stream, such as
+ * [[Serializer.supportsRelocationOfSerializedObjects]].
+ */
+class SerializerPropertiesSuite extends FunSuite {
+
+  import SerializerPropertiesSuite._
+
+  test("JavaSerializer does not support relocation") {
+    // Per a comment on the SPARK-4550 JIRA ticket, Java serialization appears to write out the
+    // full class name the first time an object is written to an output stream, but subsequent
+    // references to the class write a more compact identifier; this prevents relocation.
+    val ser = new JavaSerializer(new SparkConf())
+    testSupportsRelocationOfSerializedObjects(ser, generateRandomItem)
+  }
+
+  test("KryoSerializer supports relocation when auto-reset is enabled") {
+    val ser = new KryoSerializer(new SparkConf)
+    assert(ser.newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset())
+    testSupportsRelocationOfSerializedObjects(ser, generateRandomItem)
+  }
+
+  test("KryoSerializer does not support relocation when auto-reset is disabled") {
+    val conf = new SparkConf().set("spark.kryo.registrator",
+      classOf[RegistratorWithoutAutoReset].getName)
+    val ser = new KryoSerializer(conf)
+    assert(!ser.newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset())
+    testSupportsRelocationOfSerializedObjects(ser, generateRandomItem)
+  }
+
+}
+
+object SerializerPropertiesSuite extends Assertions {
+
+  def generateRandomItem(rand: Random): Any = {
+    val randomFunctions: Seq[() => Any] = Seq(
+      () => rand.nextInt(),
+      () => rand.nextString(rand.nextInt(10)),
+      () => rand.nextDouble(),
+      () => rand.nextBoolean(),
+      () => (rand.nextInt(), rand.nextString(rand.nextInt(10))),
+      () => MyCaseClass(rand.nextInt(), rand.nextString(rand.nextInt(10))),
+      () => {
+        val x = MyCaseClass(rand.nextInt(), rand.nextString(rand.nextInt(10)))
+        (x, x)
+      }
+    )
+    randomFunctions(rand.nextInt(randomFunctions.size)).apply()
+  }
+
+  def testSupportsRelocationOfSerializedObjects(
+      serializer: Serializer,
+      generateRandomItem: Random => Any): Unit = {
+    if (!serializer.supportsRelocationOfSerializedObjects) {
+      return
+    }
+    val NUM_TRIALS = 5
+    val rand = new Random(42)
+    for (_ <- 1 to NUM_TRIALS) {
+      val items = {
+        // Make sure that we have duplicate occurrences of the same object in the stream:
+        val randomItems = Seq.fill(10)(generateRandomItem(rand))
+        randomItems ++ randomItems.take(5)
+      }
+      val baos = new ByteArrayOutputStream()
+      val serStream = serializer.newInstance().serializeStream(baos)
+      def serializeItem(item: Any): Array[Byte] = {
+        val itemStartOffset = baos.toByteArray.length
+        serStream.writeObject(item)
+        serStream.flush()
+        val itemEndOffset = baos.toByteArray.length
+        baos.toByteArray.slice(itemStartOffset, itemEndOffset).clone()
+      }
+      val itemsAndSerializedItems: Seq[(Any, Array[Byte])] = {
+        val serItems = items.map {
+          item => (item, serializeItem(item))
+        }
+        serStream.close()
+        rand.shuffle(serItems)
+      }
+      val reorderedSerializedData: Array[Byte] = itemsAndSerializedItems.flatMap(_._2).toArray
+      val deserializedItemsStream = serializer.newInstance().deserializeStream(
+        new ByteArrayInputStream(reorderedSerializedData))
+      assert(deserializedItemsStream.asIterator.toSeq === itemsAndSerializedItems.map(_._1))
+      deserializedItemsStream.close()
+    }
+  }
+}
+
+private case class MyCaseClass(foo: Int, bar: String)

http://git-wip-us.apache.org/repos/asf/spark/blob/002c1238/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
index 9552f41..35ad987 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
@@ -154,6 +154,11 @@ private[sql] class SparkSqlSerializer2(keySchema: Array[DataType], valueSchema:
   with Serializable{
 
   def newInstance(): SerializerInstance = new ShuffleSerializerInstance(keySchema, valueSchema)
+
+  override def supportsRelocationOfSerializedObjects: Boolean = {
+    // SparkSqlSerializer2 is stateless and writes no stream headers
+    true
+  }
 }
 
 private[sql] object SparkSqlSerializer2 {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org