You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/11 01:25:42 UTC
[25/50] git commit: Simplify merge logic based on the invariant that
all spills contain unique keys
Simplify merge logic based on the invariant that all spills contain unique keys
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/d6e7910d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/d6e7910d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/d6e7910d
Branch: refs/heads/master
Commit: d6e7910d925039d9b57d82e7ca17e775c52fbee5
Parents: 2b71ab9
Author: Andrew Or <an...@gmail.com>
Authored: Mon Dec 30 13:01:00 2013 -0800
Committer: Andrew Or <an...@gmail.com>
Committed: Mon Dec 30 13:01:00 2013 -0800
----------------------------------------------------------------------
.../util/collection/ExternalAppendOnlyMap.scala | 59 ++++++++------------
1 file changed, 22 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d6e7910d/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 4349e8d..0e8f46c 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -185,29 +185,15 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
kgPairs
}
- // Drop and return all (K, G) pairs with K = the given key from the given KGITuple
- def dropKey(kgi: KGITuple, key: K): ArrayBuffer[(K, G)] = {
- val dropped = new ArrayBuffer[(K, G)]
- var i = 0
- while (i < kgi.pairs.length) {
- if (kgi.pairs(i)._1 == key) {
- dropped += kgi.pairs.remove(i)
- } else {
- i += 1
- }
- }
- dropped
- }
-
- // Merge all (K, G) pairs with K = the given key into baseGroup
- def mergeIntoGroup(key: K, baseGroup: G, kgPairs: ArrayBuffer[(K, G)]): G = {
- var mergedGroup = baseGroup
- kgPairs.foreach { case (k, g) =>
- if (k == key){
- mergedGroup = mergeGroups(mergedGroup, g)
+ // From the given KGITuple, remove the (K, G) pair with K = key and merge it into baseGroup
+ def mergeIntoGroup(key: K, baseGroup: G, kgi: KGITuple): G = {
+ kgi.pairs.zipWithIndex.foreach { case ((k, g), i) =>
+ if (k == key) {
+ kgi.pairs.remove(i)
+ return mergeGroups(baseGroup, g)
}
}
- mergedGroup
+ baseGroup
}
override def hasNext: Boolean = {
@@ -226,28 +212,27 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
// Should only happen when hasNext is false
throw new NoSuchElementException
}
- var (minKey, minGroup) = minPairs(0)
- assert(minKey.hashCode() == minHash)
- // Merge the rest of minPairs into minGroup
- val minPairsWithKey = dropKey(minKGI, minKey).tail
- minGroup = mergeIntoGroup(minKey, minGroup, minPairsWithKey)
- if (minPairs.length == 0) {
- minPairs ++= readFromIterator(minKGI.iterator)
- }
+ // Select a return key with the minimum hash
+ var (minKey, minGroup) = minPairs.remove(0)
+ assert(minKey.hashCode() == minHash)
- // Do the same for all other KGITuples with the same minHash
- val tuplesToAddBack = ArrayBuffer[KGITuple](minKGI)
+ // Merge all other KGITuple's with the same minHash
+ val dequeuedKGI = ArrayBuffer[KGITuple](minKGI)
while (!mergeHeap.isEmpty && mergeHeap.head.minHash == minHash) {
val newKGI = mergeHeap.dequeue()
- val pairsWithKey = dropKey(newKGI, minKey)
- minGroup = mergeIntoGroup(minKey, minGroup, pairsWithKey)
- if (newKGI.pairs.length == 0) {
- newKGI.pairs ++= readFromIterator(newKGI.iterator)
+ minGroup = mergeIntoGroup(minKey, minGroup, newKGI)
+ dequeuedKGI += newKGI
+ }
+
+ // Repopulate and add back all dequeued KGI to mergeHeap
+ dequeuedKGI.foreach { kgi =>
+ if (kgi.pairs.length == 0) {
+ kgi.pairs ++= readFromIterator(kgi.iterator)
}
- tuplesToAddBack += newKGI
+ mergeHeap.enqueue(kgi)
}
- tuplesToAddBack.foreach(mergeHeap.enqueue(_))
+
(minKey, createCombiner(minGroup))
}