You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2016/04/06 04:04:25 UTC

[jira] [Assigned] (SPARK-14252) Executors do not try to download remote cached blocks

     [ https://issues.apache.org/jira/browse/SPARK-14252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apache Spark reassigned SPARK-14252:
------------------------------------

    Assignee:     (was: Apache Spark)

> Executors do not try to download remote cached blocks
> -----------------------------------------------------
>
>                 Key: SPARK-14252
>                 URL: https://issues.apache.org/jira/browse/SPARK-14252
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.0.0
>            Reporter: Marcelo Vanzin
>
> I noticed this when taking a look at the root cause of SPARK-14209. 2.0.0 includes SPARK-12817, which changed the caching code a bit to remove duplication. But it seems to have removed the part where executors check whether other executors contain the needed cached block.
> In 1.6, that was done by the call to {{BlockManager.get}} in {{CacheManager.getOrCompute}}. But in the new version, {{RDD.iterator}} calls {{BlockManager.getOrElseUpdate}}, which never calls {{BlockManager.get}}, and thus the executor never gets block that are cached by other executors, causing the blocks to be instead recomputed locally.
> I wrote a small program that shows this. In 1.6, running with {{--num-executors 2}}, I get 5 blocks cached on each executor, and messages like these in the logs:
> {noformat}
> 16/03/29 13:18:01 DEBUG spark.CacheManager: Looking for partition rdd_0_7
> 16/03/29 13:18:01 DEBUG storage.BlockManager: Getting local block rdd_0_7
> 16/03/29 13:18:01 DEBUG storage.BlockManager: Block rdd_0_7 not registered locally
> 16/03/29 13:18:01 DEBUG storage.BlockManager: Getting remote block rdd_0_7
> 16/03/29 13:18:01 DEBUG storage.BlockManager: Getting remote block rdd_0_7 from BlockManagerId(1, blah, 58831)
> 1
> {noformat}
> On 2.0, I get (almost) all the 10 partitions cached on *both* executors, because once the second one fails to find a block locally it just recomputes it and caches it. It never tries to download the block from the other executor.  The log messages above, which still exist in the code, don't show up anywhere.
> Here's the code I used for the above (trimmed of some other stuff from my little test harness, so might not compile as is):
> {code}
>     val sc = new SparkContext(conf)
>     try {
>       val rdd = sc.parallelize(1 to 10000000, 10)
>       rdd.cache()
>       rdd.count()
>       // Create a single task that will sleep and block, so that a particular executor is busy.
>       // This should force future tasks to download cached data from that executor.
>       println("Running sleep job..")
>       val thread =  new Thread(new Runnable() {
>         override def run(): Unit = {
>           rdd.mapPartitionsWithIndex { (i, iter) =>
>             if (i == 0) {
>               Thread.sleep(TimeUnit.MINUTES.toMillis(10))
>             }
>             iter
>           }.count()
>         }
>       })
>       thread.setDaemon(true)
>       thread.start()
>       // Wait a few seconds to make sure the task is running (too lazy for listeners)
>       println("Waiting for tasks to start...")
>       TimeUnit.SECONDS.sleep(10)
>       // Now run a job that will touch everything and should use the cached data.
>       val cnt = rdd.map(_*2).count()
>       println(s"Counted $cnt elements.")
>       println("Killing sleep job.")
>       thread.interrupt()
>       thread.join()
>     } finally {
>       sc.stop()
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org