You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/07/30 09:26:36 UTC

git commit: SPARK-2543: Allow user to set maximum Kryo buffer size

Repository: spark
Updated Branches:
  refs/heads/master 7003c163d -> 7c5fc28af


SPARK-2543: Allow user to set maximum Kryo buffer size

Author: Koert Kuipers <ko...@tresata.com>

Closes #735 from koertkuipers/feat-kryo-max-buffersize and squashes the following commits:

15f6d81 [Koert Kuipers] change default for spark.kryoserializer.buffer.max.mb to 64mb and add some documentation
1bcc22c [Koert Kuipers] Merge branch 'master' into feat-kryo-max-buffersize
0c9f8eb [Koert Kuipers] make default for kryo max buffer size 16MB
143ec4d [Koert Kuipers] test resizable buffer in kryo Output
0732445 [Koert Kuipers] support setting maxCapacity to something different than capacity in kryo Output


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c5fc28a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c5fc28a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c5fc28a

Branch: refs/heads/master
Commit: 7c5fc28af42daaa6725af083d78c2372f3d0a338
Parents: 7003c16
Author: Koert Kuipers <ko...@tresata.com>
Authored: Wed Jul 30 00:18:59 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Wed Jul 30 00:26:14 2014 -0700

----------------------------------------------------------------------
 .../spark/serializer/KryoSerializer.scala       |  3 +-
 .../spark/serializer/KryoSerializerSuite.scala  | 30 ++++++++++++++++++++
 docs/configuration.md                           | 16 ++++++++---
 3 files changed, 44 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7c5fc28a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index fa79b25..e60b802 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -48,11 +48,12 @@ class KryoSerializer(conf: SparkConf)
   with Serializable {
 
   private val bufferSize = conf.getInt("spark.kryoserializer.buffer.mb", 2) * 1024 * 1024
+  private val maxBufferSize = conf.getInt("spark.kryoserializer.buffer.max.mb", 64) * 1024 * 1024
   private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true)
   private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false)
   private val registrator = conf.getOption("spark.kryo.registrator")
 
-  def newKryoOutput() = new KryoOutput(bufferSize)
+  def newKryoOutput() = new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
 
   def newKryo(): Kryo = {
     val instantiator = new EmptyScalaKryoInstantiator

http://git-wip-us.apache.org/repos/asf/spark/blob/7c5fc28a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index 79280d1..789b773 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -209,6 +209,36 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
   }
 }
 
+class KryoSerializerResizableOutputSuite extends FunSuite {
+  import org.apache.spark.SparkConf
+  import org.apache.spark.SparkContext
+  import org.apache.spark.LocalSparkContext
+  import org.apache.spark.SparkException
+
+  // trial and error showed this will not serialize with 1mb buffer
+  val x = (1 to 400000).toArray
+
+  test("kryo without resizable output buffer should fail on large array") {
+    val conf = new SparkConf(false)
+    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+    conf.set("spark.kryoserializer.buffer.mb", "1")
+    conf.set("spark.kryoserializer.buffer.max.mb", "1")
+    val sc = new SparkContext("local", "test", conf)
+    intercept[SparkException](sc.parallelize(x).collect)
+    LocalSparkContext.stop(sc)
+  }
+
+  test("kryo with resizable output buffer should succeed on large array") {
+    val conf = new SparkConf(false)
+    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+    conf.set("spark.kryoserializer.buffer.mb", "1")
+    conf.set("spark.kryoserializer.buffer.max.mb", "2")
+    val sc = new SparkContext("local", "test", conf)
+    assert(sc.parallelize(x).collect === x)
+    LocalSparkContext.stop(sc)
+  }
+}
+
 object KryoTest {
   case class CaseClass(i: Int, s: String) {}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7c5fc28a/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 2e6c85c..ea69057 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -414,10 +414,18 @@ Apart from these, the following properties are also available, and may be useful
   <td><code>spark.kryoserializer.buffer.mb</code></td>
   <td>2</td>
   <td>
-    Maximum object size to allow within Kryo (the library needs to create a buffer at least as
-    large as the largest single object you'll serialize). Increase this if you get a "buffer limit
-    exceeded" exception inside Kryo. Note that there will be one buffer <i>per core</i> on each
-    worker.
+    Initial size of Kryo's serialization buffer, in megabytes. Note that there will be one buffer
+     <i>per core</i> on each worker. This buffer will grow up to
+     <code>spark.kryoserializer.buffer.max.mb</code> if needed.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.kryoserializer.buffer.max.mb</code></td>
+  <td>64</td>
+  <td>
+    Maximum allowable size of Kryo serialization buffer, in megabytes. This must be larger than any
+    object you attempt to serialize. Increase this if you get a "buffer limit exceeded" exception
+    inside Kryo.
   </td>
 </tr>
 </table>