You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "doki23 (via GitHub)" <gi...@apache.org> on 2024/02/20 12:45:14 UTC

[PR] [SPARK-46992]make dataset.cache() return new ds instance [spark]

doki23 opened a new pull request, #45181:
URL: https://github.com/apache/spark/pull/45181

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'common/utils/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   This pr fixes [SPARK-46992](https://issues.apache.org/jira/projects/SPARK/issues/SPARK-46992).
   It makes cache() method return a new Dataset instance so that we can get cached plan.
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   See comments of [SPARK-46992](https://issues.apache.org/jira/projects/SPARK/issues/SPARK-46992).
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   No. But if users want it, they must change their code to:
   ```scala
   val cached_df = df.cache()
   ```
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   <!--
   If generative AI tooling has been used in the process of authoring this patch, please include the
   phrase: 'Generated-by: ' followed by the name of the tool and its version.
   If no, write 'No'.
   Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]Fix cache consistence [spark]

Posted by "dtarima (via GitHub)" <gi...@apache.org>.
dtarima commented on code in PR #45181:
URL: https://github.com/apache/spark/pull/45181#discussion_r1526282544


##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -193,10 +193,40 @@ private[sql] object Dataset {
  */
 @Stable
 class Dataset[T] private[sql](
-    @DeveloperApi @Unstable @transient val queryExecution: QueryExecution,
+    @DeveloperApi @Unstable @transient val queryUnpersisted: QueryExecution,
     @DeveloperApi @Unstable @transient val encoder: Encoder[T])
   extends Serializable {
 
+  @volatile private var queryPersisted: Option[(Array[Boolean], QueryExecution)] = None
+
+  def queryExecution: QueryExecution = {

Review Comment:
   This method should probably have `@DeveloperApi @Unstable`, and remove `@DeveloperApi` annotation from `queryUnpersisted` above.



##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -193,10 +193,40 @@ private[sql] object Dataset {
  */
 @Stable
 class Dataset[T] private[sql](
-    @DeveloperApi @Unstable @transient val queryExecution: QueryExecution,
+    @DeveloperApi @Unstable @transient val queryUnpersisted: QueryExecution,
     @DeveloperApi @Unstable @transient val encoder: Encoder[T])
   extends Serializable {
 
+  @volatile private var queryPersisted: Option[(Array[Boolean], QueryExecution)] = None
+
+  def queryExecution: QueryExecution = {
+    val cacheStatesSign = queryUnpersisted.computeCacheStateSignature()
+    // If all children aren't cached, directly return the queryUnpersisted
+    if (cacheStatesSign.forall(b => !b)) {

Review Comment:
   nit: `cacheStatesSign.forall(_ == false)` is a bit more readable, and I think it'll make the comment unnecessary



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala:
##########
@@ -369,6 +375,20 @@ class QueryExecution(
     Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, message)
   }
 
+  /**
+   * This method performs a pre-order traversal and return a boolean Array
+   * representing whether some nodes of the logical tree are persisted.
+   */
+  def computeCacheStateSignature(): Array[Boolean] = {

Review Comment:
   How about using `BitSet` for persistence state representation?
   It'll be easier to work with and it's more efficient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]make dataset.cache() return new ds instance [spark]

Posted by "doki23 (via GitHub)" <gi...@apache.org>.
doki23 commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-1975750721

   > @doki23 Do you have time to continue working on the pull request? I seems to me that it's close to completion.
   
   @dtarima Of course, I'm glad to move it forward.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]make dataset.cache() return new ds instance [spark]

Posted by "nchammas (via GitHub)" <gi...@apache.org>.
nchammas commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-1956773367

   > It's not an ideal behavior but should be easy to work around
   
   Just to be clear, do you not consider it a correctness issue? To me, it's a correctness issue since the existing behavior on `master` violates what should be a basic invariant: `df.count()` and `df.collect().size` should always agree.
   
   But this is not always true, as the repro documented in the issue description shows. I also posted my own repro [just above][1].
   
   [1]: https://github.com/apache/spark/pull/45181#pullrequestreview-1893314799


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]Fix "Inconsistent results with 'sort', 'cache', and AQE." [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #45181:
URL: https://github.com/apache/spark/pull/45181#discussion_r1513176460


##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3878,6 +3880,8 @@ class Dataset[T] private[sql](
    */
   def persist(newLevel: StorageLevel): this.type = {
     sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel)
+    queryExecution = new QueryExecution(

Review Comment:
   analysis is eager, blindly create a new `QueryExecution` means always paying for the cost of a repeated analysis.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]Fix "Inconsistent results with 'sort', 'cache', and AQE." [spark]

Posted by "doki23 (via GitHub)" <gi...@apache.org>.
doki23 commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-1979968619

   Maybe [this](https://github.com/apache/spark/pull/45181#issuecomment-1969241145) is the proper solution.
   But we need find all the children of logicalPlan if they're cached:
   ```scala
   def queryExecution: QueryExecution = {
       val cacheManager = queryExecutionUnPersisted.sparkSession.sharedState.cacheManager
       val plan = queryExecutionUnPersisted.logical
       val findCachedFragment = plan.find({
         case command: IgnoreCachedData => false
         case currentFragment => cacheManager.lookupCachedData(currentFragment).isEmpty
       }).isDefined
       if (findCachedFragment) queryExecutionUnPersisted
       else queryExecutionPersisted
     }
   ```
   And I don't know the if it's binary compatible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]Fix cache consistence [spark]

Posted by "dtarima (via GitHub)" <gi...@apache.org>.
dtarima commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-1983260338

   All children have to be considered for changes of their persistence state. Currently it only checks the fist found child.
   For clarity there is a test which fails: https://github.com/doki23/spark/pull/1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]make dataset.cache() return new ds instance [spark]

Posted by "dtarima (via GitHub)" <gi...@apache.org>.
dtarima commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-1968973266

   > > df.count() and df.collect().size should always agree.
   > 
   > how about this idea: when calling `df.collect()`, if the plan is cached but the physical plan is not a cache scan, then we do `df.select("*").collect()` instead of executing the current physical plan.
   
   Possible, but in my mind it's less robust.
   
   Here is a representation of the consequences from the root cause:
   `caching changes the result` -- affects --> `queryExecution caching in Dataset` -- affects --> `collect()`
   
   I know that `collect()` is not the only affected method: `toLocalIterator()` is another one - there might exist more we don't know about, and more could be added in the future. If the root cause is not fixed then there is a high probability of a similar bug to reappear.
   
   Ideally `caching changes the result` should be fixed. If it's impossible by some reason then `queryExecution caching in Dataset` would be the next issue to be fixed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]make dataset.cache() return new ds instance [spark]

Posted by "dtarima (via GitHub)" <gi...@apache.org>.
dtarima commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-1978804309

   > > Regardless of the answer I think it makes sense to use the same approach for both Dataset states (persisted and unpersisted).
   > 
   > I agree. We can cache it in a lazy variable `queryExecutionPersisted`.
   
   :+1:  to `lazy val`
   
   > > The additional responsibility is independent so it should be in a separate method which provides proper QueryExecution: if we do that then we'll get something similar to def queryExecution: QueryExecution method above.
   > 
   > I'm afraid it'll be a user-facing change if we can only access `queryExecution` by method `queryExecution()`.
   
   I don't think we have a choice... Otherwise using unpersisted `QueryExecution` when the dataset is cached may result in inconsistencies. Basically, the bug is user-facing and our fix have to be user-facing too by definition.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]Fix "Inconsistent results with 'sort', 'cache', and AQE." [spark]

Posted by "dtarima (via GitHub)" <gi...@apache.org>.
dtarima commented on code in PR #45181:
URL: https://github.com/apache/spark/pull/45181#discussion_r1513104440


##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -193,10 +193,12 @@ private[sql] object Dataset {
  */
 @Stable
 class Dataset[T] private[sql](
-    @DeveloperApi @Unstable @transient val queryExecution: QueryExecution,
+    @DeveloperApi @Unstable @transient private[sql] var queryExecution: QueryExecution,

Review Comment:
   It's not thread-safe.



##########
sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala:
##########
@@ -134,6 +134,19 @@ class DatasetCacheSuite extends QueryTest
     assert(df.storageLevel == StorageLevel.NONE)
   }
 
+  test("SPARK-46992 collect before persisting") {
+    val ds = (1 to 4).toDF("id").sort("id").sample(0.4, 123)
+    // this check will call ds.collect() first
+    assert(Array(Row(1), Row(4)).sameElements(ds.collect()))
+    // and then cache it
+    ds.cache()
+    // Make sure, the Dataset is indeed cached.
+    assertCached(ds)
+    // Make sure the result of count() is consistent with collect()

Review Comment:
   there is no need to the comments - they are saying the same thing what the code is doing



##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3878,6 +3880,8 @@ class Dataset[T] private[sql](
    */
   def persist(newLevel: StorageLevel): this.type = {
     sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel)
+    queryExecution = new QueryExecution(
+      queryExecution.sparkSession, queryExecution.logical, queryExecution.tracker)

Review Comment:
   there is also `mode: CommandExecutionMode.Value` argument - I don't know its meaning, but I'm pretty sure it'd need to be set too



##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3878,6 +3880,8 @@ class Dataset[T] private[sql](
    */
   def persist(newLevel: StorageLevel): this.type = {
     sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel)
+    queryExecution = new QueryExecution(

Review Comment:
   1. Persisting in `cacheManager` is global - it affects all `Dataset` instances.
   2. `unpersist` on any of `Dataset` instances would require updating `queryExecution` again on all instances with the same `logicalPlan`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]make dataset.cache() return new ds instance [spark]

Posted by "dtarima (via GitHub)" <gi...@apache.org>.
dtarima commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-1976817040

   > What're your ideas?
   
   Looking at the code I think it's going to work and will fix the issue.
   
   I do have a couple of questions though:
   
   1. We'll have an asymmetry: `QueryExecution` instance for the unpersisted state is cached/reused in the `Dataset`, but for the persisted state it's always new (never cached/reused). Do we want to use a "cached" `QueryExecution` instance for performance reasons or it doesn't matter much? Regardless of the answer I think it makes sense to use the same approach for both `Dataset` states (persisted and unpersisted).
   
   If we want to reuse `QueryExecution` instances then we'll need to have both instances in the constructor:
   ```
   class Dataset[T] private[sql](
       @DeveloperApi @Unstable @transient val queryExecutionUnpersisted: QueryExecution,
       @DeveloperApi @Unstable @transient val queryExecutionPersisted: QueryExecution,
   ```
   
   If we don't need to reuse then we should probably have these values instead and always create new `QueryExecution` instances regardless of the `Dataset` persistence state:
   ```
   class Dataset[T] private[sql](
       val sparkSession: SparkSession,
       val logical: LogicalPlan,
       val tracker: QueryPlanningTracker = new QueryPlanningTracker,
       val mode: CommandExecutionMode.Value = CommandExecutionMode.ALL
   ```
   
   Even though the latter approach is simpler, I suspect that we want to reuse `QueryExecution` instances to avoid doing the same work over and over again (the plan analysis).
   
   
   2. Method `withAction` accepts `QueryExecution` so logically it's expected to be used, but it may change it based on the `Dataset` persistence state. Does the additional responsibility belong to `withAction` method? It seems implicit to me: nothing in the name or arguments hints at that. Is this the only place where we need different `QueryExecution` instances? The additional responsibility is independent so it should be in a separate method which provides proper `QueryExecution`: if we do that then we'll get something similar to `def queryExecution: QueryExecution` method above.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]Fix cache consistence [spark]

Posted by "dtarima (via GitHub)" <gi...@apache.org>.
dtarima commented on code in PR #45181:
URL: https://github.com/apache/spark/pull/45181#discussion_r1521457462


##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -193,10 +193,40 @@ private[sql] object Dataset {
  */
 @Stable
 class Dataset[T] private[sql](
-    @DeveloperApi @Unstable @transient val queryExecution: QueryExecution,
+    @DeveloperApi @Unstable @transient val queryUnpersisted: QueryExecution,
     @DeveloperApi @Unstable @transient val encoder: Encoder[T])
   extends Serializable {
 
+  private var queryPersisted: Option[(Array[Boolean], QueryExecution)] = None
+
+  def queryExecution: QueryExecution = {
+    val cacheStatesSign = queryUnpersisted.computeCacheStateSignature()
+    // If all children aren't cached, directly return the queryUnpersisted
+    if (cacheStatesSign.forall(b => !b)) {

Review Comment:
   > I don't think making queryExecution wrapped by AtomicReference means it's thread-safe. For example, we unpersist one of it's children in another thread, and at meanwhile we call ds.count(), the cache consistency may be incorrect.
   
   Yes, consistency of results cannot be guaranteed when persistence state changes concurrently in different threads, but this is not what I was pointing to. Thread safety is a basic concept, not related to business logic: when we change a `var` in one thread, other threads might not see the updated reference. In order to avoid it the reference needs to be marked `volatile`. In the example above I used AtomicReference's `set` for simplicity, but it might make sense to implement it using `compareAndSet` to get additional guarantees.
   
   > Using 2 queryExecution variables may help reduce count of analysis.
   
   I doubt that the additional complexity worth it. It's not a big deal... Let's see what reviewers think.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]make dataset.cache() return new ds instance [spark]

Posted by "dtarima (via GitHub)" <gi...@apache.org>.
dtarima commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-1957724807

   I believe the main issue is that `cache` changes the results (logically it shouldn't have any effect).
   
   This PR creates a new `Dataset` instance, but the old one would still have the same inconsistent behavior.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]Fix "Inconsistent results with 'sort', 'cache', and AQE." [spark]

Posted by "dtarima (via GitHub)" <gi...@apache.org>.
dtarima commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-1980642854

   > > I don't think it fixes the issue completely and there are some problems with the solution. I believe a proper solution is in the following comment: [#45181 (comment)](https://github.com/apache/spark/pull/45181#issuecomment-1969241145)
   > > Also many more tests are needed.
   > 
   > Based on this solution, I think we should def `queryExecution` like this:
   > 
   > ```scala
   > private var queryPersisted: Option[(LogicalPlan, QueryExecution)] = None
   > 
   > def queryExecution: QueryExecution = {
   >     val cacheManager = queryExec.sparkSession.sharedState.cacheManager
   >     val plan = queryExec.logical
   >     plan.find({
   >       case _: IgnoreCachedData => false
   >       case currentFragment => cacheManager.lookupCachedData(currentFragment).isDefined
   >     }) match {
   >       // if we can't find cached plan, we directly return the queryExec
   >       case None =>
   >         queryPersisted = None
   >         queryExec
   >       // we find children plan is cached, we make sure that queryPersisted is consistent
   >       // with the cachedPlan
   >       case Some(cachedPlan) =>
   >         queryPersisted match {
   >           case None =>
   >             val qe = new QueryExecution(queryExec)
   >             queryPersisted = Some(cachedPlan, qe)
   >             qe
   >           case Some((prevCachedPlan, prevCachedQe)) =>
   >             if (prevCachedPlan.sameResult(cachedPlan)) {
   >               prevCachedQe
   >             } else {
   >               val qe = new QueryExecution(queryExec)
   >               // refresh the cached queryPersisted
   >               queryPersisted = Some(cachedPlan, qe)
   >               qe
   >             }
   >         }
   >     }
   >   }
   > ```
   > 
   > Whenever the ds or the child dataset is persisted or unpersisted, we can get newest consistent result.
   
   You're correct that we have to consider the current persistence states of the children nodes :+1: 
   If persistence states are changed concurrently then inconsistencies are inevitable.
   `QueryExecution` must be recalculated each time anything changes underneath. It seems like it should be done in `QueryExecution` itself, not in `Dataset` (once `withCachedData` changes then `withCachedData`, `optimizedPlan`, `sparkPlan`, `executedPlan` need to be recalculated). 
   I think we should start with comprehensive tests and then implement a suitable fix.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]make dataset.cache() return new ds instance [spark]

Posted by "dtarima (via GitHub)" <gi...@apache.org>.
dtarima commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-1975636851

   @doki23 Do you have time to continue working on the pull request? I seems to me that it's close to completion.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]Fix cache consistence [spark]

Posted by "doki23 (via GitHub)" <gi...@apache.org>.
doki23 commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-1984850287

   > All children have to be considered for changes of their persistence state. Currently it only checks the fist found child. For clarity there is a test which fails: [doki23#1](https://github.com/doki23/spark/pull/1)
   
   So, we need a cache state signature for queryExecution


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]Fix cache consistency [spark]

Posted by "dtarima (via GitHub)" <gi...@apache.org>.
dtarima commented on code in PR #45181:
URL: https://github.com/apache/spark/pull/45181#discussion_r1528497911


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala:
##########
@@ -369,6 +375,20 @@ class QueryExecution(
     Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, message)
   }
 
+  /**
+   * This method performs a pre-order traversal and return a boolean Array
+   * representing whether some nodes of the logical tree are persisted.
+   */
+  def computeCacheStateSignature(): Array[Boolean] = {

Review Comment:
   It's not necessary to know the number of bits during construction.
   It's up to you, but just FYI, here is how it'd look like:
   ```scala
       val builder = BitSet.newBuilder
       var index = 0
       normalized.foreach { fragment =>
         val cached = fragment match {
           case _: IgnoreCachedData => false
           case _ => cacheManager.lookupCachedData(fragment).isDefined
         }
         if (cached) builder += index
         index += 1
       }
       builder.result()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]Fix "Inconsistent results with 'sort', 'cache', and AQE." [spark]

Posted by "doki23 (via GitHub)" <gi...@apache.org>.
doki23 commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-1980094365

   > I don't think it fixes the issue completely and there are some problems with the solution. I believe a proper solution is in the following comment: [#45181 (comment)](https://github.com/apache/spark/pull/45181#issuecomment-1969241145)
   > 
   > Also many more tests are needed.
   
   Based on this solution, I think we should def `queryExecution` like this:
   ```scala
   def queryExecution: QueryExecution = {
       val cacheManager = queryExec.sparkSession.sharedState.cacheManager
       val plan = queryExec.logical
       plan.find({
         case _: IgnoreCachedData => false
         case currentFragment => cacheManager.lookupCachedData(currentFragment).isEmpty
       }) match {
         // if we can't find cached plan, we directly return the queryExec
         case None =>
           queryPersisted = None
           queryExec
         // we find children plan is cached, we make sure that queryPersisted is consistent
         // with the cachedPlan
         case Some(cachedPlan) =>
           queryPersisted match {
             case None =>
               val qe = new QueryExecution(queryExec)
               queryPersisted = Some(cachedPlan, qe)
               qe
             case Some((prevCachedPlan, prevCachedQe)) =>
               if (prevCachedPlan.sameResult(cachedPlan)) {
                 prevCachedQe
               } else {
                 val qe = new QueryExecution(queryExec)
                 // refresh the cached queryPersisted
                 queryPersisted = Some(cachedPlan, qe)
                 qe
               }
           }
       }
     }
   ```
   When the ds or the child dataset is persisted or unpersisted, we can get newest consistent result.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]make dataset.cache() return new ds instance [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-1955005622

   Could you enable GitHub Action on your repository, @doki23 ? Apache Spark community uses the contributor's GitHub Action resources.
   
   - https://github.com/apache/spark/pull/45181/checks?check_run_id=21768064346
   
   ![Screenshot 2024-02-20 at 12 22 49](https://github.com/apache/spark/assets/9700541/09dc02a9-8329-41af-b127-52d6274995cd)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]make dataset.cache() return new ds instance [spark]

Posted by "doki23 (via GitHub)" <gi...@apache.org>.
doki23 commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-1958979959

   Hi @dongjoon-hyun @nchammas ,
   I've made some changes, would you please take a look again?
   And are there any problems of this pr?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]make dataset.cache() return new ds instance [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #45181:
URL: https://github.com/apache/spark/pull/45181#discussion_r1496457259


##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3878,7 +3878,7 @@ class Dataset[T] private[sql](
    */
   def persist(newLevel: StorageLevel): this.type = {
     sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel)
-    this
+    Dataset(sparkSession, this.queryExecution.logical).asInstanceOf[this.type]

Review Comment:
   IIUC, Apache Spark's the `persist` data (underlying RDD) can be recomputed always, doesn't it? It's only for the best-effort approach to reduce re-computation. Do we guarantee the cached data's immutability?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]make dataset.cache() return new ds instance [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-1955662603

   It's not an ideal behavior but should be easy to work around (`df.select("*").collect()`). IIUC this PR is also like a workaround, as the original `df` can't apply cache anyway because the physical plan is materialized.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]Fix cache consistence [spark]

Posted by "doki23 (via GitHub)" <gi...@apache.org>.
doki23 commented on code in PR #45181:
URL: https://github.com/apache/spark/pull/45181#discussion_r1521276646


##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -193,10 +193,40 @@ private[sql] object Dataset {
  */
 @Stable
 class Dataset[T] private[sql](
-    @DeveloperApi @Unstable @transient val queryExecution: QueryExecution,
+    @DeveloperApi @Unstable @transient val queryUnpersisted: QueryExecution,
     @DeveloperApi @Unstable @transient val encoder: Encoder[T])
   extends Serializable {
 
+  private var queryPersisted: Option[(Array[Boolean], QueryExecution)] = None
+
+  def queryExecution: QueryExecution = {
+    val cacheStatesSign = queryUnpersisted.computeCacheStateSignature()
+    // If all children aren't cached, directly return the queryUnpersisted
+    if (cacheStatesSign.forall(b => !b)) {

Review Comment:
   I don't think making queryExecution wrapped by `AtomicReference` means it's thread-safe. For example, we unpersist one of it's children in another thread, and at meanwhile we call `ds.count()`, the cache consistency may be incorrect. 
   
   Use 2 queryExecution variables may help reduce count of analysis.



##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -193,10 +193,40 @@ private[sql] object Dataset {
  */
 @Stable
 class Dataset[T] private[sql](
-    @DeveloperApi @Unstable @transient val queryExecution: QueryExecution,
+    @DeveloperApi @Unstable @transient val queryUnpersisted: QueryExecution,
     @DeveloperApi @Unstable @transient val encoder: Encoder[T])
   extends Serializable {
 
+  private var queryPersisted: Option[(Array[Boolean], QueryExecution)] = None
+
+  def queryExecution: QueryExecution = {
+    val cacheStatesSign = queryUnpersisted.computeCacheStateSignature()
+    // If all children aren't cached, directly return the queryUnpersisted
+    if (cacheStatesSign.forall(b => !b)) {

Review Comment:
   I don't think making queryExecution wrapped by `AtomicReference` means it's thread-safe. For example, we unpersist one of it's children in another thread, and at meanwhile we call `ds.count()`, the cache consistency may be incorrect. 
   
   Using 2 queryExecution variables may help reduce count of analysis.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]make dataset.cache() return new ds instance [spark]

Posted by "doki23 (via GitHub)" <gi...@apache.org>.
doki23 commented on code in PR #45181:
URL: https://github.com/apache/spark/pull/45181#discussion_r1496852441


##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3878,7 +3878,7 @@ class Dataset[T] private[sql](
    */
   def persist(newLevel: StorageLevel): this.type = {
     sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel)
-    this
+    Dataset(sparkSession, this.queryExecution.logical).asInstanceOf[this.type]

Review Comment:
   Hmm...Sorry I do not understand your concerns. It does not change the immutability of cached data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]make dataset.cache() return new ds instance [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-1955030719

   cc @cloud-fan and @HyukjinKwon .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]make dataset.cache() return new ds instance [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #45181:
URL: https://github.com/apache/spark/pull/45181#discussion_r1496457259


##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3878,7 +3878,7 @@ class Dataset[T] private[sql](
    */
   def persist(newLevel: StorageLevel): this.type = {
     sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel)
-    this
+    Dataset(sparkSession, this.queryExecution.logical).asInstanceOf[this.type]

Review Comment:
   IIUC, Apache Spark's the `persist` data (underlying RDD) can be recomputed always, does it? It's only for the best-effort approach to reduce re-computation. Do we guarantee the cached data's immutability?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]make dataset.cache() return new ds instance [spark]

Posted by "doki23 (via GitHub)" <gi...@apache.org>.
doki23 commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-1957040482

   > Here is the Python test I am running, which is simplified from the original reproduction that Denis posted:
   
   I've not fixed pyspark's dataframe api, so it should still be incorrect.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]make dataset.cache() return new ds instance [spark]

Posted by "doki23 (via GitHub)" <gi...@apache.org>.
doki23 commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-1978337106

   > Regardless of the answer I think it makes sense to use the same approach for both Dataset states (persisted and unpersisted).
   
   I agree. We can cache it in a lazy variable `queryExecutionPersisted`.
   
   > The additional responsibility is independent so it should be in a separate method which provides proper QueryExecution: if we do that then we'll get something similar to def queryExecution: QueryExecution method above.
   
   I'm afraid it'll be a user-facing change if we can only access `queryExecution` by method `queryExecution()`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]Fix cache consistence [spark]

Posted by "doki23 (via GitHub)" <gi...@apache.org>.
doki23 commented on code in PR #45181:
URL: https://github.com/apache/spark/pull/45181#discussion_r1521276646


##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -193,10 +193,40 @@ private[sql] object Dataset {
  */
 @Stable
 class Dataset[T] private[sql](
-    @DeveloperApi @Unstable @transient val queryExecution: QueryExecution,
+    @DeveloperApi @Unstable @transient val queryUnpersisted: QueryExecution,
     @DeveloperApi @Unstable @transient val encoder: Encoder[T])
   extends Serializable {
 
+  private var queryPersisted: Option[(Array[Boolean], QueryExecution)] = None
+
+  def queryExecution: QueryExecution = {
+    val cacheStatesSign = queryUnpersisted.computeCacheStateSignature()
+    // If all children aren't cached, directly return the queryUnpersisted
+    if (cacheStatesSign.forall(b => !b)) {

Review Comment:
   I don't think that make queryExecution wrapped by `AtomicReference` means it's thread-safe. For example, we unpersist one of it's children in another thread, and at meanwhile we call `ds.count()`, the cache consistency may be incorrect.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]make dataset.cache() return new ds instance [spark]

Posted by "doki23 (via GitHub)" <gi...@apache.org>.
doki23 commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-1959304539

   > This PR creates a new Dataset instance, but the old one would still have the same inconsistent behavior.
   
   Another possible approach is to create a new `queryExecution` instance. But we need change it to a mutable variable which is unsafe.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]make dataset.cache() return new ds instance [spark]

Posted by "nchammas (via GitHub)" <gi...@apache.org>.
nchammas commented on code in PR #45181:
URL: https://github.com/apache/spark/pull/45181#discussion_r1497634106


##########
sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala:
##########
@@ -82,6 +82,26 @@ class DatasetCacheSuite extends QueryTest
     assert(cached.storageLevel == StorageLevel.NONE, "The Dataset should not be cached.")
   }
 
+  test("SPARK-46992 collect before persisting") {
+    val ds = Seq(("a", 1), ("b", 2), ("c", 3)).toDS().select(expr("_2 + 1").as[Int])
+    // collect first
+    ds.collect()
+    // and then cache it
+    val cached = ds.cache()
+    // ds is not cached
+    assertNotCached(ds)
+    // Make sure, the Dataset is indeed cached.
+    assertCached(cached)
+
+    // Check result.
+    checkDataset(
+      cached,
+      2, 3, 4)

Review Comment:
   Are you sure this is a valid test? Because this particular check passes for me on `master`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]Fix "Inconsistent results with 'sort', 'cache', and AQE." [spark]

Posted by "doki23 (via GitHub)" <gi...@apache.org>.
doki23 commented on code in PR #45181:
URL: https://github.com/apache/spark/pull/45181#discussion_r1513722604


##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3878,6 +3880,8 @@ class Dataset[T] private[sql](
    */
   def persist(newLevel: StorageLevel): this.type = {
     sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel)
+    queryExecution = new QueryExecution(

Review Comment:
   So when we judge whether it's necessary to renew a `queryExecutionPersisted` instance, we always need transform down the whole queryExecution's logical plan tree and lookup it's children in the `cacheManager`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]Fix cache consistence [spark]

Posted by "dtarima (via GitHub)" <gi...@apache.org>.
dtarima commented on code in PR #45181:
URL: https://github.com/apache/spark/pull/45181#discussion_r1517869525


##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -193,10 +193,40 @@ private[sql] object Dataset {
  */
 @Stable
 class Dataset[T] private[sql](
-    @DeveloperApi @Unstable @transient val queryExecution: QueryExecution,
+    @DeveloperApi @Unstable @transient val queryUnpersisted: QueryExecution,
     @DeveloperApi @Unstable @transient val encoder: Encoder[T])
   extends Serializable {
 
+  private var queryPersisted: Option[(Array[Boolean], QueryExecution)] = None
+
+  def queryExecution: QueryExecution = {
+    val cacheStatesSign = queryUnpersisted.computeCacheStateSignature()
+    // If all children aren't cached, directly return the queryUnpersisted
+    if (cacheStatesSign.forall(b => !b)) {

Review Comment:
   1. It doesn't look like it's necessary to distinguish between `persisted` and `unpersisted` anymore. If we wanted we could have a cache `Map[State, QueryExecution]` for different states, but I think it'd add unjustified complexity.
   2. We cannot use `var` - it's not thread-safe.
   
   ```scala
   class Dataset[T] private[sql](
       @Unstable @transient val queryExecutionRef: AtomicReference[(Array[Boolean], QueryExecution)],
       @DeveloperApi @Unstable @transient val encoder: Encoder[T])
     extends Serializable {
   
     @DeveloperApi
     def queryExecution: QueryExecution = {
       val (state, queryExecution) = queryExecutionRef.get()
       val newState = queryExecution.computeCacheStateSignature()
   
       if (state.sameElements(newState)) queryExecution
       else {
         val newQueryExecution = new QueryExecution(queryExecution)
         queryExecutionRef.set((newState, newQueryExecution))
         newQueryExecution
       }
     }
   
     ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]Fix cache consistency [spark]

Posted by "doki23 (via GitHub)" <gi...@apache.org>.
doki23 commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-2017785125

   @dongjoon-hyun @cloud-fan @nchammas Hi, would you please take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]Fix cache consistency [spark]

Posted by "doki23 (via GitHub)" <gi...@apache.org>.
doki23 commented on code in PR #45181:
URL: https://github.com/apache/spark/pull/45181#discussion_r1536383716


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala:
##########
@@ -369,6 +375,20 @@ class QueryExecution(
     Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, message)
   }
 
+  /**
+   * This method performs a pre-order traversal and return a boolean Array
+   * representing whether some nodes of the logical tree are persisted.
+   */
+  def computeCacheStateSignature(): Array[Boolean] = {

Review Comment:
   It seems that scala's `BitSet` requires that the length of elements is divisible by 64.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]make dataset.cache() return new ds instance [spark]

Posted by "nchammas (via GitHub)" <gi...@apache.org>.
nchammas commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-1962513221

   > > This PR creates a new Dataset instance, but the old one would still have the same inconsistent behavior.
   > 
   > Another possible approach is to create a new `queryExecution` instance. But we need change it to a mutable variable which is unsafe.
   
   I don't understand the problem well enough to say if this is the better approach. I'll defer to others for the time being.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]make dataset.cache() return new ds instance [spark]

Posted by "dtarima (via GitHub)" <gi...@apache.org>.
dtarima commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-1964179051

   > > This PR creates a new Dataset instance, but the old one would still have the same inconsistent behavior.
   > 
   > Another possible approach is to create a new `queryExecution` instance. But we need change it to a mutable variable which is unsafe.
   
   `Dataset` could keep two instances of `queryExecution` for `persisted` and `unpersisted` states
   (added a bit more detailed description in a ticket's comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]make dataset.cache() return new ds instance [spark]

Posted by "dtarima (via GitHub)" <gi...@apache.org>.
dtarima commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-1978837075

   > We can't cache the queryExecution in the Dataset itself because the queryExecution may come from other Dataset instance. See `isEmpty`:
   > 
   > ```scala
   > def isEmpty: Boolean = withAction("isEmpty", select().limit(1).queryExecution) { plan =>
   >     plan.executeTake(1).isEmpty
   >   }
   > ```
   
   I don't see a problem here.
   Yes, we can only cache our own `queryExecution` instances associated with our `logicalPlan` (the same way it's cached now as `val` in constructor).
   
   `select().limit(1)` will create two more short-lived `Dataset` instances, but we don't care about them - it's just an implementation detail not related to our `Dataset` instance.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]Fix "Inconsistent results with 'sort', 'cache', and AQE." [spark]

Posted by "doki23 (via GitHub)" <gi...@apache.org>.
doki23 commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-1979974166

   > I don't think it fixes the issue completely and there are some problems with the solution. I believe a proper solution is in the following comment: [#45181 (comment)](https://github.com/apache/spark/pull/45181#issuecomment-1969241145)
   > 
   > Also many more tests are needed.
   
   Maybe it is. But we need check logicalPlan's children like this:
   ```scala
   def queryExecution: QueryExecution = {
       val cacheManager = queryExecutionUnPersisted.sparkSession.sharedState.cacheManager
       val plan = queryExecutionUnPersisted.logical
       val findCachedFragment = plan.find({
         case command: IgnoreCachedData => false
         case currentFragment => cacheManager.lookupCachedData(currentFragment).isEmpty
       }).isDefined
       if (findCachedFragment) queryExecutionUnPersisted
       else queryExecutionPersisted
     }
   ```
   And I don't know if it breaks the binary compatibility.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]Fix cache consistency [spark]

Posted by "doki23 (via GitHub)" <gi...@apache.org>.
doki23 commented on code in PR #45181:
URL: https://github.com/apache/spark/pull/45181#discussion_r1527498760


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala:
##########
@@ -369,6 +375,20 @@ class QueryExecution(
     Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, message)
   }
 
+  /**
+   * This method performs a pre-order traversal and return a boolean Array
+   * representing whether some nodes of the logical tree are persisted.
+   */
+  def computeCacheStateSignature(): Array[Boolean] = {

Review Comment:
   `BitSet` requires a `numBits` parameter. I cannot know the number of children in advance. Although current implementation is less efficient, it's still acceptable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]Fix cache consistence [spark]

Posted by "doki23 (via GitHub)" <gi...@apache.org>.
doki23 commented on code in PR #45181:
URL: https://github.com/apache/spark/pull/45181#discussion_r1521276646


##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -193,10 +193,40 @@ private[sql] object Dataset {
  */
 @Stable
 class Dataset[T] private[sql](
-    @DeveloperApi @Unstable @transient val queryExecution: QueryExecution,
+    @DeveloperApi @Unstable @transient val queryUnpersisted: QueryExecution,
     @DeveloperApi @Unstable @transient val encoder: Encoder[T])
   extends Serializable {
 
+  private var queryPersisted: Option[(Array[Boolean], QueryExecution)] = None
+
+  def queryExecution: QueryExecution = {
+    val cacheStatesSign = queryUnpersisted.computeCacheStateSignature()
+    // If all children aren't cached, directly return the queryUnpersisted
+    if (cacheStatesSign.forall(b => !b)) {

Review Comment:
   I don't think making queryExecution wrapped by `AtomicReference` means it's thread-safe. For example, we unpersist one of it's children in another thread, and at meanwhile we call `ds.count()`, the cache consistency may be incorrect.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]make dataset.cache() return new ds instance [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #45181:
URL: https://github.com/apache/spark/pull/45181#discussion_r1496453576


##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3878,7 +3878,7 @@ class Dataset[T] private[sql](
    */
   def persist(newLevel: StorageLevel): this.type = {
     sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel)
-    this
+    Dataset(sparkSession, this.queryExecution.logical).asInstanceOf[this.type]

Review Comment:
   BTW, the original code came from #4686 (at least) and seems to be the default Apache Spark behavior for a long time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]make dataset.cache() return new ds instance [spark]

Posted by "doki23 (via GitHub)" <gi...@apache.org>.
doki23 commented on code in PR #45181:
URL: https://github.com/apache/spark/pull/45181#discussion_r1498639875


##########
sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala:
##########
@@ -82,6 +82,26 @@ class DatasetCacheSuite extends QueryTest
     assert(cached.storageLevel == StorageLevel.NONE, "The Dataset should not be cached.")
   }
 
+  test("SPARK-46992 collect before persisting") {
+    val ds = Seq(("a", 1), ("b", 2), ("c", 3)).toDS().select(expr("_2 + 1").as[Int])
+    // collect first
+    ds.collect()
+    // and then cache it
+    val cached = ds.cache()
+    // ds is not cached
+    assertNotCached(ds)
+    // Make sure, the Dataset is indeed cached.
+    assertCached(cached)
+
+    // Check result.
+    checkDataset(
+      cached,
+      2, 3, 4)

Review Comment:
   It makes sure that the cached data of the new `Dataset` instance is as expected. I'll also add one more case that proves the results of `cached.count()` and `cached.collect()` are consistent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]make dataset.cache() return new ds instance [spark]

Posted by "doki23 (via GitHub)" <gi...@apache.org>.
doki23 commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-1979031337

   I've force updated my pr and now it brings the smallest changes and fixes this issue completely.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]Fix "Inconsistent results with 'sort', 'cache', and AQE." [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #45181:
URL: https://github.com/apache/spark/pull/45181#discussion_r1513173362


##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -193,10 +193,12 @@ private[sql] object Dataset {
  */
 @Stable
 class Dataset[T] private[sql](
-    @DeveloperApi @Unstable @transient val queryExecution: QueryExecution,
+    @DeveloperApi @Unstable @transient private[sql] var queryExecution: QueryExecution,
     @DeveloperApi @Unstable @transient val encoder: Encoder[T])
   extends Serializable {
 
+  def queryExecution(): QueryExecution = queryExecution

Review Comment:
   is this binary compatibility?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]Fix "Inconsistent results with 'sort', 'cache', and AQE." [spark]

Posted by "doki23 (via GitHub)" <gi...@apache.org>.
doki23 commented on code in PR #45181:
URL: https://github.com/apache/spark/pull/45181#discussion_r1513702086


##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -193,10 +193,12 @@ private[sql] object Dataset {
  */
 @Stable
 class Dataset[T] private[sql](
-    @DeveloperApi @Unstable @transient val queryExecution: QueryExecution,
+    @DeveloperApi @Unstable @transient private[sql] var queryExecution: QueryExecution,
     @DeveloperApi @Unstable @transient val encoder: Encoder[T])
   extends Serializable {
 
+  def queryExecution(): QueryExecution = queryExecution

Review Comment:
   I'm not familiar with this, can anyone help take a look?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]make dataset.cache() return new ds instance [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-1969143795

   I think all these actions will go through `Dataset#withAction` so we have a narrow wrist to fix the issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]make dataset.cache() return new ds instance [spark]

Posted by "dtarima (via GitHub)" <gi...@apache.org>.
dtarima commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-1969241145

   `Dataset#withAction` accepts `queryExecution` so all places where it gets it unchanged would need to be fixed, like `withAction("collectAsArrowToR", queryExecution)`.
   And no need to change in other places where `queryExecution` is a different instance, like `withAction("head", limit(n).queryExecution)`. I'm afraid the code is going to be too implicit to quickly grasp the meaning of additional `select` for new readers.
   
   We could have something like this below (choosing a proper instance depending on the caching state in one place):
   ```
   class Dataset[T] private[sql](
       @DeveloperApi @Unstable @transient val queryExecutionUnpersisted: QueryExecution,
       @DeveloperApi @Unstable @transient val queryExecutionPersisted: QueryExecution,
       @DeveloperApi @Unstable @transient val encoder: Encoder[T])
     extends Serializable {
   
     def this(qe: QueryExecution, encoder: Encoder[T]) = {
       this(
         new QueryExecution(qe.sparkSession, qe.logical, qe.tracker, qe.mode),
         new QueryExecution(qe.sparkSession, qe.logical, qe.tracker, qe.mode),
         encoder)
     }
   
     def queryExecution: QueryExecution = {
       val cacheManager = queryExecutionUnpersisted.sparkSession.sharedState.cacheManager
       val plan = queryExecutionUnpersisted.logical
       if (cacheManager.lookupCachedData(plan).isEmpty) queryExecutionUnpersisted
       else queryExecutionPersisted
     }
   
     ...
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]Fix cache consistency [spark]

Posted by "doki23 (via GitHub)" <gi...@apache.org>.
doki23 commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-2016161912

   This pr may get ready. All tests are passed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]Fix "Inconsistent results with 'sort', 'cache', and AQE." [spark]

Posted by "doki23 (via GitHub)" <gi...@apache.org>.
doki23 commented on code in PR #45181:
URL: https://github.com/apache/spark/pull/45181#discussion_r1513704138


##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -193,10 +193,12 @@ private[sql] object Dataset {
  */
 @Stable
 class Dataset[T] private[sql](
-    @DeveloperApi @Unstable @transient val queryExecution: QueryExecution,
+    @DeveloperApi @Unstable @transient private[sql] var queryExecution: QueryExecution,

Review Comment:
   I didn't find any description about accessibility in the comment of `@DeveloperApi`. Is this a tacit agreement among developers?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]make dataset.cache() return new ds instance [spark]

Posted by "doki23 (via GitHub)" <gi...@apache.org>.
doki23 commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-1978809925

   We can't cache the queryExecution in the Dataset itself because the queryExecution may come from other Dataset instance.
   See `isEmpty`:
   ```scala
   def isEmpty: Boolean = withAction("isEmpty", select().limit(1).queryExecution) { plan =>
       plan.executeTake(1).isEmpty
     }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]Fix "Inconsistent results with 'sort', 'cache', and AQE." [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #45181:
URL: https://github.com/apache/spark/pull/45181#discussion_r1513172673


##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -193,10 +193,12 @@ private[sql] object Dataset {
  */
 @Stable
 class Dataset[T] private[sql](
-    @DeveloperApi @Unstable @transient val queryExecution: QueryExecution,
+    @DeveloperApi @Unstable @transient private[sql] var queryExecution: QueryExecution,

Review Comment:
   This is marked as developer API, we can't make it private.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]make dataset.cache() return new ds instance [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-1968268514

   > df.count() and df.collect().size should always agree.
   
   how about this idea: when calling `df.collect()`, if the plan is cached but the physical plan is not a cache scan, then we do `df.select("*").collect()` instead of executing the current physical plan.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]Fix cache consistency [spark]

Posted by "doki23 (via GitHub)" <gi...@apache.org>.
doki23 commented on code in PR #45181:
URL: https://github.com/apache/spark/pull/45181#discussion_r1528675546


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala:
##########
@@ -369,6 +375,20 @@ class QueryExecution(
     Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, message)
   }
 
+  /**
+   * This method performs a pre-order traversal and return a boolean Array
+   * representing whether some nodes of the logical tree are persisted.
+   */
+  def computeCacheStateSignature(): Array[Boolean] = {

Review Comment:
   It looks awesome



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]make dataset.cache() return new ds instance [spark]

Posted by "doki23 (via GitHub)" <gi...@apache.org>.
doki23 commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-1955732649

   > Could you enable GitHub Action on your repository, @doki23 ? Apache Spark community uses the contributor's GitHub Action resources.
   > 
   > * https://github.com/apache/spark/pull/45181/checks?check_run_id=21768064346
   > 
   > ![Screenshot 2024-02-20 at 12 22 49](https://private-user-images.githubusercontent.com/9700541/306405623-09dc02a9-8329-41af-b127-52d6274995cd.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MDg0ODEwODksIm5iZiI6MTcwODQ4MDc4OSwicGF0aCI6Ii85NzAwNTQxLzMwNjQwNTYyMy0wOWRjMDJhOS04MzI5LTQxYWYtYjEyNy01MmQ2Mjc0OTk1Y2QucG5nP1gtQW16LUFsZ29yaXRobT1BV1M0LUhNQUMtU0hBMjU2JlgtQW16LUNyZWRlbnRpYWw9QUtJQVZDT0RZTFNBNTNQUUs0WkElMkYyMDI0MDIyMSUyRnVzLWVhc3QtMSUyRnMzJTJGYXdzNF9yZXF1ZXN0JlgtQW16LURhdGU9MjAyNDAyMjFUMDE1OTQ5WiZYLUFtei1FeHBpcmVzPTMwMCZYLUFtei1TaWduYXR1cmU9N2EyYjVlNDllODYyMDYyNDk5Mjc2MWU4YTcyOTRiZGI4MjIxNDUyYTNhMGZjYjQ4MjczOTkwMzE1NDhlYTg0YiZYLUFtei1TaWduZWRIZWFkZXJzPWhvc3QmYWN0b3JfaWQ9MCZrZXlfaWQ9MCZyZXBvX2lkPTAifQ.GyIY-WgoHrmWQvN2vyfgHKidtJQNy08FanNinshBYE4)
   
   Ok, I've enabled it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]Fix cache consistency [spark]

Posted by "doki23 (via GitHub)" <gi...@apache.org>.
doki23 commented on code in PR #45181:
URL: https://github.com/apache/spark/pull/45181#discussion_r1536383716


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala:
##########
@@ -369,6 +375,20 @@ class QueryExecution(
     Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, message)
   }
 
+  /**
+   * This method performs a pre-order traversal and return a boolean Array
+   * representing whether some nodes of the logical tree are persisted.
+   */
+  def computeCacheStateSignature(): Array[Boolean] = {

Review Comment:
   It seems that scala's `BitSet` doesn't record the last index so that I can't judge the num of fragments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46992]Fix cache consistency [spark]

Posted by "doki23 (via GitHub)" <gi...@apache.org>.
doki23 commented on code in PR #45181:
URL: https://github.com/apache/spark/pull/45181#discussion_r1528675546


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala:
##########
@@ -369,6 +375,20 @@ class QueryExecution(
     Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, message)
   }
 
+  /**
+   * This method performs a pre-order traversal and return a boolean Array
+   * representing whether some nodes of the logical tree are persisted.
+   */
+  def computeCacheStateSignature(): Array[Boolean] = {

Review Comment:
   It looks awesome



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org