You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/10/13 22:11:59 UTC

git commit: [Spark] RDD take() method: overestimate too much

Repository: spark
Updated Branches:
  refs/heads/master 39ccabacf -> 49bbdcb66


[Spark] RDD take() method: overestimate too much

In the comment (Line 1083), it says: "Otherwise, interpolate the number of partitions we need to try, but overestimate it by 50%."

`(1.5 * num * partsScanned / buf.size).toInt` is the guess of "num of total partitions needed". In every iteration, we should consider the increment `(1.5 * num * partsScanned / buf.size).toInt - partsScanned`
Existing implementation 'exponentially' grows `partsScanned ` ( roughly: `x_{n+1} >= (1.5 + 1) x_n`)

This could be a performance problem. (unless this is the intended behavior)

Author: yingjieMiao <yi...@42go.com>

Closes #2648 from yingjieMiao/rdd_take and squashes the following commits:

d758218 [yingjieMiao] scala style fix
a8e74bb [yingjieMiao] python style fix
4b6e777 [yingjieMiao] infix operator style fix
4391d3b [yingjieMiao] typo fix.
692f4e6 [yingjieMiao] cap numPartsToTry
c4483dc [yingjieMiao] style fix
1d2c410 [yingjieMiao] also change in rdd.py and AsyncRDD
d31ff7e [yingjieMiao] handle the edge case after 1 iteration
a2aa36b [yingjieMiao] RDD take method: overestimate too much


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/49bbdcb6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/49bbdcb6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/49bbdcb6

Branch: refs/heads/master
Commit: 49bbdcb660edff7522430b329a300765164ccc44
Parents: 39ccaba
Author: yingjieMiao <yi...@42go.com>
Authored: Mon Oct 13 13:11:55 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Mon Oct 13 13:11:55 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/rdd/AsyncRDDActions.scala    | 12 +++++++-----
 core/src/main/scala/org/apache/spark/rdd/RDD.scala      |  8 +++++---
 python/pyspark/rdd.py                                   |  5 ++++-
 3 files changed, 16 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/49bbdcb6/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index b62f3fb..ede5568 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -78,16 +78,18 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
         // greater than totalParts because we actually cap it at totalParts in runJob.
         var numPartsToTry = 1
         if (partsScanned > 0) {
-          // If we didn't find any rows after the first iteration, just try all partitions next.
+          // If we didn't find any rows after the previous iteration, quadruple and retry.
           // Otherwise, interpolate the number of partitions we need to try, but overestimate it
-          // by 50%.
+          // by 50%. We also cap the estimation in the end.
           if (results.size == 0) {
-            numPartsToTry = totalParts - 1
+            numPartsToTry = partsScanned * 4
           } else {
-            numPartsToTry = (1.5 * num * partsScanned / results.size).toInt
+            // the left side of max is >=1 whenever partsScanned >= 2
+            numPartsToTry = Math.max(1, 
+              (1.5 * num * partsScanned / results.size).toInt - partsScanned)
+            numPartsToTry = Math.min(numPartsToTry, partsScanned * 4) 
           }
         }
-        numPartsToTry = math.max(0, numPartsToTry)  // guard against negative num of partitions
 
         val left = num - results.size
         val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)

http://git-wip-us.apache.org/repos/asf/spark/blob/49bbdcb6/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 2aba40d..71cabf6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1079,15 +1079,17 @@ abstract class RDD[T: ClassTag](
       // greater than totalParts because we actually cap it at totalParts in runJob.
       var numPartsToTry = 1
       if (partsScanned > 0) {
-        // If we didn't find any rows after the previous iteration, quadruple and retry.  Otherwise,
+        // If we didn't find any rows after the previous iteration, quadruple and retry. Otherwise,
         // interpolate the number of partitions we need to try, but overestimate it by 50%.
+        // We also cap the estimation in the end.
         if (buf.size == 0) {
           numPartsToTry = partsScanned * 4
         } else {
-          numPartsToTry = (1.5 * num * partsScanned / buf.size).toInt
+          // the left side of max is >=1 whenever partsScanned >= 2
+          numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1)
+          numPartsToTry = Math.min(numPartsToTry, partsScanned * 4) 
         }
       }
-      numPartsToTry = math.max(0, numPartsToTry)  // guard against negative num of partitions
 
       val left = num - buf.size
       val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)

http://git-wip-us.apache.org/repos/asf/spark/blob/49bbdcb6/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index e13bab9..15be4bf 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -1070,10 +1070,13 @@ class RDD(object):
                 # If we didn't find any rows after the previous iteration,
                 # quadruple and retry.  Otherwise, interpolate the number of
                 # partitions we need to try, but overestimate it by 50%.
+                # We also cap the estimation in the end.
                 if len(items) == 0:
                     numPartsToTry = partsScanned * 4
                 else:
-                    numPartsToTry = int(1.5 * num * partsScanned / len(items))
+                    # the first paramter of max is >=1 whenever partsScanned >= 2
+                    numPartsToTry = int(1.5 * num * partsScanned / len(items)) - partsScanned
+                    numPartsToTry = min(max(numPartsToTry, 1), partsScanned * 4)
 
             left = num - len(items)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org