You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by cu...@apache.org on 2021/02/10 17:59:23 UTC

[spark] branch master updated: [SPARK-32953][PYTHON][SQL] Add Arrow self_destruct support to toPandas

This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b875ce  [SPARK-32953][PYTHON][SQL] Add Arrow self_destruct support to toPandas
9b875ce is described below

commit 9b875ceada60732899053fbd90728b4944d1c03d
Author: David Li <li...@gmail.com>
AuthorDate: Wed Feb 10 09:58:46 2021 -0800

    [SPARK-32953][PYTHON][SQL] Add Arrow self_destruct support to toPandas
    
    ### What changes were proposed in this pull request?
    
    Creating a Pandas dataframe via Apache Arrow currently can use twice as much memory as the final result, because during the conversion, both Pandas and Arrow retain a copy of the data. Arrow has a "self-destruct" mode now (Arrow >= 0.16) to avoid this, by freeing each column after conversion. This PR integrates support for this in toPandas, handling a couple of edge cases:
    
    self_destruct has no effect unless the memory is allocated appropriately, which is handled in the Arrow serializer here. Essentially, the issue is that self_destruct frees memory column-wise, but Arrow record batches are oriented row-wise:
    
    ```
    Record batch 0: allocation 0: column 0 chunk 0, column 1 chunk 0, ...
    Record batch 1: allocation 1: column 0 chunk 1, column 1 chunk 1, ...
    ```
    
    In this scenario, Arrow will drop references to all of column 0's chunks, but no memory will actually be freed, as the chunks were just slices of an underlying allocation. The PR copies each column into its own allocation so that memory is instead arranged as so:
    
    ```
    Record batch 0: allocation 0 column 0 chunk 0, allocation 1 column 1 chunk 0, ...
    Record batch 1: allocation 2 column 0 chunk 1, allocation 3 column 1 chunk 1, ...
    ```
    
    The optimization is disabled by default, and can be enabled with the Spark SQL conf "spark.sql.execution.arrow.pyspark.selfDestruct.enabled" set to "true". We can't always apply this optimization because it's more likely to generate a dataframe with immutable buffers, which Pandas doesn't always handle well, and because it is slower overall (since it only converts one column at a time instead of in parallel).
    
    ### Why are the changes needed?
    
    This lets us load larger datasets - in particular, with N bytes of memory, before we could never load a dataset bigger than N/2 bytes; now the overhead is more like N/1.25 or so.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes - it adds a new SQL conf "spark.sql.execution.arrow.pyspark.selfDestruct.enabled"
    
    ### How was this patch tested?
    
    See the [mailing list](http://apache-spark-developers-list.1001551.n3.nabble.com/DISCUSS-Reducing-memory-usage-of-toPandas-with-Arrow-quot-self-destruct-quot-option-td30149.html) - it was tested with Python memory_profiler. Unit tests added to check memory within certain bounds and correctness with the option enabled.
    
    Closes #29818 from lidavidm/spark-32953.
    
    Authored-by: David Li <li...@gmail.com>
    Signed-off-by: Bryan Cutler <cu...@gmail.com>
---
 python/pyspark/sql/pandas/conversion.py            | 48 ++++++++++++++++++++--
 python/pyspark/sql/tests/test_arrow.py             | 33 ++++++++++++++-
 .../org/apache/spark/sql/internal/SQLConf.scala    | 13 ++++++
 3 files changed, 89 insertions(+), 5 deletions(-)

diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py
index d8a2414..92ef7ce 100644
--- a/python/pyspark/sql/pandas/conversion.py
+++ b/python/pyspark/sql/pandas/conversion.py
@@ -105,13 +105,29 @@ class PandasConversionMixin(object):
                     import pyarrow
                     # Rename columns to avoid duplicated column names.
                     tmp_column_names = ['col_{}'.format(i) for i in range(len(self.columns))]
-                    batches = self.toDF(*tmp_column_names)._collect_as_arrow()
+                    self_destruct = self.sql_ctx._conf.arrowPySparkSelfDestructEnabled()
+                    batches = self.toDF(*tmp_column_names)._collect_as_arrow(
+                        split_batches=self_destruct)
                     if len(batches) > 0:
                         table = pyarrow.Table.from_batches(batches)
+                        # Ensure only the table has a reference to the batches, so that
+                        # self_destruct (if enabled) is effective
+                        del batches
                         # Pandas DataFrame created from PyArrow uses datetime64[ns] for date type
                         # values, but we should use datetime.date to match the behavior with when
                         # Arrow optimization is disabled.
-                        pdf = table.to_pandas(date_as_object=True)
+                        pandas_options = {'date_as_object': True}
+                        if self_destruct:
+                            # Configure PyArrow to use as little memory as possible:
+                            # self_destruct - free columns as they are converted
+                            # split_blocks - create a separate Pandas block for each column
+                            # use_threads - convert one column at a time
+                            pandas_options.update({
+                                'self_destruct': True,
+                                'split_blocks': True,
+                                'use_threads': False,
+                            })
+                        pdf = table.to_pandas(**pandas_options)
                         # Rename back to the original column names.
                         pdf.columns = self.columns
                         for field in self.schema:
@@ -225,11 +241,16 @@ class PandasConversionMixin(object):
         else:
             return None
 
-    def _collect_as_arrow(self):
+    def _collect_as_arrow(self, split_batches=False):
         """
         Returns all records as a list of ArrowRecordBatches, pyarrow must be installed
         and available on driver and worker Python environments.
         This is an experimental feature.
+
+        :param split_batches: split batches such that each column is in its own allocation, so
+            that the selfDestruct optimization is effective; default False.
+
+        .. note:: Experimental.
         """
         from pyspark.sql.dataframe import DataFrame
 
@@ -240,7 +261,26 @@ class PandasConversionMixin(object):
 
         # Collect list of un-ordered batches where last element is a list of correct order indices
         try:
-            results = list(_load_from_socket((port, auth_secret), ArrowCollectSerializer()))
+            batch_stream = _load_from_socket((port, auth_secret), ArrowCollectSerializer())
+            if split_batches:
+                # When spark.sql.execution.arrow.pyspark.selfDestruct.enabled, ensure
+                # each column in each record batch is contained in its own allocation.
+                # Otherwise, selfDestruct does nothing; it frees each column as its
+                # converted, but each column will actually be a list of slices of record
+                # batches, and so no memory is actually freed until all columns are
+                # converted.
+                import pyarrow as pa
+                results = []
+                for batch_or_indices in batch_stream:
+                    if isinstance(batch_or_indices, pa.RecordBatch):
+                        batch_or_indices = pa.RecordBatch.from_arrays([
+                            # This call actually reallocates the array
+                            pa.concat_arrays([array])
+                            for array in batch_or_indices
+                        ], schema=batch_or_indices.schema)
+                    results.append(batch_or_indices)
+            else:
+                results = list(batch_stream)
         finally:
             # Join serving thread and raise any exceptions from collectAsArrowToPython
             jsocket_auth_server.getResult()
diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py
index 938e67f..1843607 100644
--- a/python/pyspark/sql/tests/test_arrow.py
+++ b/python/pyspark/sql/tests/test_arrow.py
@@ -25,7 +25,7 @@ from distutils.version import LooseVersion
 
 from pyspark import SparkContext, SparkConf
 from pyspark.sql import Row, SparkSession
-from pyspark.sql.functions import udf
+from pyspark.sql.functions import rand, udf
 from pyspark.sql.types import StructType, StringType, IntegerType, LongType, \
     FloatType, DoubleType, DecimalType, DateType, TimestampType, BinaryType, StructField, \
     ArrayType, NullType
@@ -196,6 +196,37 @@ class ArrowTests(ReusedSQLTestCase):
         pdf_arrow = df.toPandas()
         assert_frame_equal(pdf_arrow, pdf)
 
+    def test_pandas_self_destruct(self):
+        import pyarrow as pa
+        rows = 2 ** 10
+        cols = 4
+        expected_bytes = rows * cols * 8
+        df = self.spark.range(0, rows).select(*[rand() for _ in range(cols)])
+        # Test the self_destruct behavior by testing _collect_as_arrow directly
+        allocation_before = pa.total_allocated_bytes()
+        batches = df._collect_as_arrow(split_batches=True)
+        table = pa.Table.from_batches(batches)
+        del batches
+        pdf_split = table.to_pandas(self_destruct=True, split_blocks=True, use_threads=False)
+        allocation_after = pa.total_allocated_bytes()
+        difference = allocation_after - allocation_before
+        # Should be around 1x the data size (table should not hold on to any memory)
+        self.assertGreaterEqual(difference, 0.9 * expected_bytes)
+        self.assertLessEqual(difference, 1.1 * expected_bytes)
+
+        with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": False}):
+            no_self_destruct_pdf = df.toPandas()
+            # Note while memory usage is 2x data size here (both table and pdf hold on to
+            # memory), in this case Arrow still only tracks 1x worth of memory (since the
+            # batches are not allocated by Arrow in this case), so we can't make any
+            # assertions here
+
+        with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):
+            self_destruct_pdf = df.toPandas()
+
+        assert_frame_equal(pdf_split, no_self_destruct_pdf)
+        assert_frame_equal(pdf_split, self_destruct_pdf)
+
     def test_filtered_frame(self):
         df = self.spark.range(3).toDF("i")
         pdf = df.filter("i < 0").toPandas()
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 544a062..2865257 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2033,6 +2033,17 @@ object SQLConf {
       .version("3.0.0")
       .fallbackConf(ARROW_EXECUTION_ENABLED)
 
+  val ARROW_PYSPARK_SELF_DESTRUCT_ENABLED =
+    buildConf("spark.sql.execution.arrow.pyspark.selfDestruct.enabled")
+      .doc("When true, make use of Apache Arrow's self-destruct and split-blocks options " +
+        "for columnar data transfers in PySpark, when converting from Arrow to Pandas. " +
+        "This reduces memory usage at the cost of some CPU time. " +
+        "This optimization applies to: pyspark.sql.DataFrame.toPandas " +
+        "when 'spark.sql.execution.arrow.pyspark.enabled' is set.")
+      .version("3.2.0")
+      .booleanConf
+      .createWithDefault(false)
+
   val PYSPARK_JVM_STACKTRACE_ENABLED =
     buildConf("spark.sql.pyspark.jvmStacktrace.enabled")
       .doc("When true, it shows the JVM stacktrace in the user-facing PySpark exception " +
@@ -3609,6 +3620,8 @@ class SQLConf extends Serializable with Logging {
 
   def arrowPySparkEnabled: Boolean = getConf(ARROW_PYSPARK_EXECUTION_ENABLED)
 
+  def arrowPySparkSelfDestructEnabled: Boolean = getConf(ARROW_PYSPARK_SELF_DESTRUCT_ENABLED)
+
   def pysparkJVMStacktraceEnabled: Boolean = getConf(PYSPARK_JVM_STACKTRACE_ENABLED)
 
   def arrowSparkREnabled: Boolean = getConf(ARROW_SPARKR_EXECUTION_ENABLED)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org