You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/01/05 07:59:07 UTC

[spark] branch master updated: [SPARK-8582][CORE] Checkpoint eagerly when asked to do so for real

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6061e60e [SPARK-8582][CORE] Checkpoint eagerly when asked to do so for real
6061e60e is described below

commit 6061e60ed7691bf3ca0e0964acac4c53c69bcc07
Author: Devesh Agrawal <da...@uber.com>
AuthorDate: Wed Jan 5 15:58:16 2022 +0800

    [SPARK-8582][CORE] Checkpoint eagerly when asked to do so for real
    
    Run checkpoint job only once when asked to do so eagerly.
    
    ### What changes were proposed in this pull request?
    
    The flow is like so:
    ```
    - df.checkpoint(eager = true, reliable = true)
      - rdd = get rdd from this df's physical plan
      - rdd.checkpoint (just marks checkpointData)
      - rdd.count (if eager = true)
        - SparkContext.runJob for all the partitions
          - DAGScheduler.runJob
          - rdd.doCheckpoint
            - ReliableCheckpointRDD#writeRDDToCheckpointDirectory
              - SparkContext.runJob for all the partitions
              - DAGScheduler.runJob (<-- This is the repeat job)
    ```
    
    The local checkpointing case is better because there it will just
    recompute the missing partitions.
    
    We tried a fix where we just replaced `rdd.count` above with
    `rdd.doCheckpoint` and it seemed to work and pass the unit tests.
    
    So the new flow is simply:
    ```
    - df.checkpoint(eager = true, reliable = true)
      - rdd = get rdd from this df's physical plan
      - rdd.checkpoint (just marks checkpointData)
      - rdd.doCheckpoint (if eager = true)
        - ReliableCheckpointRDD#writeRDDToCheckpointDirectory
          - SparkContext.runJob for all the partitions
          - DAGScheduler.runJob (<-- Only one job is run)
    ```
    
    ### Why are the changes needed?
    This simple fix drastically improves spark jobs that make heavy use of
    Dataframe.checkpoint.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, it would make eager checkpointing jobs supposedly faster by doing half as many spark jobs.
    
    ### How was this patch tested?
    
    Customer spark apps using checkpoint with this fix see half as many
    spark jobs launched, seeing upto 50% less runtime in some cases.
    Also, added one more unit test to check that only job is created.
    
    This patch may have some interactions with the Spark-Streaming, since it
    touches the codepaths enabled via the config
    spark.checkpoint.checkpointAllMarkedAncestors, so would be happy to add
    more testing there if pointed in the right direction.
    
    Closes #35005 from agrawaldevesh/master.
    
    Authored-by: Devesh Agrawal <da...@uber.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  2 +-
 .../scala/org/apache/spark/sql/DatasetSuite.scala  | 27 ++++++++++++++++++++--
 2 files changed, 26 insertions(+), 3 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 926f3ed..9dd38d8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -684,7 +684,7 @@ class Dataset[T] private[sql](
       }
 
       if (eager) {
-        internalRdd.count()
+        internalRdd.doCheckpoint()
       }
 
       // Takes the first leaf partitioning whenever we see a `PartitioningCollection`. Otherwise the
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 2ca3f20..2ce0754 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -25,6 +25,8 @@ import org.scalatest.exceptions.TestFailedException
 import org.scalatest.prop.TableDrivenPropertyChecks._
 
 import org.apache.spark.{SparkException, TaskContext}
+import org.apache.spark.TestUtils.withListener
+import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
 import org.apache.spark.sql.catalyst.{FooClassWithEnum, FooEnum, ScroogeLikeExample}
 import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder}
 import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi}
@@ -1457,8 +1459,29 @@ class DatasetSuite extends QueryTest
       }
 
       testCheckpointing("basic") {
-        val ds = spark.range(10).repartition($"id" % 2).filter($"id" > 5).orderBy($"id".desc)
-        val cp = if (reliable) ds.checkpoint(eager) else ds.localCheckpoint(eager)
+        val ds = spark
+          .range(10)
+          // Num partitions is set to 1 to avoid a RangePartitioner in the orderBy below
+          .repartition(1, $"id" % 2)
+          .filter($"id" > 5)
+          .orderBy($"id".desc)
+        @volatile var jobCounter = 0
+        val listener = new SparkListener {
+          override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+            jobCounter += 1
+          }
+        }
+        var cp = ds
+        withListener(spark.sparkContext, listener) { _ =>
+          // AQE adds a job per shuffle. The expression above does multiple shuffles and
+          // that screws up the job counting
+          withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+            cp = if (reliable) ds.checkpoint(eager) else ds.localCheckpoint(eager)
+          }
+        }
+        if (eager) {
+          assert(jobCounter === 1)
+        }
 
         val logicalRDD = cp.logicalPlan match {
           case plan: LogicalRDD => plan

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org