You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/07/20 10:24:38 UTC

git commit: SPARK-2519 part 2. Remove pattern matching on Tuple2 in critical section...

Repository: spark
Updated Branches:
  refs/heads/master 4da01e381 -> 98ab41122


SPARK-2519 part 2. Remove pattern matching on Tuple2 in critical section...

...s of CoGroupedRDD and PairRDDFunctions

This also removes an unnecessary tuple creation in cogroup.

Author: Sandy Ryza <sa...@cloudera.com>

Closes #1447 from sryza/sandy-spark-2519-2 and squashes the following commits:

b6d9699 [Sandy Ryza] Remove missed Tuple2 match in CoGroupedRDD
a109828 [Sandy Ryza] Remove another pattern matching in MappedValuesRDD and revert some changes in PairRDDFunctions
be10f8a [Sandy Ryza] SPARK-2519 part 2. Remove pattern matching on Tuple2 in critical sections of CoGroupedRDD and PairRDDFunctions


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/98ab4112
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/98ab4112
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/98ab4112

Branch: refs/heads/master
Commit: 98ab4112255d4e0fdb6e084bd3fe65807c5b209b
Parents: 4da01e3
Author: Sandy Ryza <sa...@cloudera.com>
Authored: Sun Jul 20 01:24:32 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Sun Jul 20 01:24:32 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/rdd/CoGroupedRDD.scala     |  4 +-
 .../org/apache/spark/rdd/MappedValuesRDD.scala  |  2 +-
 .../org/apache/spark/rdd/PairRDDFunctions.scala | 60 ++++++++++----------
 3 files changed, 33 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/98ab4112/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 5366c1a..aca235a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -170,12 +170,12 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
 
     val createCombiner: (CoGroupValue => CoGroupCombiner) = value => {
       val newCombiner = Array.fill(numRdds)(new CoGroup)
-      value match { case (v, depNum) => newCombiner(depNum) += v }
+      newCombiner(value._2) += value._1
       newCombiner
     }
     val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner =
       (combiner, value) => {
-      value match { case (v, depNum) => combiner(depNum) += v }
+      combiner(value._2) += value._1
       combiner
     }
     val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner =

http://git-wip-us.apache.org/repos/asf/spark/blob/98ab4112/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala
index 2bc47eb..a60952e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala
@@ -28,6 +28,6 @@ class MappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => U)
   override val partitioner = firstParent[Product2[K, U]].partitioner
 
   override def compute(split: Partition, context: TaskContext): Iterator[(K, U)] = {
-    firstParent[Product2[K, V]].iterator(split, context).map { case Product2(k ,v) => (k, f(v)) }
+    firstParent[Product2[K, V]].iterator(split, context).map { pair => (pair._1, f(pair._2)) }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/98ab4112/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 29038b0..a6b9204 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -216,17 +216,17 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
 
     val reducePartition = (iter: Iterator[(K, V)]) => {
       val map = new JHashMap[K, V]
-      iter.foreach { case (k, v) =>
-        val old = map.get(k)
-        map.put(k, if (old == null) v else func(old, v))
+      iter.foreach { pair =>
+        val old = map.get(pair._1)
+        map.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
       }
       Iterator(map)
     } : Iterator[JHashMap[K, V]]
 
     val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {
-      m2.foreach { case (k, v) =>
-        val old = m1.get(k)
-        m1.put(k, if (old == null) v else func(old, v))
+      m2.foreach { pair =>
+        val old = m1.get(pair._1)
+        m1.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
       }
       m1
     } : JHashMap[K, V]
@@ -401,9 +401,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
    */
   def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
-    this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
-      for (v <- vs; w <- ws) yield (v, w)
-    }
+    this.cogroup(other, partitioner).flatMapValues( pair =>
+      for (v <- pair._1; w <- pair._2) yield (v, w)
+    )
   }
 
   /**
@@ -413,11 +413,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * partition the output RDD.
    */
   def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
-    this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
-      if (ws.isEmpty) {
-        vs.map(v => (v, None))
+    this.cogroup(other, partitioner).flatMapValues { pair =>
+      if (pair._2.isEmpty) {
+        pair._1.map(v => (v, None))
       } else {
-        for (v <- vs; w <- ws) yield (v, Some(w))
+        for (v <- pair._1; w <- pair._2) yield (v, Some(w))
       }
     }
   }
@@ -430,11 +430,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    */
   def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
       : RDD[(K, (Option[V], W))] = {
-    this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
-      if (vs.isEmpty) {
-        ws.map(w => (None, w))
+    this.cogroup(other, partitioner).flatMapValues { pair =>
+      if (pair._1.isEmpty) {
+        pair._2.map(w => (None, w))
       } else {
-        for (v <- vs; w <- ws) yield (Some(v), w)
+        for (v <- pair._1; w <- pair._2) yield (Some(v), w)
       }
     }
   }
@@ -535,7 +535,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     val data = self.collect()
     val map = new mutable.HashMap[K, V]
     map.sizeHint(data.length)
-    data.foreach { case (k, v) => map.put(k, v) }
+    data.foreach { pair => map.put(pair._1, pair._2) }
     map
   }
 
@@ -572,10 +572,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     }
     val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
     cg.mapValues { case Seq(vs, w1s, w2s, w3s) =>
-      (vs.asInstanceOf[Seq[V]],
-        w1s.asInstanceOf[Seq[W1]],
-        w2s.asInstanceOf[Seq[W2]],
-        w3s.asInstanceOf[Seq[W3]])
+       (vs.asInstanceOf[Seq[V]],
+         w1s.asInstanceOf[Seq[W1]],
+         w2s.asInstanceOf[Seq[W2]],
+         w3s.asInstanceOf[Seq[W3]])
     }
   }
 
@@ -589,8 +589,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       throw new SparkException("Default partitioner cannot partition array keys.")
     }
     val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
-    cg.mapValues { case Seq(vs, ws) =>
-      (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
+    cg.mapValues { case Seq(vs, w1s) =>
+      (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W]])
     }
   }
 
@@ -606,8 +606,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
     cg.mapValues { case Seq(vs, w1s, w2s) =>
       (vs.asInstanceOf[Seq[V]],
-       w1s.asInstanceOf[Seq[W1]],
-       w2s.asInstanceOf[Seq[W2]])
+        w1s.asInstanceOf[Seq[W1]],
+        w2s.asInstanceOf[Seq[W2]])
     }
   }
 
@@ -712,8 +712,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
         val index = p.getPartition(key)
         val process = (it: Iterator[(K, V)]) => {
           val buf = new ArrayBuffer[V]
-          for ((k, v) <- it if k == key) {
-            buf += v
+          for (pair <- it if pair._1 == key) {
+            buf += pair._2
           }
           buf
         } : Seq[V]
@@ -858,8 +858,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
       try {
         while (iter.hasNext) {
-          val (k, v) = iter.next()
-          writer.write(k, v)
+          val pair = iter.next()
+          writer.write(pair._1, pair._2)
         }
       } finally {
         writer.close(hadoopContext)