You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/02/19 11:26:46 UTC

spark git commit: [SPARK-13339][DOCS] Clarify commutative / associative operator requirements for reduce, fold

Repository: spark
Updated Branches:
  refs/heads/master c776fce99 -> fb7e21797


[SPARK-13339][DOCS] Clarify commutative / associative operator requirements for reduce, fold

Clarify that reduce functions need to be commutative, and fold functions do not

See https://github.com/apache/spark/pull/11091

Author: Sean Owen <so...@cloudera.com>

Closes #11217 from srowen/SPARK-13339.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb7e2179
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb7e2179
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb7e2179

Branch: refs/heads/master
Commit: fb7e21797ed618d9754545a44f8f95f75b66757a
Parents: c776fce
Author: Sean Owen <so...@cloudera.com>
Authored: Fri Feb 19 10:26:38 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Fri Feb 19 10:26:38 2016 +0000

----------------------------------------------------------------------
 R/pkg/R/pairRDD.R                               | 10 +++---
 .../scala/org/apache/spark/Accumulator.scala    |  6 ++--
 .../org/apache/spark/api/java/JavaPairRDD.scala | 32 ++++++++++----------
 .../org/apache/spark/api/java/JavaRDDLike.scala |  2 +-
 .../org/apache/spark/rdd/PairRDDFunctions.scala | 24 +++++++--------
 .../main/scala/org/apache/spark/rdd/RDD.scala   |  2 +-
 docs/programming-guide.md                       |  2 +-
 docs/streaming-programming-guide.md             |  4 +--
 python/pyspark/rdd.py                           |  7 ++---
 python/pyspark/streaming/dstream.py             |  4 +--
 .../streaming/api/java/JavaDStreamLike.scala    |  6 ++--
 .../streaming/api/java/JavaPairDStream.scala    | 18 +++++------
 .../spark/streaming/dstream/DStream.scala       |  4 +--
 .../dstream/PairDStreamFunctions.scala          | 16 +++++-----
 14 files changed, 68 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/R/pkg/R/pairRDD.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/pairRDD.R b/R/pkg/R/pairRDD.R
