You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/11 01:25:24 UTC

[07/50] git commit: New minor edits

New minor edits


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/7ad44082
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/7ad44082
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/7ad44082

Branch: refs/heads/master
Commit: 7ad4408255e37f95e545d9c21a4460cbf98c05dd
Parents: fcc443b
Author: Andrew Or <an...@gmail.com>
Authored: Wed Dec 25 23:10:53 2013 -0800
Committer: Andrew Or <an...@gmail.com>
Committed: Thu Dec 26 23:40:07 2013 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/Aggregator.scala     |  5 +-
 .../org/apache/spark/rdd/CoGroupedRDD.scala     | 37 ++++++------
 .../spark/util/ExternalAppendOnlyMap.scala      | 61 ++++++++++----------
 3 files changed, 49 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7ad44082/core/src/main/scala/org/apache/spark/Aggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 5826255..8863c31 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -45,8 +45,8 @@ case class Aggregator[K, V, C] (
       }
       combiners.iterator
     } else {
-      // Spilling
-      val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
+      val combiners =
+        new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
       iter.foreach { case(k, v) => combiners.insert(k, v) }
       combiners.iterator
     }
@@ -66,7 +66,6 @@ case class Aggregator[K, V, C] (
       }
       combiners.iterator
     } else {
-      // Spilling
       val combiners =
         new ExternalAppendOnlyMap[K, C, C](Predef.identity, mergeCombiners, mergeCombiners)
       iter.foreach { case(k, c) => combiners.insert(k, c) }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7ad44082/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 3af0376..113a912 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -50,7 +50,6 @@ class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep])
   override def hashCode(): Int = idx
 }
 
