You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Thomas Graves (JIRA)" <ji...@apache.org> on 2016/04/29 17:38:12 UTC

[jira] [Commented] (SPARK-11316) coalesce doesn't handle UnionRDD with partial locality properly

    [ https://issues.apache.org/jira/browse/SPARK-11316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15264205#comment-15264205 ] 

Thomas Graves commented on SPARK-11316:
---------------------------------------

Simple steps to reproduce an RDD with partial preferred locations, Any text file will do here:

val textFile = sc.textFile("randomtext2.txt")
val textFile2 = sc.textFile("README.md")
val wordCounts4 = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
val wordCounts5 = textFile2.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
sc.setCheckpointDir("hdfs:///user/tgraves/test10")
wordCounts5.checkpoint()
wordCounts5.take(1)
val urdd = wordCounts4.union(wordCounts5)
urdd.coalesce(10).count()

> coalesce doesn't handle UnionRDD with partial locality properly
> ---------------------------------------------------------------
>
>                 Key: SPARK-11316
>                 URL: https://issues.apache.org/jira/browse/SPARK-11316
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.5.1
>            Reporter: Thomas Graves
>            Assignee: Thomas Graves
>            Priority: Critical
>
> So I haven't fully debugged this yet but reporting what I'm seeing and think might be going on.
> I have a graph processing job that is seeing huge slow down in setupGroups in the location iterator where its getting the preferred locations for the coalesce.  They are coalescing from 2400 down to 1200 and its taking 17+ hours to do the calculation.  Killed it at this point so don't know total time.
> It appears that the job is doing an isEmpty call, a bunch of other transformation, then a coalesce (where it takes so long), other transformations, then finally a count to trigger it.   
> It appears that there is only one node that its finding in the setupGroup call and to get to that node it has to first to through the while loop:
>     while (numCreated < targetLen && tries < expectedCoupons2) {
> where expectedCoupons2 is around 19000.  It finds very few or none in this loop.  
> Then it does the second loop:
> while (numCreated < targetLen) {  // if we don't have enough partition groups, create duplicates
>       var (nxt_replica, nxt_part) = rotIt.next()
>       val pgroup = PartitionGroup(nxt_replica)
>       groupArr += pgroup
>       groupHash.getOrElseUpdate(nxt_replica, ArrayBuffer()) += pgroup
>       var tries = 0
>       while (!addPartToPGroup(nxt_part, pgroup) && tries < targetLen) { // ensure at least one part
>         nxt_part = rotIt.next()._2
>         tries += 1
>       }
>       numCreated += 1
>     }
> Where it has an inner while loop and both of those are going 1200 times.  1200*1200 loops.  This is taking a very long time.
> The user can work around the issue by adding in a count() call very close to after the isEmpty call before the coalesce is called.  I also tried putting in a take(10000)  right before the isEmpty call and it seems to work around the issue, took 1 hours with the take vs a few minutes with the count().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org