You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/01/14 22:28:54 UTC

[2/7] git commit: Updated API doc for Accumulable and Accumulator.

Updated API doc for Accumulable and Accumulator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/6a12b9eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/6a12b9eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/6a12b9eb

Branch: refs/heads/master
Commit: 6a12b9ebc53d8851b9e1bc52452d4cc6d4c13ca1
Parents: 71b3007
Author: Reynold Xin <rx...@apache.org>
Authored: Tue Jan 14 11:16:08 2014 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Tue Jan 14 11:16:08 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/Accumulators.scala   | 40 +++++++++++++++-----
 1 file changed, 31 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6a12b9eb/core/src/main/scala/org/apache/spark/Accumulators.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index 2ba871a..fe1537a 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -17,17 +17,17 @@
 
 package org.apache.spark
 
-import java.io._
+import java.io.ObjectInputStream
 
 import scala.collection.mutable.Map
 import scala.collection.generic.Growable
 import org.apache.spark.serializer.JavaSerializer
 
 /**
- * A datatype that can be accumulated, ie has an commutative and associative "add" operation,
+ * A data type that can be accumulated, ie has an commutative and associative "add" operation,
  * but where the result type, `R`, may be different from the element type being added, `T`.
  *
- * You must define how to add data, and how to merge two of these together.  For some datatypes,
+ * You must define how to add data, and how to merge two of these together.  For some data types,
  * such as a counter, these might be the same operation. In that case, you can use the simpler
  * [[org.apache.spark.Accumulator]]. They won't always be the same, though -- e.g., imagine you are
  * accumulating a set. You will add items to the set, and you will union two sets together.
@@ -45,7 +45,7 @@ class Accumulable[R, T] (
   val id = Accumulators.newId
   @transient private var value_ = initialValue // Current value on master
   val zero = param.zero(initialValue)  // Zero value to be passed to workers
-  var deserialized = false
+  private var deserialized = false
 
   Accumulators.register(this, true)
 
@@ -127,7 +127,7 @@ class Accumulable[R, T] (
 
 /**
  * Helper object defining how to accumulate values of a particular type. An implicit
- * AccumulableParam needs to be available when you create Accumulables of a specific type.
+ * AccumulableParam needs to be available when you create [[Accumulable]]s of a specific type.
  *
  * @tparam R the full accumulated data (result type)
  * @tparam T partial data that can be added in
@@ -186,7 +186,29 @@ class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Ser
 
 /**
  * A simpler value of [[Accumulable]] where the result type being accumulated is the same
- * as the types of elements being merged.
+ * as the types of elements being merged, i.e. variables that are only "added" to through an
+ * associative operation and can therefore be efficiently supported in parallel. They can be used
+ * to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of type
+ * `Int` and `Double`, and programmers can add support for new types.
+ *
+ * An accumulator is created from an initial value `v` by calling [[SparkContext#accumulator]].
+ * Tasks running on the cluster can then add to it using the [[Accumulable#+=]] operator.
+ * However, they cannot read its value. Only the driver program can read the accumulator's value,
+ * using its value method.
+ *
+ * The interpreter session below shows an accumulator being used to add up the elements of an array:
+ *
+ * {{{
+ * scala> val accum = sc.accumulator(0)
+ * accum: spark.Accumulator[Int] = 0
+ *
+ * scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
+ * ...
+ * 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
+ *
+ * scala> accum.value
+ * res2: Int = 10
+ * }}}
  *
  * @param initialValue initial value of accumulator
  * @param param helper object defining how to add elements of type `T`
@@ -196,9 +218,9 @@ class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T])
   extends Accumulable[T,T](initialValue, param)
 
 /**
- * A simpler version of [[org.apache.spark.AccumulableParam]] where the only datatype you can add in is the same type
- * as the accumulated value. An implicit AccumulatorParam object needs to be available when you create
- * Accumulators of a specific type.
+ * A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add
+ * in is the same type as the accumulated value. An implicit AccumulatorParam object needs to be
+ * available when you create Accumulators of a specific type.
  *
  * @tparam T type of value to accumulate
  */