You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/11 01:25:23 UTC

[06/50] git commit: Allow Product2 rather than just tuple kv pairs

Allow Product2 rather than just tuple kv pairs


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0289eb75
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0289eb75
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0289eb75

Branch: refs/heads/master
Commit: 0289eb752abfc71ac0cc6796b57f1d21603cfd90
Parents: 64b2d54
Author: Aaron Davidson <aa...@databricks.com>
Authored: Thu Dec 26 14:39:53 2013 -0800
Committer: Andrew Or <an...@gmail.com>
Committed: Thu Dec 26 23:40:07 2013 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/Aggregator.scala   |  4 ++--
 .../main/scala/org/apache/spark/rdd/CoGroupedRDD.scala  | 12 ++++++------
 2 files changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0289eb75/core/src/main/scala/org/apache/spark/Aggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 59e5102..f977c03 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -47,7 +47,7 @@ case class Aggregator[K, V, C] (
     } else {
       val combiners =
         new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
-      iter.foreach { case(k, v) => combiners.insert(k, v) }
+      iter.foreach { kv => combiners.insert(kv._1, kv._2) }
       combiners.iterator
     }
   }
@@ -68,7 +68,7 @@ case class Aggregator[K, V, C] (
     } else {
       val combiners =
         new ExternalAppendOnlyMap[K, C, C](Predef.identity, mergeCombiners, mergeCombiners)
-      iter.foreach { case(k, c) => combiners.insert(k, c) }
+      iter.foreach { kc => combiners.insert(kc._1, kc._2) }
       combiners.iterator
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0289eb75/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 3a549b7..367dc3e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -130,17 +130,17 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
       val update: (Boolean, CoGroupCombiner) => CoGroupCombiner = (hadVal, oldVal) => {
         if (hadVal) oldVal else Array.fill(numRdds)(new ArrayBuffer[Any])
       }
-      rddIterators.foreach { case(it, depNum) =>
-        it.foreach { case(k, v) =>
-          map.changeValue(k, update)(depNum) += v
+      rddIterators.foreach { case (it, depNum) =>
+        it.foreach { kv =>
+          map.changeValue(kv._1, update)(depNum) += kv._2
         }
       }
       new InterruptibleIterator(context, map.iterator)
     } else {
       val map = createExternalMap(numRdds)
-      rddIterators.foreach { case(it, depNum) =>
-        it.foreach { case(k, v) =>
-          map.insert(k, new CoGroupValue(v, depNum))
+      rddIterators.foreach { case (it, depNum) =>
+        it.foreach { kv =>
+          map.insert(kv._1, new CoGroupValue(kv._2, depNum))
         }
       }
       new InterruptibleIterator(context, map.iterator)