You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/11 01:25:42 UTC

[25/50] git commit: Simplify merge logic based on the invariant that all spills contain unique keys

Simplify merge logic based on the invariant that all spills contain unique keys


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/d6e7910d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/d6e7910d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/d6e7910d

Branch: refs/heads/master
Commit: d6e7910d925039d9b57d82e7ca17e775c52fbee5
Parents: 2b71ab9
Author: Andrew Or <an...@gmail.com>
Authored: Mon Dec 30 13:01:00 2013 -0800
Committer: Andrew Or <an...@gmail.com>
Committed: Mon Dec 30 13:01:00 2013 -0800

----------------------------------------------------------------------
 .../util/collection/ExternalAppendOnlyMap.scala | 59 ++++++++------------
 1 file changed, 22 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d6e7910d/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 4349e8d..0e8f46c 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -185,29 +185,15 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
       kgPairs
     }
 
-    // Drop and return all (K, G) pairs with K = the given key from the given KGITuple
-    def dropKey(kgi: KGITuple, key: K): ArrayBuffer[(K, G)] = {
-      val dropped = new ArrayBuffer[(K, G)]
-      var i = 0
-      while (i < kgi.pairs.length) {
-        if (kgi.pairs(i)._1 == key) {
-          dropped += kgi.pairs.remove(i)
-        } else {
-          i += 1
-        }
-      }
-      dropped
-    }
-
-    // Merge all (K, G) pairs with K = the given key into baseGroup
-    def mergeIntoGroup(key: K, baseGroup: G, kgPairs: ArrayBuffer[(K, G)]): G = {
-      var mergedGroup = baseGroup
-      kgPairs.foreach { case (k, g) =>
-        if (k == key){
-          mergedGroup = mergeGroups(mergedGroup, g)
+    // From the given KGITuple, remove the (K, G) pair with K = key and merge it into baseGroup
+    def mergeIntoGroup(key: K, baseGroup: G, kgi: KGITuple): G = {
+      kgi.pairs.zipWithIndex.foreach { case ((k, g), i) =>
+        if (k == key) {
+          kgi.pairs.remove(i)
+          return mergeGroups(baseGroup, g)
         }
       }
-      mergedGroup
+      baseGroup
     }
 
     override def hasNext: Boolean = {
@@ -226,28 +212,27 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag](
         // Should only happen when hasNext is false
         throw new NoSuchElementException
       }
-      var (minKey, minGroup) = minPairs(0)
-      assert(minKey.hashCode() == minHash)
 
-      // Merge the rest of minPairs into minGroup
-      val minPairsWithKey = dropKey(minKGI, minKey).tail
-      minGroup = mergeIntoGroup(minKey, minGroup, minPairsWithKey)
-      if (minPairs.length == 0) {
-        minPairs ++= readFromIterator(minKGI.iterator)
-      }
+      // Select a return key with the minimum hash
+      var (minKey, minGroup) = minPairs.remove(0)
+      assert(minKey.hashCode() == minHash)
 
-      // Do the same for all other KGITuples with the same minHash
-      val tuplesToAddBack = ArrayBuffer[KGITuple](minKGI)
+      // Merge all other KGITuple's with the same minHash
+      val dequeuedKGI = ArrayBuffer[KGITuple](minKGI)
       while (!mergeHeap.isEmpty && mergeHeap.head.minHash == minHash) {
         val newKGI = mergeHeap.dequeue()
-        val pairsWithKey = dropKey(newKGI, minKey)
-        minGroup = mergeIntoGroup(minKey, minGroup, pairsWithKey)
-        if (newKGI.pairs.length == 0) {
-          newKGI.pairs ++= readFromIterator(newKGI.iterator)
+        minGroup = mergeIntoGroup(minKey, minGroup, newKGI)
+        dequeuedKGI += newKGI
+      }
+
+      // Repopulate and add back all dequeued KGI to mergeHeap
+      dequeuedKGI.foreach { kgi =>
+        if (kgi.pairs.length == 0) {
+          kgi.pairs ++= readFromIterator(kgi.iterator)
         }
-        tuplesToAddBack += newKGI
+        mergeHeap.enqueue(kgi)
       }
-      tuplesToAddBack.foreach(mergeHeap.enqueue(_))
+
       (minKey, createCombiner(minGroup))
     }