You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/09/21 13:30:27 UTC

[GitHub] [spark] lidavidm opened a new pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

lidavidm opened a new pull request #29818:
URL: https://github.com/apache/spark/pull/29818


   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
   -->
   
   This is a proof-of-concept that I'd like to put up for discussion. I originally put this on the [mailing list](http://apache-spark-developers-list.1001551.n3.nabble.com/DISCUSS-Reducing-memory-usage-of-toPandas-with-Arrow-quot-self-destruct-quot-option-td30149.html).
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   Creating a Pandas dataframe via Apache Arrow currently can use twice as much memory as the final result, because during the conversion, both Pandas and Arrow retain a copy of the data. Arrow has a "self-destruct" mode now (Arrow >= 0.16) to avoid this, by freeing each column after conversion. This PR integrates support for this in toPandas, handling a couple of edge cases: 
   
   self_destruct has no effect unless the memory is allocated appropriately, which is handled in the Arrow serializer here. Essentially, the issue is that self_destruct frees memory column-wise, but Arrow record batches are oriented row-wise:
   
   ```
   Record batch 0: allocation 0: column 0 chunk 0, column 1 chunk 0, ...
   Record batch 1: allocation 1: column 0 chunk 1, column 1 chunk 1, ...
   ```
   
   In this scenario, Arrow will drop references to all of column 0's chunks, but no memory will actually be freed, as the chunks were just slices of an underlying allocation. The PR copies each column into its own allocation so that memory is instead arranged as so:
   
   ```
   Record batch 0: allocation 0 column 0 chunk 0, allocation 1 column 1 chunk 0, ...
   Record batch 1: allocation 2 column 0 chunk 1, allocation 3 column 1 chunk 1, ...
   ```
   
   This also adds a user-facing option to disable the optimization. I'd appreciate feedback on the API here (e.g. should we instead have a global option?) We can't always apply this optimization because it's more likely to generate a dataframe with immutable buffers, which Pandas doesn't always handle well, and because it is slower overall (since it only converts one column at a time instead of in parallel).
   
   ### Why are the changes needed?
   
   This lets us load larger datasets - in particular, with N bytes of memory, before we could never load a dataset bigger than N/2 bytes; now the overhead is more like N/1.25 or so.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes - it adds a new option, for which I haven't added docs yet.
   
   ### How was this patch tested?
   
   See the [mailing list](http://apache-spark-developers-list.1001551.n3.nabble.com/DISCUSS-Reducing-memory-usage-of-toPandas-with-Arrow-quot-self-destruct-quot-option-td30149.html) - it was tested with Python memory_profiler. I will also add unit tests to test correctness, but testing memory usage is unreliable.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] lidavidm commented on a change in pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r528005445



##########
File path: python/pyspark/sql/pandas/serializers.py
##########
@@ -51,7 +55,20 @@ def load_stream(self, stream):
         """
         # load the batches
         for batch in self.serializer.load_stream(stream):
-            yield batch
+            if self.split_batches:
+                import pyarrow as pa
+                # When spark.sql.execution.arrow.pyspark.selfDestruct.enabled, ensure
+                # each column in each record batch is contained in its own allocation.
+                # Otherwise, selfDestruct does nothing; it frees each column as its
+                # converted, but each column will actually be a list of slices of record
+                # batches, and so no memory is actually freed until all columns are
+                # converted.
+                split_batch = pa.RecordBatch.from_arrays([
+                    pa.concat_arrays([array]) for array in batch

Review comment:
       I filed https://issues.apache.org/jira/browse/ARROW-10670.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-776391729


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/39668/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-710422635


   **[Test build #129910 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129910/testReport)** for PR 29818 at commit [`4fef9d9`](https://github.com/apache/spark/commit/4fef9d9211faa3ac0d8280e5b8818b3301b35647).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-731478527


   Kubernetes integration test status success
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/36058/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] BryanCutler commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
BryanCutler commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-776329472


   restest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] BryanCutler commented on a change in pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
BryanCutler commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r538827775



##########
File path: python/pyspark/sql/tests/test_arrow.py
##########
@@ -191,6 +191,32 @@ def test_pandas_round_trip(self):
         pdf_arrow = df.toPandas()
         assert_frame_equal(pdf_arrow, pdf)
 
+    def test_pandas_self_destruct(self):
+        import pyarrow as pa
+        rows = 2 ** 16
+        cols = 8
+        df = self.spark.range(0, rows).select(*[rand() for _ in range(cols)])
+        expected_bytes = rows * cols * 8
+        with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):
+            # We hold on to the table reference here, so if self destruct didn't work, then
+            # there would be 2 copies of the data (one in Arrow, one in Pandas), both
+            # tracked by the Arrow allocator

Review comment:
       Not really, it's not ideal but it would still test that splitting and reallocating the columns in `_collect_as_arrow()` will allow for self destruction, which is most of additions here. The rest is just calling `to_pandas()` with the right options. I would recommend testing something like this:
   
   ```python
   with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):
       # get initial pa.total_allocated_bytes()
       batches = _collect_as_arrow(_force_split_batches=True)
       table = # convert batches to table as done in toPandas()
       pdf = table.to_pandas(self_destruct=True, split_blocks=True, use_threads=False)
       # get after pa.total_allocated_bytes() and check difference is expected
   
       # Call with the full code path and compare resulting DataFrame for good measure
       assert_frames_equal(df.toPandas(), pdf)
   ```
   
   That should be sufficient testing, wdyt?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] lidavidm commented on a change in pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r518959078



##########
File path: python/pyspark/sql/tests/test_arrow.py
##########
@@ -190,6 +190,13 @@ def test_pandas_round_trip(self):
         pdf_arrow = df.toPandas()
         assert_frame_equal(pdf_arrow, pdf)
 
+    def test_pandas_self_destruct(self):
+        with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):
+            pdf = self.create_pandas_data_frame()

Review comment:
       It's really hard to verify this in an automated way due to how jemalloc works (except possibly by reconfiguring jemalloc) - at the very least it'd be iffy to do in a unit test like this without some setup.
   
   (The brief explanation is that with how Arrow configures jemalloc, memory is only slowly reclaimed by the OS, so in naive measurements, you'll still see a memory usage spike - albeit slightly less of one. In C++, you can force jemalloc to purge everything right away, but even via ctypes I couldn't access the right functions from Python.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] lidavidm commented on a change in pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r492086557



##########
File path: python/pyspark/sql/pandas/conversion.py
##########
@@ -103,10 +103,18 @@ def toPandas(self):
                     batches = self.toDF(*tmp_column_names)._collect_as_arrow()
                     if len(batches) > 0:
                         table = pyarrow.Table.from_batches(batches)
+                        del batches
                         # Pandas DataFrame created from PyArrow uses datetime64[ns] for date type
                         # values, but we should use datetime.date to match the behavior with when
                         # Arrow optimization is disabled.
-                        pdf = table.to_pandas(date_as_object=True)
+                        pandas_options = {'date_as_object': True}
+                        if self_destruct:
+                            pandas_options.update({
+                                'self_destruct': True,
+                                'split_blocks': True,
+                                'use_threads': False,
+                            })
+                        pdf = table.to_pandas(**pandas_options)

Review comment:
       The worry is with running into things like https://github.com/pandas-dev/pandas/issues/35530 in which case the user may appreciate an escape hatch.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-776347207


   **[Test build #135086 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/135086/testReport)** for PR 29818 at commit [`64d0301`](https://github.com/apache/spark/commit/64d03012616c9bc56b97693d1fdf8132493deb0e).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-712422172


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34622/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-731464866


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/36056/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-708741263


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34372/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-696116785






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-712398796


   **[Test build #130015 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/130015/testReport)** for PR 29818 at commit [`1b875c1`](https://github.com/apache/spark/commit/1b875c19e3c6318fcb26c1ea62b5397d6e75d1f8).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-731503100


   **[Test build #131452 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/131452/testReport)** for PR 29818 at commit [`b15a307`](https://github.com/apache/spark/commit/b15a307b4b8fd56d1c12eb171106b4f4e6926a1d).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] lidavidm commented on pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-707938312


   Following up here - any other comments? Does this look like a desirable feature, and how do we want to configure it?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] lidavidm commented on a change in pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r528003425



##########
File path: python/pyspark/sql/tests/test_arrow.py
##########
@@ -190,6 +190,13 @@ def test_pandas_round_trip(self):
         pdf_arrow = df.toPandas()
         assert_frame_equal(pdf_arrow, pdf)
 
+    def test_pandas_self_destruct(self):
+        with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):
+            pdf = self.create_pandas_data_frame()

Review comment:
       Sorry again for the delay.
   
   With some refactoring, this test now functions similarly to the PyArrow test. However if we don't want to rearrange things like this I can revert this change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-748183627


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37622/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] lidavidm commented on a change in pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r508017270



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1843,6 +1843,16 @@ object SQLConf {
       .version("3.0.0")
       .fallbackConf(ARROW_EXECUTION_ENABLED)
 
+  val ARROW_PYSPARK_SELF_DESTRUCT_ENABLED =
+    buildConf("spark.sql.execution.arrow.pyspark.selfDestruct.enabled")
+      .doc("When true, make use of Apache Arrow's self-destruct option " +
+        "for columnar data transfers in PySpark. " +
+        "This reduces memory usage at the cost of some CPU time. " +
+        "This optimization applies to: pyspark.sql.DataFrame.toPandas")
+      .version("3.0.0")

Review comment:
       Done, thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-696132518


   cc @WeichenXu123 who already took a close look.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-712434950


   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/34622/
   Test FAILed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] BryanCutler commented on a change in pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
BryanCutler commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r523106731



##########
File path: python/pyspark/sql/pandas/serializers.py
##########
@@ -51,7 +55,20 @@ def load_stream(self, stream):
         """
         # load the batches
         for batch in self.serializer.load_stream(stream):
-            yield batch
+            if self.split_batches:
+                import pyarrow as pa
+                # When spark.sql.execution.arrow.pyspark.selfDestruct.enabled, ensure
+                # each column in each record batch is contained in its own allocation.
+                # Otherwise, selfDestruct does nothing; it frees each column as its
+                # converted, but each column will actually be a list of slices of record
+                # batches, and so no memory is actually freed until all columns are
+                # converted.
+                split_batch = pa.RecordBatch.from_arrays([
+                    pa.concat_arrays([array]) for array in batch

Review comment:
       I don't think we need to defer since what you have works, but if you could look into those 2 improvements on the Arrow side that would be great. Then we could come back and update this later.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-776369017


   Kubernetes integration test status success
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39668/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-731440798


   **[Test build #131450 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/131450/testReport)** for PR 29818 at commit [`e29c994`](https://github.com/apache/spark/commit/e29c994b901ba5f7ba7650b2079c12afcbf7fb55).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-776444897


   **[Test build #135086 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/135086/testReport)** for PR 29818 at commit [`64d0301`](https://github.com/apache/spark/commit/64d03012616c9bc56b97693d1fdf8132493deb0e).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] lidavidm commented on pull request #29818: [SPARK-32953][PYTHON][SQL] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-776900488


   Thanks for being patient with me through this very long review!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-708682102


   **[Test build #129766 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129766/testReport)** for PR 29818 at commit [`96483e6`](https://github.com/apache/spark/commit/96483e628c1b590fe7702b956edb90834fee80fc).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] lidavidm commented on a change in pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r518958067



##########
File path: python/pyspark/sql/pandas/serializers.py
##########
@@ -51,7 +55,20 @@ def load_stream(self, stream):
         """
         # load the batches
         for batch in self.serializer.load_stream(stream):
-            yield batch
+            if self.split_batches:
+                import pyarrow as pa
+                # When spark.sql.execution.arrow.pyspark.selfDestruct.enabled, ensure
+                # each column in each record batch is contained in its own allocation.
+                # Otherwise, selfDestruct does nothing; it frees each column as its
+                # converted, but each column will actually be a list of slices of record
+                # batches, and so no memory is actually freed until all columns are
+                # converted.
+                split_batch = pa.RecordBatch.from_arrays([
+                    pa.concat_arrays([array]) for array in batch

Review comment:
       I'll look around. I couldn't find another obvious one but I agree that this is not ideal.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-710422635


   **[Test build #129910 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129910/testReport)** for PR 29818 at commit [`4fef9d9`](https://github.com/apache/spark/commit/4fef9d9211faa3ac0d8280e5b8818b3301b35647).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-748299143


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/133023/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-731455723


   **[Test build #131452 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/131452/testReport)** for PR 29818 at commit [`b15a307`](https://github.com/apache/spark/commit/b15a307b4b8fd56d1c12eb171106b4f4e6926a1d).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-708765717






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r504400867



##########
File path: python/pyspark/sql/pandas/conversion.py
##########
@@ -34,7 +34,7 @@ class PandasConversionMixin(object):
     """
 
     @since(1.3)
-    def toPandas(self):
+    def toPandas(self, self_destruct=False):

Review comment:
       Let's name it as `selfDestruct` to make the naming role consistent.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-731441818


   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/131450/
   Test FAILed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] BryanCutler commented on a change in pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
BryanCutler commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r523108743



##########
File path: python/pyspark/sql/tests/test_arrow.py
##########
@@ -190,6 +190,13 @@ def test_pandas_round_trip(self):
         pdf_arrow = df.toPandas()
         assert_frame_equal(pdf_arrow, pdf)
 
+    def test_pandas_self_destruct(self):
+        with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):
+            pdf = self.create_pandas_data_frame()

Review comment:
       The pyarrow test works because it allocates the memory for the required Pandas blocks too, which I believe doubles the memory allocated normally, but ends up with a net zero change with self destruct.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] lidavidm commented on a change in pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r492086557



##########
File path: python/pyspark/sql/pandas/conversion.py
##########
@@ -103,10 +103,18 @@ def toPandas(self):
                     batches = self.toDF(*tmp_column_names)._collect_as_arrow()
                     if len(batches) > 0:
                         table = pyarrow.Table.from_batches(batches)
+                        del batches
                         # Pandas DataFrame created from PyArrow uses datetime64[ns] for date type
                         # values, but we should use datetime.date to match the behavior with when
                         # Arrow optimization is disabled.
-                        pdf = table.to_pandas(date_as_object=True)
+                        pandas_options = {'date_as_object': True}
+                        if self_destruct:
+                            pandas_options.update({
+                                'self_destruct': True,
+                                'split_blocks': True,
+                                'use_threads': False,
+                            })
+                        pdf = table.to_pandas(**pandas_options)

Review comment:
       The worry is with running into things like https://github.com/pandas-dev/pandas/issues/35530 in which case the user may appreciate an escape hatch.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-708696569






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] BryanCutler commented on a change in pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
BryanCutler commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r532991912



##########
File path: python/pyspark/sql/tests/test_arrow.py
##########
@@ -191,6 +191,32 @@ def test_pandas_round_trip(self):
         pdf_arrow = df.toPandas()
         assert_frame_equal(pdf_arrow, pdf)
 
+    def test_pandas_self_destruct(self):
+        import pyarrow as pa
+        rows = 2 ** 16
+        cols = 8
+        df = self.spark.range(0, rows).select(*[rand() for _ in range(cols)])
+        expected_bytes = rows * cols * 8
+        with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):
+            # We hold on to the table reference here, so if self destruct didn't work, then
+            # there would be 2 copies of the data (one in Arrow, one in Pandas), both
+            # tracked by the Arrow allocator
+            pdf, table = df._collect_as_arrow_table()
+            self.assertEqual((rows, cols), pdf.shape)
+            # If self destruct did work, then memory usage should be only a little above
+            # the minimum memory necessary for the dataframe
+            self.assertLessEqual(pa.total_allocated_bytes(), 1.2 * expected_bytes)

Review comment:
       just to be safe, what do you think about getting allocated bytes before and after, then comparing difference? It should probably be the same, but would be a little more focused then in case something changes in the future

##########
File path: python/pyspark/sql/tests/test_arrow.py
##########
@@ -191,6 +191,32 @@ def test_pandas_round_trip(self):
         pdf_arrow = df.toPandas()
         assert_frame_equal(pdf_arrow, pdf)
 
+    def test_pandas_self_destruct(self):
+        import pyarrow as pa
+        rows = 2 ** 16
+        cols = 8
+        df = self.spark.range(0, rows).select(*[rand() for _ in range(cols)])
+        expected_bytes = rows * cols * 8
+        with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):
+            # We hold on to the table reference here, so if self destruct didn't work, then
+            # there would be 2 copies of the data (one in Arrow, one in Pandas), both
+            # tracked by the Arrow allocator
+            pdf, table = df._collect_as_arrow_table()
+            self.assertEqual((rows, cols), pdf.shape)
+            # If self destruct did work, then memory usage should be only a little above
+            # the minimum memory necessary for the dataframe
+            self.assertLessEqual(pa.total_allocated_bytes(), 1.2 * expected_bytes)
+            del pdf, table
+            self.assertEqual(pa.total_allocated_bytes(), 0)
+        with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": False}):
+            # Force the internals to reallocate data via PyArrow's allocator so that it
+            # gets measured by total_allocated_bytes
+            pdf, table = df._collect_as_arrow_table(_force_split_batches=True)
+            total_allocated_bytes = pa.total_allocated_bytes()

