You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/07/24 01:31:48 UTC

git commit: [SPARK-2102][SQL][CORE] Add option for kryo registration required and use a resource pool in Spark SQL for Kryo instances.

Repository: spark
Updated Branches:
  refs/heads/master 1871574a2 -> efdaeb111


[SPARK-2102][SQL][CORE] Add option for kryo registration required and use a resource pool in Spark SQL for Kryo instances.

Author: Ian O Connell <io...@twitter.com>

Closes #1377 from ianoc/feature/SPARK-2102 and squashes the following commits:

5498566 [Ian O Connell] Docs update suggested by Patrick
20e8555 [Ian O Connell] Slight style change
f92c294 [Ian O Connell] Add docs for new KryoSerializer option
f3735c8 [Ian O Connell] Add using a kryo resource pool for the SqlSerializer
4e5c342 [Ian O Connell] Register the SparkConf for kryo, it gets swept into serialization
665805a [Ian O Connell] Add a spark.kryo.registrationRequired option for configuring the Kryo Serializer


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/efdaeb11
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/efdaeb11
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/efdaeb11

Branch: refs/heads/master
Commit: efdaeb111917dd0314f1d00ee8524bed1e2e21ca
Parents: 1871574
Author: Ian O Connell <io...@twitter.com>
Authored: Wed Jul 23 16:30:06 2014 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Wed Jul 23 16:30:11 2014 -0700

----------------------------------------------------------------------
 .../spark/serializer/KryoSerializer.scala       |  5 ++-
 docs/configuration.md                           | 19 +++++++--
 .../sql/execution/SparkSqlSerializer.scala      | 43 ++++++++++++++------
 3 files changed, 50 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/efdaeb11/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 1ce4243..c3a3e90 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -48,6 +48,7 @@ class KryoSerializer(conf: SparkConf)
 
   private val bufferSize = conf.getInt("spark.kryoserializer.buffer.mb", 2) * 1024 * 1024
   private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true)
+  private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false)
   private val registrator = conf.getOption("spark.kryo.registrator")
 
   def newKryoOutput() = new KryoOutput(bufferSize)
@@ -55,6 +56,7 @@ class KryoSerializer(conf: SparkConf)
   def newKryo(): Kryo = {
     val instantiator = new EmptyScalaKryoInstantiator
     val kryo = instantiator.newKryo()
+    kryo.setRegistrationRequired(registrationRequired)
     val classLoader = Thread.currentThread.getContextClassLoader
 
     // Allow disabling Kryo reference tracking if user knows their object graphs don't have loops.
@@ -185,7 +187,8 @@ private[serializer] object KryoSerializer {
     classOf[MapStatus],
     classOf[BlockManagerId],
     classOf[Array[Byte]],
-    classOf[BoundedPriorityQueue[_]]
+    classOf[BoundedPriorityQueue[_]],
+    classOf[SparkConf]
   )
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/efdaeb11/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index a70007c..02af461 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -389,6 +389,17 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
+  <td><code>spark.kryo.registrationRequired</code></td>
+  <td>false</td>
+  <td>
+    Whether to require registration with Kryo. If set to 'true', Kryo will throw an exception
+    if an unregistered class is serialized. If set to false (the default), Kryo will write
+    unregistered class names along with each object. Writing class names can cause
+    significant performance overhead, so enabling this option can enforce strictly that a
+    user has not omitted classes from registration.
+  </td>
+</tr>
+<tr>
   <td><code>spark.kryoserializer.buffer.mb</code></td>
   <td>2</td>
   <td>
@@ -497,9 +508,9 @@ Apart from these, the following properties are also available, and may be useful
 <tr>
     <td>spark.hadoop.validateOutputSpecs</td>
     <td>true</td>
-    <td>If set to true, validates the output specification (e.g. checking if the output directory already exists) 
-    used in saveAsHadoopFile and other variants. This can be disabled to silence exceptions due to pre-existing 
-    output directories. We recommend that users do not disable this except if trying to achieve compatibility with 
+    <td>If set to true, validates the output specification (e.g. checking if the output directory already exists)
+    used in saveAsHadoopFile and other variants. This can be disabled to silence exceptions due to pre-existing
+    output directories. We recommend that users do not disable this except if trying to achieve compatibility with
     previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand.</td>
 </tr>
 </table>
@@ -861,7 +872,7 @@ Apart from these, the following properties are also available, and may be useful
 </table>
 
 #### Cluster Managers
-Each cluster manager in Spark has additional configuration options. Configurations 
+Each cluster manager in Spark has additional configuration options. Configurations
 can be found on the pages for each mode:
 
  * [YARN](running-on-yarn.html#configuration)

http://git-wip-us.apache.org/repos/asf/spark/blob/efdaeb11/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
index 34b355e..3465444 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
@@ -24,10 +24,10 @@ import scala.reflect.ClassTag
 import com.clearspring.analytics.stream.cardinality.HyperLogLog
 import com.esotericsoftware.kryo.io.{Input, Output}
 import com.esotericsoftware.kryo.{Serializer, Kryo}
-import com.twitter.chill.AllScalaRegistrar
+import com.twitter.chill.{AllScalaRegistrar, ResourcePool}
 
 import org.apache.spark.{SparkEnv, SparkConf}
-import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.serializer.{SerializerInstance, KryoSerializer}
 import org.apache.spark.util.MutablePair
 import org.apache.spark.util.Utils
 
@@ -48,22 +48,41 @@ private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(co
   }
 }
 
-private[sql] object SparkSqlSerializer {
-  // TODO (lian) Using KryoSerializer here is workaround, needs further investigation
-  // Using SparkSqlSerializer here makes BasicQuerySuite to fail because of Kryo serialization
-  // related error.
-  @transient lazy val ser: KryoSerializer = {
+private[execution] class KryoResourcePool(size: Int)
+    extends ResourcePool[SerializerInstance](size) {
+
+  val ser: KryoSerializer = {
     val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
+    // TODO (lian) Using KryoSerializer here is workaround, needs further investigation
+    // Using SparkSqlSerializer here makes BasicQuerySuite to fail because of Kryo serialization
+    // related error.
     new KryoSerializer(sparkConf)
   }
 
-  def serialize[T: ClassTag](o: T): Array[Byte] = {
-    ser.newInstance().serialize(o).array()
-  }
+  def newInstance() = ser.newInstance()
+}
 
-  def deserialize[T: ClassTag](bytes: Array[Byte]): T  = {
-    ser.newInstance().deserialize[T](ByteBuffer.wrap(bytes))
+private[sql] object SparkSqlSerializer {
+  @transient lazy val resourcePool = new KryoResourcePool(30)
+
+  private[this] def acquireRelease[O](fn: SerializerInstance => O): O = {
+    val kryo = resourcePool.borrow
+    try {
+      fn(kryo)
+    } finally {
+      resourcePool.release(kryo)
+    }
   }
+
+  def serialize[T: ClassTag](o: T): Array[Byte] =
+    acquireRelease { k =>
+      k.serialize(o).array()
+    }
+
+  def deserialize[T: ClassTag](bytes: Array[Byte]): T =
+    acquireRelease { k =>
+      k.deserialize[T](ByteBuffer.wrap(bytes))
+    }
 }
 
 private[sql] class BigDecimalSerializer extends Serializer[BigDecimal] {