index f713114..4075ef4 100644
--- a/R/pkg/R/pairRDD.R
+++ b/R/pkg/R/pairRDD.R
@@ -305,11 +305,11 @@ setMethod("groupByKey",
 #'  Merge values by key
 #'
 #' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
-#' and merges the values for each key using an associative reduce function.
+#' and merges the values for each key using an associative and commutative reduce function.
 #'
 #' @param x The RDD to reduce by key. Should be an RDD where each element is
 #'             list(K, V) or c(K, V).
-#' @param combineFunc The associative reduce function to use.
+#' @param combineFunc The associative and commutative reduce function to use.
 #' @param numPartitions Number of partitions to create.
 #' @return An RDD where each element is list(K, V') where V' is the merged
 #'         value
@@ -347,12 +347,12 @@ setMethod("reduceByKey",
 #' Merge values by key locally
 #'
 #' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
-#' and merges the values for each key using an associative reduce function, but return the
-#' results immediately to the driver as an R list.
+#' and merges the values for each key using an associative and commutative reduce function, but
+#' return the results immediately to the driver as an R list.
 #'
 #' @param x The RDD to reduce by key. Should be an RDD where each element is
 #'             list(K, V) or c(K, V).
-#' @param combineFunc The associative reduce function to use.
+#' @param combineFunc The associative and commutative reduce function to use.
 #' @return A list of elements of type list(K, V') where V' is the merged value for each key
 #' @seealso reduceByKey
 #' @examples

http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/core/src/main/scala/org/apache/spark/Accumulator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Accumulator.scala b/core/src/main/scala/org/apache/spark/Accumulator.scala
index 5e8f1d4..0e4bcc3 100644
--- a/core/src/main/scala/org/apache/spark/Accumulator.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulator.scala
@@ -29,9 +29,9 @@ import org.apache.spark.storage.{BlockId, BlockStatus}
 /**
  * A simpler value of [[Accumulable]] where the result type being accumulated is the same
  * as the types of elements being merged, i.e. variables that are only "added" to through an
- * associative operation and can therefore be efficiently supported in parallel. They can be used
- * to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric
- * value types, and programmers can add support for new types.
+ * associative and commutative operation and can therefore be efficiently supported in parallel.
+ * They can be used to implement counters (as in MapReduce) or sums. Spark natively supports
+ * accumulators of numeric value types, and programmers can add support for new types.
  *
  * An accumulator is created from an initial value `v` by calling [[SparkContext#accumulator]].
  * Tasks running on the cluster can then add to it using the [[Accumulable#+=]] operator.

http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 94d1035..e080f91 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -278,17 +278,17 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
     combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
 
   /**
-   * Merge the values for each key using an associative reduce function. This will also perform
-   * the merging locally on each mapper before sending results to a reducer, similarly to a
-   * "combiner" in MapReduce.
+   * Merge the values for each key using an associative and commutative reduce function. This will
+   * also perform the merging locally on each mapper before sending results to a reducer, similarly
+   * to a "combiner" in MapReduce.
    */
   def reduceByKey(partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
     fromRDD(rdd.reduceByKey(partitioner, func))
 
   /**
-   * Merge the values for each key using an associative reduce function, but return the results
-   * immediately to the master as a Map. This will also perform the merging locally on each mapper
-   * before sending results to a reducer, similarly to a "combiner" in MapReduce.
+   * Merge the values for each key using an associative and commutative reduce function, but return
+   * the result immediately to the master as a Map. This will also perform the merging locally on
+   * each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce.
    */
   def reduceByKeyLocally(func: JFunction2[V, V, V]): java.util.Map[K, V] =
     mapAsSerializableJavaMap(rdd.reduceByKeyLocally(func))
@@ -381,9 +381,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
     fromRDD(rdd.foldByKey(zeroValue)(func))
 
   /**
-   * Merge the values for each key using an associative reduce function. This will also perform
-   * the merging locally on each mapper before sending results to a reducer, similarly to a
-   * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
+   * Merge the values for each key using an associative and commutative reduce function. This will
+   * also perform the merging locally on each mapper before sending results to a reducer, similarly
+   * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
    */
   def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairRDD[K, V] =
     fromRDD(rdd.reduceByKey(func, numPartitions))
@@ -461,10 +461,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
     fromRDD(rdd.partitionBy(partitioner))
 
   /**
-   * Merge the values for each key using an associative reduce function. This will also perform
-   * the merging locally on each mapper before sending results to a reducer, similarly to a
-   * "combiner" in MapReduce.
-   */
+    * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
+    * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
+    * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
+    */
   def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, W)] =
     fromRDD(rdd.join(other, partitioner))
 
@@ -520,9 +520,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
   }
 
   /**
-   * Merge the values for each key using an associative reduce function. This will also perform
-   * the merging locally on each mapper before sending results to a reducer, similarly to a
-   * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
+   * Merge the values for each key using an associative and commutative reduce function. This will
+   * also perform the merging locally on each mapper before sending results to a reducer, similarly
+   * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
    * parallelism level.
    */
   def reduceByKey(func: JFunction2[V, V, V]): JavaPairRDD[K, V] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 37c211f..4212027 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -373,7 +373,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
 
   /**
    * Aggregate the elements of each partition, and then the results for all the partitions, using a
-   * given associative and commutative function and a neutral "zero value". The function
+   * given associative function and a neutral "zero value". The function
    * op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object
    * allocation; however, it should not modify t2.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 61905a8..e00b9f6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -300,27 +300,27 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
   }
 
   /**
-   * Merge the values for each key using an associative reduce function. This will also perform
-   * the merging locally on each mapper before sending results to a reducer, similarly to a
-   * "combiner" in MapReduce.
+   * Merge the values for each key using an associative and commutative reduce function. This will
+   * also perform the merging locally on each mapper before sending results to a reducer, similarly
+   * to a "combiner" in MapReduce.
    */
   def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
     combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
   }
 
   /**
-   * Merge the values for each key using an associative reduce function. This will also perform
-   * the merging locally on each mapper before sending results to a reducer, similarly to a
-   * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
+   * Merge the values for each key using an associative and commutative reduce function. This will
+   * also perform the merging locally on each mapper before sending results to a reducer, similarly
+   * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
    */
   def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
     reduceByKey(new HashPartitioner(numPartitions), func)
   }
 
   /**
-   * Merge the values for each key using an associative reduce function. This will also perform
-   * the merging locally on each mapper before sending results to a reducer, similarly to a
-   * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
+   * Merge the values for each key using an associative and commutative reduce function. This will
+   * also perform the merging locally on each mapper before sending results to a reducer, similarly
+   * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
    * parallelism level.
    */
   def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
