You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Reynold Xin (JIRA)" <ji...@apache.org> on 2016/04/15 08:46:25 UTC

[jira] [Commented] (SPARK-14654) New accumulator API

    [ https://issues.apache.org/jira/browse/SPARK-14654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15242536#comment-15242536 ] 

Reynold Xin commented on SPARK-14654:
-------------------------------------

 Note that one challenge is that we most likely don't want to break the existing accumulator API, meaning we'd need to name this something else or put it in a different package, or both.


> New accumulator API
> -------------------
>
>                 Key: SPARK-14654
>                 URL: https://issues.apache.org/jira/browse/SPARK-14654
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>            Reporter: Reynold Xin
>
> The current accumulator API has a few problems:
> 1. Its type hierarchy is very complicated, with Accumulator, Accumulable, AccumulatorParam, AccumulableParam, etc.
> 2. The intermediate buffer type must be the same as the output type, so there is no way to define an accumulator that computes averages.
> 3. It is very difficult to specialize the methods, leading to excessive boxing and making accumulators bad for metrics that change for each record.
> 4. There is not a single coherent API that works for both Java and Scala.
> This is a proposed new API that addresses all of the above. In this new API:
> 1. There is only a single class (Accumulator) that is user facing
> 2. The intermediate value is stored in the accumulator itself and can be different from the output type.
> 3. Concrete implementations can provide its own specialized methods.
> 4. Designed to work for both Java and Scala.
> {code}
> abstract class Accumulator[IN, OUT] extends Serializable {
>   def isRegistered: Boolean = ...
>   def register(metadata: AccumulatorMetadata): Unit = ...
>   def metadata: AccumulatorMetadata = ...
>   def reset(): Unit
>   def add(v: IN): Unit
>   def merge(other: Accumulator[IN, OUT]): Unit
>   def value: OUT
>   def localValue: OUT = value
>   final def registerAccumulatorOnExecutor(): Unit = {
>     // Automatically register the accumulator when it is deserialized with the task closure.
>     // This is for external accumulators and internal ones that do not represent task level
>     // metrics, e.g. internal SQL metrics, which are per-operator.
>     val taskContext = TaskContext.get()
>     if (taskContext != null) {
>       taskContext.registerAccumulator(this)
>     }
>   }
>   // Called by Java when deserializing an object
>   private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
>     in.defaultReadObject()
>     registerAccumulator()
>   }
> }
> {code}
> Metadata, provided by Spark after registration:
> {code}
> class AccumulatorMetadata(
>   val id: Long,
>   val name: Option[String],
>   val countFailedValues: Boolean
> ) extends Serializable
> {code}
> and an implementation that also offers specialized getters and setters
> {code}
> class LongAccumulator extends Accumulator[jl.Long, jl.Long] {
>   private[this] var _sum = 0L
>   override def reset(): Unit = _sum = 0L
>   override def add(v: jl.Long): Unit = {
>     _sum += v
>   }
>   override def merge(other: Accumulator[jl.Long, jl.Long]): Unit = other match {
>     case o: LongAccumulator => _sum += o.sum
>     case _ => throw new UnsupportedOperationException(
>       s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
>   }
>   override def value: jl.Long = _sum
>   def sum: Long = _sum
> }
> {code}
> and SparkContext...
> {code}
> class SparkContext {
>   ...
>   def newLongAccumulator(): LongAccumulator
>   def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): Accumulator[IN, OUT]
>   ...
> }
> {code}
> To use it ...
> {code}
> val acc = sc.newLongAccumulator()
> sc.parallelize(1 to 1000).map { i =>
>   acc.add(1)
>   i
> }
> {code}
> A work-in-progress prototype here: https://github.com/rxin/spark/tree/accumulator-refactor



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org