You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/08/26 06:26:44 UTC

[spark] branch master updated: [SPARK-40225][PYTHON] PySpark rdd.takeOrdered should check num and numPartitions

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 52cd037f321 [SPARK-40225][PYTHON] PySpark rdd.takeOrdered should check num and numPartitions
52cd037f321 is described below

commit 52cd037f321c1e8e1e4d6c1b76b1b4c50fbe3ccd
Author: Ruifeng Zheng <ru...@apache.org>
AuthorDate: Fri Aug 26 15:26:29 2022 +0900

    [SPARK-40225][PYTHON] PySpark rdd.takeOrdered should check num and numPartitions
    
    ### What changes were proposed in this pull request?
    add the validation of `num` and `numPartitions`
    
    ### Why are the changes needed?
    this PR is to keep in line with the scala side
    1, the scala side will check whether `num==0`
    2, the scala side will check whether `numPartitions==0`, so it accept a `emptyRDD`
    
    ```scala
    scala> sc.emptyRDD[Int].takeOrdered(3)
    res2: Array[Int] = Array()
    ```
    
    while in PySpark:
    ```
    In [1]: sc.emptyRDD().takeOrdered(3)
    ---------------------------------------------------------------------------
    ValueError                                Traceback (most recent call last)
    Input In [1], in <cell line: 1>()
    ----> 1 sc.emptyRDD().takeOrdered(3)
    
    File ~/.dev/spark-3.3.0-bin-hadoop3/python/pyspark/rdd.py:1823, in RDD.takeOrdered(self, num, key)
       1820 def merge(a: List[T], b: List[T]) -> List[T]:
       1821     return heapq.nsmallest(num, a + b, key)
    -> 1823 return self.mapPartitions(lambda it: [heapq.nsmallest(num, it, key)]).reduce(merge)
    
    File ~/.dev/spark-3.3.0-bin-hadoop3/python/pyspark/rdd.py:1253, in RDD.reduce(self, f)
       1251 if vals:
       1252     return reduce(f, vals)
    -> 1253 raise ValueError("Can not reduce() empty RDD")
    
    ValueError: Can not reduce() empty RDD
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    added doctest
    
    Closes #37669 from zhengruifeng/py_rdd_take_ordered.
    
    Authored-by: Ruifeng Zheng <ru...@apache.org>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 python/pyspark/rdd.py | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)

diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 181935aa8d7..b631f141a89 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -2732,12 +2732,20 @@ class RDD(Generic[T_co]):
         [1, 2, 3, 4, 5, 6]
         >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x)
         [10, 9, 7, 6, 5, 4]
+        >>> sc.emptyRDD().takeOrdered(3)
+        []
         """
+        if num < 0:
+            raise ValueError("top N cannot be negative.")
 
-        def merge(a: List[T], b: List[T]) -> List[T]:
-            return heapq.nsmallest(num, a + b, key)
+        if num == 0 or self.getNumPartitions() == 0:
+            return []
+        else:
+
+            def merge(a: List[T], b: List[T]) -> List[T]:
+                return heapq.nsmallest(num, a + b, key)
 
-        return self.mapPartitions(lambda it: [heapq.nsmallest(num, it, key)]).reduce(merge)
+            return self.mapPartitions(lambda it: [heapq.nsmallest(num, it, key)]).reduce(merge)
 
     def take(self: "RDD[T]", num: int) -> List[T]:
         """


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org