You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/11 01:25:26 UTC
[09/50] git commit: Provide for cases when mergeCombiners is not
specified in ExternalAppendOnlyMap
Provide for cases when mergeCombiners is not specified in ExternalAppendOnlyMap
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/28685a48
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/28685a48
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/28685a48
Branch: refs/heads/master
Commit: 28685a482032f4b42cb46d1b24de1cc1dd1180c1
Parents: 17def8c
Author: Andrew Or <an...@gmail.com>
Authored: Wed Dec 25 15:16:57 2013 -0800
Committer: Andrew Or <an...@gmail.com>
Committed: Thu Dec 26 23:40:07 2013 -0800
----------------------------------------------------------------------
.../org/apache/spark/rdd/CoGroupedRDD.scala | 2 +-
.../spark/util/ExternalAppendOnlyMap.scala | 187 ++++++++++++-------
2 files changed, 121 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/28685a48/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index b93c60c..a2a3de7 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -126,7 +126,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
new InterruptibleIterator(context, combiners.iterator)
}
- def createExternalMap(numRdds:Int): ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = {
+ private def createExternalMap(numRdds:Int): ExternalAppendOnlyMap [K, CoGroupValue, CoGroupCombiner] = {
def createCombiner(v: CoGroupValue): CoGroupCombiner = {
val newCombiner = Array.fill(numRdds)(new CoGroup)
mergeValue(newCombiner, v)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/28685a48/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
index 1a57536..e2205c6 100644
--- a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
@@ -18,11 +18,16 @@
package org.apache.spark.util
import java.io._
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable
+import scala.collection.mutable.{ArrayBuffer, PriorityQueue}
/**
- * An append-only map that spills sorted content to disk when the memory threshold is exceeded.
+ * A wrapper for SpillableAppendOnlyMap that handles two cases:
+ *
+ * (1) If a mergeCombiners function is specified, merge values into combiners before
+ * disk spill, as it is possible to merge the resulting combiners later
+ *
+ * (2) Otherwise, group values of the same key together before disk spill, and merge
+ * them into combiners only after reading them back from disk
*/
class ExternalAppendOnlyMap[K, V, C] (createCombiner: V => C,
mergeValue: (C, V) => C,
@@ -30,78 +35,103 @@ class ExternalAppendOnlyMap[K, V, C] (createCombiner: V => C,
memoryThresholdMB: Int = 1024)
extends Iterable[(K, C)] with Serializable {
- var currentMap = new AppendOnlyMap[K, C]
- var oldMaps = new ArrayBuffer[DiskKCIterator]
+ private val mergeBeforeSpill: Boolean = mergeCombiners != null
+
+ private val map: SpillableAppendOnlyMap[K, V, _, C] = {
+ if (mergeBeforeSpill) {
+ println("* Merge before spill *")
+ new SpillableAppendOnlyMap[K, V, C, C] (createCombiner,
+ mergeValue, mergeCombiners, combinerIdentity, memoryThresholdMB)
+ } else {
+ println("* Merge after spill *")
+ new SpillableAppendOnlyMap[K, V, ArrayBuffer[V], C] (createGroup,
+ mergeValueIntoGroup, mergeGroups, combineGroup, memoryThresholdMB)
+ }
+ }
+
+ def insert(key: K, value: V): Unit = map.insert(key, value)
+
+ override def iterator: Iterator[(K, C)] = map.iterator
+
+ private def combinerIdentity(combiner: C): C = combiner
+ private def createGroup(value: V): ArrayBuffer[V] = ArrayBuffer[V](value)
+ private def mergeValueIntoGroup(group: ArrayBuffer[V], value: V): ArrayBuffer[V] = {
+ group += value
+ group
+ }
+ private def mergeGroups(group1: ArrayBuffer[V], group2: ArrayBuffer[V]): ArrayBuffer[V] = {
+ group1 ++= group2
+ group1
+ }
+ private def combineGroup(group: ArrayBuffer[V]): C = {
+ var combiner : Option[C] = None
+ group.foreach { v =>
+ combiner match {
+ case None => combiner = Some(createCombiner(v))
+ case Some(c) => combiner = Some(mergeValue(c, v))
+ }
+ }
+ combiner.get
+ }
+}
+
+/**
+ * An append-only map that spills sorted content to disk when the memory threshold
+ * is exceeded. A group with type M is an intermediate combiner, and shares the same
+ * type as either C or ArrayBuffer[V].
+ */
+class SpillableAppendOnlyMap[K, V, M, C] (createGroup: V => M,
+ mergeValue: (M, V) => M,
+ mergeGroups: (M, M) => M,
+ createCombiner: M => C,
+ memoryThresholdMB: Int = 1024)
+ extends Iterable[(K, C)] with Serializable {
+
+ var currentMap = new AppendOnlyMap[K, M]
+ var oldMaps = new ArrayBuffer[DiskIterator]
def insert(key: K, value: V): Unit = {
- currentMap.changeValue(key, updateFunction(value))
+ def update(hadVal: Boolean, oldVal: M): M = {
+ if (hadVal) mergeValue(oldVal, value) else createGroup(value)
+ }
+ currentMap.changeValue(key, update)
val mapSize = SizeEstimator.estimate(currentMap)
- if (mapSize > memoryThresholdMB * math.pow(1024, 2)) {
+ //if (mapSize > memoryThresholdMB * math.pow(1024, 2)) {
+ if (mapSize > 1024 * 10) {
spill()
}
}
- def updateFunction(value: V) : (Boolean, C) => C = {
- (hadVal: Boolean, oldVal: C) =>
- if (hadVal) mergeValue(oldVal, value) else createCombiner(value)
- }
-
def spill(): Unit = {
+ println("> SPILL <")
val file = File.createTempFile("external_append_only_map", "") // Add spill location
val out = new ObjectOutputStream(new FileOutputStream(file))
val sortedMap = currentMap.iterator.toList.sortBy(kv => kv._1.hashCode())
sortedMap.foreach { out.writeObject( _ ) }
out.close()
- currentMap = new AppendOnlyMap[K, C]
- oldMaps.append(new DiskKCIterator(file))
+ currentMap = new AppendOnlyMap[K, M]
+ oldMaps.append(new DiskIterator(file))
}
override def iterator: Iterator[(K, C)] = new ExternalIterator()
- /**
- * An iterator that merges KV pairs from memory and disk in sorted order
- */
+ // An iterator that sort-merges (K, M) pairs from memory and disk into (K, C) pairs
class ExternalIterator extends Iterator[(K, C)] {
// Order by increasing key hash value
- implicit object KVOrdering extends Ordering[KCITuple] {
- def compare(a:KCITuple, b:KCITuple) = -a.key.hashCode().compareTo(b.key.hashCode())
+ implicit object KVOrdering extends Ordering[KMITuple] {
+ def compare(a:KMITuple, b:KMITuple) = -a.key.hashCode().compareTo(b.key.hashCode())
}
- val pq = mutable.PriorityQueue[KCITuple]()
- val inputStreams = Seq(new MemoryKCIterator(currentMap)) ++ oldMaps
+ val pq = PriorityQueue[KMITuple]()
+ val inputStreams = Seq(new MemoryIterator(currentMap)) ++ oldMaps
inputStreams.foreach { readFromIterator( _ ) }
- override def hasNext: Boolean = !pq.isEmpty
-
- // Combine all values from all input streams corresponding to the same key
- override def next(): (K, C) = {
- val minKCI = pq.dequeue()
- var (minKey, minCombiner) = (minKCI.key, minKCI.combiner)
- val minHash = minKey.hashCode()
- readFromIterator(minKCI.iter)
-
- var collidedKCI = ArrayBuffer[KCITuple]()
- while (!pq.isEmpty && pq.head.key.hashCode() == minHash) {
- val newKCI: KCITuple = pq.dequeue()
- if (newKCI.key == minKey){
- minCombiner = mergeCombiners(minCombiner, newKCI.combiner)
- readFromIterator(newKCI.iter)
- } else {
- // Collision
- collidedKCI += newKCI
- }
- }
- collidedKCI.foreach { pq.enqueue( _ ) }
- (minKey, minCombiner)
- }
-
- // Read from the given iterator until a key of different hash is retrieved,
- // Add each KC pair read from this iterator to the heap
- def readFromIterator(iter: Iterator[(K, C)]): Unit = {
+ // Read from the given iterator until a key of different hash is retrieved
+ def readFromIterator(iter: Iterator[(K, M)]): Unit = {
var minHash : Option[Int] = None
while (iter.hasNext) {
- val (k, c) = iter.next()
- pq.enqueue(KCITuple(k, c, iter))
+ val (k, m) = iter.next()
+ pq.enqueue(KMITuple(k, m, iter))
minHash match {
case None => minHash = Some(k.hashCode())
case Some(expectedHash) =>
@@ -112,40 +142,63 @@ class ExternalAppendOnlyMap[K, V, C] (createCombiner: V => C,
}
}
- case class KCITuple(key:K, combiner:C, iter:Iterator[(K, C)])
+ override def hasNext: Boolean = !pq.isEmpty
+
+ override def next(): (K, C) = {
+ val minKMI = pq.dequeue()
+ var (minKey, minGroup) = (minKMI.key, minKMI.group)
+ val minHash = minKey.hashCode()
+ readFromIterator(minKMI.iterator)
+
+ // Merge groups with the same key into minGroup
+ var collidedKMI = ArrayBuffer[KMITuple]()
+ while (!pq.isEmpty && pq.head.key.hashCode() == minHash) {
+ val newKMI = pq.dequeue()
+ if (newKMI.key == minKey) {
+ minGroup = mergeGroups(minGroup, newKMI.group)
+ readFromIterator(newKMI.iterator)
+ } else {
+ // Collision
+ collidedKMI += newKMI
+ }
+ }
+ collidedKMI.foreach { pq.enqueue( _ ) }
+ (minKey, createCombiner(minGroup))
+ }
+
+ case class KMITuple(key:K, group:M, iterator:Iterator[(K, M)])
}
- class MemoryKCIterator(map: AppendOnlyMap[K, C]) extends Iterator[(K, C)] {
+ // Iterate through (K, M) pairs in sorted order from the in-memory map
+ class MemoryIterator(map: AppendOnlyMap[K, M]) extends Iterator[(K, M)] {
val sortedMap = currentMap.iterator.toList.sortBy(kc => kc._1.hashCode())
val it = sortedMap.iterator
override def hasNext: Boolean = it.hasNext
- override def next(): (K, C) = it.next()
+ override def next(): (K, M) = it.next()
}
- class DiskKCIterator(file: File) extends Iterator[(K, C)] {
+ // Iterate through (K, M) pairs in sorted order from an on-disk map
+ class DiskIterator(file: File) extends Iterator[(K, M)] {
val in = new ObjectInputStream(new FileInputStream(file))
- var nextItem:(K, C) = _
- var eof = false
+ var nextItem: Option[(K, M)] = None
override def hasNext: Boolean = {
- if (eof) {
- return false
- }
try {
- nextItem = in.readObject().asInstanceOf[(K, C)]
+ nextItem = Some(in.readObject().asInstanceOf[(K, M)])
+ true
} catch {
case e: EOFException =>
- eof = true
- return false
+ nextItem = None
+ false
}
- true
}
- override def next(): (K, C) = {
- if (eof) {
- throw new NoSuchElementException
+ override def next(): (K, M) = {
+ nextItem match {
+ case Some(item) => item
+ case None =>
+ throw new NoSuchElementException
}
- nextItem
}
}
}