You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "dtarima (via GitHub)" <gi...@apache.org> on 2024/03/05 16:21:09 UTC

Re: [PR] [SPARK-46992]Fix "Inconsistent results with 'sort', 'cache', and AQE." [spark]

dtarima commented on code in PR #45181:
URL: https://github.com/apache/spark/pull/45181#discussion_r1513104440


##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -193,10 +193,12 @@ private[sql] object Dataset {
  */
 @Stable
 class Dataset[T] private[sql](
-    @DeveloperApi @Unstable @transient val queryExecution: QueryExecution,
+    @DeveloperApi @Unstable @transient private[sql] var queryExecution: QueryExecution,

Review Comment:
   It's not thread-safe.



##########
sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala:
##########
@@ -134,6 +134,19 @@ class DatasetCacheSuite extends QueryTest
     assert(df.storageLevel == StorageLevel.NONE)
   }
 
+  test("SPARK-46992 collect before persisting") {
+    val ds = (1 to 4).toDF("id").sort("id").sample(0.4, 123)
+    // this check will call ds.collect() first
+    assert(Array(Row(1), Row(4)).sameElements(ds.collect()))
+    // and then cache it
+    ds.cache()
+    // Make sure, the Dataset is indeed cached.
+    assertCached(ds)
+    // Make sure the result of count() is consistent with collect()

Review Comment:
   there is no need to the comments - they are saying the same thing what the code is doing



##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3878,6 +3880,8 @@ class Dataset[T] private[sql](
    */
   def persist(newLevel: StorageLevel): this.type = {
     sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel)
+    queryExecution = new QueryExecution(
+      queryExecution.sparkSession, queryExecution.logical, queryExecution.tracker)

Review Comment:
   there is also `mode: CommandExecutionMode.Value` argument - I don't know its meaning, but I'm pretty sure it'd need to be set too



##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3878,6 +3880,8 @@ class Dataset[T] private[sql](
    */
   def persist(newLevel: StorageLevel): this.type = {
     sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel)
+    queryExecution = new QueryExecution(

Review Comment:
   1. Persisting in `cacheManager` is global - it affects all `Dataset` instances.
   2. `unpersist` on any of `Dataset` instances would require updating `queryExecution` again on all instances with the same `logicalPlan`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org