You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/08/25 19:39:32 UTC

[GitHub] [spark] JoshRosen commented on a diff in pull request #37661: [SPARK-40211][CORE][SQL]allow customize initial partitions number in take() behavior

JoshRosen commented on code in PR #37661:
URL: https://github.com/apache/spark/pull/37661#discussion_r955330367


##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -1956,6 +1956,12 @@ package object config {
       .intConf
       .createWithDefault(10)
 
+  private[spark] val RDD_INITIAL_NUM_PARTITIONS =

Review Comment:
   Can we change this to `RDD_LIMIT_INITIAL_NUM_PARTITIONS` in order to be consistent with the `RDD_LIMIT_SCALE_UP_FACTOR` configuration defined below?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -4300,6 +4309,8 @@ class SQLConf extends Serializable with Logging {
 
   def autoBroadcastJoinThreshold: Long = getConf(AUTO_BROADCASTJOIN_THRESHOLD)
 
+  def initialNumPartitions: Int = getConf(INITIAL_NUM_PARTITIONS)

Review Comment:
   Rename to `limitInitialNumPartitions`



##########
core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala:
##########
@@ -84,18 +87,18 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
       } else {
         // The number of partitions to try in this iteration. It is ok for this number to be
         // greater than totalParts because we actually cap it at totalParts in runJob.
-        var numPartsToTry = 1L
+        var numPartsToTry = Math.max(self.conf.get(RDD_INITIAL_NUM_PARTITIONS), 1)
         if (partsScanned > 0) {
           // If we didn't find any rows after the previous iteration, quadruple and retry.
           // Otherwise, interpolate the number of partitions we need to try, but overestimate it
           // by 50%. We also cap the estimation in the end.
-          if (results.size == 0) {
-            numPartsToTry = partsScanned * 4L
+          if (results.isEmpty) {
+            numPartsToTry = partsScanned * scaleUpFactor

Review Comment:
   It looks like this is fixing a pre-existing bug where the `RDD_LIMIT_SCALE_UP_FACTOR` wasn't being applied to the AsyncRDDActions version of `take()`. Nice!



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -526,6 +526,15 @@ object SQLConf {
     .checkValue(_ >= 1, "The shuffle hash join factor cannot be negative.")
     .createWithDefault(3)
 
+  val INITIAL_NUM_PARTITIONS = buildConf("spark.sql.limit.initialNumPartitions")

Review Comment:
   Rename to `LIMIT_INITIAL_NUM_PARTITIONS` for consistency with `LIMIT_SCALE_UP_FACTOR`



##########
core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala:
##########
@@ -1255,6 +1256,37 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually {
     assert(numPartsPerLocation(locations(1)) > 0.4 * numCoalescedPartitions)
   }
 
+  test("SPARK-40211: customize initialNumPartitions for take") {
+    val totalElements = 100
+    val numToTake = 50
+    val rdd = sc.parallelize(0 to totalElements, totalElements)
+    var jobCount = 0

Review Comment:
   I'm wondering whether this should be an `AtomicInteger`: the listener invocations happen in a different thread from the main test runner thread and I'm not sure how visibility of writes is guaranteed (it might be happening implicitly via some synchronization somewhere else, but just to be safe we might want to rely on explicit synchronization or atomic operations.
   
   Alternatively, I think we could do something like
   
   ```scala
   val jobCountListener = new SparkListener {
     private var count = 0
     def getCount: Int = synchronized { count } 
     def reset(): Unit =  synchronized { count = 0}
     override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
       count += 1
     }
   }
   sc.addSparkListener(jobCountListener)
   
   ...
   
   assert(jobCountListener.getCount > 1)
   jobCountListener.reset()
   ```
   ```



##########
core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala:
##########
@@ -29,11 +29,12 @@ import com.esotericsoftware.kryo.KryoException
 import org.apache.hadoop.io.{LongWritable, Text}
 import org.apache.hadoop.mapred.{FileSplit, TextInputFormat}
 import org.scalatest.concurrent.Eventually
-

Review Comment:
   It looks like the import grouping is broken here, so Scalastyle is going to fail. Can you fix this to be consistent with the style described at https://github.com/databricks/scala-style-guide#imports (blank line separating the third-party deps from the org.apache.spark deps, removal of the blank line within the spark deps section)? 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala:
##########
@@ -478,12 +479,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     while (buf.length < n && partsScanned < totalParts) {
       // The number of partitions to try in this iteration. It is ok for this number to be
       // greater than totalParts because we actually cap it at totalParts in runJob.
-      var numPartsToTry = 1L
+      var numPartsToTry = Math.max(conf.initialNumPartitions, 1)
       if (partsScanned > 0) {
         // If we didn't find any rows after the previous iteration, quadruple and retry.

Review Comment:
   Just realized that this comment (and the ones in the other file) have been outdated ever since we originally introduced the `limitScaleUpFactor` configuration. Maybe we can change "quadruple and retry" to "multiply by limitScaleUpFactor and retry"?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -526,6 +526,15 @@ object SQLConf {
     .checkValue(_ >= 1, "The shuffle hash join factor cannot be negative.")
     .createWithDefault(3)
 
+  val INITIAL_NUM_PARTITIONS = buildConf("spark.sql.limit.initialNumPartitions")
+    .internal()
+    .doc("Initial number of partitions to try when executing a take on a query. Higher values " +
+      "can mitigate the 'multiple job run' overhead with fewer tries. Could also set it to " +
+      "higher-than-1-but-still-small values to achieve a middle-ground trade-off")
+    .version("3.4")

Review Comment:
   Should be "3.4.0" for consistency with the other configs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org