-
 /**
  * A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a
  * tuple with the list of values for that key.
@@ -108,7 +107,6 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
     val externalSorting = System.getProperty("spark.shuffle.externalSorting", "false").toBoolean
     val split = s.asInstanceOf[CoGroupPartition]
     val numRdds = split.deps.size
-    val ser = SparkEnv.get.serializerManager.get(serializerClass)
 
     // A list of (rdd iterator, dependency number) pairs
     val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
@@ -121,6 +119,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
       case ShuffleCoGroupSplitDep(shuffleId) => {
         // Read map outputs of shuffle
         val fetcher = SparkEnv.get.shuffleFetcher
+        val ser = SparkEnv.get.serializerManager.get(serializerClass)
         val v = (fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser), depNum)
         rddIterators += v
       }
@@ -131,39 +130,39 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
       val update: (Boolean, CoGroupCombiner) => CoGroupCombiner = (hadVal, oldVal) => {
         if (hadVal) oldVal else Array.fill(numRdds)(new ArrayBuffer[Any])
       }
-      val getSeq = (k: K) => map.changeValue(k, update)
-      rddIterators.foreach { case(iter, depNum) =>
-        iter.foreach {
-          case(k, v) => getSeq(k)(depNum) += v
+      rddIterators.foreach { case(it, depNum) =>
+        it.foreach { case(k, v) =>
+          map.changeValue(k, update)(depNum) += v
         }
       }
       new InterruptibleIterator(context, map.iterator)
     } else {
-      // Spilling
       val map = createExternalMap(numRdds)
-      rddIterators.foreach { case(iter, depNum) =>
-        iter.foreach {
-          case(k, v) => map.insert(k, new CoGroupValue(v, depNum))
+      rddIterators.foreach { case(it, depNum) =>
+        it.foreach { case(k, v) =>
+          map.insert(k, new CoGroupValue(v, depNum))
         }
       }
       new InterruptibleIterator(context, map.iterator)
     }
   }
 
-  private def createExternalMap(numRdds: Int)
-    : ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = {
+  private def createExternalMap(numRdds: Int):
+    ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = {
 
-    val createCombiner: (CoGroupValue) => CoGroupCombiner = v => {
+    val createCombiner: (CoGroupValue => CoGroupCombiner) = value => {
       val newCombiner = Array.fill(numRdds)(new CoGroup)
-      v match { case (value, depNum) => newCombiner(depNum) += value }
+      value match { case(v, depNum) => newCombiner(depNum) += v }
       newCombiner
     }
-    val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner = (c, v) => {
-      v match { case (value, depNum) => c(depNum) += value }
-      c
+    val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner =
+      (combiner, value) => {
+      value match { case(v, depNum) => combiner(depNum) += v }
+      combiner
     }
-    val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner = (c1, c2) => {
-      c1.zipAll(c2, new CoGroup, new CoGroup).map {
+    val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner =
+      (combiner1, combiner2) => {
+      combiner1.zipAll(combiner2, new CoGroup, new CoGroup).map {
         case (v1, v2) => v1 ++ v2
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7ad44082/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
index c8c0534..413f838 100644
--- a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
@@ -28,11 +28,11 @@ import scala.util.Random
 /**
  * A wrapper for SpillableAppendOnlyMap that handles two cases:
  *
- * (1)  If a mergeCombiners function is specified, merge values into combiners before
- *      disk spill, as it is possible to merge the resulting combiners later.
+ * (1)  If a mergeCombiners function is specified, merge values into combiners before disk
+ *      spill, as it is possible to merge the resulting combiners later.
  *
- * (2)  Otherwise, group values of the same key together before disk spill, and merge
- *      them into combiners only after reading them back from disk.
+ * (2)  Otherwise, group values of the same key together before disk spill, and merge them
+ *      into combiners only after reading them back from disk.
  */
 class ExternalAppendOnlyMap[K, V, C](
     createCombiner: V => C,
@@ -48,8 +48,25 @@ class ExternalAppendOnlyMap[K, V, C](
       new SpillableAppendOnlyMap[K, V, C, C] (createCombiner,
         mergeValue, mergeCombiners, Predef.identity, memoryThresholdMB)
     } else {
+      // Use ArrayBuffer[V] as the intermediate combiner
       val createGroup: (V => ArrayBuffer[V]) = value => ArrayBuffer[V](value)
-      new SpillableAppendOnlyMap[K, V, ArrayBuffer[V], C] (createGroup,
+      val mergeValueIntoGroup: (ArrayBuffer[V], V) => ArrayBuffer[V] = (group, value) => {
+        group += value
+      }
+      val mergeGroups: (ArrayBuffer[V], ArrayBuffer[V]) => ArrayBuffer[V] = (group1, group2) => {
+        group1 ++= group2
+      }
+      val combineGroup: (ArrayBuffer[V] => C) = group => {
+        var combiner : Option[C] = None
+        group.foreach { v =>
+          combiner match {
+            case None => combiner = Some(createCombiner(v))
+            case Some(c) => combiner = Some(mergeValue(c, v))
+          }
+        }
+        combiner.getOrElse(null.asInstanceOf[C])
+      }
+      new SpillableAppendOnlyMap[K, V, ArrayBuffer[V], C](createGroup,
         mergeValueIntoGroup, mergeGroups, combineGroup, memoryThresholdMB)
     }
   }
@@ -57,31 +74,11 @@ class ExternalAppendOnlyMap[K, V, C](
   def insert(key: K, value: V): Unit = map.insert(key, value)
 
   override def iterator: Iterator[(K, C)] = map.iterator
-
-  private def mergeValueIntoGroup(group: ArrayBuffer[V], value: V): ArrayBuffer[V] = {
-    group += value
-    group
-  }
-  private def mergeGroups(group1: ArrayBuffer[V], group2: ArrayBuffer[V]): ArrayBuffer[V] = {
-    group1 ++= group2
-    group1
-  }
-  private def combineGroup(group: ArrayBuffer[V]): C = {
-    var combiner : Option[C] = None
-    group.foreach { v =>
-      combiner match {
-        case None => combiner = Some(createCombiner(v))
-        case Some(c) => combiner = Some(mergeValue(c, v))
-      }
-    }
-    combiner.get
-  }
 }
 
 /**
- * An append-only map that spills sorted content to disk when the memory threshold
- * is exceeded. A group with type M is an intermediate combiner, and shares the same
- * type as either C or ArrayBuffer[V].
+ * An append-only map that spills sorted content to disk when the memory threshold is exceeded.
+ * A group is an intermediate combiner, with type M equal to either C or ArrayBuffer[V].
  */
 class SpillableAppendOnlyMap[K, V, M, C](
     createGroup: V => M,
@@ -96,7 +93,7 @@ class SpillableAppendOnlyMap[K, V, M, C](
   var oldMaps = new ArrayBuffer[DiskIterator]
 
   def insert(key: K, value: V): Unit = {
-    def update(hadVal: Boolean, oldVal: M): M = {
+    val update: (Boolean, M) => M = (hadVal, oldVal) => {
       if (hadVal) mergeValue(oldVal, value) else createGroup(value)
     }
     currentMap.changeValue(key, update)
@@ -128,11 +125,11 @@ class SpillableAppendOnlyMap[K, V, M, C](
     inputStreams.foreach(readFromIterator)
 
     // Read from the given iterator until a key of different hash is retrieved
-    def readFromIterator(iter: Iterator[(K, M)]): Unit = {
+    def readFromIterator(it: Iterator[(K, M)]): Unit = {
       var minHash : Option[Int] = None
-      while (iter.hasNext) {
-        val (k, m) = iter.next()
-        pq.enqueue(KMITuple(k, m, iter))
+      while (it.hasNext) {
+        val (k, m) = it.next()
+        pq.enqueue(KMITuple(k, m, it))
         minHash match {
           case None => minHash = Some(k.hashCode())
           case Some(expectedHash) if k.hashCode() != expectedHash => return