You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2014/12/19 01:29:46 UTC
spark git commit: SPARK-4743 - Use SparkEnv.serializer instead of
closureSerializer in aggregateByKey and foldByKey
Repository: spark
Updated Branches:
refs/heads/master d5a596d41 -> f9f58b9a0
SPARK-4743 - Use SparkEnv.serializer instead of closureSerializer in aggregateByKey and foldByKey
Author: Ivan Vergiliev <iv...@leanplum.com>
Closes #3605 from IvanVergiliev/change-serializer and squashes the following commits:
a49b7cf [Ivan Vergiliev] Use serializer instead of closureSerializer in aggregate/foldByKey.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f9f58b9a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f9f58b9a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f9f58b9a
Branch: refs/heads/master
Commit: f9f58b9a01c4c7eaf0ce5055d6870e69a22297e3
Parents: d5a596d
Author: Ivan Vergiliev <iv...@leanplum.com>
Authored: Thu Dec 18 16:29:36 2014 -0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Thu Dec 18 16:29:36 2014 -0800
----------------------------------------------------------------------
.../main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/f9f58b9a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index b0434c9..fe3129b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -123,11 +123,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] = {
// Serialize the zero value to a byte array so that we can get a new clone of it on each key
- val zeroBuffer = SparkEnv.get.closureSerializer.newInstance().serialize(zeroValue)
+ val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
val zeroArray = new Array[Byte](zeroBuffer.limit)
zeroBuffer.get(zeroArray)
- lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance()
+ lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))
combineByKey[U]((v: V) => seqOp(createZero(), v), seqOp, combOp, partitioner)
@@ -168,12 +168,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
*/
def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = {
// Serialize the zero value to a byte array so that we can get a new clone of it on each key
- val zeroBuffer = SparkEnv.get.closureSerializer.newInstance().serialize(zeroValue)
+ val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
val zeroArray = new Array[Byte](zeroBuffer.limit)
zeroBuffer.get(zeroArray)
// When deserializing, use a lazy val to create just one instance of the serializer per task
- lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance()
+ lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
combineByKey[V]((v: V) => func(createZero(), v), func, func, partitioner)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org