You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/08/24 14:30:19 UTC

[spark] branch master updated: [SPARK-36419][CORE] Optionally move final aggregation in RDD.treeAggregate to executor

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ee20fbb  [SPARK-36419][CORE] Optionally move final aggregation in RDD.treeAggregate to executor
ee20fbb is described below

commit ee20fbb3dc8594e2c70758c0e2d140f50108a6c5
Author: Aravind Patnam <ap...@linkedin.com>
AuthorDate: Tue Aug 24 22:29:26 2021 +0800

    [SPARK-36419][CORE] Optionally move final aggregation in RDD.treeAggregate to executor
    
    ## What changes were proposed in this pull request?
    
    Move final iteration of aggregation of RDD.treeAggregate to an executor with one partition and fetch that result to the driver
    
    ## Why are the changes needed?
    1. RDD.fold pulls all shuffle partitions to the driver to merge the result
            a. Driver becomes a single point of failure in the case that there are a lot of partitions to do the final aggregation on
    2. Shuffle machinery at executors is much more robust/fault tolerant compared to fetching results to driver.
    
    ## Does this PR introduce any user-facing change?
    The previous behavior always did the final aggregation in the driver. The user can now (optionally) provide a boolean config (default = false) ENABLE_EXECUTOR_TREE_AGGREGATE to do that final aggregation in a single partition executor before fetching the results to the driver. The only additional cost is that the user will see an extra stage in their job.
    
    ## How was this patch tested?
    This patch was tested via unit tests, and also tested on a cluster.
    The screenshots showing the extra stage on a cluster are attached below (before vs after).
    ![before](https://user-images.githubusercontent.com/24758726/128249830-eefc4bda-f737-4d68-960e-1d1907762538.png)
    ![after](https://user-images.githubusercontent.com/24758726/128249838-be70bc95-9f39-489c-be17-c9c80c4846a4.png)
    
    Closes #33644 from akpatnam25/SPARK-36419.
    
    Authored-by: Aravind Patnam <ap...@linkedin.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../org/apache/spark/api/java/JavaRDDLike.scala    | 13 +++++++++
 core/src/main/scala/org/apache/spark/rdd/RDD.scala | 32 +++++++++++++++++++++-
 .../java/test/org/apache/spark/JavaAPISuite.java   | 11 ++++++++
 .../test/scala/org/apache/spark/rdd/RDDSuite.scala | 10 +++++++
 4 files changed, 65 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 306af24..c17c9a0 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -451,6 +451,19 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   }
 
   /**
+   * `org.apache.spark.api.java.JavaRDDLike.treeAggregate` with a parameter to do the
+   * final aggregation on the executor.
+   */
+  def treeAggregate[U](
+      zeroValue: U,
+      seqOp: JFunction2[U, T, U],
+      combOp: JFunction2[U, U, U],
+      depth: Int,
+      finalAggregateOnExecutor: Boolean): U = {
+    rdd.treeAggregate(zeroValue, seqOp, combOp, depth, finalAggregateOnExecutor)(fakeClassTag[U])
+  }
+
+  /**
    * Return the number of elements in the RDD.
    */
   def count(): Long = rdd.count()
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 35e53b6..61108bb 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1211,7 +1211,22 @@ abstract class RDD[T: ClassTag](
       seqOp: (U, T) => U,
       combOp: (U, U) => U,
       depth: Int = 2): U = withScope {
-    require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
+      treeAggregate(zeroValue, seqOp, combOp, depth, finalAggregateOnExecutor = false)
+  }
+
+  /**
+   * [[org.apache.spark.rdd.RDD#treeAggregate]] with a parameter to do the final
+   * aggregation on the executor
+   *
+   * @param finalAggregateOnExecutor do final aggregation on executor
+   */
+  def treeAggregate[U: ClassTag](
+      zeroValue: U,
+      seqOp: (U, T) => U,
+      combOp: (U, U) => U,
+      depth: Int,
+      finalAggregateOnExecutor: Boolean): U = withScope {
+      require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
     if (partitions.length == 0) {
       Utils.clone(zeroValue, context.env.closureSerializer.newInstance())
     } else {
@@ -1233,6 +1248,21 @@ abstract class RDD[T: ClassTag](
           (i, iter) => iter.map((i % curNumPartitions, _))
         }.foldByKey(zeroValue, new HashPartitioner(curNumPartitions))(cleanCombOp).values
       }
+      if (finalAggregateOnExecutor && partiallyAggregated.partitions.length > 1) {
+        // define a new partitioner that results in only 1 partition
+        val constantPartitioner = new Partitioner {
+          override def numPartitions: Int = 1
+
+          override def getPartition(key: Any): Int = 0
+        }
+        // map the partially aggregated rdd into a key-value rdd
+        // do the computation in the single executor with one partition
+        // get the new RDD[U]
+        partiallyAggregated = partiallyAggregated
+          .map(v => (0.toByte, v))
+          .foldByKey(zeroValue, constantPartitioner)(cleanCombOp)
+          .values
+      }
       val copiedZeroValue = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
       partiallyAggregated.fold(copiedZeroValue)(cleanCombOp)
     }
diff --git a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
index e73ac0e..3796d3b 100644
--- a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
@@ -546,6 +546,17 @@ public class JavaAPISuite implements Serializable {
     }
   }
 
+  // Since SPARK-36419
+  @Test
+  public void treeAggregateWithFinalAggregateOnExecutor() {
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(-5, -4, -3, -2, -1, 1, 2, 3, 4), 10);
+    Function2<Integer, Integer, Integer> add = (a, b) -> a + b;
+    for (int depth = 1; depth <= 10; depth++) {
+      int sum = rdd.treeAggregate(0, add, add, depth, true);
+      assertEquals(-5, sum);
+    }
+  }
+
   @SuppressWarnings("unchecked")
   @Test
   public void aggregateByKey() {
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 6f49e10..0c06713 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -275,6 +275,16 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually {
     }
   }
 
+  test("SPARK-36419: treeAggregate with finalAggregateOnExecutor set to true") {
+    val rdd = sc.makeRDD(-1000 until 1000, 10)
+    def seqOp: (Long, Int) => Long = (c: Long, x: Int) => c + x
+    def combOp: (Long, Long) => Long = (c1: Long, c2: Long) => c1 + c2
+    for (depth <- 1 until 10) {
+      val sum = rdd.treeAggregate(0L, seqOp, combOp, depth, finalAggregateOnExecutor = true)
+      assert(sum === -1000)
+    }
+  }
+
   test("treeReduce") {
     val rdd = sc.makeRDD(-1000 until 1000, 10)
     for (depth <- 1 until 10) {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org