You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/08/24 14:30:19 UTC
[spark] branch master updated: [SPARK-36419][CORE] Optionally move
final aggregation in RDD.treeAggregate to executor
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ee20fbb [SPARK-36419][CORE] Optionally move final aggregation in RDD.treeAggregate to executor
ee20fbb is described below
commit ee20fbb3dc8594e2c70758c0e2d140f50108a6c5
Author: Aravind Patnam <ap...@linkedin.com>
AuthorDate: Tue Aug 24 22:29:26 2021 +0800
[SPARK-36419][CORE] Optionally move final aggregation in RDD.treeAggregate to executor
## What changes were proposed in this pull request?
Move final iteration of aggregation of RDD.treeAggregate to an executor with one partition and fetch that result to the driver
## Why are the changes needed?
1. RDD.fold pulls all shuffle partitions to the driver to merge the result
a. Driver becomes a single point of failure in the case that there are a lot of partitions to do the final aggregation on
2. Shuffle machinery at executors is much more robust/fault tolerant compared to fetching results to driver.
## Does this PR introduce any user-facing change?
The previous behavior always did the final aggregation in the driver. The user can now (optionally) provide a boolean config (default = false) ENABLE_EXECUTOR_TREE_AGGREGATE to do that final aggregation in a single partition executor before fetching the results to the driver. The only additional cost is that the user will see an extra stage in their job.
## How was this patch tested?
This patch was tested via unit tests, and also tested on a cluster.
The screenshots showing the extra stage on a cluster are attached below (before vs after).
![before](https://user-images.githubusercontent.com/24758726/128249830-eefc4bda-f737-4d68-960e-1d1907762538.png)
![after](https://user-images.githubusercontent.com/24758726/128249838-be70bc95-9f39-489c-be17-c9c80c4846a4.png)
Closes #33644 from akpatnam25/SPARK-36419.
Authored-by: Aravind Patnam <ap...@linkedin.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../org/apache/spark/api/java/JavaRDDLike.scala | 13 +++++++++
core/src/main/scala/org/apache/spark/rdd/RDD.scala | 32 +++++++++++++++++++++-
.../java/test/org/apache/spark/JavaAPISuite.java | 11 ++++++++
.../test/scala/org/apache/spark/rdd/RDDSuite.scala | 10 +++++++
4 files changed, 65 insertions(+), 1 deletion(-)
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 306af24..c17c9a0 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -451,6 +451,19 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
}
/**
+ * `org.apache.spark.api.java.JavaRDDLike.treeAggregate` with a parameter to do the
+ * final aggregation on the executor.
+ */
+ def treeAggregate[U](
+ zeroValue: U,
+ seqOp: JFunction2[U, T, U],
+ combOp: JFunction2[U, U, U],
+ depth: Int,
+ finalAggregateOnExecutor: Boolean): U = {
+ rdd.treeAggregate(zeroValue, seqOp, combOp, depth, finalAggregateOnExecutor)(fakeClassTag[U])
+ }
+
+ /**
* Return the number of elements in the RDD.
*/
def count(): Long = rdd.count()
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 35e53b6..61108bb 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1211,7 +1211,22 @@ abstract class RDD[T: ClassTag](
seqOp: (U, T) => U,
combOp: (U, U) => U,
depth: Int = 2): U = withScope {
- require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
+ treeAggregate(zeroValue, seqOp, combOp, depth, finalAggregateOnExecutor = false)
+ }
+
+ /**
+ * [[org.apache.spark.rdd.RDD#treeAggregate]] with a parameter to do the final
+ * aggregation on the executor
+ *
+ * @param finalAggregateOnExecutor do final aggregation on executor
+ */
+ def treeAggregate[U: ClassTag](
+ zeroValue: U,
+ seqOp: (U, T) => U,
+ combOp: (U, U) => U,
+ depth: Int,
+ finalAggregateOnExecutor: Boolean): U = withScope {
+ require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
if (partitions.length == 0) {
Utils.clone(zeroValue, context.env.closureSerializer.newInstance())
} else {
@@ -1233,6 +1248,21 @@ abstract class RDD[T: ClassTag](
(i, iter) => iter.map((i % curNumPartitions, _))
}.foldByKey(zeroValue, new HashPartitioner(curNumPartitions))(cleanCombOp).values
}
+ if (finalAggregateOnExecutor && partiallyAggregated.partitions.length > 1) {
+ // define a new partitioner that results in only 1 partition
+ val constantPartitioner = new Partitioner {
+ override def numPartitions: Int = 1
+
+ override def getPartition(key: Any): Int = 0
+ }
+ // map the partially aggregated rdd into a key-value rdd
+ // do the computation in the single executor with one partition
+ // get the new RDD[U]
+ partiallyAggregated = partiallyAggregated
+ .map(v => (0.toByte, v))
+ .foldByKey(zeroValue, constantPartitioner)(cleanCombOp)
+ .values
+ }
val copiedZeroValue = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
partiallyAggregated.fold(copiedZeroValue)(cleanCombOp)
}
diff --git a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
index e73ac0e..3796d3b 100644
--- a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
@@ -546,6 +546,17 @@ public class JavaAPISuite implements Serializable {
}
}
+ // Since SPARK-36419
+ @Test
+ public void treeAggregateWithFinalAggregateOnExecutor() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(-5, -4, -3, -2, -1, 1, 2, 3, 4), 10);
+ Function2<Integer, Integer, Integer> add = (a, b) -> a + b;
+ for (int depth = 1; depth <= 10; depth++) {
+ int sum = rdd.treeAggregate(0, add, add, depth, true);
+ assertEquals(-5, sum);
+ }
+ }
+
@SuppressWarnings("unchecked")
@Test
public void aggregateByKey() {
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 6f49e10..0c06713 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -275,6 +275,16 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually {
}
}
+ test("SPARK-36419: treeAggregate with finalAggregateOnExecutor set to true") {
+ val rdd = sc.makeRDD(-1000 until 1000, 10)
+ def seqOp: (Long, Int) => Long = (c: Long, x: Int) => c + x
+ def combOp: (Long, Long) => Long = (c1: Long, c2: Long) => c1 + c2
+ for (depth <- 1 until 10) {
+ val sum = rdd.treeAggregate(0L, seqOp, combOp, depth, finalAggregateOnExecutor = true)
+ assert(sum === -1000)
+ }
+ }
+
test("treeReduce") {
val rdd = sc.makeRDD(-1000 until 1000, 10)
for (depth <- 1 until 10) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org