You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/10/06 12:25:30 UTC

[GitHub] [spark] zhengruifeng opened a new pull request, #38130: [SPARK-40556][PS][SQL][WIP] Eagerly clean temp RDD cached in `AttachDistributedSequenceExec`

zhengruifeng opened a new pull request, #38130:
URL: https://github.com/apache/spark/pull/38130

   ### What changes were proposed in this pull request?
   
   1. In AQE, explicitly invoke `cleanupResources` after each stage finished: In existing AQE, only `SortExec` override `cleanupResources` and will be called in `SortMergeJoinExec` (in `SortMergeJoinScanner` in the executors); for other cases, a physical operator's `cleanupResources` will never be invoked;
   2. In `AttachDistributedSequenceExec`, unpersist the cached RDD in `cleanupResources`
   
   ### Why are the changes needed?
   1, make `cleanupResources` works in general cases;
   2, unpersist the temp RDD in `AttachDistributedSequenceExec` ASAP;
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   existing UT
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #38130: [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly clean Pandas Index Cache

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on PR #38130:
URL: https://github.com/apache/spark/pull/38130#issuecomment-1274074665

   cc @HyukjinKwon @ueshin @xinrong-meng @itholic 
   
   also cc @cloud-fan @dongjoon-hyun @ulysses-you for the changes in SQL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38130: [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly clean Pandas Index Cache

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38130:
URL: https://github.com/apache/spark/pull/38130#discussion_r992086496


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala:
##########
@@ -63,6 +97,16 @@ case class AttachDistributedSequenceExec(
     }
   }
 
