You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/09/08 02:58:05 UTC
git commit: [SPARK-3394] [SQL] Fix crash in TakeOrdered when limit is
0
Repository: spark
Updated Branches:
refs/heads/master 3fb57a0ab -> 6754570d8
[SPARK-3394] [SQL] Fix crash in TakeOrdered when limit is 0
This resolves https://issues.apache.org/jira/browse/SPARK-3394
Author: Eric Liang <ek...@google.com>
Closes #2264 from ericl/spark-3394 and squashes the following commits:
c87355b [Eric Liang] refactor
bfb6140 [Eric Liang] change RDD takeOrdered instead
7a51528 [Eric Liang] fix takeordered when limit = 0
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6754570d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6754570d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6754570d
Branch: refs/heads/master
Commit: 6754570d83044c4fbaf0d2ac2378a0e081a93629
Parents: 3fb57a0
Author: Eric Liang <ek...@google.com>
Authored: Sun Sep 7 17:57:59 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Sun Sep 7 17:57:59 2014 -0700
----------------------------------------------------------------------
.../main/scala/org/apache/spark/rdd/RDD.scala | 22 ++++++++++++--------
.../scala/org/apache/spark/rdd/RDDSuite.scala | 7 +++++++
2 files changed, 20 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/6754570d/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 1cf55e8..a9b905b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1127,15 +1127,19 @@ abstract class RDD[T: ClassTag](
* @return an array of top elements
*/
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = {
- mapPartitions { items =>
- // Priority keeps the largest elements, so let's reverse the ordering.
- val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
- queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
- Iterator.single(queue)
- }.reduce { (queue1, queue2) =>
- queue1 ++= queue2
- queue1
- }.toArray.sorted(ord)
+ if (num == 0) {
+ Array.empty
+ } else {
+ mapPartitions { items =>
+ // Priority keeps the largest elements, so let's reverse the ordering.
+ val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
+ queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
+ Iterator.single(queue)
+ }.reduce { (queue1, queue2) =>
+ queue1 ++= queue2
+ queue1
+ }.toArray.sorted(ord)
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/6754570d/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 926d4fe..499dcda 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -521,6 +521,13 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(sortedLowerK === Array(1, 2, 3, 4, 5))
}
+ test("takeOrdered with limit 0") {
+ val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
+ val rdd = sc.makeRDD(nums, 2)
+ val sortedLowerK = rdd.takeOrdered(0)
+ assert(sortedLowerK.size === 0)
+ }
+
test("takeOrdered with custom ordering") {
val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
implicit val ord = implicitly[Ordering[Int]].reverse
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org