You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/01/14 22:28:53 UTC

[1/7] git commit: Broadcast variable visibility change & doc update.

Updated Branches:
  refs/heads/master 3fcc68bfa -> 2ce23a55a


Broadcast variable visibility change & doc update.

Note that previously Broadcast class was accidentally marked as private[spark]. It needs to be public
for broadcast variables to work. Also exposing the broadcast varaible id.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/71b3007d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/71b3007d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/71b3007d

Branch: refs/heads/master
Commit: 71b3007dbde2211ed1487c1c6d5f877c9a74fdb5
Parents: 3fcc68b
Author: Reynold Xin <rx...@apache.org>
Authored: Tue Jan 14 11:15:21 2014 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Tue Jan 14 11:15:21 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/broadcast/Broadcast.scala  | 32 ++++++++++++++++++--
 .../org/apache/spark/broadcast/package.scala    | 25 +++++++++++++++
 2 files changed, 54 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/71b3007d/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
index 6bfe2cb..cf0904f 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
@@ -17,13 +17,39 @@
 
 package org.apache.spark.broadcast
 
-import java.io._
 import java.util.concurrent.atomic.AtomicLong
 
 import org.apache.spark._
 
-private[spark]
-abstract class Broadcast[T](private[spark] val id: Long) extends Serializable {
+/**
+ * A broadcast variable. Broadcast variables allow the programmer to keep a read-only variable
+ * cached on each machine rather than shipping a copy of it with tasks. They can be used, for
+ * example, to give every node a copy of a large input dataset in an efficient manner. Spark also
+ * attempts to distribute broadcast variables using efficient broadcast algorithms to reduce
+ * communication cost.
+ *
+ * Broadcast variables are created from a variable `v` by calling [[SparkContext#broadcast]].
+ * The broadcast variable is a wrapper around `v`, and its value can be accessed by calling the
+ * `value` method. The interpreter session below shows this:
+ *
+ * {{{
+ * scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
+ * broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)
+ *
+ * scala> broadcastVar.value
+ * res0: Array[Int] = Array(1, 2, 3)
+ * }}}
+ *
+ * After the broadcast variable is created, it should be used instead of the value `v` in any
+ * functions run on the cluster so that `v` is not shipped to the nodes more than once.
+ * In addition, the object `v` should not be modified after it is broadcast in order to ensure
+ * that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped
+ * to a new node later).
+ *
+ * @param id A unique identifier for the broadcast variable.
+ * @tparam T Type of the data contained in the broadcast variable.
+ */
+abstract class Broadcast[T](val id: Long) extends Serializable {
   def value: T
 
   // We cannot have an abstract readObject here due to some weird issues with

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/71b3007d/core/src/main/scala/org/apache/spark/broadcast/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/package.scala b/core/src/main/scala/org/apache/spark/broadcast/package.scala
new file mode 100644
index 0000000..01bf886
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/broadcast/package.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+/**
+ * Package for broadcast variables. See [[broadcast.Broadcast]] for details.
+ */
+package object broadcast {
+  // For package docs only
+}


[5/7] git commit: Maintain Serializable API compatibility by reverting back to java.io.Serializable for Broadcast and Accumulator.

Posted by rx...@apache.org.
Maintain Serializable API compatibility by reverting back to java.io.Serializable for Broadcast and Accumulator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/1b5623fd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/1b5623fd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/1b5623fd

Branch: refs/heads/master
Commit: 1b5623fd0bd716d3c556371caabbf94abcc6fb8d
Parents: 55db774
Author: Reynold Xin <rx...@apache.org>
Authored: Tue Jan 14 11:30:59 2014 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Tue Jan 14 11:30:59 2014 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/Accumulators.scala        | 2 +-
 core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala | 1 +
 2 files changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1b5623fd/core/src/main/scala/org/apache/spark/Accumulators.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index fe1537a..df01b2e 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark
 
-import java.io.ObjectInputStream
+import java.io.{ObjectInputStream, Serializable}
 
 import scala.collection.mutable.Map
 import scala.collection.generic.Growable

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1b5623fd/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
index cf0904f..d113d40 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.broadcast
 
+import java.io.Serializable
 import java.util.concurrent.atomic.AtomicLong
 
 import org.apache.spark._


[7/7] git commit: Merge pull request #425 from rxin/scaladoc

Posted by rx...@apache.org.
Merge pull request #425 from rxin/scaladoc

API doc update & make Broadcast public

In #413 Broadcast was mistakenly made private[spark]. I changed it to public again. Also exposing id in public given the R frontend requires that.

Copied some of the documentation from the programming guide to API Doc for Broadcast and Accumulator.

This should be cherry picked into branch-0.9 as well for 0.9.0 release.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/2ce23a55
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/2ce23a55
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/2ce23a55

Branch: refs/heads/master
Commit: 2ce23a55a3c4033873bb262919d89e5afabb9134
Parents: 3fcc68b f12e506
Author: Reynold Xin <rx...@apache.org>
Authored: Tue Jan 14 13:28:44 2014 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Tue Jan 14 13:28:44 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/Accumulators.scala   | 40 +++++++++++++++-----
 .../spark/api/java/JavaSparkContext.scala       | 11 +++---
 .../org/apache/spark/api/java/package.scala     | 23 +++++++++++
 .../org/apache/spark/broadcast/Broadcast.scala  | 33 ++++++++++++++--
 .../org/apache/spark/broadcast/package.scala    | 25 ++++++++++++
 5 files changed, 115 insertions(+), 17 deletions(-)
----------------------------------------------------------------------



[4/7] git commit: Added license header for package.scala in the Java API package.

Posted by rx...@apache.org.
Added license header for package.scala in the Java API package.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/55db7741
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/55db7741
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/55db7741

Branch: refs/heads/master
Commit: 55db77416b4fd8c51e0b0c3cc704c700f73bdc2e
Parents: f8c12e9
Author: Reynold Xin <rx...@apache.org>
Authored: Tue Jan 14 11:20:12 2014 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Tue Jan 14 11:20:12 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/api/java/package.scala  | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/55db7741/core/src/main/scala/org/apache/spark/api/java/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/package.scala b/core/src/main/scala/org/apache/spark/api/java/package.scala
index 14ba2c1..8ec7700 100644
--- a/core/src/main/scala/org/apache/spark/api/java/package.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/package.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.spark.api
 
 /** Spark Java programming APIs. */


[3/7] git commit: Added package doc for the Java API.

Posted by rx...@apache.org.
Added package doc for the Java API.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/f8c12e94
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/f8c12e94
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/f8c12e94

Branch: refs/heads/master
Commit: f8c12e9457503956fd728dda47be87bc67522a65
Parents: 6a12b9e
Author: Reynold Xin <rx...@apache.org>
Authored: Tue Jan 14 11:16:25 2014 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Tue Jan 14 11:16:25 2014 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/api/java/package.scala | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f8c12e94/core/src/main/scala/org/apache/spark/api/java/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/package.scala b/core/src/main/scala/org/apache/spark/api/java/package.scala
new file mode 100644
index 0000000..14ba2c1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/package.scala
@@ -0,0 +1,6 @@
+package org.apache.spark.api
+
+/** Spark Java programming APIs. */
+package object java {
+  // For package docs only
+}


[2/7] git commit: Updated API doc for Accumulable and Accumulator.

Posted by rx...@apache.org.
Updated API doc for Accumulable and Accumulator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/6a12b9eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/6a12b9eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/6a12b9eb

Branch: refs/heads/master
Commit: 6a12b9ebc53d8851b9e1bc52452d4cc6d4c13ca1
Parents: 71b3007
Author: Reynold Xin <rx...@apache.org>
Authored: Tue Jan 14 11:16:08 2014 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Tue Jan 14 11:16:08 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/Accumulators.scala   | 40 +++++++++++++++-----
 1 file changed, 31 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6a12b9eb/core/src/main/scala/org/apache/spark/Accumulators.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index 2ba871a..fe1537a 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -17,17 +17,17 @@
 
 package org.apache.spark
 
-import java.io._
+import java.io.ObjectInputStream
 
 import scala.collection.mutable.Map
 import scala.collection.generic.Growable
 import org.apache.spark.serializer.JavaSerializer
 
 /**
- * A datatype that can be accumulated, ie has an commutative and associative "add" operation,
+ * A data type that can be accumulated, ie has an commutative and associative "add" operation,
  * but where the result type, `R`, may be different from the element type being added, `T`.
  *
- * You must define how to add data, and how to merge two of these together.  For some datatypes,
+ * You must define how to add data, and how to merge two of these together.  For some data types,
  * such as a counter, these might be the same operation. In that case, you can use the simpler
  * [[org.apache.spark.Accumulator]]. They won't always be the same, though -- e.g., imagine you are
  * accumulating a set. You will add items to the set, and you will union two sets together.
@@ -45,7 +45,7 @@ class Accumulable[R, T] (
   val id = Accumulators.newId
   @transient private var value_ = initialValue // Current value on master
   val zero = param.zero(initialValue)  // Zero value to be passed to workers
-  var deserialized = false
+  private var deserialized = false
 
   Accumulators.register(this, true)
 
@@ -127,7 +127,7 @@ class Accumulable[R, T] (
 
 /**
  * Helper object defining how to accumulate values of a particular type. An implicit
- * AccumulableParam needs to be available when you create Accumulables of a specific type.
+ * AccumulableParam needs to be available when you create [[Accumulable]]s of a specific type.
  *
  * @tparam R the full accumulated data (result type)
  * @tparam T partial data that can be added in
@@ -186,7 +186,29 @@ class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Ser
 
 /**
  * A simpler value of [[Accumulable]] where the result type being accumulated is the same
- * as the types of elements being merged.
+ * as the types of elements being merged, i.e. variables that are only "added" to through an
+ * associative operation and can therefore be efficiently supported in parallel. They can be used
+ * to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of type
+ * `Int` and `Double`, and programmers can add support for new types.
+ *
+ * An accumulator is created from an initial value `v` by calling [[SparkContext#accumulator]].
+ * Tasks running on the cluster can then add to it using the [[Accumulable#+=]] operator.
+ * However, they cannot read its value. Only the driver program can read the accumulator's value,
+ * using its value method.
+ *
+ * The interpreter session below shows an accumulator being used to add up the elements of an array:
+ *
+ * {{{
+ * scala> val accum = sc.accumulator(0)
+ * accum: spark.Accumulator[Int] = 0
+ *
+ * scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
+ * ...
+ * 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
+ *
+ * scala> accum.value
+ * res2: Int = 10
+ * }}}
  *
  * @param initialValue initial value of accumulator
  * @param param helper object defining how to add elements of type `T`
@@ -196,9 +218,9 @@ class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T])
   extends Accumulable[T,T](initialValue, param)
 
 /**
- * A simpler version of [[org.apache.spark.AccumulableParam]] where the only datatype you can add in is the same type
- * as the accumulated value. An implicit AccumulatorParam object needs to be available when you create
- * Accumulators of a specific type.
+ * A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add
+ * in is the same type as the accumulated value. An implicit AccumulatorParam object needs to be
+ * available when you create Accumulators of a specific type.
  *
  * @tparam T type of value to accumulate
  */


[6/7] git commit: Fixed a typo in JavaSparkContext's API doc.

Posted by rx...@apache.org.
Fixed a typo in JavaSparkContext's API doc.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/f12e506c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/f12e506c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/f12e506c

Branch: refs/heads/master
Commit: f12e506c9e3031d512ff0cc4e1c942eb06f4851f
Parents: 1b5623f
Author: Reynold Xin <rx...@apache.org>
Authored: Tue Jan 14 11:42:28 2014 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Tue Jan 14 11:42:28 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/api/java/JavaSparkContext.scala     | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f12e506c/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 7a6f044..8041163 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -34,11 +34,11 @@ import org.apache.spark.SparkContext.IntAccumulatorParam
 import org.apache.spark.SparkContext.DoubleAccumulatorParam
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
-import scala.Tuple2
+
 
 /**
- * A Java-friendly version of [[org.apache.spark.SparkContext]] that returns [[org.apache.spark.api.java.JavaRDD]]s and
- * works with Java collections instead of Scala ones.
+ * A Java-friendly version of [[org.apache.spark.SparkContext]] that returns
+ * [[org.apache.spark.api.java.JavaRDD]]s and works with Java collections instead of Scala ones.
  */
 class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWorkaround {
   /**
@@ -333,8 +333,9 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     sc.accumulable(initialValue)(param)
 
   /**
-   * Broadcast a read-only variable to the cluster, returning a [[org.apache.spark.Broadcast]] object for
-   * reading it in distributed functions. The variable will be sent to each cluster only once.
+   * Broadcast a read-only variable to the cluster, returning a
+   * [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
+   * The variable will be sent to each cluster only once.
    */
   def broadcast[T](value: T): Broadcast[T] = sc.broadcast(value)