You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/01/14 23:52:48 UTC

git commit: Merge pull request #427 from pwendell/deprecate-aggregator

Updated Branches:
  refs/heads/branch-0.9 119b6c524 -> a14933dac


Merge pull request #427 from pwendell/deprecate-aggregator

Deprecate rather than remove old combineValuesByKey function

(cherry picked from commit d601a76d1fdd25b95020b2e32bacde583cf6aa50)
Signed-off-by: Reynold Xin <rx...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a14933da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a14933da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a14933da

Branch: refs/heads/branch-0.9
Commit: a14933dac1e8b866d49a161854453b56a6e1dfcc
Parents: 119b6c5
Author: Reynold Xin <rx...@apache.org>
Authored: Tue Jan 14 14:52:24 2014 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Tue Jan 14 14:52:42 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/Aggregator.scala     | 22 +++++++++++++++-----
 1 file changed, 17 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a14933da/core/src/main/scala/org/apache/spark/Aggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 6d439fd..edbea6e 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark
 
+import scala.{Option, deprecated}
+
 import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap}
 
 /**
@@ -34,8 +36,12 @@ case class Aggregator[K, V, C] (
   private val sparkConf = SparkEnv.get.conf
   private val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
 
+  @deprecated("use combineValuesByKey with TaskContext argument", "0.9.0")
+  def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]): Iterator[(K, C)] =
+    combineValuesByKey(iter, null)
+
   def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]],
-                         context: TaskContext) : Iterator[(K, C)] = {
+                         context: TaskContext): Iterator[(K, C)] = {
     if (!externalSorting) {
       val combiners = new AppendOnlyMap[K,C]
       var kv: Product2[K, V] = null
@@ -53,12 +59,17 @@ case class Aggregator[K, V, C] (
         val (k, v) = iter.next()
         combiners.insert(k, v)
       }
-      context.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled
-      context.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled
+      // TODO: Make this non optional in a future release
+      Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
+      Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled)
       combiners.iterator
     }
   }
 
+  @deprecated("use combineCombinersByKey with TaskContext argument", "0.9.0")
+  def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] =
+    combineCombinersByKey(iter, null)
+
   def combineCombinersByKey(iter: Iterator[(K, C)], context: TaskContext) : Iterator[(K, C)] = {
     if (!externalSorting) {
       val combiners = new AppendOnlyMap[K,C]
@@ -77,8 +88,9 @@ case class Aggregator[K, V, C] (
         val (k, c) = iter.next()
         combiners.insert(k, c)
       }
-      context.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled
-      context.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled
+      // TODO: Make this non optional in a future release
+      Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
+      Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled)
       combiners.iterator
     }
   }