You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2014/12/19 01:29:46 UTC

spark git commit: SPARK-4743 - Use SparkEnv.serializer instead of closureSerializer in aggregateByKey and foldByKey

Repository: spark
Updated Branches:
  refs/heads/master d5a596d41 -> f9f58b9a0


SPARK-4743 - Use SparkEnv.serializer instead of closureSerializer in aggregateByKey and foldByKey

Author: Ivan Vergiliev <iv...@leanplum.com>

Closes #3605 from IvanVergiliev/change-serializer and squashes the following commits:

a49b7cf [Ivan Vergiliev] Use serializer instead of closureSerializer in aggregate/foldByKey.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f9f58b9a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f9f58b9a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f9f58b9a

Branch: refs/heads/master
Commit: f9f58b9a01c4c7eaf0ce5055d6870e69a22297e3
Parents: d5a596d
Author: Ivan Vergiliev <iv...@leanplum.com>
Authored: Thu Dec 18 16:29:36 2014 -0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Thu Dec 18 16:29:36 2014 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/rdd/PairRDDFunctions.scala   | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f9f58b9a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index b0434c9..fe3129b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -123,11 +123,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
   def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
       combOp: (U, U) => U): RDD[(K, U)] = {
     // Serialize the zero value to a byte array so that we can get a new clone of it on each key
-    val zeroBuffer = SparkEnv.get.closureSerializer.newInstance().serialize(zeroValue)
+    val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
     val zeroArray = new Array[Byte](zeroBuffer.limit)
     zeroBuffer.get(zeroArray)
 
-    lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance()
+    lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
     val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))
 
     combineByKey[U]((v: V) => seqOp(createZero(), v), seqOp, combOp, partitioner)
@@ -168,12 +168,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    */
   def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = {
     // Serialize the zero value to a byte array so that we can get a new clone of it on each key
-    val zeroBuffer = SparkEnv.get.closureSerializer.newInstance().serialize(zeroValue)
+    val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
     val zeroArray = new Array[Byte](zeroBuffer.limit)
     zeroBuffer.get(zeroArray)
 
     // When deserializing, use a lazy val to create just one instance of the serializer per task
-    lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance()
+    lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
     val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
 
     combineByKey[V]((v: V) => func(createZero(), v), func, func, partitioner)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org