You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/07/16 20:07:20 UTC

git commit: SPARK-2519. Eliminate pattern-matching on Tuple2 in performance-critical...

Repository: spark
Updated Branches:
  refs/heads/master 1c5739f68 -> fc7edc9e7


SPARK-2519. Eliminate pattern-matching on Tuple2 in performance-critical...

... aggregation code

Author: Sandy Ryza <sa...@cloudera.com>

Closes #1435 from sryza/sandy-spark-2519 and squashes the following commits:

640706a [Sandy Ryza] SPARK-2519. Eliminate pattern-matching on Tuple2 in performance-critical aggregation code


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fc7edc9e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fc7edc9e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fc7edc9e

Branch: refs/heads/master
Commit: fc7edc9e76f97b25e456ae7b72ef8636656f4f1a
Parents: 1c5739f
Author: Sandy Ryza <sa...@cloudera.com>
Authored: Wed Jul 16 11:07:16 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Wed Jul 16 11:07:16 2014 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/Aggregator.scala   |  8 ++++----
 .../spark/util/collection/ExternalAppendOnlyMap.scala   | 12 +++++++-----
 2 files changed, 11 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fc7edc9e/core/src/main/scala/org/apache/spark/Aggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 59fdf65..1d64057 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -56,8 +56,8 @@ case class Aggregator[K, V, C] (
     } else {
       val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
       while (iter.hasNext) {
-        val (k, v) = iter.next()
-        combiners.insert(k, v)
+        val pair = iter.next()
+        combiners.insert(pair._1, pair._2)
       }
       // TODO: Make this non optional in a future release
       Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
@@ -85,8 +85,8 @@ case class Aggregator[K, V, C] (
     } else {
       val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
       while (iter.hasNext) {
-        val (k, c) = iter.next()
-        combiners.insert(k, c)
+        val pair = iter.next()
+        combiners.insert(pair._1, pair._2)
       }
       // TODO: Make this non optional in a future release
       Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)

http://git-wip-us.apache.org/repos/asf/spark/blob/fc7edc9e/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 292d096..765254b 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -268,10 +268,10 @@ class ExternalAppendOnlyMap[K, V, C](
     private def mergeIfKeyExists(key: K, baseCombiner: C, buffer: StreamBuffer): C = {
       var i = 0
       while (i < buffer.pairs.length) {
-        val (k, c) = buffer.pairs(i)
-        if (k == key) {
+        val pair = buffer.pairs(i)
+        if (pair._1 == key) {
           buffer.pairs.remove(i)
-          return mergeCombiners(baseCombiner, c)
+          return mergeCombiners(baseCombiner, pair._2)
         }
         i += 1
       }
@@ -293,9 +293,11 @@ class ExternalAppendOnlyMap[K, V, C](
       }
       // Select a key from the StreamBuffer that holds the lowest key hash
       val minBuffer = mergeHeap.dequeue()
-      val (minPairs, minHash) = (minBuffer.pairs, minBuffer.minKeyHash)
+      val minPairs = minBuffer.pairs
+      val minHash = minBuffer.minKeyHash
       val minPair = minPairs.remove(0)
-      var (minKey, minCombiner) = minPair
+      val minKey = minPair._1
+      var minCombiner = minPair._2
       assert(getKeyHashCode(minPair) == minHash)
 
       // For all other streams that may have this key (i.e. have the same minimum key hash),