You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/06/09 07:53:21 UTC

spark git commit: [SPARK-14408][CORE] Changed RDD.treeAggregate to use fold instead of reduce

Repository: spark
Updated Branches:
  refs/heads/master 2a23cdd07 -> 5a3371883


[SPARK-14408][CORE] Changed RDD.treeAggregate to use fold instead of reduce

## What changes were proposed in this pull request?

Previously, `RDD.treeAggregate` used `reduceByKey` and `reduce` in its implementation, neither of which technically allows the `seq`/`combOps` to modify and return their first arguments.

This PR uses `foldByKey` and `fold` instead and notes that `aggregate` and `treeAggregate` are semantically identical in the Scala doc.

Note that this had some test failures by unknown reasons. This was actually fixed in https://github.com/apache/spark/commit/e3554605b36bdce63ac180cc66dbdee5c1528ec7.

The root cause was, the `zeroValue` now becomes `AFTAggregator` and it compares `totalCnt` (where the value is actually 0). It starts merging one by one and it keeps returning `this` where `totalCnt` is 0. So, this looks not the bug in the current change.

This is now fixed in the commit. So, this should pass the tests.

## How was this patch tested?

Test case added in `RDDSuite`.

Closes #12217

Author: Joseph K. Bradley <jo...@databricks.com>
Author: hyukjinkwon <gu...@gmail.com>

Closes #18198 from HyukjinKwon/SPARK-14408.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5a337188
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5a337188
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5a337188

Branch: refs/heads/master
Commit: 5a3371883acf8ac8f94a71cbffa75166605c91bc
Parents: 2a23cdd
Author: Joseph K. Bradley <jo...@databricks.com>
Authored: Fri Jun 9 08:53:18 2017 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Fri Jun 9 08:53:18 2017 +0100

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/rdd/RDD.scala   |  9 +++---
 .../scala/org/apache/spark/rdd/RDDSuite.scala   | 31 +++++++++++++++++++-
 2 files changed, 35 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5a337188/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 63a87e7..2985c90 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1118,9 +1118,9 @@ abstract class RDD[T: ClassTag](
 
   /**
    * Aggregates the elements of this RDD in a multi-level tree pattern.
+   * This method is semantically identical to [[org.apache.spark.rdd.RDD#aggregate]].
    *
    * @param depth suggested depth of the tree (default: 2)
-   * @see [[org.apache.spark.rdd.RDD#aggregate]]
    */
   def treeAggregate[U: ClassTag](zeroValue: U)(
       seqOp: (U, T) => U,
@@ -1134,7 +1134,7 @@ abstract class RDD[T: ClassTag](
       val cleanCombOp = context.clean(combOp)
       val aggregatePartition =
         (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
-      var partiallyAggregated = mapPartitions(it => Iterator(aggregatePartition(it)))
+      var partiallyAggregated: RDD[U] = mapPartitions(it => Iterator(aggregatePartition(it)))
       var numPartitions = partiallyAggregated.partitions.length
       val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)
       // If creating an extra level doesn't help reduce
@@ -1146,9 +1146,10 @@ abstract class RDD[T: ClassTag](
         val curNumPartitions = numPartitions
         partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
           (i, iter) => iter.map((i % curNumPartitions, _))
-        }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values
+        }.foldByKey(zeroValue, new HashPartitioner(curNumPartitions))(cleanCombOp).values
       }
-      partiallyAggregated.reduce(cleanCombOp)
+      val copiedZeroValue = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
+      partiallyAggregated.fold(copiedZeroValue)(cleanCombOp)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5a337188/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 8d06f54..386c006 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -192,6 +192,23 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
     assert(ser.serialize(union.partitions.head).limit() < 2000)
   }
 
+  test("fold") {
+    val rdd = sc.makeRDD(-1000 until 1000, 10)
+    def op: (Int, Int) => Int = (c: Int, x: Int) => c + x
+    val sum = rdd.fold(0)(op)
+    assert(sum === -1000)
+  }
+
+  test("fold with op modifying first arg") {
+    val rdd = sc.makeRDD(-1000 until 1000, 10).map(x => Array(x))
+    def op: (Array[Int], Array[Int]) => Array[Int] = { (c: Array[Int], x: Array[Int]) =>
+      c(0) += x(0)
+      c
+    }
+    val sum = rdd.fold(Array(0))(op)
+    assert(sum(0) === -1000)
+  }
+
   test("aggregate") {
     val pairs = sc.makeRDD(Array(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3)))
     type StringMap = HashMap[String, Int]
@@ -218,7 +235,19 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
     def combOp: (Long, Long) => Long = (c1: Long, c2: Long) => c1 + c2
     for (depth <- 1 until 10) {
       val sum = rdd.treeAggregate(0L)(seqOp, combOp, depth)
-      assert(sum === -1000L)
+      assert(sum === -1000)
+    }
+  }
+
+  test("treeAggregate with ops modifying first args") {
+    val rdd = sc.makeRDD(-1000 until 1000, 10).map(x => Array(x))
+    def op: (Array[Int], Array[Int]) => Array[Int] = { (c: Array[Int], x: Array[Int]) =>
+      c(0) += x(0)
+      c
+    }
+    for (depth <- 1 until 10) {
+      val sum = rdd.treeAggregate(Array(0))(op, op, depth)
+      assert(sum(0) === -1000)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org