You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by js...@apache.org on 2022/07/01 06:57:53 UTC

[incubator-uniffle] 09/17: [Improvement] Add dynamic allocation patch for Spark 3.2 (#199)

This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git

commit a253b1fed2e947e397b45b1db8f56d856eabc9fc
Author: roryqi <je...@gmail.com>
AuthorDate: Mon Jun 27 10:07:13 2022 +0800

    [Improvement] Add dynamic allocation patch for Spark 3.2 (#199)
    
    ### What changes were proposed in this pull request?
    Add the dynamic allocation patch for Spark 3.2, solve issue #106
    
    ### Why are the changes needed?
    If we don't have this patch, users can't use dynamic allocation in Spark 3.2.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Manual test
---
 README.md                                          |  2 +-
 .../spark-3.2.1_dynamic_allocation_support.patch   | 92 ++++++++++++++++++++++
 2 files changed, 93 insertions(+), 1 deletion(-)

diff --git a/README.md b/README.md
index 0fb65e5..9ad8299 100644
--- a/README.md
+++ b/README.md
@@ -155,7 +155,7 @@ rss-xxx.tgz will be generated for deployment
 ### Support Spark dynamic allocation
 
 To support spark dynamic allocation with Firestorm, spark code should be updated.
-There are 2 patches for spark-2.4.6 and spark-3.1.2 in spark-patches folder for reference.
+There are 3 patches for spark (2.4.6/3.1.2/3.2.1) in spark-patches folder for reference.
 
 After apply the patch and rebuild spark, add following configuration in spark conf to enable dynamic allocation:
   ```
diff --git a/spark-patches/spark-3.2.1_dynamic_allocation_support.patch b/spark-patches/spark-3.2.1_dynamic_allocation_support.patch
new file mode 100644
index 0000000..1e195df
--- /dev/null
+++ b/spark-patches/spark-3.2.1_dynamic_allocation_support.patch
@@ -0,0 +1,92 @@
+diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala
+index 1b4e7ba5106..95818ff72ca 100644
+--- a/core/src/main/scala/org/apache/spark/Dependency.scala
++++ b/core/src/main/scala/org/apache/spark/Dependency.scala
+@@ -174,8 +174,10 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
+       !rdd.isBarrier()
+   }
+ 
+-  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
+-  _rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
++  if (!_rdd.context.getConf.isRssEnable()) {
++    _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
++    _rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
++  }
+ }
+ 
+ 
+diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+index c4b619300b5..821a01985d9 100644
+--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
++++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+@@ -207,7 +207,9 @@ private[spark] class ExecutorAllocationManager(
+       // If dynamic allocation shuffle tracking or worker decommissioning along with
+       // storage shuffle decommissioning is enabled we have *experimental* support for
+       // decommissioning without a shuffle service.
+-      if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED) ||
++      if (conf.isRssEnable()) {
++        logInfo("Dynamic allocation will use remote shuffle service")
++      } else if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED) ||
+           (decommissionEnabled &&
+             conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))) {
+         logWarning("Dynamic allocation without a shuffle service is an experimental feature.")
+diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
+index 5f37a1abb19..af4bee1e1bb 100644
+--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
++++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
+@@ -580,6 +580,10 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
+     Utils.redact(this, getAll).sorted.map { case (k, v) => k + "=" + v }.mkString("\n")
+   }
+ 
++  /**
++   * Return true if remote shuffle service is enabled.
++   */
++  def isRssEnable(): Boolean = get("spark.shuffle.manager", "sort").contains("RssShuffleManager")
+ }
+ 
+ private[spark] object SparkConf extends Logging {
+diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+index a82d261d545..72e54940ca2 100644
+--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
++++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+@@ -2231,7 +2231,8 @@ private[spark] class DAGScheduler(
+     // if the cluster manager explicitly tells us that the entire worker was lost, then
+     // we know to unregister shuffle output.  (Note that "worker" specifically refers to the process
+     // from a Standalone cluster, where the shuffle service lives in the Worker.)
+-    val fileLost = workerHost.isDefined || !env.blockManager.externalShuffleServiceEnabled
++    val fileLost = (workerHost.isDefined || !env.blockManager.externalShuffleServiceEnabled) &&
++      !sc.getConf.isRssEnable()
+     removeExecutorAndUnregisterOutputs(
+       execId = execId,
+       fileLost = fileLost,
+diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+index 3b72103f993..8e03754941e 100644
+--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
++++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+@@ -1015,7 +1015,8 @@ private[spark] class TaskSetManager(
+     // and we are not using an external shuffle server which could serve the shuffle outputs.
+     // The reason is the next stage wouldn't be able to fetch the data from this dead executor
+     // so we would need to rerun these tasks on other executors.
+-    if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled && !isZombie) {
++    if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled &&
++        !isZombie && !conf.isRssEnable()) {
+       for ((tid, info) <- taskInfos if info.executorId == execId) {
+         val index = taskInfos(tid).index
+         // We may have a running task whose partition has been marked as successful,
+diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
+index 47d61196fe8..98a5381bef4 100644
+--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
++++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
+@@ -174,6 +174,9 @@ class ShuffledRowRDD(
+   }
+ 
+   override def getPreferredLocations(partition: Partition): Seq[String] = {
++    if (conf.isRssEnable()) {
++      return Nil
++    }
+     val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+     partition.asInstanceOf[ShuffledRowRDDPartition].spec match {
+       case CoalescedPartitionSpec(startReducerIndex, endReducerIndex, _) =>
+-- 
+2.32.0
+