You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/01/05 07:59:07 UTC
[spark] branch master updated: [SPARK-8582][CORE] Checkpoint eagerly when asked to do so for real
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6061e60e [SPARK-8582][CORE] Checkpoint eagerly when asked to do so for real
6061e60e is described below
commit 6061e60ed7691bf3ca0e0964acac4c53c69bcc07
Author: Devesh Agrawal <da...@uber.com>
AuthorDate: Wed Jan 5 15:58:16 2022 +0800
[SPARK-8582][CORE] Checkpoint eagerly when asked to do so for real
Run checkpoint job only once when asked to do so eagerly.
### What changes were proposed in this pull request?
The flow is like so:
```
- df.checkpoint(eager = true, reliable = true)
- rdd = get rdd from this df's physical plan
- rdd.checkpoint (just marks checkpointData)
- rdd.count (if eager = true)
- SparkContext.runJob for all the partitions
- DAGScheduler.runJob
- rdd.doCheckpoint
- ReliableCheckpointRDD#writeRDDToCheckpointDirectory
- SparkContext.runJob for all the partitions
- DAGScheduler.runJob (<-- This is the repeat job)
```
The local checkpointing case is better because there it will just
recompute the missing partitions.
We tried a fix where we just replaced `rdd.count` above with
`rdd.doCheckpoint` and it seemed to work and pass the unit tests.
So the new flow is simply:
```
- df.checkpoint(eager = true, reliable = true)
- rdd = get rdd from this df's physical plan
- rdd.checkpoint (just marks checkpointData)
- rdd.doCheckpoint (if eager = true)
- ReliableCheckpointRDD#writeRDDToCheckpointDirectory
- SparkContext.runJob for all the partitions
- DAGScheduler.runJob (<-- Only one job is run)
```
### Why are the changes needed?
This simple fix drastically improves spark jobs that make heavy use of
Dataframe.checkpoint.
### Does this PR introduce _any_ user-facing change?
Yes, it would make eager checkpointing jobs supposedly faster by doing half as many spark jobs.
### How was this patch tested?
Customer spark apps using checkpoint with this fix see half as many
spark jobs launched, seeing upto 50% less runtime in some cases.
Also, added one more unit test to check that only job is created.
This patch may have some interactions with the Spark-Streaming, since it
touches the codepaths enabled via the config
spark.checkpoint.checkpointAllMarkedAncestors, so would be happy to add
more testing there if pointed in the right direction.
Closes #35005 from agrawaldevesh/master.
Authored-by: Devesh Agrawal <da...@uber.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../main/scala/org/apache/spark/sql/Dataset.scala | 2 +-
.../scala/org/apache/spark/sql/DatasetSuite.scala | 27 ++++++++++++++++++++--
2 files changed, 26 insertions(+), 3 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 926f3ed..9dd38d8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -684,7 +684,7 @@ class Dataset[T] private[sql](
}
if (eager) {
- internalRdd.count()
+ internalRdd.doCheckpoint()
}
// Takes the first leaf partitioning whenever we see a `PartitioningCollection`. Otherwise the
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 2ca3f20..2ce0754 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -25,6 +25,8 @@ import org.scalatest.exceptions.TestFailedException
import org.scalatest.prop.TableDrivenPropertyChecks._
import org.apache.spark.{SparkException, TaskContext}
+import org.apache.spark.TestUtils.withListener
+import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.{FooClassWithEnum, FooEnum, ScroogeLikeExample}
import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder}
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi}
@@ -1457,8 +1459,29 @@ class DatasetSuite extends QueryTest
}
testCheckpointing("basic") {
- val ds = spark.range(10).repartition($"id" % 2).filter($"id" > 5).orderBy($"id".desc)
- val cp = if (reliable) ds.checkpoint(eager) else ds.localCheckpoint(eager)
+ val ds = spark
+ .range(10)
+ // Num partitions is set to 1 to avoid a RangePartitioner in the orderBy below
+ .repartition(1, $"id" % 2)
+ .filter($"id" > 5)
+ .orderBy($"id".desc)
+ @volatile var jobCounter = 0
+ val listener = new SparkListener {
+ override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+ jobCounter += 1
+ }
+ }
+ var cp = ds
+ withListener(spark.sparkContext, listener) { _ =>
+ // AQE adds a job per shuffle. The expression above does multiple shuffles and
+ // that screws up the job counting
+ withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+ cp = if (reliable) ds.checkpoint(eager) else ds.localCheckpoint(eager)
+ }
+ }
+ if (eager) {
+ assert(jobCounter === 1)
+ }
val logicalRDD = cp.logicalPlan match {
case plan: LogicalRDD => plan
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org