You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/11 01:25:21 UTC

[04/50] git commit: Working ExternalAppendOnlyMap for Aggregator, but not for CoGroupedRDD

Working ExternalAppendOnlyMap for Aggregator, but not for CoGroupedRDD


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/97fbb3ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/97fbb3ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/97fbb3ec

Branch: refs/heads/master
Commit: 97fbb3ec52785883a0eee8644f9f4603c4c9df21
Parents: 5e69fc5
Author: Andrew Or <an...@gmail.com>
Authored: Mon Dec 23 22:50:15 2013 -0800
Committer: Andrew Or <an...@gmail.com>
Committed: Thu Dec 26 23:40:07 2013 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/Aggregator.scala     |  13 +-
 .../org/apache/spark/rdd/CoGroupedRDD.scala     |  43 ++++--
 .../org/apache/spark/rdd/PairRDDFunctions.scala |  11 +-
 .../spark/util/ExternalAppendOnlyMap.scala      | 136 +++++++++++++++++++
 4 files changed, 182 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/97fbb3ec/core/src/main/scala/org/apache/spark/Aggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 1a2ec55..ae16242 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark
 
-import org.apache.spark.util.AppendOnlyMap
+import org.apache.spark.util.{AppendOnlyMap, ExternalAppendOnlyMap}
 
 /**
  * A set of functions used to aggregate data.
@@ -32,7 +32,9 @@ case class Aggregator[K, V, C] (
     mergeCombiners: (C, C) => C) {
 
   def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
-    val combiners = new AppendOnlyMap[K, C]
+    println("Combining values by key!!")
+    //val combiners = new AppendOnlyMap[K, C]
+    val combiners = new ExternalAppendOnlyMap[K, C](mergeCombiners)
     var kv: Product2[K, V] = null
     val update = (hadValue: Boolean, oldValue: C) => {
       if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
@@ -45,7 +47,9 @@ case class Aggregator[K, V, C] (
   }
 
   def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = {
-    val combiners = new AppendOnlyMap[K, C]
+    println("Combining combiners by key!!")
+    //val combiners = new AppendOnlyMap[K, C]
+    val combiners = new ExternalAppendOnlyMap[K, C](mergeCombiners)
     var kc: (K, C) = null
     val update = (hadValue: Boolean, oldValue: C) => {
       if (hadValue) mergeCombiners(oldValue, kc._2) else kc._2
@@ -56,5 +60,4 @@ case class Aggregator[K, V, C] (
     }
     combiners.iterator
   }
-}
-
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/97fbb3ec/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 911a002..6283686 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext}
 import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
-import org.apache.spark.util.AppendOnlyMap
+import org.apache.spark.util.{AppendOnlyMap, ExternalAppendOnlyMap}
 
 
 private[spark] sealed trait CoGroupSplitDep extends Serializable
@@ -101,36 +101,49 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
   override val partitioner = Some(part)
 
   override def compute(s: Partition, context: TaskContext): Iterator[(K, Seq[Seq[_]])] = {
+    println("Computing in CoGroupedRDD!")
+    // e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs)
     val split = s.asInstanceOf[CoGroupPartition]
     val numRdds = split.deps.size
-    // e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs)
-    val map = new AppendOnlyMap[K, Seq[ArrayBuffer[Any]]]
-
-    val update: (Boolean, Seq[ArrayBuffer[Any]]) => Seq[ArrayBuffer[Any]] = (hadVal, oldVal) => {
-      if (hadVal) oldVal else Array.fill(numRdds)(new ArrayBuffer[Any])
-    }
-
-    val getSeq = (k: K) => {
-      map.changeValue(k, update)
-    }
+    val combineFunction: (Seq[ArrayBuffer[Any]], Seq[ArrayBuffer[Any]]) => Seq[ArrayBuffer[Any]] =
+      (x, y) => { x ++ y }
+    //val map = new AppendOnlyMap[K, Seq[ArrayBuffer[Any]]]
+    val map = new ExternalAppendOnlyMap[K, Seq[ArrayBuffer[Any]]](combineFunction)
 
     val ser = SparkEnv.get.serializerManager.get(serializerClass)
     for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
       case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
         // Read them from the parent
-        rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]].foreach { kv =>
-          getSeq(kv._1)(depNum) += kv._2
+        rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]].foreach {
+          kv => addToMap(kv._1, kv._2, depNum)
         }
       }
       case ShuffleCoGroupSplitDep(shuffleId) => {
         // Read map outputs of shuffle
         val fetcher = SparkEnv.get.shuffleFetcher
         fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser).foreach {
-          kv => getSeq(kv._1)(depNum) += kv._2
+          kv => addToMap(kv._1, kv._2, depNum)
         }
       }
     }
-    new InterruptibleIterator(context, map.iterator)
+
+    def addToMap(key: K, value: Any, depNum: Int) {
+      val updateFunction: (Boolean, Seq[ArrayBuffer[Any]]) => Seq[ArrayBuffer[Any]] =
+        (hadVal, oldVal) => {
+          var newVal = oldVal
+          if (!hadVal){
+            newVal = Array.fill(numRdds)(new ArrayBuffer[Any])
+          }
+          newVal(depNum) += value
+          newVal
+        }
+      map.changeValue(key, updateFunction)
+    }
+
+    println("About to construct CoGroupedRDD iterator!")
+    val theIterator = map.iterator
+    println("Returning CoGroupedRDD iterator!")
+    new InterruptibleIterator(context, theIterator)
   }
 
   override def clearDependencies() {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/97fbb3ec/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 48168e1..6849703 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -85,10 +85,12 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
     }
     val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
     if (self.partitioner == Some(partitioner)) {
+      println("Partitioner is some partitioner! In fact, it is " + self.partitioner.toString())
       self.mapPartitionsWithContext((context, iter) => {
         new InterruptibleIterator(context, aggregator.combineValuesByKey(iter))
       }, preservesPartitioning = true)
     } else if (mapSideCombine) {
+      println("Otherwise, combining on map side.")
       val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
       val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
         .setSerializer(serializerClass)
@@ -96,6 +98,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
         new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter))
       }, preservesPartitioning = true)
     } else {
+      println("Else. No combining on map side!")
       // Don't apply map-side combiner.
       // A sanity check to make sure mergeCombiners is not defined.
       assert(mergeCombiners == null)
@@ -647,6 +650,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
    * MapReduce job.
    */
   def saveAsHadoopDataset(conf: JobConf) {
+    println("SAVE AS HADOOP DATASET")
     val outputFormatClass = conf.getOutputFormat
     val keyClass = conf.getOutputKeyClass
     val valueClass = conf.getOutputValueClass
@@ -666,6 +670,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
     writer.preSetup()
 
     def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) {
+      println("WRITE TO FILE")
       // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
       // around by taking a mod. We expect that no task will be attempted 2 billion times.
       val attemptNumber = (context.attemptId % Int.MaxValue).toInt
@@ -673,13 +678,17 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
       writer.setup(context.stageId, context.partitionId, attemptNumber)
       writer.open()
 
+      println("START LOOP\n\n\n")
       var count = 0
       while(iter.hasNext) {
+        println("Before next()")
         val record = iter.next()
         count += 1
+        println("Before write. Record = "+record)
         writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
+        println("After write. Record = "+record)
       }
-
+      println("ALL DONE! Woohoo.")
       writer.close()
       writer.commit()
     }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/97fbb3ec/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
