You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Ryan Williams (JIRA)" <ji...@apache.org> on 2017/07/16 12:26:00 UTC

[jira] [Comment Edited] (SPARK-21425) LongAccumulator, DoubleAccumulator not threadsafe

    [ https://issues.apache.org/jira/browse/SPARK-21425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16088909#comment-16088909 ] 

Ryan Williams edited comment on SPARK-21425 at 7/16/17 12:25 PM:
-----------------------------------------------------------------

[~sowen] interesting, I didn't think my example was using accumulators un-idiomatically but maybe that's the catch here. 

To be clear, the [Main|https://github.com/ryan-williams/spark-bugs/blob/accum/src/main/scala/com.foo/Main.scala] application in my repro is the interesting one; [LongRace|https://github.com/ryan-williams/spark-bugs/blob/accum/src/main/scala/com.foo/LongRace.scala] and [LongAccumulatorRace|https://github.com/ryan-williams/spark-bugs/blob/accum/src/main/scala/com.foo/LongAccumulatorRace.scala] just confirm that {{Long}}'s and {{LongAccumulator}}'s are not concurrent-write-safe, respectively.

[The accumulator usage in that {{Main}}|https://github.com/ryan-williams/spark-bugs/blob/accum/src/main/scala/com.foo/Main.scala#L56-L70] is straightforward, I thought:

{code}
sc
  .parallelize(
    1 to numElems,
    numSlices = partitions
  )
  .mapPartitions {
    it ⇒
      otherAccum.add(1)
      lazyOtherAccum.add(1)
      localAccum.add(1)
      lazyLocalAccum.add(1)

      it.map(_.toString)
  }
  .collect()
{code}

It also "works" with {{RDD.map}}, incrementing the accumulators for every element.

I thought the way to use accumulators was:

- declare them on the driver,
- then when they are referenced in task closures, a copy is made, either per-task or per-executor (the latter mirroring how {{Broadcast}}'s work).

If they are per-executor, and multiple tasks can find themselves writing to the same accumulator instance, then this bug would seem pretty severe.

If they are per-task, then as you said everything should be fine as long as a task doesn't spawn multiple threads that both write to the accumulator, which seems un-idiomatic enough from a Spark-usage perspective that this would be much less worrisome.

In either case, I'm wondering if the behavior I'm seeing is specific to the way {{local\[*\]}}-mode lays out executors/threads under the hood. [My example uses {{local\[4\]}}|https://github.com/ryan-williams/spark-bugs/blob/accum/src/main/scala/com.foo/Main.scala#L109], which I thought meant that my driver JVM spawns 4 threads to act as executors, in which case I'd expect the task-closures sent to those executors to have replaced references to accumulators with references to copies of the accumulators, as would presumably happen in "cluster mode".

So the important outstanding questions, to me, are:

- are accumulator instances shared across tasks on an executor, like {{Broadcast}} values, or does each task instantiate and write to a fresh copy of each accumulator?
- does {{local\[…\]}}-mode isolate accumulator instances in the same way as cluster-mode; if not, this bug could just be interpreted as a bug in how accumulator-references in task-closures are handled in local-mode.

There's also a nagging question of why I only see the race in the accumulators that are declared out in the {{Spark}} singleton; maybe the closure serializer just picks up direct references to them and doesn't transform them into per-task accumulator-instances like it would normally?

Thanks for having a look!


was (Author: rdub):
[~sowen] interesting, I didn't think my example was using accumulators un-idiomatically but maybe that's the catch here. 

To be clear, the [Main|https://github.com/ryan-williams/spark-bugs/blob/accum/src/main/scala/com.foo/Main.scala] application in my repro is the interesting one; [LongRace|https://github.com/ryan-williams/spark-bugs/blob/accum/src/main/scala/com.foo/LongRace.scala] and [LongAccumulatorRace|https://github.com/ryan-williams/spark-bugs/blob/accum/src/main/scala/com.foo/LongAccumulatorRace.scala] just show that {{Long}}'s and {{LongAccumulator}}'s are not threadsafe, respectively.

[The accumulator usage in that Main|https://github.com/ryan-williams/spark-bugs/blob/accum/src/main/scala/com.foo/Main.scala#L56-L70] is straightforward, I thought:

{code}
sc
  .parallelize(
    1 to numElems,
    numSlices = partitions
  )
  .mapPartitions {
    it ⇒
      otherAccum.add(1)
      lazyOtherAccum.add(1)
      localAccum.add(1)
      lazyLocalAccum.add(1)

      it.map(_.toString)
  }
  .collect()
{code}

It also "works" with {{RDD.map}}, incrementing the accumulators for every element.

I thought the way to use accumulators was declare them on the driver, then when they are referenced in task closures, a copy is made either per-task or per-executor (the latter mirroring how {{Broadcast}}'s work).

If they are per-executor, and multiple tasks can find themselves writing to the same accumulator instance, then this bug would seem pretty severe.

If they are per-task, then as you said everything should be fine as long as a task doesn't spawn multiple threads that both write to the accumulator, which seems un-idiomatic enough from a Spark-usage perspective that this would be much less worrisome.

In either case, I'm wondering if the behavior I'm seeing is specific to the way {{local\[*\]}}-mode lays out executors/threads under the hood. [My example uses {{local\[4\]}}|https://github.com/ryan-williams/spark-bugs/blob/accum/src/main/scala/com.foo/Main.scala#L109], which I thought meant that my driver JVM spawns 4 threads to act as executors, in which case I'd expect the task-closures sent to those executors to have replaced references to accumulators with references to copies of the accumulators, as would presumably happen in "cluster mode".

So the important outstanding questions, to me, are:

- are accumulator instances shared across tasks on an executor, like {{Broadcast}} values, or does each task instantiate and write to a fresh copy of each accumulator?
- does {{local\[…\]}}-mode isolate accumulator instances in the same way as cluster-mode; if not, this bug could just be interpreted as a bug in how accumulator-references in task-closures are handled in local-mode.

There's also a nagging question of why I only see the race in the accumulators that are declared out in the {{Spark}} singleton; maybe the closure serializer just picks up direct references to them and doesn't transform them into per-task accumulator-instances like it would normally?

Thanks for having a look!

> LongAccumulator, DoubleAccumulator not threadsafe
> -------------------------------------------------
>
>                 Key: SPARK-21425
>                 URL: https://issues.apache.org/jira/browse/SPARK-21425
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.2.0
>            Reporter: Ryan Williams
>
> [AccumulatorV2 docs|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L42-L43] acknowledge that accumulators must be concurrent-read-safe, but afaict they must also be concurrent-write-safe.
> The same docs imply that {{Int}} and {{Long}} meet either/both of these criteria, when afaict they do not.
> Relatedly, the provided [LongAccumulator|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L291] and [DoubleAccumulator|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L370] are not thread-safe, and should be expected to behave undefinedly when multiple concurrent tasks on the same executor write to them.
> [Here is a repro repo|https://github.com/ryan-williams/spark-bugs/tree/accum] with some simple applications that demonstrate incorrect results from {{LongAccumulator}}'s.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org