+  override protected[sql] def cleanupResources(): Unit = {
+    this.synchronized {
+      if (cached != null && cached.getStorageLevel != StorageLevel.NONE) {

Review Comment:
   are all the accesses of `cached` protected by `synchronized`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38130: [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly clean Pandas Index Cache

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38130:
URL: https://github.com/apache/spark/pull/38130#discussion_r995296843


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala:
##########
@@ -151,6 +151,11 @@ abstract class QueryStageExec extends LeafExecNode {
     plan.generateTreeString(
       depth + 1, lastChildren :+ true, append, verbose, "", false, maxFields, printNodeId, indent)
   }
+
+  override protected[sql] def cleanupResources(): Unit = {
+    plan.cleanupResources()
+    super.cleanupResources()

Review Comment:
   try-finally?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38130: [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly clean Pandas Index Cache

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38130:
URL: https://github.com/apache/spark/pull/38130#discussion_r994682267


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:
##########
@@ -265,6 +265,8 @@ case class AdaptiveSparkPlanExec(
                 } else {
                   events.offer(StageFailure(stage, res.failed.get))
                 }
+                // explicitly clean up the resources in this stage
+                stage.cleanupResources()

Review Comment:
   The execution of the final stage is out of AQE's control. e.g. people can even do `df.rdd` and we don't know when the job of the last job will be submitted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #38130: [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly clean Pandas Index Cache

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on PR #38130:
URL: https://github.com/apache/spark/pull/38130#issuecomment-1278338037

   cc @HyukjinKwon @ueshin @xinrong-meng @itholic PTAL when you find some time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38130: [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly clean Pandas Index Cache

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38130:
URL: https://github.com/apache/spark/pull/38130#discussion_r992296662


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala:
##########
@@ -63,6 +97,16 @@ case class AttachDistributedSequenceExec(
     }
   }
 
+  override protected[sql] def cleanupResources(): Unit = {
+    this.synchronized {
+      if (cached != null && cached.getStorageLevel != StorageLevel.NONE) {

Review Comment:
   Yea, `cached` starts with `null`, and get assigned during `doExecute`. Seems `@transient` is sufficient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38130: [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly clean Pandas Index Cache

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38130:
URL: https://github.com/apache/spark/pull/38130#discussion_r992310760


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala:
##########
@@ -63,6 +97,16 @@ case class AttachDistributedSequenceExec(
     }
   }
 
+  override protected[sql] def cleanupResources(): Unit = {
+    this.synchronized {
+      if (cached != null && cached.getStorageLevel != StorageLevel.NONE) {

Review Comment:
   let me update 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38130: [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly clean Pandas Index Cache

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38130:
URL: https://github.com/apache/spark/pull/38130#discussion_r995295713


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala:
##########
@@ -40,15 +42,45 @@ case class AttachDistributedSequenceExec(
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
 
+  @transient private var cached: RDD[InternalRow] = _
+
   override protected def doExecute(): RDD[InternalRow] = {
-    val childRDD = child.execute().map(_.copy())
-    val checkpointed = if (childRDD.getNumPartitions > 1) {
-      // to avoid execute multiple jobs. zipWithIndex launches a Spark job.
-      childRDD.localCheckpoint()
+    val childRDD = child.execute()
+    // before `compute.distributed_sequence_index_storage_level` is explicitly set via
+    // `ps.set_option`, `SQLConf.get` can not get its value (as well as its default value);
+    // after `ps.set_option`, `SQLConf.get` can get its value:
+    //
+    //    In [1]: import pyspark.pandas as ps
+    //    In [2]: ps.get_option("compute.distributed_sequence_index_storage_level")
+    //    Out[2]: 'MEMORY_AND_DISK_SER'
+    //    In [3]: spark.conf.get("pandas_on_Spark.compute.distributed_sequence_index_storage_level")
+    //    ...
+    //    Py4JJavaError: An error occurred while calling o40.get.
+    //      : java.util.NoSuchElementException: pandas_on_Spark.compute.distributed_sequence_...
+    //    at org.apache.spark.sql.errors.QueryExecutionErrors$.noSuchElementExceptionError...
+    //    at org.apache.spark.sql.internal.SQLConf.$anonfun$getConfString$3(SQLConf.scala:4766)
+    //    ...
+    //    In [4]: ps.set_option("compute.distributed_sequence_index_storage_level", "NONE")
+    //    In [5]: spark.conf.get("pandas_on_Spark.compute.distributed_sequence_index_storage_level")
+    //    Out[5]: '"NONE"'
+    //    In [6]: ps.set_option("compute.distributed_sequence_index_storage_level", "DISK_ONLY")
+    //    In [7]: spark.conf.get("pandas_on_Spark.compute.distributed_sequence_index_storage_level")
+    //    Out[7]: '"DISK_ONLY"'
+    val storageLevel = StorageLevel.fromString(
+      SQLConf.get.getConfString(
+        "pandas_on_Spark.compute.distributed_sequence_index_storage_level",
+        "MEMORY_AND_DISK_SER"
+      ).replaceAll("\"", "")

Review Comment:
   This is quoted because we ser/de configuration values in JSON for pandas API on Spark. So I believe it's safe to do `stripPrefix("\"").stripSuffix("\"")`. I would recommend adding a comment too.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38130: [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly clean Pandas Index Cache

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38130:
URL: https://github.com/apache/spark/pull/38130#discussion_r995630979


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala:
##########
@@ -40,15 +42,45 @@ case class AttachDistributedSequenceExec(
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
 
+  @transient private var cached: RDD[InternalRow] = _
+
   override protected def doExecute(): RDD[InternalRow] = {
-    val childRDD = child.execute().map(_.copy())
-    val checkpointed = if (childRDD.getNumPartitions > 1) {
-      // to avoid execute multiple jobs. zipWithIndex launches a Spark job.
-      childRDD.localCheckpoint()
+    val childRDD = child.execute()

Review Comment:
   Hm, maybe let's just keep the legacy behaviour as is for now is AQE is disabled. Keeping the behaviour as is is fine but changing it to a new behaviour is a different story.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on a diff in pull request #38130: [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly clean Pandas Index Cache

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on code in PR #38130:
URL: https://github.com/apache/spark/pull/38130#discussion_r993321561


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:
##########
@@ -265,6 +265,8 @@ case class AdaptiveSparkPlanExec(
                 } else {
                   events.offer(StageFailure(stage, res.failed.get))
                 }
+                // explicitly clean up the resources in this stage
+                stage.cleanupResources()

Review Comment:
   shall we do cleanup for final stage ?  and also for some plans which only have one stage



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38130: [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly clean Pandas Index Cache

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38130:
URL: https://github.com/apache/spark/pull/38130#discussion_r995296008


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala:
##########
@@ -40,15 +42,45 @@ case class AttachDistributedSequenceExec(
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
 
+  @transient private var cached: RDD[InternalRow] = _
+
   override protected def doExecute(): RDD[InternalRow] = {
-    val childRDD = child.execute().map(_.copy())
-    val checkpointed = if (childRDD.getNumPartitions > 1) {
-      // to avoid execute multiple jobs. zipWithIndex launches a Spark job.
-      childRDD.localCheckpoint()
+    val childRDD = child.execute()

Review Comment:
   Should we fallback to local checkpoint if AQE is disabled?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38130: [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly clean Pandas Index Cache

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38130:
URL: https://github.com/apache/spark/pull/38130#discussion_r994020309


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:
##########
@@ -265,6 +265,8 @@ case class AdaptiveSparkPlanExec(
                 } else {
                   events.offer(StageFailure(stage, res.failed.get))
                 }
+                // explicitly clean up the resources in this stage
+                stage.cleanupResources()

Review Comment:
   good point, I think we should cleanup for final stage
   
   for single plan without shuffle, I am not very sure right now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38130: [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly clean Pandas Index Cache

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38130:
URL: https://github.com/apache/spark/pull/38130#discussion_r995620313


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala:
##########
@@ -151,6 +151,11 @@ abstract class QueryStageExec extends LeafExecNode {
     plan.generateTreeString(
       depth + 1, lastChildren :+ true, append, verbose, "", false, maxFields, printNodeId, indent)
   }
+
+  override protected[sql] def cleanupResources(): Unit = {
+    plan.cleanupResources()
+    super.cleanupResources()

Review Comment:
   I found that there is a internal try-catch in `SortExec's cleanupResources`
   in this PR I think it's safe to only add a `try` in  `AdaptiveSparkPlanExec`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #38130: [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly clean Pandas Index Cache

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on PR #38130:
URL: https://github.com/apache/spark/pull/38130#issuecomment-1280703929

   previous config name seems too long, and will make 'doc build' fail.
   so I rename it as `default_index_cache`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38130: [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly clean Pandas Index Cache

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38130:
URL: https://github.com/apache/spark/pull/38130#discussion_r992162591


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala:
##########
@@ -63,6 +97,16 @@ case class AttachDistributedSequenceExec(
     }
   }
 
+  override protected[sql] def cleanupResources(): Unit = {
+    this.synchronized {
+      if (cached != null && cached.getStorageLevel != StorageLevel.NONE) {

Review Comment:
   I think so, maybe we don't need the `synchronized `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38130: [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly clean Pandas Index Cache

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38130:
URL: https://github.com/apache/spark/pull/38130#discussion_r995618291


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala:
##########
@@ -40,15 +42,45 @@ case class AttachDistributedSequenceExec(
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
 
+  @transient private var cached: RDD[InternalRow] = _
+
   override protected def doExecute(): RDD[InternalRow] = {
-    val childRDD = child.execute().map(_.copy())
-    val checkpointed = if (childRDD.getNumPartitions > 1) {
-      // to avoid execute multiple jobs. zipWithIndex launches a Spark job.
-      childRDD.localCheckpoint()
+    val childRDD = child.execute()

Review Comment:
   I guess we should avoid using `checkpoint` even if AQE is disabled, just because it is unreliable: a local checkpointed rdd can not be recomputed if it is unfortunately evicted from memory, or due to something like dynamic allocation.
   
   if we want to support checkpointing here, what about adding `localcheckpoint` and `checkpoint` as new options in this new configration?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38130: [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly clean Pandas Index Cache

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38130:
URL: https://github.com/apache/spark/pull/38130#discussion_r995620483


##########
python/pyspark/pandas/config.py:
##########
@@ -183,6 +183,40 @@ def validate(self, v: Any) -> None:
             "Index type should be one of 'sequence', 'distributed', 'distributed-sequence'.",
         ),
     ),
+    Option(

Review Comment:
   good catch! will update this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38130: [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly clean Pandas Index Cache

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38130:
URL: https://github.com/apache/spark/pull/38130#discussion_r996577538


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala:
##########
@@ -40,15 +42,45 @@ case class AttachDistributedSequenceExec(
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
 
+  @transient private var cached: RDD[InternalRow] = _
+
   override protected def doExecute(): RDD[InternalRow] = {
-    val childRDD = child.execute().map(_.copy())
-    val checkpointed = if (childRDD.getNumPartitions > 1) {
-      // to avoid execute multiple jobs. zipWithIndex launches a Spark job.
-      childRDD.localCheckpoint()
+    val childRDD = child.execute()

Review Comment:
   SGTM



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38130: [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly clean Pandas Index Cache

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38130:
URL: https://github.com/apache/spark/pull/38130#discussion_r995297776


##########
python/pyspark/pandas/config.py:
##########
@@ -183,6 +183,40 @@ def validate(self, v: Any) -> None:
             "Index type should be one of 'sequence', 'distributed', 'distributed-sequence'.",
         ),
     ),
+    Option(

Review Comment:
   Mind regenerating the table? See also above:
   
   ```
   # NOTE: if you are fixing or adding an option here, make sure you execute `show_options()` and
   #     copy & paste the results into show_options
   #     'docs/source/user_guide/pandas_on_spark/options.rst' as well.
   #     See the examples below:
   #     >>> from pyspark.pandas.config import show_options
   #     >>> show_options()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38130: [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly clean Pandas Index Cache

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38130:
URL: https://github.com/apache/spark/pull/38130#discussion_r995683005


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala:
##########
@@ -40,15 +42,45 @@ case class AttachDistributedSequenceExec(
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
 
+  @transient private var cached: RDD[InternalRow] = _
+
   override protected def doExecute(): RDD[InternalRow] = {
-    val childRDD = child.execute().map(_.copy())
-    val checkpointed = if (childRDD.getNumPartitions > 1) {
-      // to avoid execute multiple jobs. zipWithIndex launches a Spark job.
-      childRDD.localCheckpoint()
+    val childRDD = child.execute()

Review Comment:
   maybe better to make `localcheckpoint` an option of `compute.distributed_sequence_index_storage_level`, so that we can always switch back to the original behavior no matter whether AQE is enabled?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38130: [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly clean Pandas Index Cache

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38130:
URL: https://github.com/apache/spark/pull/38130#discussion_r995683005


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala:
##########
@@ -40,15 +42,45 @@ case class AttachDistributedSequenceExec(
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
 
+  @transient private var cached: RDD[InternalRow] = _
+
   override protected def doExecute(): RDD[InternalRow] = {
-    val childRDD = child.execute().map(_.copy())
-    val checkpointed = if (childRDD.getNumPartitions > 1) {
-      // to avoid execute multiple jobs. zipWithIndex launches a Spark job.
-      childRDD.localCheckpoint()
+    val childRDD = child.execute()

Review Comment:
   maybe better to make `localcheckpoint` an option of `compute.distributed_sequence_index_storage_level`, so that we can fallback to the original behavior no matter whether AQE is enabled?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #38130: [SPARK-40556][PS][SQL][WIP] Eagerly clean temp RDD cached in `AttachDistributedSequenceExec`

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on PR #38130:
URL: https://github.com/apache/spark/pull/38130#issuecomment-1269956288

   test script:
   ```
   sc.setLogLevel("WARN")
   
   import pyspark.pandas as ps
   
   from pyspark.sql import functions as F, Column, DataFrame as SparkDataFrame
   
   #Read parquet data and convert it to pandas_api
   # pandas_hope_df = spark.read.parquet("/mnt/edl/raw/pdda_data_wrangling/dg39759/bom_explosion/hope_df").pandas_api()
   pandas_hope_df = spark.range(0, 100, 1, 10).withColumn("TOP_ASSEMBLY", F.col("id") % 33).pandas_api()
   
   #ps.set_option('compute.default_index_type', 'distributed')
   
   #add index row for grouping of Top Assembly Column
   ps.set_option('compute.ops_on_diff_frames', True)
   df_index = pandas_hope_df['TOP_ASSEMBLY'].reset_index()
   df_index['index'] = (df_index.groupby(['TOP_ASSEMBLY']).cumcount()==0).astype(int)
   df_index['index'] = df_index['index'].cumsum()
   
   #merge original pandas_hope_df & generated df_index, using TOP_ASSEMBLY as the key
   df_out = ps.merge(pandas_hope_df, df_index, how="inner", left_on=["TOP_ASSEMBLY"], right_on=["TOP_ASSEMBLY"])
   ps.reset_option('compute.ops_on_diff_frames')
   
   #convert resulting pandas dataframe back to spark dataframe.
   spark_df_out = df_out.to_spark()
   
   #spark_df_out.cache().count()
   spark_df_out.count()
   
   sc._jsc.getPersistentRDDs()
   ```
   
   before:
   ```
   Out[1]: {25: JavaObject id=o424, 7: JavaObject id=o425, 16: JavaObject id=o426}
   ```
   
   
   
   after:
   ```
   22/10/06 20:31:37 WARN AttachDistributedSequenceExec: clean up cached RDD(10) in AttachDistributedSequenceExec(197)
   22/10/06 20:31:37 WARN AttachDistributedSequenceExec: clean up cached RDD(21) in AttachDistributedSequenceExec(404)
   22/10/06 20:31:37 WARN AttachDistributedSequenceExec: clean up cached RDD(32) in AttachDistributedSequenceExec(535)
   Out[1]: {}
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38130: [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly clean Pandas Index Cache

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38130:
URL: https://github.com/apache/spark/pull/38130#discussion_r992084889


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala:
##########
@@ -151,6 +151,11 @@ abstract class QueryStageExec extends LeafExecNode {
     plan.generateTreeString(
       depth + 1, lastChildren :+ true, append, verbose, "", false, maxFields, printNodeId, indent)
   }
+
+  override protected[sql] def cleanupResources(): Unit = {
+    plan.cleanupResources()
+    super.cleanupResources()

Review Comment:
   We don't need to call `super.cleanupResources()`, as query stage is a leaf node.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38130: [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly clean Pandas Index Cache

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38130:
URL: https://github.com/apache/spark/pull/38130#discussion_r994682267


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:
##########
@@ -265,6 +265,8 @@ case class AdaptiveSparkPlanExec(
                 } else {
                   events.offer(StageFailure(stage, res.failed.get))
                 }
+                // explicitly clean up the resources in this stage
+                stage.cleanupResources()

Review Comment:
   The execution of the final stage is out of AQE's control. e.g. people can even do `df.rdd` and we don't know when the job of the last stage will be submitted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #38130: [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly clean Pandas Index Cache

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on PR #38130:
URL: https://github.com/apache/spark/pull/38130#issuecomment-1280706269

   Merged into master, thanks @cloud-fan @HyukjinKwon @ulysses-you @itholic for reivews


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng closed pull request #38130: [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly clean Pandas Index Cache

Posted by GitBox <gi...@apache.org>.
zhengruifeng closed pull request #38130: [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly clean Pandas Index Cache
URL: https://github.com/apache/spark/pull/38130


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38130: [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly clean Pandas Index Cache

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38130:
URL: https://github.com/apache/spark/pull/38130#discussion_r995674134


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala:
##########
@@ -40,15 +42,45 @@ case class AttachDistributedSequenceExec(
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
 
+  @transient private var cached: RDD[InternalRow] = _
+
   override protected def doExecute(): RDD[InternalRow] = {
-    val childRDD = child.execute().map(_.copy())
-    val checkpointed = if (childRDD.getNumPartitions > 1) {
-      // to avoid execute multiple jobs. zipWithIndex launches a Spark job.
-      childRDD.localCheckpoint()
+    val childRDD = child.execute()

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38130: [SPARK-40556][PS][SQL] AQE clean up resources after each stage and eagerly clean Pandas Index Cache

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38130:
URL: https://github.com/apache/spark/pull/38130#discussion_r995614842


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala:
##########
@@ -40,15 +42,45 @@ case class AttachDistributedSequenceExec(
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
 
+  @transient private var cached: RDD[InternalRow] = _
+
   override protected def doExecute(): RDD[InternalRow] = {
-    val childRDD = child.execute().map(_.copy())
-    val checkpointed = if (childRDD.getNumPartitions > 1) {
-      // to avoid execute multiple jobs. zipWithIndex launches a Spark job.
-      childRDD.localCheckpoint()
+    val childRDD = child.execute()
+    // before `compute.distributed_sequence_index_storage_level` is explicitly set via
+    // `ps.set_option`, `SQLConf.get` can not get its value (as well as its default value);
+    // after `ps.set_option`, `SQLConf.get` can get its value:
+    //
+    //    In [1]: import pyspark.pandas as ps
+    //    In [2]: ps.get_option("compute.distributed_sequence_index_storage_level")
+    //    Out[2]: 'MEMORY_AND_DISK_SER'
+    //    In [3]: spark.conf.get("pandas_on_Spark.compute.distributed_sequence_index_storage_level")
+    //    ...
+    //    Py4JJavaError: An error occurred while calling o40.get.
+    //      : java.util.NoSuchElementException: pandas_on_Spark.compute.distributed_sequence_...
+    //    at org.apache.spark.sql.errors.QueryExecutionErrors$.noSuchElementExceptionError...
+    //    at org.apache.spark.sql.internal.SQLConf.$anonfun$getConfString$3(SQLConf.scala:4766)
+    //    ...
+    //    In [4]: ps.set_option("compute.distributed_sequence_index_storage_level", "NONE")
+    //    In [5]: spark.conf.get("pandas_on_Spark.compute.distributed_sequence_index_storage_level")
+    //    Out[5]: '"NONE"'
+    //    In [6]: ps.set_option("compute.distributed_sequence_index_storage_level", "DISK_ONLY")
+    //    In [7]: spark.conf.get("pandas_on_Spark.compute.distributed_sequence_index_storage_level")
+    //    Out[7]: '"DISK_ONLY"'
+    val storageLevel = StorageLevel.fromString(
+      SQLConf.get.getConfString(
+        "pandas_on_Spark.compute.distributed_sequence_index_storage_level",
+        "MEMORY_AND_DISK_SER"
+      ).replaceAll("\"", "")

Review Comment:
   sure, will add a comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org