You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dan Meany (JIRA)" <ji...@apache.org> on 2018/02/01 22:53:00 UTC

[jira] [Commented] (SPARK-19371) Cannot spread cached partitions evenly across executors

    [ https://issues.apache.org/jira/browse/SPARK-19371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16349440#comment-16349440 ] 

Dan Meany commented on SPARK-19371:
-----------------------------------

We have had this issue on many occasions and nothing I tried related to repartitioning would redistribute the work evenly.  The job would spend most of its time using one executor with the rest idle.  This was on both Spark 1 and 2.  The work around was to create a few threads (like say 3 or 4) on the driver, and split the work into multiple independent units to run on the threads.  This worked to fully saturate the executors with work all the time and reduced runtimes dramatically.  Works for streaming or batch. There are two requirements for this to work:

1) The work units done in the threads must be independent of each other.

2) If there are any RDDs, DataFrames, or DataSets created in the main thread (such as dimension data for joining etc) that are used by the worker threads, it is a good idea to persist or cachtable and run an action like count on them in the main thread, just to be sure they are fully calculated first before launching the workers.  Remember to unpersist/uncachetable afterward if using streaming so storage doesn't accumulate.

3) I save any exceptions generated in the thread objects, and after the Thread.joins in the main thread, check to see if any exceptions were thrown by the workers and report them.

 

 

> Cannot spread cached partitions evenly across executors
> -------------------------------------------------------
>
>                 Key: SPARK-19371
>                 URL: https://issues.apache.org/jira/browse/SPARK-19371
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 1.6.1
>            Reporter: Thunder Stumpges
>            Priority: Major
>         Attachments: RDD Block Distribution on two executors.png, Unbalanced RDD Blocks, and resulting task imbalance.png, Unbalanced RDD Blocks, and resulting task imbalance.png, execution timeline.png
>
>
> Before running an intensive iterative job (in this case a distributed topic model training), we need to load a dataset and persist it across executors. 
> After loading from HDFS and persisting, the partitions are spread unevenly across executors (based on the initial scheduling of the reads which are not data locale sensitive). The partition sizes are even, just not their distribution over executors. We currently have no way to force the partitions to spread evenly, and as the iterative algorithm begins, tasks are distributed to executors based on this initial load, forcing some very unbalanced work.
> This has been mentioned a [number|http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-Partitions-not-distributed-evenly-to-executors-tt16988.html#a17059] of [times|http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html] in [various|http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html] user/dev group threads.
> None of the discussions I could find had solutions that worked for me. Here are examples of things I have tried. All resulted in partitions in memory that were NOT evenly distributed to executors, causing future tasks to be imbalanced across executors as well.
> *Reduce Locality*
> {code}spark.shuffle.reduceLocality.enabled=false/true{code}
> *"Legacy" memory mode*
> {code}spark.memory.useLegacyMode = true/false{code}
> *Basic load and repartition*
> {code}
> val numPartitions = 48*16
> val df = sqlContext.read.
>     parquet("/data/folder_to_load").
>     repartition(numPartitions).
>     persist
> df.count
> {code}
> *Load and repartition to 2x partitions, then shuffle repartition down to desired partitions*
> {code}
> val numPartitions = 48*16
> val df2 = sqlContext.read.
>     parquet("/data/folder_to_load").
>     repartition(numPartitions*2)
> val df = df2.repartition(numPartitions).
>     persist
> df.count
> {code}
> It would be great if when persisting an RDD/DataFrame, if we could request that those partitions be stored evenly across executors in preparation for future tasks. 
> I'm not sure if this is a more general issue (I.E. not just involving persisting RDDs), but for the persisted in-memory case, it can make a HUGE difference in the over-all running time of the remaining work.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org