You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/06/09 07:53:21 UTC
spark git commit: [SPARK-14408][CORE] Changed RDD.treeAggregate to
use fold instead of reduce
Repository: spark
Updated Branches:
refs/heads/master 2a23cdd07 -> 5a3371883
[SPARK-14408][CORE] Changed RDD.treeAggregate to use fold instead of reduce
## What changes were proposed in this pull request?
Previously, `RDD.treeAggregate` used `reduceByKey` and `reduce` in its implementation, neither of which technically allows the `seq`/`combOps` to modify and return their first arguments.
This PR uses `foldByKey` and `fold` instead and notes that `aggregate` and `treeAggregate` are semantically identical in the Scala doc.
Note that this had some test failures by unknown reasons. This was actually fixed in https://github.com/apache/spark/commit/e3554605b36bdce63ac180cc66dbdee5c1528ec7.
The root cause was, the `zeroValue` now becomes `AFTAggregator` and it compares `totalCnt` (where the value is actually 0). It starts merging one by one and it keeps returning `this` where `totalCnt` is 0. So, this looks not the bug in the current change.
This is now fixed in the commit. So, this should pass the tests.
## How was this patch tested?
Test case added in `RDDSuite`.
Closes #12217
Author: Joseph K. Bradley <jo...@databricks.com>
Author: hyukjinkwon <gu...@gmail.com>
Closes #18198 from HyukjinKwon/SPARK-14408.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5a337188
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5a337188
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5a337188
Branch: refs/heads/master
Commit: 5a3371883acf8ac8f94a71cbffa75166605c91bc
Parents: 2a23cdd
Author: Joseph K. Bradley <jo...@databricks.com>
Authored: Fri Jun 9 08:53:18 2017 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Fri Jun 9 08:53:18 2017 +0100
----------------------------------------------------------------------
.../main/scala/org/apache/spark/rdd/RDD.scala | 9 +++---
.../scala/org/apache/spark/rdd/RDDSuite.scala | 31 +++++++++++++++++++-
2 files changed, 35 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/5a337188/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 63a87e7..2985c90 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1118,9 +1118,9 @@ abstract class RDD[T: ClassTag](
/**
* Aggregates the elements of this RDD in a multi-level tree pattern.
+ * This method is semantically identical to [[org.apache.spark.rdd.RDD#aggregate]].
*
* @param depth suggested depth of the tree (default: 2)
- * @see [[org.apache.spark.rdd.RDD#aggregate]]
*/
def treeAggregate[U: ClassTag](zeroValue: U)(
seqOp: (U, T) => U,
@@ -1134,7 +1134,7 @@ abstract class RDD[T: ClassTag](
val cleanCombOp = context.clean(combOp)
val aggregatePartition =
(it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
- var partiallyAggregated = mapPartitions(it => Iterator(aggregatePartition(it)))
+ var partiallyAggregated: RDD[U] = mapPartitions(it => Iterator(aggregatePartition(it)))
var numPartitions = partiallyAggregated.partitions.length
val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)
// If creating an extra level doesn't help reduce
@@ -1146,9 +1146,10 @@ abstract class RDD[T: ClassTag](
val curNumPartitions = numPartitions
partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
(i, iter) => iter.map((i % curNumPartitions, _))
- }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values
+ }.foldByKey(zeroValue, new HashPartitioner(curNumPartitions))(cleanCombOp).values
}
- partiallyAggregated.reduce(cleanCombOp)
+ val copiedZeroValue = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
+ partiallyAggregated.fold(copiedZeroValue)(cleanCombOp)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/5a337188/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 8d06f54..386c006 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -192,6 +192,23 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
assert(ser.serialize(union.partitions.head).limit() < 2000)
}
+ test("fold") {
+ val rdd = sc.makeRDD(-1000 until 1000, 10)
+ def op: (Int, Int) => Int = (c: Int, x: Int) => c + x
+ val sum = rdd.fold(0)(op)
+ assert(sum === -1000)
+ }
+
+ test("fold with op modifying first arg") {
+ val rdd = sc.makeRDD(-1000 until 1000, 10).map(x => Array(x))
+ def op: (Array[Int], Array[Int]) => Array[Int] = { (c: Array[Int], x: Array[Int]) =>
+ c(0) += x(0)
+ c
+ }
+ val sum = rdd.fold(Array(0))(op)
+ assert(sum(0) === -1000)
+ }
+
test("aggregate") {
val pairs = sc.makeRDD(Array(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3)))
type StringMap = HashMap[String, Int]
@@ -218,7 +235,19 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
def combOp: (Long, Long) => Long = (c1: Long, c2: Long) => c1 + c2
for (depth <- 1 until 10) {
val sum = rdd.treeAggregate(0L)(seqOp, combOp, depth)
- assert(sum === -1000L)
+ assert(sum === -1000)
+ }
+ }
+
+ test("treeAggregate with ops modifying first args") {
+ val rdd = sc.makeRDD(-1000 until 1000, 10).map(x => Array(x))
+ def op: (Array[Int], Array[Int]) => Array[Int] = { (c: Array[Int], x: Array[Int]) =>
+ c(0) += x(0)
+ c
+ }
+ for (depth <- 1 until 10) {
+ val sum = rdd.treeAggregate(Array(0))(op, op, depth)
+ assert(sum(0) === -1000)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org