You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/01/14 23:52:31 UTC

[2/3] git commit: Adding fix covering combineCombinersByKey as well

Adding fix covering combineCombinersByKey as well


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/8ea2cd56
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/8ea2cd56
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/8ea2cd56

Branch: refs/heads/master
Commit: 8ea2cd56e4a243a834214d04e29502a5fdb539df
Parents: b683608
Author: Patrick Wendell <pw...@gmail.com>
Authored: Tue Jan 14 13:52:23 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Tue Jan 14 13:52:23 2014 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/Aggregator.scala    | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8ea2cd56/core/src/main/scala/org/apache/spark/Aggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index d712927..edbea6e 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark
 
+import scala.{Option, deprecated}
+
 import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap}
 
 /**
@@ -35,10 +37,11 @@ case class Aggregator[K, V, C] (
   private val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
 
   @deprecated("use combineValuesByKey with TaskContext argument", "0.9.0")
-  def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) = combineValuesByKey(iter, null)
+  def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]): Iterator[(K, C)] =
+    combineValuesByKey(iter, null)
 
   def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]],
-                         context: TaskContext) : Iterator[(K, C)] = {
+                         context: TaskContext): Iterator[(K, C)] = {
     if (!externalSorting) {
       val combiners = new AppendOnlyMap[K,C]
       var kv: Product2[K, V] = null
@@ -63,6 +66,10 @@ case class Aggregator[K, V, C] (
     }
   }
 
+  @deprecated("use combineCombinersByKey with TaskContext argument", "0.9.0")
+  def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] =
+    combineCombinersByKey(iter, null)
+
   def combineCombinersByKey(iter: Iterator[(K, C)], context: TaskContext) : Iterator[(K, C)] = {
     if (!externalSorting) {
       val combiners = new AppendOnlyMap[K,C]
@@ -81,8 +88,9 @@ case class Aggregator[K, V, C] (
         val (k, c) = iter.next()
         combiners.insert(k, c)
       }
-      context.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled
-      context.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled
+      // TODO: Make this non optional in a future release
+      Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
+      Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled)
       combiners.iterator
     }
   }