You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sergey Zhemzhitsky (JIRA)" <ji...@apache.org> on 2018/05/02 19:31:00 UTC
[jira] [Created] (SPARK-24154) AccumulatorV2 loses type information
during serialization
Sergey Zhemzhitsky created SPARK-24154:
------------------------------------------
Summary: AccumulatorV2 loses type information during serialization
Key: SPARK-24154
URL: https://issues.apache.org/jira/browse/SPARK-24154
Project: Spark
Issue Type: Bug
Components: Spark Core
Affects Versions: 2.3.0, 2.2.1, 2.2.0, 2.3.1
Environment: Scala 2.11
Spark 2.2.0
Reporter: Sergey Zhemzhitsky
AccumulatorV2 loses type information during serialization.
It happens [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L164] during *writeReplace* call
{code:scala}
final protected def writeReplace(): Any = {
if (atDriverSide) {
if (!isRegistered) {
throw new UnsupportedOperationException(
"Accumulator must be registered before send to executor")
}
val copyAcc = copyAndReset()
assert(copyAcc.isZero, "copyAndReset must return a zero value copy")
val isInternalAcc = name.isDefined && name.get.startsWith(InternalAccumulator.METRICS_PREFIX)
if (isInternalAcc) {
// Do not serialize the name of internal accumulator and send it to executor.
copyAcc.metadata = metadata.copy(name = None)
} else {
// For non-internal accumulators, we still need to send the name because users may need to
// access the accumulator name at executor side, or they may keep the accumulators sent from
// executors and access the name when the registered accumulator is already garbage
// collected(e.g. SQLMetrics).
copyAcc.metadata = metadata
}
copyAcc
} else {
this
}
}
{code}
It means that it is hardly possible to create new accumulators easily by adding new behaviour to existing ones by means of mix-ins or inheritance (without overriding *copy*).
For example the following snippet ...
{code:scala}
trait TripleCount {
self: LongAccumulator =>
abstract override def add(v: jl.Long): Unit = {
self.add(v * 3)
}
}
val acc = new LongAccumulator with TripleCount
sc.register(acc)
val data = 1 to 10
val rdd = sc.makeRDD(data, 5)
rdd.foreach(acc.add(_))
acc.value shouldBe 3 * data.sum
{code}
... fails with
{code:none}
org.scalatest.exceptions.TestFailedException: 55 was not equal to 165
at org.scalatest.MatchersHelper$.indicateFailure(MatchersHelper.scala:340)
at org.scalatest.Matchers$AnyShouldWrapper.shouldBe(Matchers.scala:6864)
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org