You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Josh Rosen (Jira)" <ji...@apache.org> on 2022/07/14 00:31:00 UTC

[jira] [Updated] (SPARK-39771) If spark.default.parallelism is unset, RDD defaultPartitioner may pick a value that is too large to successfully run

     [ https://issues.apache.org/jira/browse/SPARK-39771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Josh Rosen updated SPARK-39771:
-------------------------------
    Description: 
[According to its docs|https://github.com/apache/spark/blob/899f6c90eb2de5b46a36710a131d7417010ce4b3/core/src/main/scala/org/apache/spark/Partitioner.scala#L45-L65], {{Partitioner.defaultPartitioner}} will use the maximum number of RDD partitions as its partition count when {{spark.default.parallelism}} is not set. If that number of upstream partitions is very large then this can result in shuffles where {{{}numMappers * numReducers = numMappers^2{}}}, which can cause various problems that prevent the job from successfully running.

To help users identify when they have run into this problem, I think we should add warning logs to Spark.

As an example of the problem, let's say that I have an RDD with 100,000 partitions and then do a {{reduceByKey}} on it without specifying an explicit partitioner or partition count. In this case, Spark will plan a reduce stage with 100,000 partitions:
{code:java}
scala>  sc.parallelize(1 to 100000, 100000).map(x => (x, x)).reduceByKey(_ + _).toDebugString
res7: String =
(100000) ShuffledRDD[21] at reduceByKey at <console>:25 []
   +-(100000) MapPartitionsRDD[20] at map at <console>:25 []
        |     ParallelCollectionRDD[19] at parallelize at <console>:25 []
{code}
This results in the creation of 10 billion shuffle blocks, so if this job _does_ run it is likely to be extremely show. However, it's more likely that the driver will crash when serializing map output statuses: if we were able to use one bit per mapper / reducer pair (which is probably overly optimistic in terms of compressibility) then the map statuses would be ~1.25 gigabytes (and the actual size is probably much larger)!

I don't think that users are likely to intentionally wind up in this scenario: it's more likely that either (a) their job depends on {{spark.default.parallelism}} being set but it was run on an environment lacking a value for that config, or (b) their input data significantly grew in size. These scenarios may be rare, but they can be frustrating to debug (especially if a failure occurs midway through a long-running job).

I think we should do something to handle this scenario.

A good starting point might be for {{Partitioner.defaultPartitioner}} to log a warning when the default partition size exceeds some threshold.

In addition, I think it might be a good idea to log a similar warning in {{MapOutputTrackerMaster}} right before we start trying to serialize map statuses: in a real-world situation where this problem cropped up, the map stage ran successfully but the driver crashed when serializing map statuses. Putting a warning about partition counts here makes it more likely that users will spot that error in the logs and be able to identify the source of the problem (compared to a warning that appears much earlier in the job and therefore much farther from the likely site of a crash).

  was:
[According to its docs|https://github.com/apache/spark/blob/899f6c90eb2de5b46a36710a131d7417010ce4b3/core/src/main/scala/org/apache/spark/Partitioner.scala#L45-L65], {{Partitioner.defaultPartitioner}} will use the maximum number of RDD partitions as its partition count when {{spark.default.parallelism}} is not set. If that number of upstream partitions is very large then this can result in shuffles where {{{}numMappers * numReducers = numMappers^2{}}}, which can cause various problems that prevent the job from successfully running.

To help users identify when they have run into this problem, I think we should add warning logs to Spark.

As an example of the problem, let's say that I have an RDD with 100,000 partitions and then do a {{reduceByKey}} on it without specifying an explicit partitioner or partition count. In this case, Spark will plan a reduce stage with 100,000 partitions:
{code:java}
scala>  sc.parallelize(1 to 100000, 100000).map(x => (x, x)).reduceByKey(_ + _).toDebugString
res7: String =
(100000) ShuffledRDD[21] at reduceByKey at <console>:25 []
   +-(100000) MapPartitionsRDD[20] at map at <console>:25 []
        |     ParallelCollectionRDD[19] at parallelize at <console>:25 []
{code}
This results in the creation of 10 billion shuffle blocks, so if this job _does_ run it is likely to be extremely show. However, it's more likely that the driver will crash when serializing map output statuses: if we were able to use one bit per mapper / reducer pair (which is probably overly optimistic in terms of compressibility) then the map statuses would be ~1.25 gigabytes!

I don't think that users are likely to intentionally wind up in this scenario: it's more likely that either (a) their job depends on {{spark.default.parallelism}} being set but it was run on an environment lacking a value for that config, or (b) their input data significantly grew in size. These scenarios may be rare, but they can be frustrating to debug (especially if a failure occurs midway through a long-running job).

I think we should do something to handle this scenario.

A good starting point might be for {{Partitioner.defaultPartitioner}} to log a warning when the default partition size exceeds some threshold.

In addition, I think it might be a good idea to log a similar warning in {{MapOutputTrackerMaster}} right before we start trying to serialize map statuses: in a real-world situation where this problem cropped up, the map stage ran successfully but the driver crashed when serializing map statuses. Putting a warning about partition counts here makes it more likely that users will spot that error in the logs and be able to identify the source of the problem (compared to a warning that appears much earlier in the job and therefore much farther from the likely site of a crash).


> If spark.default.parallelism is unset, RDD defaultPartitioner may pick a value that is too large to successfully run
> --------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-39771
>                 URL: https://issues.apache.org/jira/browse/SPARK-39771
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 1.0.0
>            Reporter: Josh Rosen
>            Priority: Major
>
> [According to its docs|https://github.com/apache/spark/blob/899f6c90eb2de5b46a36710a131d7417010ce4b3/core/src/main/scala/org/apache/spark/Partitioner.scala#L45-L65], {{Partitioner.defaultPartitioner}} will use the maximum number of RDD partitions as its partition count when {{spark.default.parallelism}} is not set. If that number of upstream partitions is very large then this can result in shuffles where {{{}numMappers * numReducers = numMappers^2{}}}, which can cause various problems that prevent the job from successfully running.
> To help users identify when they have run into this problem, I think we should add warning logs to Spark.
> As an example of the problem, let's say that I have an RDD with 100,000 partitions and then do a {{reduceByKey}} on it without specifying an explicit partitioner or partition count. In this case, Spark will plan a reduce stage with 100,000 partitions:
> {code:java}
> scala>  sc.parallelize(1 to 100000, 100000).map(x => (x, x)).reduceByKey(_ + _).toDebugString
> res7: String =
> (100000) ShuffledRDD[21] at reduceByKey at <console>:25 []
>    +-(100000) MapPartitionsRDD[20] at map at <console>:25 []
>         |     ParallelCollectionRDD[19] at parallelize at <console>:25 []
> {code}
> This results in the creation of 10 billion shuffle blocks, so if this job _does_ run it is likely to be extremely show. However, it's more likely that the driver will crash when serializing map output statuses: if we were able to use one bit per mapper / reducer pair (which is probably overly optimistic in terms of compressibility) then the map statuses would be ~1.25 gigabytes (and the actual size is probably much larger)!
> I don't think that users are likely to intentionally wind up in this scenario: it's more likely that either (a) their job depends on {{spark.default.parallelism}} being set but it was run on an environment lacking a value for that config, or (b) their input data significantly grew in size. These scenarios may be rare, but they can be frustrating to debug (especially if a failure occurs midway through a long-running job).
> I think we should do something to handle this scenario.
> A good starting point might be for {{Partitioner.defaultPartitioner}} to log a warning when the default partition size exceeds some threshold.
> In addition, I think it might be a good idea to log a similar warning in {{MapOutputTrackerMaster}} right before we start trying to serialize map statuses: in a real-world situation where this problem cropped up, the map stage ran successfully but the driver crashed when serializing map statuses. Putting a warning about partition counts here makes it more likely that users will spot that error in the logs and be able to identify the source of the problem (compared to a warning that appears much earlier in the job and therefore much farther from the likely site of a crash).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org