Review comment:
       looks to be unused?

##########
File path: python/pyspark/sql/pandas/conversion.py
##########
@@ -252,6 +242,74 @@ def _collect_as_arrow(self):
         # Re-order the batch list using the correct order
         return [batches[i] for i in batch_order]
 
+    def _collect_as_arrow_table(self, _force_split_batches=False):

Review comment:
       I don't think we really need to add this method, it just does the conversion from batches to DataFrame. The unit test doesn't even use the returned Table right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-731470736


   Kubernetes integration test status success
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/36056/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-712434937






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-696117614


   Can one of the admins verify this patch?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-708156469






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] lidavidm commented on a change in pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r505044467



##########
File path: python/pyspark/sql/pandas/serializers.py
##########
@@ -90,7 +90,10 @@ def load_stream(self, stream):
         import pyarrow as pa
         reader = pa.ipc.open_stream(stream)
         for batch in reader:
-            yield batch
+            split_batch = pa.RecordBatch.from_arrays([
+                pa.concat_arrays([array]) for array in batch
+            ], schema=batch.schema)

Review comment:
       I added a comment, but please let me know if it's unclear.

##########
File path: python/pyspark/sql/pandas/conversion.py
##########
@@ -103,10 +103,18 @@ def toPandas(self):
                     batches = self.toDF(*tmp_column_names)._collect_as_arrow()
                     if len(batches) > 0:
                         table = pyarrow.Table.from_batches(batches)
