You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/03/26 08:09:47 UTC
git commit: SPARK-1321 Use Guava's top k implementation rather than
our BoundedPriorityQueue based implementation
Repository: spark
Updated Branches:
refs/heads/master 4f7d547b8 -> b859853ba
SPARK-1321 Use Guava's top k implementation rather than our BoundedPriorityQueue based implementation
Also updated the documentation for top and takeOrdered.
On my simple test of sorting 100 million (Int, Int) tuples using Spark, Guava's top k implementation (in Ordering) is much faster than the BoundedPriorityQueue implementation for roughly sorted input (10 - 20X faster), and still faster for purely random input (2 - 5X).
Author: Reynold Xin <rx...@apache.org>
Closes #229 from rxin/takeOrdered and squashes the following commits:
0d11844 [Reynold Xin] Use Guava's top k implementation rather than our BoundedPriorityQueue based implementation. Also updated the documentation for top and takeOrdered.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b859853b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b859853b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b859853b
Branch: refs/heads/master
Commit: b859853ba47b6323af0e31a4e2099e943221e1b1
Parents: 4f7d547
Author: Reynold Xin <rx...@apache.org>
Authored: Wed Mar 26 00:09:44 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Wed Mar 26 00:09:44 2014 -0700
----------------------------------------------------------------------
.../main/scala/org/apache/spark/rdd/RDD.scala | 49 +++++++++++++-------
.../apache/spark/util/collection/Utils.scala | 39 ++++++++++++++++
2 files changed, 72 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/b859853b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 4f9d39f..6af4224 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -927,32 +927,49 @@ abstract class RDD[T: ClassTag](
}
/**
- * Returns the top K elements from this RDD as defined by
- * the specified implicit Ordering[T].
+ * Returns the top K (largest) elements from this RDD as defined by the specified
+ * implicit Ordering[T]. This does the opposite of [[takeOrdered]]. For example:
+ * {{{
+ * sc.parallelize([10, 4, 2, 12, 3]).top(1)
+ * // returns [12]
+ *
+ * sc.parallelize([2, 3, 4, 5, 6]).top(2)
+ * // returns [6, 5]
+ * }}}
+ *
* @param num the number of top elements to return
* @param ord the implicit ordering for T
* @return an array of top elements
*/
- def top(num: Int)(implicit ord: Ordering[T]): Array[T] = {
- mapPartitions { items =>
- val queue = new BoundedPriorityQueue[T](num)
- queue ++= items
- Iterator.single(queue)
- }.reduce { (queue1, queue2) =>
- queue1 ++= queue2
- queue1
- }.toArray.sorted(ord.reverse)
- }
+ def top(num: Int)(implicit ord: Ordering[T]): Array[T] = takeOrdered(num)(ord.reverse)
/**
- * Returns the first K elements from this RDD as defined by
- * the specified implicit Ordering[T] and maintains the
- * ordering.
+ * Returns the first K (smallest) elements from this RDD as defined by the specified
+ * implicit Ordering[T] and maintains the ordering. This does the opposite of [[top]].
+ * For example:
+ * {{{
+ * sc.parallelize([10, 4, 2, 12, 3]).takeOrdered(1)
+ * // returns [12]
+ *
+ * sc.parallelize([2, 3, 4, 5, 6]).takeOrdered(2)
+ * // returns [2, 3]
+ * }}}
+ *
* @param num the number of top elements to return
* @param ord the implicit ordering for T
* @return an array of top elements
*/
- def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse)
+ def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = {
+ mapPartitions { items =>
+ // Priority keeps the largest elements, so let's reverse the ordering.
+ val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
+ queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
+ Iterator.single(queue)
+ }.reduce { (queue1, queue2) =>
+ queue1 ++= queue2
+ queue1
+ }.toArray.sorted(ord)
+ }
/**
* Returns the max of this RDD as defined by the implicit Ordering[T].
http://git-wip-us.apache.org/repos/asf/spark/blob/b859853b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala
new file mode 100644
index 0000000..c5268c0
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.collection.JavaConversions.{collectionAsScalaIterable, asJavaIterator}
+
+import com.google.common.collect.{Ordering => GuavaOrdering}
+
+/**
+ * Utility functions for collections.
+ */
+private[spark] object Utils {
+
+ /**
+ * Returns the first K elements from the input as defined by the specified implicit Ordering[T]
+ * and maintains the ordering.
+ */
+ def takeOrdered[T](input: Iterator[T], num: Int)(implicit ord: Ordering[T]): Iterator[T] = {
+ val ordering = new GuavaOrdering[T] {
+ override def compare(l: T, r: T) = ord.compare(l, r)
+ }
+ collectionAsScalaIterable(ordering.leastOf(asJavaIterator(input), num)).iterator
+ }
+}