new file mode 100644
index 0000000..28a3b7e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io._
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+/**
+ * A simple map that spills sorted content to disk when the memory threshold is exceeded. A combiner
+ * function must be specified to merge values back into memory during read.
+ */
+class ExternalAppendOnlyMap[K,V](combinerFunction: (V, V) => V,
+                                 memoryThresholdMB: Int = 1024)
+  extends Iterable[(K,V)] with Serializable {
+
+  var currentMap = new AppendOnlyMap[K,V]
+  var oldMaps = new ArrayBuffer[DiskKVIterator]
+
+  def changeValue(key: K, updateFunc: (Boolean, V) => V): Unit = {
+    currentMap.changeValue(key, updateFunc)
+    val mapSize = SizeEstimator.estimate(currentMap)
+    //if (mapSize > memoryThresholdMB * math.pow(1024, 2)) {
+    if (mapSize > 1024 * 10) {
+      spill()
+    }
+  }
+
+  def spill(): Unit = {
+    println("SPILL")
+    val file = File.createTempFile("external_append_only_map", "")  // Add spill location
+    val out = new ObjectOutputStream(new FileOutputStream(file))
+    val sortedMap = currentMap.iterator.toList.sortBy(kv => kv._1.hashCode())
+    sortedMap foreach {
+      out.writeObject( _ )
+    }
+    out.close()
+    currentMap = new AppendOnlyMap[K,V]
+    oldMaps.append(new DiskKVIterator(file))
+  }
+
+  override def iterator: Iterator[(K,V)] = new ExternalIterator()
+
+  /**
+   *  An iterator that merges KV pairs from memory and disk in sorted order
+   */
+  class ExternalIterator extends Iterator[(K, V)] {
+
+    // Order by increasing key hash value
+    implicit object KVOrdering extends Ordering[KVITuple] {
+      def compare(a:KVITuple, b:KVITuple) = -a.key.hashCode().compareTo(b.key.hashCode())
+    }
+    val pq = mutable.PriorityQueue[KVITuple]()
+    val inputStreams = Seq(new MemoryKVIterator(currentMap)) ++ oldMaps
+    inputStreams foreach { readFromIterator }
+
+    override def hasNext: Boolean = !pq.isEmpty
+
+    override def next(): (K,V) = {
+      println("ExternalIterator.next - How many left? "+pq.length)
+      val minKVI = pq.dequeue()
+      var (minKey, minValue, minIter) = (minKVI.key, minKVI.value, minKVI.iter)
+//      println("Min key = "+minKey)
+      readFromIterator(minIter)
+      while (!pq.isEmpty && pq.head.key == minKey) {
+        val newKVI = pq.dequeue()
+        val (newValue, newIter) = (newKVI.value, newKVI.iter)
+//        println("\tfound new value to merge! "+newValue)
+//        println("\tcombinerFunction("+minValue+" <====> "+newValue+")")
+        minValue = combinerFunction(minValue, newValue)
+//        println("\tCombine complete! New value = "+minValue)
+        readFromIterator(newIter)
+      }
+      println("Returning minKey = "+minKey+", minValue = "+minValue)
+      (minKey, minValue)
+    }
+
+    def readFromIterator(iter: Iterator[(K,V)]): Unit = {
+      if (iter.hasNext) {
+        val (k, v) = iter.next()
+        pq.enqueue(KVITuple(k, v, iter))
+      }
+    }
+
+    case class KVITuple(key:K, value:V, iter:Iterator[(K,V)])
+  }
+
+  class MemoryKVIterator(map: AppendOnlyMap[K,V]) extends Iterator[(K,V)] {
+    val sortedMap = currentMap.iterator.toList.sortBy(kv => kv._1.hashCode())
+    val it = sortedMap.iterator
+    override def hasNext: Boolean = it.hasNext
+    override def next(): (K,V) = it.next()
+  }
+
+  class DiskKVIterator(file: File) extends Iterator[(K,V)] {
+    val in = new ObjectInputStream(new FileInputStream(file))
+    var nextItem:(K,V) = _
+    var eof = false
+
+    override def hasNext: Boolean = {
+      if (eof) {
+        return false
+      }
+      try {
+        nextItem = in.readObject().asInstanceOf[(K,V)]
+      } catch {
+        case e: EOFException =>
+          eof = true
+          return false
+      }
+      true
+    }
+
+    override def next(): (K,V) = {
+      if (eof) {
+        throw new NoSuchElementException
+      }
+      nextItem
+    }
+  }
+}