You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/03/27 02:19:56 UTC

git commit: [SQL] Add a custom serializer for maps since they do not have a no-arg constructor.

Repository: spark
Updated Branches:
  refs/heads/master 32cbdfd28 -> e15e57413


[SQL] Add a custom serializer for maps since they do not have a no-arg constructor.

Author: Michael Armbrust <mi...@databricks.com>

Closes #243 from marmbrus/mapSer and squashes the following commits:

54045f7 [Michael Armbrust] Add a custom serializer for maps since they do not have a no-arg constructor.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e15e5741
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e15e5741
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e15e5741

Branch: refs/heads/master
Commit: e15e57413e07e5d4787514702f735bba0c30cae5
Parents: 32cbdfd
Author: Michael Armbrust <mi...@databricks.com>
Authored: Wed Mar 26 18:19:49 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Wed Mar 26 18:19:49 2014 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/SparkSqlSerializer.scala  | 18 ++++++++++++++++++
 1 file changed, 18 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e15e5741/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
index 1c3196a..915f551 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
@@ -32,6 +32,7 @@ class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) {
     kryo.setRegistrationRequired(false)
     kryo.register(classOf[MutablePair[_, _]])
     kryo.register(classOf[Array[Any]])
+    kryo.register(classOf[scala.collection.immutable.Map$Map1], new MapSerializer)
     kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow])
     kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow])
     kryo.register(classOf[scala.collection.mutable.ArrayBuffer[_]])
@@ -70,3 +71,20 @@ class BigDecimalSerializer extends Serializer[BigDecimal] {
     BigDecimal(input.readString())
   }
 }
+
+/**
+ * Maps do not have a no arg constructor and so cannot be serialized by default. So, we serialize
+ * them as `Array[(k,v)]`.
+ */
+class MapSerializer extends Serializer[Map[_,_]] {
+  def write(kryo: Kryo, output: Output, map: Map[_,_]) {
+    kryo.writeObject(output, map.flatMap(e => Seq(e._1, e._2)).toArray)
+  }
+
+  def read(kryo: Kryo, input: Input, tpe: Class[Map[_,_]]): Map[_,_] = {
+    kryo.readObject(input, classOf[Array[Any]])
+      .sliding(2,2)
+      .map { case Array(k,v) => (k,v) }
+      .toMap
+  }
+}