You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/03/26 08:09:47 UTC

git commit: SPARK-1321 Use Guava's top k implementation rather than our BoundedPriorityQueue based implementation

Repository: spark
Updated Branches:
  refs/heads/master 4f7d547b8 -> b859853ba


SPARK-1321 Use Guava's top k implementation rather than our BoundedPriorityQueue based implementation

Also updated the documentation for top and takeOrdered.

On my simple test of sorting 100 million (Int, Int) tuples using Spark, Guava's top k implementation (in Ordering) is much faster than the BoundedPriorityQueue implementation for roughly sorted input (10 - 20X faster), and still faster for purely random input (2 - 5X).

Author: Reynold Xin <rx...@apache.org>

Closes #229 from rxin/takeOrdered and squashes the following commits:

0d11844 [Reynold Xin] Use Guava's top k implementation rather than our BoundedPriorityQueue based implementation. Also updated the documentation for top and takeOrdered.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b859853b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b859853b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b859853b

Branch: refs/heads/master
Commit: b859853ba47b6323af0e31a4e2099e943221e1b1
Parents: 4f7d547
Author: Reynold Xin <rx...@apache.org>
Authored: Wed Mar 26 00:09:44 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Wed Mar 26 00:09:44 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/rdd/RDD.scala   | 49 +++++++++++++-------
 .../apache/spark/util/collection/Utils.scala    | 39 ++++++++++++++++
 2 files changed, 72 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b859853b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 4f9d39f..6af4224 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -927,32 +927,49 @@ abstract class RDD[T: ClassTag](
   }
 
   /**
-   * Returns the top K elements from this RDD as defined by
-   * the specified implicit Ordering[T].
+   * Returns the top K (largest) elements from this RDD as defined by the specified
+   * implicit Ordering[T]. This does the opposite of [[takeOrdered]]. For example:
+   * {{{
+   *   sc.parallelize([10, 4, 2, 12, 3]).top(1)
+   *   // returns [12]
+   *
+   *   sc.parallelize([2, 3, 4, 5, 6]).top(2)
+   *   // returns [6, 5]
+   * }}}
+   *
    * @param num the number of top elements to return
    * @param ord the implicit ordering for T
    * @return an array of top elements
    */
-  def top(num: Int)(implicit ord: Ordering[T]): Array[T] = {
-    mapPartitions { items =>
-      val queue = new BoundedPriorityQueue[T](num)
-      queue ++= items
-      Iterator.single(queue)
-    }.reduce { (queue1, queue2) =>
-      queue1 ++= queue2
-      queue1
-    }.toArray.sorted(ord.reverse)
-  }
+  def top(num: Int)(implicit ord: Ordering[T]): Array[T] = takeOrdered(num)(ord.reverse)
 
   /**
-   * Returns the first K elements from this RDD as defined by
-   * the specified implicit Ordering[T] and maintains the
-   * ordering.
+   * Returns the first K (smallest) elements from this RDD as defined by the specified
+   * implicit Ordering[T] and maintains the ordering. This does the opposite of [[top]].
+   * For example:
+   * {{{
+   *   sc.parallelize([10, 4, 2, 12, 3]).takeOrdered(1)
+   *   // returns [12]
+   *
+   *   sc.parallelize([2, 3, 4, 5, 6]).takeOrdered(2)
+   *   // returns [2, 3]
+   * }}}
+   *
    * @param num the number of top elements to return
    * @param ord the implicit ordering for T
    * @return an array of top elements
    */
-  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse)
+  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = {
+    mapPartitions { items =>
+      // Priority keeps the largest elements, so let's reverse the ordering.
+      val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
+      queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
+      Iterator.single(queue)
+    }.reduce { (queue1, queue2) =>
+      queue1 ++= queue2
+      queue1
+    }.toArray.sorted(ord)
+  }
 
   /**
    * Returns the max of this RDD as defined by the implicit Ordering[T].

http://git-wip-us.apache.org/repos/asf/spark/blob/b859853b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala
new file mode 100644
index 0000000..c5268c0
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.collection.JavaConversions.{collectionAsScalaIterable, asJavaIterator}
+
+import com.google.common.collect.{Ordering => GuavaOrdering}
+
+/**
+ * Utility functions for collections.
+ */
+private[spark] object Utils {
+
+  /**
+   * Returns the first K elements from the input as defined by the specified implicit Ordering[T]
+   * and maintains the ordering.
+   */
+  def takeOrdered[T](input: Iterator[T], num: Int)(implicit ord: Ordering[T]): Iterator[T] = {
+    val ordering = new GuavaOrdering[T] {
+      override def compare(l: T, r: T) = ord.compare(l, r)
+    }
+    collectionAsScalaIterable(ordering.leastOf(asJavaIterator(input), num)).iterator
+  }
+}