You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2018/10/15 15:45:00 UTC
[jira] [Resolved] (SPARK-24154) AccumulatorV2 loses type
information during serialization
[ https://issues.apache.org/jira/browse/SPARK-24154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen resolved SPARK-24154.
-------------------------------
Resolution: Won't Fix
> AccumulatorV2 loses type information during serialization
> ---------------------------------------------------------
>
> Key: SPARK-24154
> URL: https://issues.apache.org/jira/browse/SPARK-24154
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.2.0, 2.2.1, 2.3.0, 2.3.1
> Environment: Scala 2.11
> Spark 2.2.0
> Reporter: Sergey Zhemzhitsky
> Priority: Major
>
> AccumulatorV2 loses type information during serialization.
> It happens [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L164] during *writeReplace* call
> {code:scala}
> final protected def writeReplace(): Any = {
> if (atDriverSide) {
> if (!isRegistered) {
> throw new UnsupportedOperationException(
> "Accumulator must be registered before send to executor")
> }
> val copyAcc = copyAndReset()
> assert(copyAcc.isZero, "copyAndReset must return a zero value copy")
> val isInternalAcc = name.isDefined && name.get.startsWith(InternalAccumulator.METRICS_PREFIX)
> if (isInternalAcc) {
> // Do not serialize the name of internal accumulator and send it to executor.
> copyAcc.metadata = metadata.copy(name = None)
> } else {
> // For non-internal accumulators, we still need to send the name because users may need to
> // access the accumulator name at executor side, or they may keep the accumulators sent from
> // executors and access the name when the registered accumulator is already garbage
> // collected(e.g. SQLMetrics).
> copyAcc.metadata = metadata
> }
> copyAcc
> } else {
> this
> }
> }
> {code}
> It means that it is hardly possible to create new accumulators easily by adding new behaviour to existing ones by means of mix-ins or inheritance (without overriding *copy*).
> For example the following snippet ...
> {code:scala}
> trait TripleCount {
> self: LongAccumulator =>
> abstract override def add(v: jl.Long): Unit = {
> self.add(v * 3)
> }
> }
> val acc = new LongAccumulator with TripleCount
> sc.register(acc)
> val data = 1 to 10
> val rdd = sc.makeRDD(data, 5)
> rdd.foreach(acc.add(_))
> acc.value shouldBe 3 * data.sum
> {code}
> ... fails with
> {code:none}
> org.scalatest.exceptions.TestFailedException: 55 was not equal to 165
> at org.scalatest.MatchersHelper$.indicateFailure(MatchersHelper.scala:340)
> at org.scalatest.Matchers$AnyShouldWrapper.shouldBe(Matchers.scala:6864)
> {code}
> Also such a behaviour seems to be error prone and confusing because an implementor gets not the same thing as he/she sees in the code.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org