You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/01/21 18:53:01 UTC

[GitHub] [spark] BryanCutler commented on a change in pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

BryanCutler commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r562061643



##########
File path: python/pyspark/sql/tests/test_arrow.py
##########
@@ -191,6 +191,36 @@ def test_pandas_round_trip(self):
         pdf_arrow = df.toPandas()
         assert_frame_equal(pdf_arrow, pdf)
 
+    def test_pandas_self_destruct(self):
+        import pyarrow as pa
+        rows = 2 ** 4

Review comment:
       we should test memory usage with more rows than this

##########
File path: python/pyspark/sql/pandas/conversion.py
##########
@@ -121,6 +137,8 @@ def toPandas(self):
                             elif isinstance(field.dataType, MapType):
                                 pdf[field.name] = \
                                     _convert_map_items_to_dict(pdf[field.name])
+                        # Return both the DataFrame and table for unit testing
+                        # Note that table reference is invalid if self_destruct enabled

Review comment:
       This can be removed now

##########
File path: python/pyspark/sql/pandas/conversion.py
##########
@@ -240,7 +263,27 @@ def _collect_as_arrow(self):
 
         # Collect list of un-ordered batches where last element is a list of correct order indices
         try:
-            results = list(_load_from_socket((port, auth_secret), ArrowCollectSerializer()))
+            batch_stream = _load_from_socket((port, auth_secret), ArrowCollectSerializer())
+            if split_batches:
+                # When spark.sql.execution.arrow.pyspark.selfDestruct.enabled, ensure
+                # each column in each record batch is contained in its own allocation.
+                # Otherwise, selfDestruct does nothing; it frees each column as its
+                # converted, but each column will actually be a list of slices of record
+                # batches, and so no memory is actually freed until all columns are
+                # converted.
+                import pyarrow as pa
+                results = []
+                for batch_or_indices in batch_stream:
+                    if not isinstance(batch_or_indices, pa.RecordBatch):
+                        results.append(batch_or_indices)
+                    else:
+                        split_batch = pa.RecordBatch.from_arrays([
+                            pa.concat_arrays([array])

Review comment:
       could you make a note that this is to reallocate the array?

##########
File path: python/pyspark/sql/tests/test_arrow.py
##########
@@ -191,6 +191,36 @@ def test_pandas_round_trip(self):
         pdf_arrow = df.toPandas()
         assert_frame_equal(pdf_arrow, pdf)
 
+    def test_pandas_self_destruct(self):
+        import pyarrow as pa
+        rows = 2 ** 4
+        cols = 4
+        expected_bytes = rows * cols * 8
+        df = self.spark.range(0, rows).select(*[rand() for _ in range(cols)])
+        # Test the self_destruct behavior by testing _collect_as_arrow directly
+        # Note the sql_conf flags here don't have any real effect as
+        # we aren't going through df.toPandas
+        with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):
+            allocation_before = pa.total_allocated_bytes()
+            batches = df._collect_as_arrow(split_batches=True)
+            table = pa.Table.from_batches(batches)
+            del batches
+            pdf_split = table.to_pandas(self_destruct=True, split_blocks=True, use_threads=False)
+            allocation_after = pa.total_allocated_bytes()
+            difference = allocation_after - allocation_before
+            # Should be around 1x the data size (table should not hold on to any memory)
+            self.assertGreaterEqual(difference, 0.9 * expected_bytes)
+            self.assertLessEqual(difference, 1.1 * expected_bytes)
+        with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": False}):

Review comment:
       This block isn't quite what I was suggesting. We should compare the Pandas DataFrame resulting from `df.toPandas()` with `spark.sql.execution.arrow.pyspark.selfDestruct.enabled` False to `pdf_split`. Also, we will need to call `df.toPandas()` with `spark.sql.execution.arrow.pyspark.selfDestruct.enabled` True and compare that as well.

##########
File path: python/pyspark/sql/pandas/conversion.py
##########
@@ -240,7 +263,27 @@ def _collect_as_arrow(self):
 
         # Collect list of un-ordered batches where last element is a list of correct order indices
         try:
-            results = list(_load_from_socket((port, auth_secret), ArrowCollectSerializer()))
+            batch_stream = _load_from_socket((port, auth_secret), ArrowCollectSerializer())
+            if split_batches:
+                # When spark.sql.execution.arrow.pyspark.selfDestruct.enabled, ensure
+                # each column in each record batch is contained in its own allocation.
+                # Otherwise, selfDestruct does nothing; it frees each column as its
+                # converted, but each column will actually be a list of slices of record
+                # batches, and so no memory is actually freed until all columns are
+                # converted.
+                import pyarrow as pa
+                results = []
+                for batch_or_indices in batch_stream:
+                    if not isinstance(batch_or_indices, pa.RecordBatch):
+                        results.append(batch_or_indices)
+                    else:
+                        split_batch = pa.RecordBatch.from_arrays([
+                            pa.concat_arrays([array])
+                            for array in batch_or_indices
+                        ], schema=batch_or_indices.schema)
+                        results.append(split_batch)

Review comment:
       ```suggestion
                       if isinstance(batch_or_indices, pa.RecordBatch):
                           batch_or_indices = pa.RecordBatch.from_arrays([
                               pa.concat_arrays([array])
                               for array in batch_or_indices
                           ], schema=batch_or_indices.schema)
                       results.append(batch_or_indices)
   ```

##########
File path: python/pyspark/sql/tests/test_arrow.py
##########
@@ -191,6 +191,36 @@ def test_pandas_round_trip(self):
         pdf_arrow = df.toPandas()
         assert_frame_equal(pdf_arrow, pdf)
 
+    def test_pandas_self_destruct(self):
+        import pyarrow as pa
+        rows = 2 ** 4
+        cols = 4
+        expected_bytes = rows * cols * 8
+        df = self.spark.range(0, rows).select(*[rand() for _ in range(cols)])
+        # Test the self_destruct behavior by testing _collect_as_arrow directly
+        # Note the sql_conf flags here don't have any real effect as
+        # we aren't going through df.toPandas
+        with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):

Review comment:
       I don't think this conf has any affect with the enclosing code block




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org