+                        del batches
                         # Pandas DataFrame created from PyArrow uses datetime64[ns] for date type
                         # values, but we should use datetime.date to match the behavior with when
                         # Arrow optimization is disabled.
-                        pdf = table.to_pandas(date_as_object=True)
+                        pandas_options = {'date_as_object': True}
+                        if self_destruct:
+                            pandas_options.update({
+                                'self_destruct': True,

Review comment:
       Done.

##########
File path: python/pyspark/sql/pandas/conversion.py
##########
@@ -34,7 +34,7 @@ class PandasConversionMixin(object):
     """
 
     @since(1.3)
-    def toPandas(self):
+    def toPandas(self, self_destruct=False):

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-769378767


   **[Test build #134628 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/134628/testReport)** for PR 29818 at commit [`64d0301`](https://github.com/apache/spark/commit/64d03012616c9bc56b97693d1fdf8132493deb0e).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] lidavidm commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-710303071


   Ah and for
   
   > The last I saw was that the self_destruct option is experimental. Do you know if or when this might change? I'm a little unsure about adding experimental features, especially if it could lead to issues with the resulting Pandas DataFrame.
   
   I can follow up on the experimental status, but AIUI, it's going to just be a long tail of "this Pandas operation didn't expect an immutable backing array" that we would need to flush out anyways over time. We can leave it turned off by default. Also, I see the PyArrow optimization in Spark is itself experimental anyways.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] lidavidm commented on a change in pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r535648140



##########
File path: python/pyspark/sql/tests/test_arrow.py
##########
@@ -191,6 +191,32 @@ def test_pandas_round_trip(self):
         pdf_arrow = df.toPandas()
         assert_frame_equal(pdf_arrow, pdf)
 
+    def test_pandas_self_destruct(self):
+        import pyarrow as pa
+        rows = 2 ** 16
+        cols = 8
+        df = self.spark.range(0, rows).select(*[rand() for _ in range(cols)])
+        expected_bytes = rows * cols * 8
+        with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):
+            # We hold on to the table reference here, so if self destruct didn't work, then
+            # there would be 2 copies of the data (one in Arrow, one in Pandas), both
+            # tracked by the Arrow allocator
+            pdf, table = df._collect_as_arrow_table()
+            self.assertEqual((rows, cols), pdf.shape)
+            # If self destruct did work, then memory usage should be only a little above
+            # the minimum memory necessary for the dataframe
+            self.assertLessEqual(pa.total_allocated_bytes(), 1.2 * expected_bytes)

Review comment:
       Hmm, I think that would work if we compare it after we delete the dataframe (but not the table). Then self_destruct should ensure that Arrow doesn't hold any copies of data.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] BryanCutler commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
BryanCutler commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-716889755


   @lidavidm I'm a little swamped right now with some other things, but will take another look as soon as I can.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-710524785






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r492066488



##########
File path: python/pyspark/sql/pandas/conversion.py
##########
@@ -103,10 +103,18 @@ def toPandas(self):
                     batches = self.toDF(*tmp_column_names)._collect_as_arrow()
                     if len(batches) > 0:
                         table = pyarrow.Table.from_batches(batches)
+                        del batches
                         # Pandas DataFrame created from PyArrow uses datetime64[ns] for date type
                         # values, but we should use datetime.date to match the behavior with when
                         # Arrow optimization is disabled.
-                        pdf = table.to_pandas(date_as_object=True)
+                        pandas_options = {'date_as_object': True}
+                        if self_destruct:
+                            pandas_options.update({
+                                'self_destruct': True,
+                                'split_blocks': True,
+                                'use_threads': False,
+                            })
+                        pdf = table.to_pandas(**pandas_options)

Review comment:
       Yeah, I think it's better. In which case should we disable? Maybe we should enable it by default.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-731470752






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-708695639


   **[Test build #129768 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129768/testReport)** for PR 29818 at commit [`a6a189f`](https://github.com/apache/spark/commit/a6a189f869c93d9ce1dfed153b558e3e73c576a3).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-769378767


   **[Test build #134628 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/134628/testReport)** for PR 29818 at commit [`64d0301`](https://github.com/apache/spark/commit/64d03012616c9bc56b97693d1fdf8132493deb0e).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r504400977



##########
File path: python/pyspark/sql/pandas/conversion.py
##########
@@ -103,10 +103,18 @@ def toPandas(self):
                     batches = self.toDF(*tmp_column_names)._collect_as_arrow()
                     if len(batches) > 0:
                         table = pyarrow.Table.from_batches(batches)
+                        del batches
                         # Pandas DataFrame created from PyArrow uses datetime64[ns] for date type
                         # values, but we should use datetime.date to match the behavior with when
                         # Arrow optimization is disabled.
-                        pdf = table.to_pandas(date_as_object=True)
+                        pandas_options = {'date_as_object': True}
+                        if self_destruct:
+                            pandas_options.update({
+                                'self_destruct': True,

Review comment:
       Would you mind leaving a comment on the codes about this set of parameter configurations?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-712461557


   **[Test build #130021 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/130021/testReport)** for PR 29818 at commit [`c114166`](https://github.com/apache/spark/commit/c114166afb682081f99b8893bce60bbd38560b3e).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] lidavidm commented on a change in pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r506587180



##########
File path: python/pyspark/sql/pandas/conversion.py
##########
@@ -103,10 +103,22 @@ def toPandas(self):
                     batches = self.toDF(*tmp_column_names)._collect_as_arrow()
                     if len(batches) > 0:
                         table = pyarrow.Table.from_batches(batches)
+                        del batches

Review comment:
       Yes - we don't want to hold on to any other references to the buffers, we want the Table to be the only owner. I'll clarify this part here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-731455723


   **[Test build #131452 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/131452/testReport)** for PR 29818 at commit [`b15a307`](https://github.com/apache/spark/commit/b15a307b4b8fd56d1c12eb171106b4f4e6926a1d).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-716252237


   @iidavidm, just out of curiosity, we can do it in pandas UDFs too, right? I can of course do it separately in another PR but I was wondering if I am understanding correctly.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-769513474


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/134628/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-769439939


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39216/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-708696569






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-731503435






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-708704146






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-710713322






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-712482362






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-712434919


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34622/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon edited a comment on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
HyukjinKwon edited a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-716252237


   @iidavidm, just out of curiosity, we can do it in pandas UDFs too, right? It can be of course done separately in another PR but I was wondering if I am understanding correctly.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-708765194


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34374/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] BryanCutler commented on a change in pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
BryanCutler commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r505197033



##########
File path: python/pyspark/sql/pandas/conversion.py
##########
@@ -34,7 +34,7 @@ class PandasConversionMixin(object):
     """
 
     @since(1.3)
-    def toPandas(self):
+    def toPandas(self, selfDestruct=False):

Review comment:
       Would this be better as an sql config? Since this class is a mixin, I'm not sure the user would see this option from the public api?

##########
File path: python/pyspark/sql/pandas/conversion.py
##########
@@ -103,10 +103,22 @@ def toPandas(self):
                     batches = self.toDF(*tmp_column_names)._collect_as_arrow()
                     if len(batches) > 0:
                         table = pyarrow.Table.from_batches(batches)
+                        del batches

Review comment:
       does this have any bearing on the buffers self destructing? is it taking into account how many reference counts there are before destructing?

##########
File path: python/pyspark/sql/pandas/conversion.py
##########
@@ -103,10 +103,18 @@ def toPandas(self):
                     batches = self.toDF(*tmp_column_names)._collect_as_arrow()
                     if len(batches) > 0:
                         table = pyarrow.Table.from_batches(batches)
+                        del batches
                         # Pandas DataFrame created from PyArrow uses datetime64[ns] for date type
                         # values, but we should use datetime.date to match the behavior with when
                         # Arrow optimization is disabled.
-                        pdf = table.to_pandas(date_as_object=True)
+                        pandas_options = {'date_as_object': True}
+                        if self_destruct:
+                            pandas_options.update({
+                                'self_destruct': True,
+                                'split_blocks': True,
+                                'use_threads': False,
+                            })
+                        pdf = table.to_pandas(**pandas_options)

Review comment:
       yeah, I think this option can lead to other side effects, best to disable by default I think.

##########
File path: python/pyspark/sql/pandas/serializers.py
##########
@@ -90,7 +90,10 @@ def load_stream(self, stream):
         import pyarrow as pa
         reader = pa.ipc.open_stream(stream)
         for batch in reader:
-            yield batch
+            split_batch = pa.RecordBatch.from_arrays([
+                pa.concat_arrays([array]) for array in batch
+            ], schema=batch.schema)

Review comment:
       This looks a little strange to me too. Is `concat_arrays` actually doing anything here, and if so, wouldn't it do it for the case that `selfDestruct` is False too?

##########
File path: python/pyspark/sql/pandas/conversion.py
##########
@@ -103,10 +103,22 @@ def toPandas(self):
                     batches = self.toDF(*tmp_column_names)._collect_as_arrow()
                     if len(batches) > 0:
                         table = pyarrow.Table.from_batches(batches)
+                        del batches
                         # Pandas DataFrame created from PyArrow uses datetime64[ns] for date type
                         # values, but we should use datetime.date to match the behavior with when
                         # Arrow optimization is disabled.
-                        pdf = table.to_pandas(date_as_object=True)
+                        pandas_options = {'date_as_object': True}
+                        if selfDestruct:
+                            # Configure PyArrow to use as little memory as possible:
+                            # self_destruct - free columns as they are converted
+                            # split_blocks - create a separate Pandas block for each column
+                            # use_threads - convert one column at a time
+                            pandas_options.update({
+                                'self_destruct': True,
+                                'split_blocks': True,

Review comment:
       Is this necessary to set with `self_destruct`? It might lead to Pandas doing more memory allocation later, I believe.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-696116785


   Can one of the admins verify this patch?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-776451202


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/135086/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] lidavidm commented on a change in pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r535646920



##########
File path: python/pyspark/sql/tests/test_arrow.py
##########
@@ -191,6 +191,32 @@ def test_pandas_round_trip(self):
         pdf_arrow = df.toPandas()
         assert_frame_equal(pdf_arrow, pdf)
 
+    def test_pandas_self_destruct(self):
+        import pyarrow as pa
+        rows = 2 ** 16
+        cols = 8
+        df = self.spark.range(0, rows).select(*[rand() for _ in range(cols)])
+        expected_bytes = rows * cols * 8
+        with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):
+            # We hold on to the table reference here, so if self destruct didn't work, then
+            # there would be 2 copies of the data (one in Arrow, one in Pandas), both
+            # tracked by the Arrow allocator

Review comment:
       Yes - we need the table reference. If self destruct worked, then it holds effectively 0 memory; if it didn't work, then it holds a copy of the dataframe. 
   
   If we call to_pandas inside the test itself, wouldn't we then be testing Arrow directly, instead of testing how Arrow is used inside Spark?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-710524785






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] BryanCutler closed pull request #29818: [SPARK-32953][PYTHON][SQL] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
BryanCutler closed pull request #29818:
URL: https://github.com/apache/spark/pull/29818


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-696116785


   Can one of the admins verify this patch?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-769513474


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/134628/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-712482339


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34628/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-708797026






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-696117614


   Can one of the admins verify this patch?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-748154212


   **[Test build #133023 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/133023/testReport)** for PR 29818 at commit [`0d9c88a`](https://github.com/apache/spark/commit/0d9c88a12a26185012d6086a656a566314d18847).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-748292774


   **[Test build #133023 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/133023/testReport)** for PR 29818 at commit [`0d9c88a`](https://github.com/apache/spark/commit/0d9c88a12a26185012d6086a656a566314d18847).
    * This patch **fails SparkR unit tests**.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-731471512


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/36058/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] lidavidm commented on a change in pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r506586985



##########
File path: python/pyspark/sql/pandas/conversion.py
##########
@@ -34,7 +34,7 @@ class PandasConversionMixin(object):
     """
 
     @since(1.3)
-    def toPandas(self):
+    def toPandas(self, selfDestruct=False):

Review comment:
       I'll change it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-712434937


   Merged build finished. Test FAILed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] lidavidm commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-710415159


   Running the demo again gives these two plots. While the memory usage looks identical, in the no-self-destruct case, Python gets OOMKilled, while it does not get OOMKilled in the other case. The reason why the memory usage looks so similar is that jemalloc doesn't immediately return unused memory to the OS, but rather, when under memory pressure, signals a background thread to start cleaning up memory; because of this delay, memory still gets consumed, but at a slower pace.
   
   Without self-destruct
   
   ![no-self-destruct](https://user-images.githubusercontent.com/327919/96297169-72d9a080-0fbe-11eb-8014-d8fb8990d677.png)
   
   With self-destruct
   ![self-destruct](https://user-images.githubusercontent.com/327919/96297184-766d2780-0fbe-11eb-822b-753b9efb84c2.png)
   
   ```
   import time
   
   import pyarrow
   pyarrow.jemalloc_set_decay_ms(0)
   
   from pyspark.sql import SparkSession
   from pyspark.sql.functions import rand
   
   self_destruct = "true"
   print('self_destruct:', self_destruct)
   spark = SparkSession.builder \
       .master("local") \
       .appName("demo") \
       .config("spark.driver.maxResultSize", "8g") \
       .config("spark.driver.memory", "4g") \
       .config("spark.executor.memory", "512m") \
       .config("spark.worker.memory", "512m") \
       .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
       .config("spark.sql.execution.arrow.pyspark.selfDestruct.enabled", "true") \
       .getOrCreate()
   
   # 6 GiB dataframe. Tweak this to adjust for the amount of RAM you have
   # (target > ~1/2 of free memory). I had ~8 GiB free for this demo.
   # union() generates a dataframe that doesn't take so much memory in Java
   rows = 2 ** 17
   cols = 64
   df = spark.range(0, rows).select(*[rand(seed=i) for i in range(cols)])
   df = df.union(df).union(df).union(df).union(df).union(df)
   df = df.union(df)
   df = df.union(df)
   df = df.union(df)
   
   pdf = df.toPandas()
   
   print('================ MEMORY USAGE:', sum(pdf.memory_usage()) / 2**20, "MiB")
   # Give memory_profiler some more time
   time.sleep(2)
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-712482366


   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/34628/
   Test FAILed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-708765638


   Kubernetes integration test status success
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34372/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-708682102


   **[Test build #129766 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129766/testReport)** for PR 29818 at commit [`96483e6`](https://github.com/apache/spark/commit/96483e628c1b590fe7702b956edb90834fee80fc).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-748200961


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37622/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] BryanCutler commented on a change in pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
BryanCutler commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r519049299



##########
File path: python/pyspark/sql/tests/test_arrow.py
##########
@@ -190,6 +190,13 @@ def test_pandas_round_trip(self):
         pdf_arrow = df.toPandas()
         assert_frame_equal(pdf_arrow, pdf)
 
+    def test_pandas_self_destruct(self):
+        with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):
+            pdf = self.create_pandas_data_frame()

Review comment:
       Well, I was thinking you might be able to use `pyarrow.total_allocated_bytes()` - this is how it's tested in pyarrow https://github.com/apache/arrow/blob/e6eb61f58ef382003c9462924563f575d9a59c13/python/pyarrow/tests/test_pandas.py#L3340. But the table is deallocated after `toPandas()` returns anyway, so you would have to sample before and after `pdf = table.to_pandas(**pandas_options)`..




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] BryanCutler commented on a change in pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
BryanCutler commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r518947536



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1843,6 +1843,16 @@ object SQLConf {
       .version("3.0.0")
       .fallbackConf(ARROW_EXECUTION_ENABLED)
 
+  val ARROW_PYSPARK_SELF_DESTRUCT_ENABLED =
+    buildConf("spark.sql.execution.arrow.pyspark.selfDestruct.enabled")
+      .doc("When true, make use of Apache Arrow's self-destruct option " +
+        "for columnar data transfers in PySpark. " +
+        "This reduces memory usage at the cost of some CPU time. " +
+        "This optimization applies to: pyspark.sql.DataFrame.toPandas")

Review comment:
       Can you also add that the `split_blocks` will be enabled too, and make it clear that is is for the conversion of arrow to pandas?

##########
File path: python/pyspark/sql/pandas/serializers.py
##########
@@ -51,7 +55,20 @@ def load_stream(self, stream):
         """
         # load the batches
         for batch in self.serializer.load_stream(stream):
-            yield batch
+            if self.split_batches:
+                import pyarrow as pa
+                # When spark.sql.execution.arrow.pyspark.selfDestruct.enabled, ensure
+                # each column in each record batch is contained in its own allocation.
+                # Otherwise, selfDestruct does nothing; it frees each column as its
+                # converted, but each column will actually be a list of slices of record
+                # batches, and so no memory is actually freed until all columns are
+                # converted.
+                split_batch = pa.RecordBatch.from_arrays([
+                    pa.concat_arrays([array]) for array in batch

Review comment:
       I'm not crazy about using `pa.concat_arrays()` to force a copy. I don't think it's safe to assume it will always make a copy, it might end up producing a chunked array in the future with zero copy, as is done in `pa.concat_tables()`. Is there another, more direct way we can make copies?

##########
File path: python/pyspark/sql/tests/test_arrow.py
##########
@@ -190,6 +190,13 @@ def test_pandas_round_trip(self):
         pdf_arrow = df.toPandas()
         assert_frame_equal(pdf_arrow, pdf)
 
+    def test_pandas_self_destruct(self):
+        with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):
+            pdf = self.create_pandas_data_frame()

Review comment:
       This is a pretty minimal dataframe, mostly for checking type compatibility. It would be nice to also check against a larger dataframe that the memory usage doesn't double during conversion, but I'm not sure how to do that reliably.

##########
File path: python/pyspark/sql/pandas/serializers.py
##########
@@ -51,7 +55,20 @@ def load_stream(self, stream):
         """
         # load the batches
         for batch in self.serializer.load_stream(stream):
-            yield batch
+            if self.split_batches:

Review comment:
       This doesn't need to be done in the serializer, could you move it to `_collect_as_arrow` and leave the serializer as it was?

##########
File path: python/pyspark/sql/pandas/conversion.py
##########
@@ -232,8 +252,9 @@ def _collect_as_arrow(self):
             port, auth_secret, jsocket_auth_server = self._jdf.collectAsArrowToPython()
 
         # Collect list of un-ordered batches where last element is a list of correct order indices
+        serializer = ArrowCollectSerializer(split_batches=split_batches)
         try:
-            results = list(_load_from_socket((port, auth_secret), ArrowCollectSerializer()))
+            results = list(_load_from_socket((port, auth_secret), serializer))

Review comment:
       you should be able to split the batches here, as they are yielded from the socket




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-712517745


   **[Test build #130015 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/130015/testReport)** for PR 29818 at commit [`1b875c1`](https://github.com/apache/spark/commit/1b875c19e3c6318fcb26c1ea62b5397d6e75d1f8).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-731441806


   **[Test build #131450 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/131450/testReport)** for PR 29818 at commit [`e29c994`](https://github.com/apache/spark/commit/e29c994b901ba5f7ba7650b2079c12afcbf7fb55).
    * This patch **fails Python style tests**.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-708704146






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-712482362


   Merged build finished. Test FAILed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-710712527


   **[Test build #129910 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129910/testReport)** for PR 29818 at commit [`4fef9d9`](https://github.com/apache/spark/commit/4fef9d9211faa3ac0d8280e5b8818b3301b35647).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-776347207


   **[Test build #135086 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/135086/testReport)** for PR 29818 at commit [`64d0301`](https://github.com/apache/spark/commit/64d03012616c9bc56b97693d1fdf8132493deb0e).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] BryanCutler commented on a change in pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
BryanCutler commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r532996042



##########
File path: python/pyspark/sql/tests/test_arrow.py
##########
@@ -191,6 +191,32 @@ def test_pandas_round_trip(self):
         pdf_arrow = df.toPandas()
         assert_frame_equal(pdf_arrow, pdf)
 
+    def test_pandas_self_destruct(self):
+        import pyarrow as pa
+        rows = 2 ** 16
+        cols = 8
+        df = self.spark.range(0, rows).select(*[rand() for _ in range(cols)])
+        expected_bytes = rows * cols * 8
+        with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):
+            # We hold on to the table reference here, so if self destruct didn't work, then
+            # there would be 2 copies of the data (one in Arrow, one in Pandas), both
+            # tracked by the Arrow allocator

Review comment:
       Oh, I missed this comment that you need the Table reference. Otherwise it could be freed and then we lose track of memory allocated right? If that's the case, what about just calling `_collect_as_arrow(_force_split_batches=True)` here and perform batches -> table -> pandas directly?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-769503650


   **[Test build #134628 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/134628/testReport)** for PR 29818 at commit [`64d0301`](https://github.com/apache/spark/commit/64d03012616c9bc56b97693d1fdf8132493deb0e).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-731503435






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-731478538






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-731440798


   **[Test build #131450 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/131450/testReport)** for PR 29818 at commit [`e29c994`](https://github.com/apache/spark/commit/e29c994b901ba5f7ba7650b2079c12afcbf7fb55).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] BryanCutler commented on a change in pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
BryanCutler commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r562061643



##########
File path: python/pyspark/sql/tests/test_arrow.py
##########
@@ -191,6 +191,36 @@ def test_pandas_round_trip(self):
         pdf_arrow = df.toPandas()
         assert_frame_equal(pdf_arrow, pdf)
 
+    def test_pandas_self_destruct(self):
+        import pyarrow as pa
+        rows = 2 ** 4

Review comment:
       we should test memory usage with more rows than this

##########
File path: python/pyspark/sql/pandas/conversion.py
##########
@@ -121,6 +137,8 @@ def toPandas(self):
                             elif isinstance(field.dataType, MapType):
                                 pdf[field.name] = \
                                     _convert_map_items_to_dict(pdf[field.name])
+                        # Return both the DataFrame and table for unit testing
+                        # Note that table reference is invalid if self_destruct enabled

Review comment:
       This can be removed now

##########
File path: python/pyspark/sql/pandas/conversion.py
##########
@@ -240,7 +263,27 @@ def _collect_as_arrow(self):
 
         # Collect list of un-ordered batches where last element is a list of correct order indices
         try:
-            results = list(_load_from_socket((port, auth_secret), ArrowCollectSerializer()))
+            batch_stream = _load_from_socket((port, auth_secret), ArrowCollectSerializer())
+            if split_batches:
+                # When spark.sql.execution.arrow.pyspark.selfDestruct.enabled, ensure
+                # each column in each record batch is contained in its own allocation.
+                # Otherwise, selfDestruct does nothing; it frees each column as its
+                # converted, but each column will actually be a list of slices of record
+                # batches, and so no memory is actually freed until all columns are
+                # converted.
+                import pyarrow as pa
+                results = []
+                for batch_or_indices in batch_stream:
+                    if not isinstance(batch_or_indices, pa.RecordBatch):
+                        results.append(batch_or_indices)
+                    else:
+                        split_batch = pa.RecordBatch.from_arrays([
+                            pa.concat_arrays([array])

Review comment:
       could you make a note that this is to reallocate the array?

##########
File path: python/pyspark/sql/tests/test_arrow.py
##########
@@ -191,6 +191,36 @@ def test_pandas_round_trip(self):
         pdf_arrow = df.toPandas()
         assert_frame_equal(pdf_arrow, pdf)
 
+    def test_pandas_self_destruct(self):
+        import pyarrow as pa
+        rows = 2 ** 4
+        cols = 4
+        expected_bytes = rows * cols * 8
+        df = self.spark.range(0, rows).select(*[rand() for _ in range(cols)])
+        # Test the self_destruct behavior by testing _collect_as_arrow directly
+        # Note the sql_conf flags here don't have any real effect as
+        # we aren't going through df.toPandas
+        with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):
+            allocation_before = pa.total_allocated_bytes()
+            batches = df._collect_as_arrow(split_batches=True)
+            table = pa.Table.from_batches(batches)
+            del batches
+            pdf_split = table.to_pandas(self_destruct=True, split_blocks=True, use_threads=False)
+            allocation_after = pa.total_allocated_bytes()
+            difference = allocation_after - allocation_before
+            # Should be around 1x the data size (table should not hold on to any memory)
+            self.assertGreaterEqual(difference, 0.9 * expected_bytes)
+            self.assertLessEqual(difference, 1.1 * expected_bytes)
+        with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": False}):

Review comment:
       This block isn't quite what I was suggesting. We should compare the Pandas DataFrame resulting from `df.toPandas()` with `spark.sql.execution.arrow.pyspark.selfDestruct.enabled` False to `pdf_split`. Also, we will need to call `df.toPandas()` with `spark.sql.execution.arrow.pyspark.selfDestruct.enabled` True and compare that as well.

##########
File path: python/pyspark/sql/pandas/conversion.py
##########
@@ -240,7 +263,27 @@ def _collect_as_arrow(self):
 
         # Collect list of un-ordered batches where last element is a list of correct order indices
         try:
-            results = list(_load_from_socket((port, auth_secret), ArrowCollectSerializer()))
+            batch_stream = _load_from_socket((port, auth_secret), ArrowCollectSerializer())
+            if split_batches:
+                # When spark.sql.execution.arrow.pyspark.selfDestruct.enabled, ensure
+                # each column in each record batch is contained in its own allocation.
+                # Otherwise, selfDestruct does nothing; it frees each column as its
+                # converted, but each column will actually be a list of slices of record
+                # batches, and so no memory is actually freed until all columns are
+                # converted.
+                import pyarrow as pa
+                results = []
+                for batch_or_indices in batch_stream:
+                    if not isinstance(batch_or_indices, pa.RecordBatch):
+                        results.append(batch_or_indices)
+                    else:
+                        split_batch = pa.RecordBatch.from_arrays([
+                            pa.concat_arrays([array])
+                            for array in batch_or_indices
+                        ], schema=batch_or_indices.schema)
+                        results.append(split_batch)

Review comment:
       ```suggestion
                       if isinstance(batch_or_indices, pa.RecordBatch):
                           batch_or_indices = pa.RecordBatch.from_arrays([
                               pa.concat_arrays([array])
                               for array in batch_or_indices
                           ], schema=batch_or_indices.schema)
                       results.append(batch_or_indices)
   ```

##########
File path: python/pyspark/sql/tests/test_arrow.py
##########
@@ -191,6 +191,36 @@ def test_pandas_round_trip(self):
         pdf_arrow = df.toPandas()
         assert_frame_equal(pdf_arrow, pdf)
 
+    def test_pandas_self_destruct(self):
+        import pyarrow as pa
+        rows = 2 ** 4
+        cols = 4
+        expected_bytes = rows * cols * 8
+        df = self.spark.range(0, rows).select(*[rand() for _ in range(cols)])
+        # Test the self_destruct behavior by testing _collect_as_arrow directly
+        # Note the sql_conf flags here don't have any real effect as
+        # we aren't going through df.toPandas
+        with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):

Review comment:
       I don't think this conf has any affect with the enclosing code block




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-748213603


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/37622/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] lidavidm commented on a change in pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r544710042



##########
File path: python/pyspark/sql/tests/test_arrow.py
##########
@@ -191,6 +191,32 @@ def test_pandas_round_trip(self):
         pdf_arrow = df.toPandas()
         assert_frame_equal(pdf_arrow, pdf)
 
+    def test_pandas_self_destruct(self):
+        import pyarrow as pa
+        rows = 2 ** 16
+        cols = 8
+        df = self.spark.range(0, rows).select(*[rand() for _ in range(cols)])
+        expected_bytes = rows * cols * 8
+        with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):
+            # We hold on to the table reference here, so if self destruct didn't work, then
+            # there would be 2 copies of the data (one in Arrow, one in Pandas), both
+            # tracked by the Arrow allocator

Review comment:
       Makes sense to me - I'll try to update this soon. Thanks for being patient with me here.
   
   Also looks like the 3.1 branch was cut already, so I need to update the flags here to reference 3.2.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] BryanCutler commented on pull request #29818: [SPARK-32953][PYTHON][SQL] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
BryanCutler commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-776901480


   Merged to master, apologies for all of the delays and thanks @lidavidm for all the work! We should add this to the documentation too, if you could do that it would be great.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-748299143


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/133023/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-708765717






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-712518347






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-708797026






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] lidavidm commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-716253285


   @HyukjinKwon hmm, it should also work there, but I can confirm (I haven't looked at that codepath/can measure the memory usage there too). It will only work one way though (Spark -> Pandas), this optimization doesn't apply the other way (Pandas -> Spark), so maybe not as much of a benefit there.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] lidavidm commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-748174791


   Hmm, I'm not sure what's with javadoc generation failing on all the CI tests...


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-748213603


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/37622/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] lidavidm commented on a change in pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r519049758



##########
File path: python/pyspark/sql/tests/test_arrow.py
##########
@@ -190,6 +190,13 @@ def test_pandas_round_trip(self):
         pdf_arrow = df.toPandas()
         assert_frame_equal(pdf_arrow, pdf)
 
+    def test_pandas_self_destruct(self):
+        with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):
+            pdf = self.create_pandas_data_frame()

Review comment:
       Hmm, is that test quite right? self_destruct doesn't change the end memory consumption, but rather the peak consumption...which you can't quite reliably measure from Python (via a separate thread, perhaps?).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-696132518


   cc @WeichenXu123 who already took a close look.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-712518347






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-712548936


   **[Test build #130021 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/130021/testReport)** for PR 29818 at commit [`c114166`](https://github.com/apache/spark/commit/c114166afb682081f99b8893bce60bbd38560b3e).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-712549555






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] lidavidm commented on a change in pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r522943058



##########
File path: python/pyspark/sql/pandas/serializers.py
##########
@@ -51,7 +55,20 @@ def load_stream(self, stream):
         """
         # load the batches
         for batch in self.serializer.load_stream(stream):
-            yield batch
+            if self.split_batches:
+                import pyarrow as pa
+                # When spark.sql.execution.arrow.pyspark.selfDestruct.enabled, ensure
+                # each column in each record batch is contained in its own allocation.
+                # Otherwise, selfDestruct does nothing; it frees each column as its
+                # converted, but each column will actually be a list of slices of record
+                # batches, and so no memory is actually freed until all columns are
+                # converted.
+                split_batch = pa.RecordBatch.from_arrays([
+                    pa.concat_arrays([array]) for array in batch

Review comment:
       I don't see an explicit way to copy. The take/filter kernels might suffice but would require constructing auxiliary arrays; we could also try copying buffers directly. Would it be better perhaps to defer this and work on a explicit copy in Arrow, or a RecordBatchReader that doesn't require copies?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] lidavidm commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-710181028


   I can't reply to this inline somehow, but
   
   > This looks a little strange to me too. Is concat_arrays actually doing anything here, and if so, wouldn't it do it for the case that selfDestruct is False too?
   
   1) Yes, it's copying each column in the record batch into its own allocation, as explained in the latest commit. I couldn't find a more explicit way in Arrow to copy a column out of a record batch.
   2) Yes, I can try to thread through a parameter, though I think it's harmless in this case.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-731441816


   Merged build finished. Test FAILed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] BryanCutler commented on a change in pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
BryanCutler commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r562120603



##########
File path: python/pyspark/sql/tests/test_arrow.py
##########
@@ -191,6 +191,36 @@ def test_pandas_round_trip(self):
         pdf_arrow = df.toPandas()
         assert_frame_equal(pdf_arrow, pdf)
 
+    def test_pandas_self_destruct(self):
+        import pyarrow as pa
+        rows = 2 ** 4

Review comment:
       how about  `2 ** 10` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-776391729


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/39668/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-731441816






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] lidavidm commented on a change in pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r506588271



