You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "sam (Jira)" <ji...@apache.org> on 2022/08/13 09:31:00 UTC

[jira] [Comment Edited] (SPARK-40048) Partitions are traversed multiple times invalidating Accumulator consistency

    [ https://issues.apache.org/jira/browse/SPARK-40048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17579229#comment-17579229 ] 

sam edited comment on SPARK-40048 at 8/13/22 9:30 AM:
------------------------------------------------------

We tried `3.2.1` and I'm now looking at https://github.com/typelevel/cats/issues/3628.

Will try shading cats.


was (Author: sams):
We tried `3.2.1` and I'm now looking at https://github.com/typelevel/cats/issues/3628, but from the thread don't see what the best workaround is.

> Partitions are traversed multiple times invalidating Accumulator consistency
> ----------------------------------------------------------------------------
>
>                 Key: SPARK-40048
>                 URL: https://issues.apache.org/jira/browse/SPARK-40048
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.4.4
>            Reporter: sam
>            Priority: Major
>
> We are trying to use Accumulators to count RDDs without having to force `.count()` on them for efficiency reasons.  We are aware tasks can fail and re-run, which will invalidate the value of the accumulator, so we count the number of times a partition has been traversed, so we can detect this.
> The problem is that partitions are being traversed multiple times even though
>  - We cache the RDD in memory _after we have applied the logic below_
>  - No tasks are failing, no executors are dying.
>  - There is plenty of memory (no RDD eviction)
> The code we use:
> ```
> val count: LongAccumulator
> val partitionTraverseCounts: List[LongAccumulator]
> def increment(): Unit = count.add(1)
> def incrementTimesCalled(partitionIndex: Int): Unit =
>       partitionTraverseCounts(partitionIndex).add(1)
> def incrementForPartition[T](index: Int, it: Iterator[T]): Iterator[T] = {
>     incrementTimesCalled(index)
>     it.map { x =>
>       increment()
>       x
>     }
>   }
> ```
> How we use the above:
> ```
> rdd.mapPartitionsWithIndex(safeCounter.incrementForPartition)
> ```
> We have a 50 partition RDD, and we frequently see odd traverse counts:
> ```
> traverseCounts: List(2, 1, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 1, 2)
> ```
> As you can see, some partitions are traversed twice, while others are traversed only once.
> To confirm no task failures:
> ```
> cat job.log | grep -i task | grep -i fail
> ```
> To confirm no memory issues:
> ```
> cat job.log | grep -i memory
> ```
> We see every log line has multiple GB memory free.
> We also don't see any errors or exceptions.
> Question:
> 1. Why is spark traversing a cached RDD multiple times?
> 2. Is there any way to disable this?
> Many thanks,
> Sam



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org