You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2021/10/14 21:38:19 UTC

[spark] branch branch-3.0 updated: [SPARK-23626][CORE] Eagerly compute RDD.partitions on entire DAG when submitting job to DAGScheduler

This is an automated email from the ASF dual-hosted git repository.

joshrosen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 1709265  [SPARK-23626][CORE] Eagerly compute RDD.partitions on entire DAG when submitting job to DAGScheduler
1709265 is described below

commit 1709265af1589ffa9e44d050bfa913aa0fd27dea
Author: Josh Rosen <jo...@databricks.com>
AuthorDate: Thu Oct 14 14:34:24 2021 -0700

    [SPARK-23626][CORE] Eagerly compute RDD.partitions on entire DAG when submitting job to DAGScheduler
    
    ### What changes were proposed in this pull request?
    
    This PR fixes a longstanding issue where the `DAGScheduler'`s single-threaded event processing loop could become blocked by slow `RDD.getPartitions()` calls, preventing other events (like task completions and concurrent job submissions) from being processed in a timely manner.
    
    With this patch's change, Spark will now call `.partitions` on every RDD in the DAG before submitting a job to the scheduler, ensuring that the expensive `getPartitions()` calls occur outside of the scheduler event loop.
    
    #### Background
    
    The `RDD.partitions` method lazily computes an RDD's partitions by calling `RDD.getPartitions()`. The `getPartitions()` method is invoked only once per RDD and its result is cached in the `RDD.partitions_` private field. Sometimes the `getPartitions()` call can be expensive: for example, `HadoopRDD.getPartitions()` performs file listing operations.
    
    The `.partitions` method is invoked at many different places in Spark's code, including many existing call sites that are outside of the scheduler event loop. As a result, it's _often_ the case that an RDD's partitions will have been computed before the RDD is submitted to the DAGScheduler. For example, [`submitJob` calls `rdd.partitions.length`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.sc [...]
    
    However, there's still some cases where `partitions` gets evaluated for the first time inside of the `DAGScheduler` internals. For example, [`ShuffledRDD.getPartitions`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala#L92-L94) doesn't call `.partitions` on the RDD being shuffled, so a plan with a ShuffledRDD at the root won't necessarily result in `.partitions` having been called on all RDDs prior [...]
    
    #### Correctness: proving that we make no excess `.partitions` calls
    
    This PR adds code to traverse the DAG prior to job submission and call `.partitions` on every RDD encountered.
    
    I'd like to argue that this results in no _excess_ `.partitions` calls: in every case where the new code calls `.partitions` there is existing code which would have called `.partitions` at some point during a successful job execution:
    
    - Assume that this is the first time we are computing every RDD in the DAG.
    - Every RDD appears in some stage.
    - [`submitStage` will call `submitMissingTasks`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1438) on every stage root RDD.
    - [`submitStage` calls `getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1687-L1696) on every stage root RDD.
    - [`getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2995-L3043) visits the RDD and all of its parents RDDs that are computed in the same stage (via narrow dependencies) and calls `.partitions` on each RDD visited.
    - Therefore `.partitions` is invoked on every RDD in the DAG by the time the job has successfully completed.
    - Therefore this patch's change does not introduce any new calls to `.partitions` which would not have otherwise occurred (assuming the job succeeded).
    
    #### Ordering of `.partitions` calls
    
    I don't think the order in which `.partitions` calls occur matters for correctness: the DAGScheduler happens to invoke `.partitions` in a particular order today (defined by the DAG traversal order in internal scheduler methods), but there's many  lots of out-of-order `.partition` calls occurring elsewhere in the codebase.
    
    #### Handling of exceptions in `.partitions`
    
    I've chosen **not** to add special error-handling for the new `.partitions` calls: if exceptions occur then they'll bubble up, unwrapped, to the user code submitting the Spark job.
    
    It's sometimes important to preserve exception wrapping behavior, but I don't think that concern is warranted in this particular case: whether `getPartitions` occurred inside or outside of the scheduler (impacting whether exceptions manifest in wrapped or unwrapped form, and impacting whether failed jobs appear in the Spark UI) was not crisply defined (and in some rare cases could even be [influenced by Spark settings in non-obvious ways](https://github.com/apache/spark/blob/10d530317 [...]
    
    #### Should this have a configuration flag?
    
    Per discussion from a previous PR trying to solve this problem (https://github.com/apache/spark/pull/24438#pullrequestreview-232692586), I've decided to skip adding a configuration flag for this.
    
    ### Why are the changes needed?
    
    This fixes a longstanding scheduler performance problem which has been reported by multiple users.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    I added a regression test in `BasicSchedulerIntegrationSuite` to cover the regular job submission codepath (`DAGScheduler.submitJob`)This test uses CountDownLatches to simulate the submission of a job containing an RDD with a slow `getPartitions()` call and checks that a concurrently-submitted job is not blocked.
    
    I have **not** added separate integration tests for the `runApproximateJob` and `submitMapStage` codepaths (both of which also received the same fix).
    
    Closes #34265 from JoshRosen/SPARK-23626.
    
    Authored-by: Josh Rosen <jo...@databricks.com>
    Signed-off-by: Josh Rosen <jo...@databricks.com>
    (cherry picked from commit c4e975e175c01f67ece7ae492a79554ad1b44106)
    Signed-off-by: Josh Rosen <jo...@databricks.com>
---
 .../org/apache/spark/scheduler/DAGScheduler.scala  |  45 ++++++++
 .../scheduler/BlacklistIntegrationSuite.scala      |   4 +-
 .../scheduler/SchedulerIntegrationSuite.scala      | 120 ++++++++++++++++++---
 3 files changed, 155 insertions(+), 14 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b483b52..28f36d7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -607,6 +607,35 @@ private[spark] class DAGScheduler(
     missing.toList
   }
 
+  /** Invoke `.partitions` on the given RDD and all of its ancestors  */
+  private def eagerlyComputePartitionsForRddAndAncestors(rdd: RDD[_]): Unit = {
+    val startTime = System.nanoTime
+    val visitedRdds = new HashSet[RDD[_]]
+    // We are manually maintaining a stack here to prevent StackOverflowError
+    // caused by recursively visiting
+    val waitingForVisit = new ListBuffer[RDD[_]]
+    waitingForVisit += rdd
+
+    def visit(rdd: RDD[_]): Unit = {
+      if (!visitedRdds(rdd)) {
+        visitedRdds += rdd
+
+        // Eagerly compute:
+        rdd.partitions
+
+        for (dep <- rdd.dependencies) {
+          waitingForVisit.prepend(dep.rdd)
+        }
+      }
+    }
+
+    while (waitingForVisit.nonEmpty) {
+      visit(waitingForVisit.remove(0))
+    }
+    logDebug("eagerlyComputePartitionsForRddAndAncestors for RDD %d took %f seconds"
+      .format(rdd.id, (System.nanoTime - startTime) / 1e9))
+  }
+
   /**
    * Registers the given jobId among the jobs that need the given stage and
    * all of that stage's ancestors.
@@ -716,6 +745,11 @@ private[spark] class DAGScheduler(
           "Total number of partitions: " + maxPartitions)
     }
 
+    // SPARK-23626: `RDD.getPartitions()` can be slow, so we eagerly compute
+    // `.partitions` on every RDD in the DAG to ensure that `getPartitions()`
+    // is evaluated outside of the DAGScheduler's single-threaded event loop:
+    eagerlyComputePartitionsForRddAndAncestors(rdd)
+
     val jobId = nextJobId.getAndIncrement()
     if (partitions.isEmpty) {
       val clonedProperties = Utils.cloneProperties(properties)
@@ -804,6 +838,12 @@ private[spark] class DAGScheduler(
       listenerBus.post(SparkListenerJobEnd(jobId, time, JobSucceeded))
       return new PartialResult(evaluator.currentResult(), true)
     }
+
+    // SPARK-23626: `RDD.getPartitions()` can be slow, so we eagerly compute
+    // `.partitions` on every RDD in the DAG to ensure that `getPartitions()`
+    // is evaluated outside of the DAGScheduler's single-threaded event loop:
+    eagerlyComputePartitionsForRddAndAncestors(rdd)
+
     val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
     val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
     eventProcessLoop.post(JobSubmitted(
@@ -836,6 +876,11 @@ private[spark] class DAGScheduler(
       throw new SparkException("Can't run submitMapStage on RDD with 0 partitions")
     }
 
+    // SPARK-23626: `RDD.getPartitions()` can be slow, so we eagerly compute
+    // `.partitions` on every RDD in the DAG to ensure that `getPartitions()`
+    // is evaluated outside of the DAGScheduler's single-threaded event loop:
+    eagerlyComputePartitionsForRddAndAncestors(rdd)
+
     // We create a JobWaiter with only one "task", which will be marked as complete when the whole
     // map stage has completed, and will be passed the MapOutputStatistics for that stage.
     // This makes it easier to avoid race conditions between the user code and the map output
diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
index 246d4b2..16d9619 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
@@ -112,7 +112,7 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
       backend.taskFailed(taskDescription, new RuntimeException("test task failure"))
     }
     withBackend(runBackend _) {
-      val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
+      val jobFuture = submit(new MockRDD(sc, 10, Nil, Nil), (0 until 10).toArray)
       awaitJobTermination(jobFuture, duration)
       val pattern = (
         s"""|Aborting TaskSet 0.0 because task .*
@@ -150,7 +150,7 @@ class MockRDDWithLocalityPrefs(
     sc: SparkContext,
     numPartitions: Int,
     shuffleDeps: Seq[ShuffleDependency[Int, Int, Nothing]],
-    val preferredLoc: String) extends MockRDD(sc, numPartitions, shuffleDeps) {
+    val preferredLoc: String) extends MockRDD(sc, numPartitions, shuffleDeps, Nil) {
   override def getPreferredLocations(split: Partition): Seq[String] = {
     Seq(preferredLoc)
   }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index dff8975..d92d0c0 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.scheduler
 
 import java.util.Properties
-import java.util.concurrent.{TimeoutException, TimeUnit}
+import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit}
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
 
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
@@ -204,7 +204,13 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
   def shuffle(nParts: Int, input: MockRDD): MockRDD = {
     val partitioner = new HashPartitioner(nParts)
     val shuffleDep = new ShuffleDependency[Int, Int, Nothing](input, partitioner)
-    new MockRDD(sc, nParts, List(shuffleDep))
+    new MockRDD(sc, nParts, List(shuffleDep), Nil)
+  }
+
+  /** models a one-to-one dependency within a stage, like a map or filter */
+  def oneToOne(input: MockRDD): MockRDD = {
+    val dep = new OneToOneDependency[(Int, Int)](input)
+    new MockRDD(sc, input.numPartitions, Nil, Seq(dep))
   }
 
   /** models a stage boundary with multiple dependencies, like a join */
@@ -213,7 +219,7 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
     val shuffleDeps = inputs.map { inputRDD =>
       new ShuffleDependency[Int, Int, Nothing](inputRDD, partitioner)
     }
-    new MockRDD(sc, nParts, shuffleDeps)
+    new MockRDD(sc, nParts, shuffleDeps, Nil)
   }
 
   val backendException = new AtomicReference[Exception](null)
@@ -448,10 +454,11 @@ case class ExecutorTaskStatus(host: String, executorId: String, var freeCores: I
 class MockRDD(
   sc: SparkContext,
   val numPartitions: Int,
-  val shuffleDeps: Seq[ShuffleDependency[Int, Int, Nothing]]
-) extends RDD[(Int, Int)](sc, shuffleDeps) with Serializable {
+  val shuffleDeps: Seq[ShuffleDependency[Int, Int, Nothing]],
+  val oneToOneDeps: Seq[OneToOneDependency[(Int, Int)]]
+) extends RDD[(Int, Int)](sc, deps = shuffleDeps ++ oneToOneDeps) with Serializable {
 
-  MockRDD.validate(numPartitions, shuffleDeps)
+  MockRDD.validate(numPartitions, shuffleDeps, oneToOneDeps)
 
   override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
     throw new RuntimeException("should not be reached")
@@ -467,14 +474,25 @@ class MockRDD(
 object MockRDD extends AssertionsHelper with TripleEquals with Assertions {
   /**
    * make sure all the shuffle dependencies have a consistent number of output partitions
+   * and that one-to-one dependencies have the same partition counts as their parents
    * (mostly to make sure the test setup makes sense, not that Spark itself would get this wrong)
    */
-  def validate(numPartitions: Int, dependencies: Seq[ShuffleDependency[_, _, _]]): Unit = {
-    dependencies.foreach { dependency =>
+  def validate(
+      numPartitions: Int,
+      shuffleDependencies: Seq[ShuffleDependency[_, _, _]],
+      oneToOneDependencies: Seq[OneToOneDependency[_]]): Unit = {
+    shuffleDependencies.foreach { dependency =>
       val partitioner = dependency.partitioner
       assert(partitioner != null)
       assert(partitioner.numPartitions === numPartitions)
     }
+    oneToOneDependencies.foreach { dependency =>
+      // In order to support the SPARK-23626 testcase, we cast to MockRDD
+      // and access `numPartitions` instead of just calling `getNumPartitions`:
+      // `getNumPartitions` would call `getPartitions`, undermining the intention
+      // of the SPARK-23626 testcase.
+      assert(dependency.rdd.asInstanceOf[MockRDD].numPartitions === numPartitions)
+    }
   }
 }
 
@@ -538,7 +556,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
       backend.taskSuccess(taskDescripition, 42)
     }
     withBackend(runBackend _) {
-      val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
+      val jobFuture = submit(new MockRDD(sc, 10, Nil, Nil), (0 until 10).toArray)
       awaitJobTermination(jobFuture, duration)
     }
     assert(results === (0 until 10).map { _ -> 42 }.toMap)
@@ -563,7 +581,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
       }
     }
 
-    val a = new MockRDD(sc, 2, Nil)
+    val a = new MockRDD(sc, 2, Nil, Nil)
     val b = shuffle(10, a)
     val c = shuffle(20, a)
     val d = join(30, b, c)
@@ -603,7 +621,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
    * (b) we get a second attempt for stage 0 & stage 1
    */
   testScheduler("job with fetch failure") {
-    val input = new MockRDD(sc, 2, Nil)
+    val input = new MockRDD(sc, 2, Nil, Nil)
     val shuffledRdd = shuffle(10, input)
     val shuffleId = shuffledRdd.shuffleDeps.head.shuffleId
 
@@ -645,10 +663,88 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
       backend.taskFailed(taskDescription, new RuntimeException("test task failure"))
     }
     withBackend(runBackend _) {
-      val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
+      val jobFuture = submit(new MockRDD(sc, 10, Nil, Nil), (0 until 10).toArray)
       awaitJobTermination(jobFuture, duration)
       assert(failure.getMessage.contains("test task failure"))
     }
     assertDataStructuresEmpty(noFailure = false)
   }
+
+  testScheduler("SPARK-23626: RDD with expensive getPartitions() doesn't block scheduler loop") {
+    // Before SPARK-23626, expensive `RDD.getPartitions()` calls might occur inside of the
+    // DAGScheduler event loop, causing concurrently-submitted jobs to block. This test case
+    // reproduces a scenario where that blocking could occur.
+
+    // We'll use latches to simulate an RDD with a slow getPartitions() call.
+    import MockRDDWithSlowGetPartitions._
+
+    // DAGScheduler.submitJob calls `.partitions` on the RDD passed to it.
+    // Therefore to write a proper regression test for SPARK-23626 we must
+    // ensure that the slow getPartitions() call occurs deeper in the RDD DAG:
+    val rddWithSlowGetPartitions = oneToOne(new MockRDDWithSlowGetPartitions(sc, 1))
+
+    // A RDD whose execution should not be blocked by the other RDD's slow getPartitions():
+    val simpleRdd = new MockRDD(sc, 1, Nil, Nil)
+
+    getPartitionsShouldNotHaveBeenCalledYet.set(false)
+
+    def runBackend(): Unit = {
+      val (taskDescription, _) = backend.beginTask()
+      backend.taskSuccess(taskDescription, 42)
+    }
+
+    withBackend(runBackend _) {
+      // Submit a job containing an RDD which will hang in getPartitions() until we release
+      // the countdown latch:
+      import scala.concurrent.ExecutionContext.Implicits.global
+      val slowJobFuture = Future { submit(rddWithSlowGetPartitions, Array(0)) }.flatten
+
+      // Block the current thread until the other thread has started the getPartitions() call:
+      beginGetPartitionsLatch.await(duration.toSeconds, SECONDS)
+
+      // Submit a concurrent job. This job's execution should not be blocked by the other job:
+      val fastJobFuture = submit(simpleRdd, Array(0))
+      awaitJobTermination(fastJobFuture, duration)
+
+      // The slow job should still be blocked in the getPartitions() call:
+      assert(!slowJobFuture.isCompleted)
+
+      // Allow it to complete:
+      endGetPartitionsLatch.countDown()
+      awaitJobTermination(slowJobFuture, duration)
+    }
+
+    assertDataStructuresEmpty()
+  }
+}
+
+/** Helper class used in SPARK-23626 test case */
+private object MockRDDWithSlowGetPartitions {
+  // Latch for blocking the test execution thread until getPartitions() has been called:
+  val beginGetPartitionsLatch = new CountDownLatch(1)
+
+  // Latch for blocking the getPartitions() call from completing:
+  val endGetPartitionsLatch = new CountDownLatch(1)
+
+  // Atomic boolean which is used to fail the test in case getPartitions() is called earlier
+  // than expected. This guards against false-negatives (e.g. the test passing because
+  // `.getPartitions()` was called in the test setup before we even submitted a job):
+  val getPartitionsShouldNotHaveBeenCalledYet = new AtomicBoolean(true)
+}
+
+/** Helper class used in SPARK-23626 test case */
+private class MockRDDWithSlowGetPartitions(
+    sc: SparkContext,
+    numPartitions: Int) extends MockRDD(sc, numPartitions, Nil, Nil) {
+  import MockRDDWithSlowGetPartitions._
+
+  override def getPartitions: Array[Partition] = {
+    if (getPartitionsShouldNotHaveBeenCalledYet.get()) {
+      throw new Exception("getPartitions() should not have been called at this point")
+    }
+    beginGetPartitionsLatch.countDown()
+    val partitions = super.getPartitions
+    endGetPartitionsLatch.await()
+    partitions
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org