You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/12/13 05:15:14 UTC

[spark] branch master updated: [SPARK-26355][PYSPARK] Add a workaround for PyArrow 0.11.

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8edae94  [SPARK-26355][PYSPARK] Add a workaround for PyArrow 0.11.
8edae94 is described below

commit 8edae94fa7ec1a1cc2c69e0924da0da85d4aac83
Author: Takuya UESHIN <ue...@databricks.com>
AuthorDate: Thu Dec 13 13:14:59 2018 +0800

    [SPARK-26355][PYSPARK] Add a workaround for PyArrow 0.11.
    
    ## What changes were proposed in this pull request?
    
    In PyArrow 0.11, there is a API breaking change.
    
    - [ARROW-1949](https://issues.apache.org/jira/browse/ARROW-1949) - [Python/C++] Add option to Array.from_pandas and pyarrow.array to perform unsafe casts.
    
    This causes test failures in `ScalarPandasUDFTests.test_vectorized_udf_null_(byte|short|int|long)`:
    
    ```
      File "/Users/ueshin/workspace/apache-spark/spark/python/pyspark/worker.py", line 377, in main
        process()
      File "/Users/ueshin/workspace/apache-spark/spark/python/pyspark/worker.py", line 372, in process
        serializer.dump_stream(func(split_index, iterator), outfile)
      File "/Users/ueshin/workspace/apache-spark/spark/python/pyspark/serializers.py", line 317, in dump_stream
        batch = _create_batch(series, self._timezone)
      File "/Users/ueshin/workspace/apache-spark/spark/python/pyspark/serializers.py", line 286, in _create_batch
        arrs = [create_array(s, t) for s, t in series]
      File "/Users/ueshin/workspace/apache-spark/spark/python/pyspark/serializers.py", line 284, in create_array
        return pa.Array.from_pandas(s, mask=mask, type=t)
      File "pyarrow/array.pxi", line 474, in pyarrow.lib.Array.from_pandas
        return array(obj, mask=mask, type=type, safe=safe, from_pandas=True,
      File "pyarrow/array.pxi", line 169, in pyarrow.lib.array
        return _ndarray_to_array(values, mask, type, from_pandas, safe,
      File "pyarrow/array.pxi", line 69, in pyarrow.lib._ndarray_to_array
        check_status(NdarrayToArrow(pool, values, mask, from_pandas,
      File "pyarrow/error.pxi", line 81, in pyarrow.lib.check_status
        raise ArrowInvalid(message)
    ArrowInvalid: Floating point value truncated
    ```
    
    We should add a workaround to support PyArrow 0.11.
    
    ## How was this patch tested?
    
    In my local environment.
    
    Closes #23305 from ueshin/issues/SPARK-26355/pyarrow_0.11.
    
    Authored-by: Takuya UESHIN <ue...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 python/pyspark/serializers.py                           |  5 ++++-
 python/pyspark/sql/tests/test_pandas_udf_grouped_map.py | 11 +++++++++--
 2 files changed, 13 insertions(+), 3 deletions(-)

diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index f3ebd37..fd46952 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -281,7 +281,10 @@ def _create_batch(series, timezone):
             # TODO: see ARROW-2432. Remove when the minimum PyArrow version becomes 0.10.0.
             return pa.Array.from_pandas(s.apply(
                 lambda v: decimal.Decimal('NaN') if v is None else v), mask=mask, type=t)
-        return pa.Array.from_pandas(s, mask=mask, type=t)
+        elif LooseVersion(pa.__version__) < LooseVersion("0.11.0"):
+            # TODO: see ARROW-1949. Remove when the minimum PyArrow version becomes 0.11.0.
+            return pa.Array.from_pandas(s, mask=mask, type=t)
+        return pa.Array.from_pandas(s, mask=mask, type=t, safe=False)
 
     arrs = [create_array(s, t) for s, t in series]
     return pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in xrange(len(arrs))])
diff --git a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py
index bfecc07..a12c608 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py
@@ -468,8 +468,15 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
         with QuietTest(self.sc):
             with self.assertRaisesRegexp(Exception, "KeyError: 'id'"):
                 grouped_df.apply(column_name_typo).collect()
-            with self.assertRaisesRegexp(Exception, "No cast implemented"):
-                grouped_df.apply(invalid_positional_types).collect()
+            from distutils.version import LooseVersion
+            import pyarrow as pa
+            if LooseVersion(pa.__version__) < LooseVersion("0.11.0"):
+                # TODO: see ARROW-1949. Remove when the minimum PyArrow version becomes 0.11.0.
+                with self.assertRaisesRegexp(Exception, "No cast implemented"):
+                    grouped_df.apply(invalid_positional_types).collect()
+            else:
+                with self.assertRaisesRegexp(Exception, "an integer is required"):
+                    grouped_df.apply(invalid_positional_types).collect()
 
     def test_positional_assignment_conf(self):
         import pandas as pd


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org