You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by ankits <an...@gmail.com> on 2014/11/12 02:15:21 UTC

Imbalanced shuffle read

Im running a job that uses groupByKey(), so it generates a lot of shuffle
data. Then it processes this and writes files to HDFS in a forEachPartition
block. Looking at the forEachPartition stage details in the web console, all
but one executor is idle (SUCCESS in 50-60ms), and one is RUNNING with a
huge shuffle read and takes a long time to finish. 

Can someone explain why the read is all on one node and how to parallelize
this better? 



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Imbalanced-shuffle-read-tp18648.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Imbalanced shuffle read

Posted by ankits <an...@gmail.com>.
Adding a call to rdd.repartition() after randomizing the keys has no effect
either. code -

   //partitioning is done like partitionIdx = f(key) % numPartitions 
    //we use random keys to get even partitioning 
    val uniform = other_stream.transform(rdd => { 
      rdd.map({ kv => 
        val k = kv._1 
        val v = kv._2 

        (UUID.randomUUID().toString, v) 
      }) 
      rdd.repartition(20)
    }) 

    uniform.foreachRDD(rdd => { 
       rdd.forEachPartition(partition => { 




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Imbalanced-shuffle-read-tp18648p18791.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Imbalanced shuffle read

Posted by ankits <an...@gmail.com>.
I have made some progress - the partitioning is very uneven, and everything
goes to one partition. I see that spark partitions by key, so I tried this:

    //partitioning is done like partitionIdx = f(key) % numPartitions
    //we use random keys to get even partitioning
    val uniform = other_stream.transform(rdd => {
      rdd.map({ kv =>
        val k = kv._1
        val v = kv._2

        (UUID.randomUUID().toString, v)
      })
    })

    uniform.foreachRDD(rdd => {
       rdd.forEachPartition(partition => {
         ...

As you can see, I'm using random keys. Even in this case, when running with
2 nodes, i verified that one partition is completely empty, and the other
contains all the records.

What is going wrong with the partitioning here?




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Imbalanced-shuffle-read-tp18648p18790.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Imbalanced shuffle read

Posted by ankits <an...@gmail.com>.
I tried that, but that did not resolve the problem. All the executors for
partitions except one have no shuffle reads and finish within 20-30 ms. one
executor has a complete shuffle read of the previous stage. Any other ideas
on debugging this?



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Imbalanced-shuffle-read-tp18648p18768.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Imbalanced shuffle read

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
When you calls the groupByKey() try providing the number of partitions like
groupByKey(100) depending on your data/cluster size.

Thanks
Best Regards

On Wed, Nov 12, 2014 at 6:45 AM, ankits <an...@gmail.com> wrote:

> Im running a job that uses groupByKey(), so it generates a lot of shuffle
> data. Then it processes this and writes files to HDFS in a forEachPartition
> block. Looking at the forEachPartition stage details in the web console,
> all
> but one executor is idle (SUCCESS in 50-60ms), and one is RUNNING with a
> huge shuffle read and takes a long time to finish.
>
> Can someone explain why the read is all on one node and how to parallelize
> this better?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Imbalanced-shuffle-read-tp18648.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>