You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Nan Zhu (JIRA)" <ji...@apache.org> on 2015/12/26 03:44:49 UTC

[jira] [Comment Edited] (SPARK-12469) Consistent Accumulators for Spark

    [ https://issues.apache.org/jira/browse/SPARK-12469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15071799#comment-15071799 ] 

Nan Zhu edited comment on SPARK-12469 at 12/26/15 2:44 AM:
-----------------------------------------------------------

Just to bring the previous discussions about the topic here, https://github.com/apache/spark/pull/2524

I originally would like to fix  exactly the same issue in the patch, but later we have to shrink our range to result task, 

There, I wanted to use StageId + partitionId to identify the accumulator uniquely, but [~matei] raised up the counter example that 

"
A shuffle stage may be resubmitted once the old one is garbage-collected (if periodic cleanup is on)

If you use an accumulator in a pipelined transformation like a map(), and then you make a new RDD built on top of that (e.g. apply another map() to it), it won't count as the same stage so you'll still get the updates twice

"

I'm not sure if the proposed solution can fully resolve this issue


was (Author: codingcat):
Just to bring the previous discussions about the topic here, https://github.com/apache/spark/pull/2524

I originally would like to fix  exactly the same issue in the patch, but later we have to shrink our range to result task, 

There, I wanted to use StageId + partitionId to identify the accumulator uniquely, but [~matei] indicated the counter example that 

"
A shuffle stage may be resubmitted once the old one is garbage-collected (if periodic cleanup is on)

If you use an accumulator in a pipelined transformation like a map(), and then you make a new RDD built on top of that (e.g. apply another map() to it), it won't count as the same stage so you'll still get the updates twice

"

I'm not sure if the proposed solution can fully resolve this issue

> Consistent Accumulators for Spark
> ---------------------------------
>
>                 Key: SPARK-12469
>                 URL: https://issues.apache.org/jira/browse/SPARK-12469
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>            Reporter: holdenk
>
> Tasks executed on Spark workers are unable to modify values from the driver, and accumulators are the one exception for this. Accumulators in Spark are implemented in such a way that when a stage is recomputed (say for cache eviction) the accumulator will be updated a second time. This makes accumulators inside of transformations more difficult to use for things like counting invalid records (one of the primary potential use cases of collecting side information during a transformation). However in some cases this counting during re-evaluation is exactly the behaviour we want (say in tracking total execution time for a particular function). Spark would benefit from a version of accumulators which did not double count even if stages were re-executed.
> Motivating example:
> {code}
> val parseTime = sc.accumulator(0L)
> val parseFailures = sc.accumulator(0L)
> val parsedData = sc.textFile(...).flatMap { line =>
>   val start = System.currentTimeMillis()
>   val parsed = Try(parse(line))
>   if (parsed.isFailure) parseFailures += 1
>   parseTime += System.currentTimeMillis() - start
>   parsed.toOption
> }
> parsedData.cache()
> val resultA = parsedData.map(...).filter(...).count()
> // some intervening code.  Almost anything could happen here -- some of parsedData may
> // get kicked out of the cache, or an executor where data was cached might get lost
> val resultB = parsedData.filter(...).map(...).flatMap(...).count()
> // now we look at the accumulators
> {code}
> Here we would want parseFailures to only have been added to once for every line which failed to parse.  Unfortunately, the current Spark accumulator API doesn’t support the current parseFailures use case since if some data had been evicted its possible that it will be double counted.
> See the full design document at https://docs.google.com/document/d/1lR_l1g3zMVctZXrcVjFusq2iQVpr4XvRK_UUDsDr6nk/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org