##########
File path: python/pyspark/sql/pandas/conversion.py
##########
@@ -103,10 +103,22 @@ def toPandas(self):
                     batches = self.toDF(*tmp_column_names)._collect_as_arrow()
                     if len(batches) > 0:
                         table = pyarrow.Table.from_batches(batches)
+                        del batches
                         # Pandas DataFrame created from PyArrow uses datetime64[ns] for date type
                         # values, but we should use datetime.date to match the behavior with when
                         # Arrow optimization is disabled.
-                        pdf = table.to_pandas(date_as_object=True)
+                        pandas_options = {'date_as_object': True}
+                        if selfDestruct:
+                            # Configure PyArrow to use as little memory as possible:
+                            # self_destruct - free columns as they are converted
+                            # split_blocks - create a separate Pandas block for each column
+                            # use_threads - convert one column at a time
+                            pandas_options.update({
+                                'self_destruct': True,
+                                'split_blocks': True,

Review comment:
       Not quite _necessary_ but good to have, else we may end up allocating a large block if there are a lot of columns of the same type, defeating the point. Pandas may reconsolidate later but that's part of the issue of using Pandas.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r504401223



##########
File path: python/pyspark/sql/pandas/serializers.py
##########
@@ -90,7 +90,10 @@ def load_stream(self, stream):
         import pyarrow as pa
         reader = pa.ipc.open_stream(stream)
         for batch in reader:
-            yield batch
+            split_batch = pa.RecordBatch.from_arrays([
+                pa.concat_arrays([array]) for array in batch
+            ], schema=batch.schema)

Review comment:
       Sorry for asking a question without taking a close look but would you mind elaborating why we should do this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-731470752






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-708696114


   **[Test build #129766 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129766/testReport)** for PR 29818 at commit [`96483e6`](https://github.com/apache/spark/commit/96483e628c1b590fe7702b956edb90834fee80fc).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-776451202


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/135086/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-748154212


   **[Test build #133023 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/133023/testReport)** for PR 29818 at commit [`0d9c88a`](https://github.com/apache/spark/commit/0d9c88a12a26185012d6086a656a566314d18847).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-712398796


   **[Test build #130015 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/130015/testReport)** for PR 29818 at commit [`1b875c1`](https://github.com/apache/spark/commit/1b875c19e3c6318fcb26c1ea62b5397d6e75d1f8).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-716254744


   Ah, that's fine. That one, we can just do it separately. We might have to use self-destruct at `ArrowStreamPandasSerializer.arrow_to_pandas` to make it working but sure there wouldn't be so much benefits. Let's. do it separately anyway- should be off-topic. I was just curious :-).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r507050339



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1843,6 +1843,16 @@ object SQLConf {
       .version("3.0.0")
       .fallbackConf(ARROW_EXECUTION_ENABLED)
 
+  val ARROW_PYSPARK_SELF_DESTRUCT_ENABLED =
+    buildConf("spark.sql.execution.arrow.pyspark.selfDestruct.enabled")
+      .doc("When true, make use of Apache Arrow's self-destruct option " +
+        "for columnar data transfers in PySpark. " +
+        "This reduces memory usage at the cost of some CPU time. " +
+        "This optimization applies to: pyspark.sql.DataFrame.toPandas")
+      .version("3.0.0")

Review comment:
       Let's change it to 3.1.0.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-708703873


   **[Test build #129768 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129768/testReport)** for PR 29818 at commit [`a6a189f`](https://github.com/apache/spark/commit/a6a189f869c93d9ce1dfed153b558e3e73c576a3).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-696116785


   Can one of the admins verify this patch?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] BryanCutler commented on a change in pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
BryanCutler commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r562061643



##########
File path: python/pyspark/sql/tests/test_arrow.py
##########
@@ -191,6 +191,36 @@ def test_pandas_round_trip(self):
         pdf_arrow = df.toPandas()
         assert_frame_equal(pdf_arrow, pdf)
 
+    def test_pandas_self_destruct(self):
+        import pyarrow as pa
+        rows = 2 ** 4

Review comment:
       we should test memory usage with more rows than this

##########
File path: python/pyspark/sql/pandas/conversion.py
##########
@@ -121,6 +137,8 @@ def toPandas(self):
                             elif isinstance(field.dataType, MapType):
                                 pdf[field.name] = \
                                     _convert_map_items_to_dict(pdf[field.name])
+                        # Return both the DataFrame and table for unit testing
+                        # Note that table reference is invalid if self_destruct enabled

Review comment:
       This can be removed now

##########
File path: python/pyspark/sql/pandas/conversion.py
##########
@@ -240,7 +263,27 @@ def _collect_as_arrow(self):
 
         # Collect list of un-ordered batches where last element is a list of correct order indices
         try:
-            results = list(_load_from_socket((port, auth_secret), ArrowCollectSerializer()))
+            batch_stream = _load_from_socket((port, auth_secret), ArrowCollectSerializer())
+            if split_batches:
+                # When spark.sql.execution.arrow.pyspark.selfDestruct.enabled, ensure
+                # each column in each record batch is contained in its own allocation.
+                # Otherwise, selfDestruct does nothing; it frees each column as its
+                # converted, but each column will actually be a list of slices of record
+                # batches, and so no memory is actually freed until all columns are
+                # converted.
+                import pyarrow as pa
+                results = []
+                for batch_or_indices in batch_stream:
+                    if not isinstance(batch_or_indices, pa.RecordBatch):
+                        results.append(batch_or_indices)
+                    else:
+                        split_batch = pa.RecordBatch.from_arrays([
+                            pa.concat_arrays([array])

Review comment:
       could you make a note that this is to reallocate the array?

##########
File path: python/pyspark/sql/tests/test_arrow.py
##########
@@ -191,6 +191,36 @@ def test_pandas_round_trip(self):
         pdf_arrow = df.toPandas()
         assert_frame_equal(pdf_arrow, pdf)
 
+    def test_pandas_self_destruct(self):
+        import pyarrow as pa
+        rows = 2 ** 4
+        cols = 4
+        expected_bytes = rows * cols * 8
+        df = self.spark.range(0, rows).select(*[rand() for _ in range(cols)])
+        # Test the self_destruct behavior by testing _collect_as_arrow directly
+        # Note the sql_conf flags here don't have any real effect as
+        # we aren't going through df.toPandas
+        with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):
+            allocation_before = pa.total_allocated_bytes()
+            batches = df._collect_as_arrow(split_batches=True)
+            table = pa.Table.from_batches(batches)
+            del batches
+            pdf_split = table.to_pandas(self_destruct=True, split_blocks=True, use_threads=False)
+            allocation_after = pa.total_allocated_bytes()
+            difference = allocation_after - allocation_before
+            # Should be around 1x the data size (table should not hold on to any memory)
+            self.assertGreaterEqual(difference, 0.9 * expected_bytes)
+            self.assertLessEqual(difference, 1.1 * expected_bytes)
+        with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": False}):

Review comment:
       This block isn't quite what I was suggesting. We should compare the Pandas DataFrame resulting from `df.toPandas()` with `spark.sql.execution.arrow.pyspark.selfDestruct.enabled` False to `pdf_split`. Also, we will need to call `df.toPandas()` with `spark.sql.execution.arrow.pyspark.selfDestruct.enabled` True and compare that as well.

##########
File path: python/pyspark/sql/pandas/conversion.py
##########
@@ -240,7 +263,27 @@ def _collect_as_arrow(self):
 
         # Collect list of un-ordered batches where last element is a list of correct order indices
         try:
-            results = list(_load_from_socket((port, auth_secret), ArrowCollectSerializer()))
+            batch_stream = _load_from_socket((port, auth_secret), ArrowCollectSerializer())
+            if split_batches:
+                # When spark.sql.execution.arrow.pyspark.selfDestruct.enabled, ensure
+                # each column in each record batch is contained in its own allocation.
+                # Otherwise, selfDestruct does nothing; it frees each column as its
+                # converted, but each column will actually be a list of slices of record
+                # batches, and so no memory is actually freed until all columns are
+                # converted.
+                import pyarrow as pa
+                results = []
+                for batch_or_indices in batch_stream:
+                    if not isinstance(batch_or_indices, pa.RecordBatch):
+                        results.append(batch_or_indices)
+                    else:
+                        split_batch = pa.RecordBatch.from_arrays([
+                            pa.concat_arrays([array])
+                            for array in batch_or_indices
+                        ], schema=batch_or_indices.schema)
+                        results.append(split_batch)

Review comment:
       ```suggestion
                       if isinstance(batch_or_indices, pa.RecordBatch):
                           batch_or_indices = pa.RecordBatch.from_arrays([
                               pa.concat_arrays([array])
                               for array in batch_or_indices
                           ], schema=batch_or_indices.schema)
                       results.append(batch_or_indices)
   ```

##########
File path: python/pyspark/sql/tests/test_arrow.py
##########
@@ -191,6 +191,36 @@ def test_pandas_round_trip(self):
         pdf_arrow = df.toPandas()
         assert_frame_equal(pdf_arrow, pdf)
 
+    def test_pandas_self_destruct(self):
+        import pyarrow as pa
+        rows = 2 ** 4
+        cols = 4
+        expected_bytes = rows * cols * 8
+        df = self.spark.range(0, rows).select(*[rand() for _ in range(cols)])
+        # Test the self_destruct behavior by testing _collect_as_arrow directly
+        # Note the sql_conf flags here don't have any real effect as
+        # we aren't going through df.toPandas
+        with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):

Review comment:
       I don't think this conf has any affect with the enclosing code block

##########
File path: python/pyspark/sql/tests/test_arrow.py
##########
@@ -191,6 +191,36 @@ def test_pandas_round_trip(self):
         pdf_arrow = df.toPandas()
         assert_frame_equal(pdf_arrow, pdf)
 
+    def test_pandas_self_destruct(self):
+        import pyarrow as pa
+        rows = 2 ** 4

Review comment:
       how about  `2 ** 10` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-710713322






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-708796983


   Kubernetes integration test status success
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34374/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-712461557


   **[Test build #130021 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/130021/testReport)** for PR 29818 at commit [`c114166`](https://github.com/apache/spark/commit/c114166afb682081f99b8893bce60bbd38560b3e).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-712549555






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-710524710


   Kubernetes integration test status success
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34516/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-776357481


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39668/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-712474248


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34628/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-710488670


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34516/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-731478538






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29818: [WIP][SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-708695639


   **[Test build #129768 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129768/testReport)** for PR 29818 at commit [`a6a189f`](https://github.com/apache/spark/commit/a6a189f869c93d9ce1dfed153b558e3e73c576a3).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org