You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Xinghao Pan (JIRA)" <ji...@apache.org> on 2015/04/14 15:14:13 UTC

[jira] [Updated] (SPARK-6808) Checkpointing after zipPartitions results in NODE_LOCAL execution

     [ https://issues.apache.org/jira/browse/SPARK-6808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Xinghao Pan updated SPARK-6808:
-------------------------------
    Description: 
I'm encountering a weird issue where a simple iterative zipPartition is PROCESS_LOCAL before checkpointing, but turns NODE_LOCAL for all iterations after checkpointing. More often than not, tasks are fetching remote blocks from the network, leading to a 10x increase in runtime.

Here's an example snippet of code:
    var R : RDD[(Long,Int)]
    = sc.parallelize((0 until numPartitions), numPartitions)
      .mapPartitions(_ => new Array[(Long,Int)](10000000).map(i => (0L,0)).toSeq.iterator).cache()

    sc.setCheckpointDir(checkpointDir)

    var iteration = 0
    while (iteration < 50){
      R = R.zipPartitions(R)((x,y) => x).cache()
      if ((iteration+1) % checkpointIter == 0) R.checkpoint()
      R.foreachPartition(_ => {})
      iteration += 1
    }

I've also tried to unpersist the old RDDs, and increased spark.locality.wait but nether helps.

Strangely, by adding a simple identity map
R = R.map(x => x).cache()
after the zipPartitions appears to partially mitigate the issue.

The problem was originally triggered when I attempted to checkpoint after doing joinVertices in GraphX, but the above example shows that the issue is in Spark core too.


  was:
I'm encountering a weird issue where a simple iterative zipPartition is PROCESS_LOCAL before checkpointing, but turns NODE_LOCAL for all iterations after checkpointing.

Here's an example snippet of code:
    var R : RDD[(Long,Int)]
    = sc.parallelize((0 until numPartitions), numPartitions)
      .mapPartitions(_ => new Array[(Long,Int)](10000000).map(i => (0L,0)).toSeq.iterator).cache()

    sc.setCheckpointDir(checkpointDir)

    var iteration = 0
    while (iteration < 50){
      R = R.zipPartitions(R)((x,y) => x).cache()
      if ((iteration+1) % checkpointIter == 0) R.checkpoint()
      R.foreachPartition(_ => {})
      iteration += 1
    }

I've also tried to unpersist the old RDDs, and increased spark.locality.wait but nether helps.

Strangely, by adding a simple identity map
R = R.map(x => x).cache()
after the zipPartitions appears to partially mitigate the issue.

The problem was originally triggered when I attempted to checkpoint after doing joinVertices in GraphX, but the above example shows that the issue is in Spark core too.



> Checkpointing after zipPartitions results in NODE_LOCAL execution
> -----------------------------------------------------------------
>
>                 Key: SPARK-6808
>                 URL: https://issues.apache.org/jira/browse/SPARK-6808
>             Project: Spark
>          Issue Type: Bug
>          Components: GraphX, Spark Core
>    Affects Versions: 1.2.1, 1.3.0
>         Environment: EC2 Ubuntu r3.8xlarge machines
>            Reporter: Xinghao Pan
>
> I'm encountering a weird issue where a simple iterative zipPartition is PROCESS_LOCAL before checkpointing, but turns NODE_LOCAL for all iterations after checkpointing. More often than not, tasks are fetching remote blocks from the network, leading to a 10x increase in runtime.
> Here's an example snippet of code:
>     var R : RDD[(Long,Int)]
>     = sc.parallelize((0 until numPartitions), numPartitions)
>       .mapPartitions(_ => new Array[(Long,Int)](10000000).map(i => (0L,0)).toSeq.iterator).cache()
>     sc.setCheckpointDir(checkpointDir)
>     var iteration = 0
>     while (iteration < 50){
>       R = R.zipPartitions(R)((x,y) => x).cache()
>       if ((iteration+1) % checkpointIter == 0) R.checkpoint()
>       R.foreachPartition(_ => {})
>       iteration += 1
>     }
> I've also tried to unpersist the old RDDs, and increased spark.locality.wait but nether helps.
> Strangely, by adding a simple identity map
> R = R.map(x => x).cache()
> after the zipPartitions appears to partially mitigate the issue.
> The problem was originally triggered when I attempted to checkpoint after doing joinVertices in GraphX, but the above example shows that the issue is in Spark core too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org