You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Reynold Xin (JIRA)" <ji...@apache.org> on 2016/04/15 08:46:25 UTC
[jira] [Commented] (SPARK-14654) New accumulator API
[ https://issues.apache.org/jira/browse/SPARK-14654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15242536#comment-15242536 ]
Reynold Xin commented on SPARK-14654:
-------------------------------------
Note that one challenge is that we most likely don't want to break the existing accumulator API, meaning we'd need to name this something else or put it in a different package, or both.
> New accumulator API
> -------------------
>
> Key: SPARK-14654
> URL: https://issues.apache.org/jira/browse/SPARK-14654
> Project: Spark
> Issue Type: Sub-task
> Components: SQL
> Reporter: Reynold Xin
>
> The current accumulator API has a few problems:
> 1. Its type hierarchy is very complicated, with Accumulator, Accumulable, AccumulatorParam, AccumulableParam, etc.
> 2. The intermediate buffer type must be the same as the output type, so there is no way to define an accumulator that computes averages.
> 3. It is very difficult to specialize the methods, leading to excessive boxing and making accumulators bad for metrics that change for each record.
> 4. There is not a single coherent API that works for both Java and Scala.
> This is a proposed new API that addresses all of the above. In this new API:
> 1. There is only a single class (Accumulator) that is user facing
> 2. The intermediate value is stored in the accumulator itself and can be different from the output type.
> 3. Concrete implementations can provide its own specialized methods.
> 4. Designed to work for both Java and Scala.
> {code}
> abstract class Accumulator[IN, OUT] extends Serializable {
> def isRegistered: Boolean = ...
> def register(metadata: AccumulatorMetadata): Unit = ...
> def metadata: AccumulatorMetadata = ...
> def reset(): Unit
> def add(v: IN): Unit
> def merge(other: Accumulator[IN, OUT]): Unit
> def value: OUT
> def localValue: OUT = value
> final def registerAccumulatorOnExecutor(): Unit = {
> // Automatically register the accumulator when it is deserialized with the task closure.
> // This is for external accumulators and internal ones that do not represent task level
> // metrics, e.g. internal SQL metrics, which are per-operator.
> val taskContext = TaskContext.get()
> if (taskContext != null) {
> taskContext.registerAccumulator(this)
> }
> }
> // Called by Java when deserializing an object
> private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
> in.defaultReadObject()
> registerAccumulator()
> }
> }
> {code}
> Metadata, provided by Spark after registration:
> {code}
> class AccumulatorMetadata(
> val id: Long,
> val name: Option[String],
> val countFailedValues: Boolean
> ) extends Serializable
> {code}
> and an implementation that also offers specialized getters and setters
> {code}
> class LongAccumulator extends Accumulator[jl.Long, jl.Long] {
> private[this] var _sum = 0L
> override def reset(): Unit = _sum = 0L
> override def add(v: jl.Long): Unit = {
> _sum += v
> }
> override def merge(other: Accumulator[jl.Long, jl.Long]): Unit = other match {
> case o: LongAccumulator => _sum += o.sum
> case _ => throw new UnsupportedOperationException(
> s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
> }
> override def value: jl.Long = _sum
> def sum: Long = _sum
> }
> {code}
> and SparkContext...
> {code}
> class SparkContext {
> ...
> def newLongAccumulator(): LongAccumulator
> def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): Accumulator[IN, OUT]
> ...
> }
> {code}
> To use it ...
> {code}
> val acc = sc.newLongAccumulator()
> sc.parallelize(1 to 1000).map { i =>
> acc.add(1)
> i
> }
> {code}
> A work-in-progress prototype here: https://github.com/rxin/spark/tree/accumulator-refactor
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org