You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/11 01:25:21 UTC
[04/50] git commit: Working ExternalAppendOnlyMap for Aggregator,
but not for CoGroupedRDD
Working ExternalAppendOnlyMap for Aggregator, but not for CoGroupedRDD
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/97fbb3ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/97fbb3ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/97fbb3ec
Branch: refs/heads/master
Commit: 97fbb3ec52785883a0eee8644f9f4603c4c9df21
Parents: 5e69fc5
Author: Andrew Or <an...@gmail.com>
Authored: Mon Dec 23 22:50:15 2013 -0800
Committer: Andrew Or <an...@gmail.com>
Committed: Thu Dec 26 23:40:07 2013 -0800
----------------------------------------------------------------------
.../scala/org/apache/spark/Aggregator.scala | 13 +-
.../org/apache/spark/rdd/CoGroupedRDD.scala | 43 ++++--
.../org/apache/spark/rdd/PairRDDFunctions.scala | 11 +-
.../spark/util/ExternalAppendOnlyMap.scala | 136 +++++++++++++++++++
4 files changed, 182 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/97fbb3ec/core/src/main/scala/org/apache/spark/Aggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 1a2ec55..ae16242 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -17,7 +17,7 @@
package org.apache.spark
-import org.apache.spark.util.AppendOnlyMap
+import org.apache.spark.util.{AppendOnlyMap, ExternalAppendOnlyMap}
/**
* A set of functions used to aggregate data.
@@ -32,7 +32,9 @@ case class Aggregator[K, V, C] (
mergeCombiners: (C, C) => C) {
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
- val combiners = new AppendOnlyMap[K, C]
+ println("Combining values by key!!")
+ //val combiners = new AppendOnlyMap[K, C]
+ val combiners = new ExternalAppendOnlyMap[K, C](mergeCombiners)
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
@@ -45,7 +47,9 @@ case class Aggregator[K, V, C] (
}
def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = {
- val combiners = new AppendOnlyMap[K, C]
+ println("Combining combiners by key!!")
+ //val combiners = new AppendOnlyMap[K, C]
+ val combiners = new ExternalAppendOnlyMap[K, C](mergeCombiners)
var kc: (K, C) = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeCombiners(oldValue, kc._2) else kc._2
@@ -56,5 +60,4 @@ case class Aggregator[K, V, C] (
}
combiners.iterator
}
-}
-
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/97fbb3ec/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 911a002..6283686 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext}
import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
-import org.apache.spark.util.AppendOnlyMap
+import org.apache.spark.util.{AppendOnlyMap, ExternalAppendOnlyMap}
private[spark] sealed trait CoGroupSplitDep extends Serializable
@@ -101,36 +101,49 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
override val partitioner = Some(part)
override def compute(s: Partition, context: TaskContext): Iterator[(K, Seq[Seq[_]])] = {
+ println("Computing in CoGroupedRDD!")
+ // e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs)
val split = s.asInstanceOf[CoGroupPartition]
val numRdds = split.deps.size
- // e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs)
- val map = new AppendOnlyMap[K, Seq[ArrayBuffer[Any]]]
-
- val update: (Boolean, Seq[ArrayBuffer[Any]]) => Seq[ArrayBuffer[Any]] = (hadVal, oldVal) => {
- if (hadVal) oldVal else Array.fill(numRdds)(new ArrayBuffer[Any])
- }
-
- val getSeq = (k: K) => {
- map.changeValue(k, update)
- }
+ val combineFunction: (Seq[ArrayBuffer[Any]], Seq[ArrayBuffer[Any]]) => Seq[ArrayBuffer[Any]] =
+ (x, y) => { x ++ y }
+ //val map = new AppendOnlyMap[K, Seq[ArrayBuffer[Any]]]
+ val map = new ExternalAppendOnlyMap[K, Seq[ArrayBuffer[Any]]](combineFunction)
val ser = SparkEnv.get.serializerManager.get(serializerClass)
for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
// Read them from the parent
- rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]].foreach { kv =>
- getSeq(kv._1)(depNum) += kv._2
+ rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]].foreach {
+ kv => addToMap(kv._1, kv._2, depNum)
}
}
case ShuffleCoGroupSplitDep(shuffleId) => {
// Read map outputs of shuffle
val fetcher = SparkEnv.get.shuffleFetcher
fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser).foreach {
- kv => getSeq(kv._1)(depNum) += kv._2
+ kv => addToMap(kv._1, kv._2, depNum)
}
}
}
- new InterruptibleIterator(context, map.iterator)
+
+ def addToMap(key: K, value: Any, depNum: Int) {
+ val updateFunction: (Boolean, Seq[ArrayBuffer[Any]]) => Seq[ArrayBuffer[Any]] =
+ (hadVal, oldVal) => {
+ var newVal = oldVal
+ if (!hadVal){
+ newVal = Array.fill(numRdds)(new ArrayBuffer[Any])
+ }
+ newVal(depNum) += value
+ newVal
+ }
+ map.changeValue(key, updateFunction)
+ }
+
+ println("About to construct CoGroupedRDD iterator!")
+ val theIterator = map.iterator
+ println("Returning CoGroupedRDD iterator!")
+ new InterruptibleIterator(context, theIterator)
}
override def clearDependencies() {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/97fbb3ec/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 48168e1..6849703 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -85,10 +85,12 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
}
val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
if (self.partitioner == Some(partitioner)) {
+ println("Partitioner is some partitioner! In fact, it is " + self.partitioner.toString())
self.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter))
}, preservesPartitioning = true)
} else if (mapSideCombine) {
+ println("Otherwise, combining on map side.")
val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
.setSerializer(serializerClass)
@@ -96,6 +98,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter))
}, preservesPartitioning = true)
} else {
+ println("Else. No combining on map side!")
// Don't apply map-side combiner.
// A sanity check to make sure mergeCombiners is not defined.
assert(mergeCombiners == null)
@@ -647,6 +650,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* MapReduce job.
*/
def saveAsHadoopDataset(conf: JobConf) {
+ println("SAVE AS HADOOP DATASET")
val outputFormatClass = conf.getOutputFormat
val keyClass = conf.getOutputKeyClass
val valueClass = conf.getOutputValueClass
@@ -666,6 +670,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
writer.preSetup()
def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) {
+ println("WRITE TO FILE")
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
@@ -673,13 +678,17 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
writer.setup(context.stageId, context.partitionId, attemptNumber)
writer.open()
+ println("START LOOP\n\n\n")
var count = 0
while(iter.hasNext) {
+ println("Before next()")
val record = iter.next()
count += 1
+ println("Before write. Record = "+record)
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
+ println("After write. Record = "+record)
}
-
+ println("ALL DONE! Woohoo.")
writer.close()
writer.commit()
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/97fbb3ec/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
new file mode 100644
index 0000000..28a3b7e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io._
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+/**
+ * A simple map that spills sorted content to disk when the memory threshold is exceeded. A combiner
+ * function must be specified to merge values back into memory during read.
+ */
+class ExternalAppendOnlyMap[K,V](combinerFunction: (V, V) => V,
+ memoryThresholdMB: Int = 1024)
+ extends Iterable[(K,V)] with Serializable {
+
+ var currentMap = new AppendOnlyMap[K,V]
+ var oldMaps = new ArrayBuffer[DiskKVIterator]
+
+ def changeValue(key: K, updateFunc: (Boolean, V) => V): Unit = {
+ currentMap.changeValue(key, updateFunc)
+ val mapSize = SizeEstimator.estimate(currentMap)
+ //if (mapSize > memoryThresholdMB * math.pow(1024, 2)) {
+ if (mapSize > 1024 * 10) {
+ spill()
+ }
+ }
+
+ def spill(): Unit = {
+ println("SPILL")
+ val file = File.createTempFile("external_append_only_map", "") // Add spill location
+ val out = new ObjectOutputStream(new FileOutputStream(file))
+ val sortedMap = currentMap.iterator.toList.sortBy(kv => kv._1.hashCode())
+ sortedMap foreach {
+ out.writeObject( _ )
+ }
+ out.close()
+ currentMap = new AppendOnlyMap[K,V]
+ oldMaps.append(new DiskKVIterator(file))
+ }
+
+ override def iterator: Iterator[(K,V)] = new ExternalIterator()
+
+ /**
+ * An iterator that merges KV pairs from memory and disk in sorted order
+ */
+ class ExternalIterator extends Iterator[(K, V)] {
+
+ // Order by increasing key hash value
+ implicit object KVOrdering extends Ordering[KVITuple] {
+ def compare(a:KVITuple, b:KVITuple) = -a.key.hashCode().compareTo(b.key.hashCode())
+ }
+ val pq = mutable.PriorityQueue[KVITuple]()
+ val inputStreams = Seq(new MemoryKVIterator(currentMap)) ++ oldMaps
+ inputStreams foreach { readFromIterator }
+
+ override def hasNext: Boolean = !pq.isEmpty
+
+ override def next(): (K,V) = {
+ println("ExternalIterator.next - How many left? "+pq.length)
+ val minKVI = pq.dequeue()
+ var (minKey, minValue, minIter) = (minKVI.key, minKVI.value, minKVI.iter)
+// println("Min key = "+minKey)
+ readFromIterator(minIter)
+ while (!pq.isEmpty && pq.head.key == minKey) {
+ val newKVI = pq.dequeue()
+ val (newValue, newIter) = (newKVI.value, newKVI.iter)
+// println("\tfound new value to merge! "+newValue)
+// println("\tcombinerFunction("+minValue+" <====> "+newValue+")")
+ minValue = combinerFunction(minValue, newValue)
+// println("\tCombine complete! New value = "+minValue)
+ readFromIterator(newIter)
+ }
+ println("Returning minKey = "+minKey+", minValue = "+minValue)
+ (minKey, minValue)
+ }
+
+ def readFromIterator(iter: Iterator[(K,V)]): Unit = {
+ if (iter.hasNext) {
+ val (k, v) = iter.next()
+ pq.enqueue(KVITuple(k, v, iter))
+ }
+ }
+
+ case class KVITuple(key:K, value:V, iter:Iterator[(K,V)])
+ }
+
+ class MemoryKVIterator(map: AppendOnlyMap[K,V]) extends Iterator[(K,V)] {
+ val sortedMap = currentMap.iterator.toList.sortBy(kv => kv._1.hashCode())
+ val it = sortedMap.iterator
+ override def hasNext: Boolean = it.hasNext
+ override def next(): (K,V) = it.next()
+ }
+
+ class DiskKVIterator(file: File) extends Iterator[(K,V)] {
+ val in = new ObjectInputStream(new FileInputStream(file))
+ var nextItem:(K,V) = _
+ var eof = false
+
+ override def hasNext: Boolean = {
+ if (eof) {
+ return false
+ }
+ try {
+ nextItem = in.readObject().asInstanceOf[(K,V)]
+ } catch {
+ case e: EOFException =>
+ eof = true
+ return false
+ }
+ true
+ }
+
+ override def next(): (K,V) = {
+ if (eof) {
+ throw new NoSuchElementException
+ }
+ nextItem
+ }
+ }
+}