@@ -328,9 +328,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
   }
 
   /**
-   * Merge the values for each key using an associative reduce function, but return the results
-   * immediately to the master as a Map. This will also perform the merging locally on each mapper
-   * before sending results to a reducer, similarly to a "combiner" in MapReduce.
+   * Merge the values for each key using an associative and commutative reduce function, but return
+   * the results immediately to the master as a Map. This will also perform the merging locally on
+   * each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce.
    */
   def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = self.withScope {
     val cleanedF = self.sparkContext.clean(func)

http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index a81a98b..6a6ad2d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -973,7 +973,7 @@ abstract class RDD[T: ClassTag](
 
   /**
    * Aggregate the elements of each partition, and then the results for all the partitions, using a
-   * given associative and commutative function and a neutral "zero value". The function
+   * given associative function and a neutral "zero value". The function
    * op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object
    * allocation; however, it should not modify t2.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/docs/programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index e450814..2d6f776 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -1343,7 +1343,7 @@ value of the broadcast variable (e.g. if the variable is shipped to a new node l
 
 ## Accumulators
 
-Accumulators are variables that are only "added" to through an associative operation and can
+Accumulators are variables that are only "added" to through an associative and commutative operation and can
 therefore be efficiently supported in parallel. They can be used to implement counters (as in
 MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers
 can add support for new types. If accumulators are created with a name, they will be

http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/docs/streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 677f5ff..4d1932b 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -798,7 +798,7 @@ Some of the common ones are as follows.
   <td> <b>reduce</b>(<i>func</i>) </td>
   <td> Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the
   source DStream using a function <i>func</i> (which takes two arguments and returns one).
-  The function should be associative so that it can be computed in parallel. </td>
+  The function should be associative and commutative so that it can be computed in parallel. </td>
 </tr>
 <tr>
   <td> <b>countByValue</b>() </td>
@@ -1072,7 +1072,7 @@ said two parameters - <i>windowLength</i> and <i>slideInterval</i>.
 <tr>
   <td> <b>reduceByWindow</b>(<i>func</i>, <i>windowLength</i>, <i>slideInterval</i>) </td>
   <td> Return a new single-element stream, created by aggregating elements in the stream over a
-  sliding interval using <i>func</i>. The function should be associative so that it can be computed
+  sliding interval using <i>func</i>. The function should be associative and commutative so that it can be computed
   correctly in parallel.
   </td>
 </tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index fe2264a..4eaf589 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -844,8 +844,7 @@ class RDD(object):
     def fold(self, zeroValue, op):
         """
         Aggregate the elements of each partition, and then the results for all
-        the partitions, using a given associative and commutative function and
-        a neutral "zero value."
+        the partitions, using a given associative function and a neutral "zero value."
 
         The function C{op(t1, t2)} is allowed to modify C{t1} and return it
         as its result value to avoid object allocation; however, it should not
@@ -1558,7 +1557,7 @@ class RDD(object):
 
     def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash):
         """
-        Merge the values for each key using an associative reduce function.
+        Merge the values for each key using an associative and commutative reduce function.
 
         This will also perform the merging locally on each mapper before
         sending results to a reducer, similarly to a "combiner" in MapReduce.
@@ -1576,7 +1575,7 @@ class RDD(object):
 
     def reduceByKeyLocally(self, func):
         """
-        Merge the values for each key using an associative reduce function, but
+        Merge the values for each key using an associative and commutative reduce function, but
         return the results immediately to the master as a dictionary.
 
         This will also perform the merging locally on each mapper before

http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/python/pyspark/streaming/dstream.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py
index 86447f5..2056663 100644
--- a/python/pyspark/streaming/dstream.py
+++ b/python/pyspark/streaming/dstream.py
@@ -453,7 +453,7 @@ class DStream(object):
         2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
         This is more efficient than `invReduceFunc` is None.
 
-        @param reduceFunc:     associative reduce function
+        @param reduceFunc:     associative and commutative reduce function
         @param invReduceFunc:  inverse reduce function of `reduceFunc`
         @param windowDuration: width of the window; must be a multiple of this DStream's
                                batching interval
@@ -524,7 +524,7 @@ class DStream(object):
         `invFunc` can be None, then it will reduce all the RDDs in window, could be slower
         than having `invFunc`.
 
-        @param func:           associative reduce function
+        @param func:           associative and commutative reduce function
         @param invFunc:        inverse function of `reduceFunc`
         @param windowDuration: width of the window; must be a multiple of this DStream's
                               batching interval

http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index f10de48..9931a46 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -214,7 +214,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
   /**
    * Return a new DStream in which each RDD has a single element generated by reducing all
    * elements in a sliding window over this DStream.
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative and commutative reduce function
    * @param windowDuration width of the window; must be a multiple of this DStream's
    *                       batching interval
    * @param slideDuration  sliding interval of the window (i.e., the interval after which
@@ -234,7 +234,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
   /**
    * Return a new DStream in which each RDD has a single element generated by reducing all
    * elements in a sliding window over this DStream.
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative and commutative reduce function
    * @param windowDuration width of the window; must be a multiple of this DStream's
    *                       batching interval
    * @param slideDuration  sliding interval of the window (i.e., the interval after which
@@ -257,7 +257,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    *  2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
    *  This is more efficient than reduceByWindow without "inverse reduce" function.
    *  However, it is applicable to only "invertible reduce functions".
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative and commutative reduce function
    * @param invReduceFunc inverse reduce function
    * @param windowDuration width of the window; must be a multiple of this DStream's
    *                       batching interval

http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index d718f1d..aad9a12 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -138,8 +138,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
 
   /**
    * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
-   * merged using the associative reduce function. Hash partitioning is used to generate the RDDs
-   * with Spark's default number of partitions.
+   * merged using the associative and commutative reduce function. Hash partitioning is used to
+   * generate the RDDs with Spark's default number of partitions.
    */
   def reduceByKey(func: JFunction2[V, V, V]): JavaPairDStream[K, V] =
     dstream.reduceByKey(func)
@@ -257,7 +257,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream
    * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate
    * the RDDs with Spark's default number of partitions.
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative and commutative reduce function
    * @param windowDuration width of the window; must be a multiple of this DStream's
    *                       batching interval
    */
@@ -270,7 +270,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to
    * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
    * generate the RDDs with Spark's default number of partitions.
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative and commutative reduce function
    * @param windowDuration width of the window; must be a multiple of this DStream's
    *                       batching interval
    * @param slideDuration  sliding interval of the window (i.e., the interval after which
@@ -289,7 +289,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to
    * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
    * generate the RDDs with `numPartitions` partitions.
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative and commutative reduce function
    * @param windowDuration width of the window; must be a multiple of this DStream's
    *                       batching interval
    * @param slideDuration  sliding interval of the window (i.e., the interval after which
@@ -309,7 +309,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
   /**
    * Return a new DStream by applying `reduceByKey` over a sliding window. Similar to
    * `DStream.reduceByKey()`, but applies it over a sliding window.
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative rand commutative educe function
    * @param windowDuration width of the window; must be a multiple of this DStream's
    *                       batching interval
    * @param slideDuration  sliding interval of the window (i.e., the interval after which
@@ -335,7 +335,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
    * However, it is applicable to only "invertible reduce functions".
    * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative and commutative reduce function
    * @param invReduceFunc inverse function
    * @param windowDuration width of the window; must be a multiple of this DStream's
    *                       batching interval
@@ -360,7 +360,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
    * However, it is applicable to only "invertible reduce functions".
    * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative and commutative reduce function
    * @param invReduceFunc inverse function
    * @param windowDuration width of the window; must be a multiple of this DStream's
    *                       batching interval
@@ -397,7 +397,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    *  2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
    * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
    * However, it is applicable to only "invertible reduce functions".
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative and commutative reduce function
    * @param invReduceFunc inverse function
    * @param windowDuration width of the window; must be a multiple of this DStream's
    *                       batching interval

http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index db79eea..70e1d8a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -791,7 +791,7 @@ abstract class DStream[T: ClassTag] (
   /**
    * Return a new DStream in which each RDD has a single element generated by reducing all
    * elements in a sliding window over this DStream.
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative and commutative reduce function
    * @param windowDuration width of the window; must be a multiple of this DStream's
    *                       batching interval
    * @param slideDuration  sliding interval of the window (i.e., the interval after which
@@ -814,7 +814,7 @@ abstract class DStream[T: ClassTag] (
    *  2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
    *  This is more efficient than reduceByWindow without "inverse reduce" function.
    *  However, it is applicable to only "invertible reduce functions".
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative and commutative reduce function
    * @param invReduceFunc inverse reduce function
    * @param windowDuration width of the window; must be a multiple of this DStream's
    *                       batching interval

http://git-wip-us.apache.org/repos/asf/spark/blob/fb7e2179/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index babc722..1dcdb64 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -75,8 +75,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
 
   /**
    * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
-   * merged using the associative reduce function. Hash partitioning is used to generate the RDDs
-   * with Spark's default number of partitions.
+   * merged using the associative and commutative reduce function. Hash partitioning is used to
+   * generate the RDDs with Spark's default number of partitions.
    */
   def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = ssc.withScope {
     reduceByKey(reduceFunc, defaultPartitioner())
@@ -204,7 +204,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
    * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream
    * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate
    * the RDDs with Spark's default number of partitions.
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative and commutative reduce function
    * @param windowDuration width of the window; must be a multiple of this DStream's
    *                       batching interval
    */
@@ -219,7 +219,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
    * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to
    * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
    * generate the RDDs with Spark's default number of partitions.
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative and commutative reduce function
    * @param windowDuration width of the window; must be a multiple of this DStream's
    *                       batching interval
    * @param slideDuration  sliding interval of the window (i.e., the interval after which
@@ -238,7 +238,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
    * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to
    * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
    * generate the RDDs with `numPartitions` partitions.
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative and commutative reduce function
    * @param windowDuration width of the window; must be a multiple of this DStream's
    *                       batching interval
    * @param slideDuration  sliding interval of the window (i.e., the interval after which
@@ -259,7 +259,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
   /**
    * Return a new DStream by applying `reduceByKey` over a sliding window. Similar to
    * `DStream.reduceByKey()`, but applies it over a sliding window.
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative and commutative reduce function
    * @param windowDuration width of the window; must be a multiple of this DStream's
    *                       batching interval
    * @param slideDuration  sliding interval of the window (i.e., the interval after which
@@ -289,7 +289,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
    * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function.
    * However, it is applicable to only "invertible reduce functions".
    * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
-   * @param reduceFunc associative reduce function
+   * @param reduceFunc associative and commutative reduce function
    * @param invReduceFunc inverse reduce function
    * @param windowDuration width of the window; must be a multiple of this DStream's
    *                       batching interval
@@ -320,7 +320,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
    *  2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
    * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function.
    * However, it is applicable to only "invertible reduce functions".
-   * @param reduceFunc     associative reduce function
+   * @param reduceFunc     associative and commutative reduce function
    * @param invReduceFunc  inverse reduce function
    * @param windowDuration width of the window; must be a multiple of this DStream's
    *                       batching interval


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org