You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/12/13 05:15:14 UTC
[spark] branch master updated: [SPARK-26355][PYSPARK] Add a
workaround for PyArrow 0.11.
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8edae94 [SPARK-26355][PYSPARK] Add a workaround for PyArrow 0.11.
8edae94 is described below
commit 8edae94fa7ec1a1cc2c69e0924da0da85d4aac83
Author: Takuya UESHIN <ue...@databricks.com>
AuthorDate: Thu Dec 13 13:14:59 2018 +0800
[SPARK-26355][PYSPARK] Add a workaround for PyArrow 0.11.
## What changes were proposed in this pull request?
In PyArrow 0.11, there is a API breaking change.
- [ARROW-1949](https://issues.apache.org/jira/browse/ARROW-1949) - [Python/C++] Add option to Array.from_pandas and pyarrow.array to perform unsafe casts.
This causes test failures in `ScalarPandasUDFTests.test_vectorized_udf_null_(byte|short|int|long)`:
```
File "/Users/ueshin/workspace/apache-spark/spark/python/pyspark/worker.py", line 377, in main
process()
File "/Users/ueshin/workspace/apache-spark/spark/python/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/Users/ueshin/workspace/apache-spark/spark/python/pyspark/serializers.py", line 317, in dump_stream
batch = _create_batch(series, self._timezone)
File "/Users/ueshin/workspace/apache-spark/spark/python/pyspark/serializers.py", line 286, in _create_batch
arrs = [create_array(s, t) for s, t in series]
File "/Users/ueshin/workspace/apache-spark/spark/python/pyspark/serializers.py", line 284, in create_array
return pa.Array.from_pandas(s, mask=mask, type=t)
File "pyarrow/array.pxi", line 474, in pyarrow.lib.Array.from_pandas
return array(obj, mask=mask, type=type, safe=safe, from_pandas=True,
File "pyarrow/array.pxi", line 169, in pyarrow.lib.array
return _ndarray_to_array(values, mask, type, from_pandas, safe,
File "pyarrow/array.pxi", line 69, in pyarrow.lib._ndarray_to_array
check_status(NdarrayToArrow(pool, values, mask, from_pandas,
File "pyarrow/error.pxi", line 81, in pyarrow.lib.check_status
raise ArrowInvalid(message)
ArrowInvalid: Floating point value truncated
```
We should add a workaround to support PyArrow 0.11.
## How was this patch tested?
In my local environment.
Closes #23305 from ueshin/issues/SPARK-26355/pyarrow_0.11.
Authored-by: Takuya UESHIN <ue...@databricks.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
python/pyspark/serializers.py | 5 ++++-
python/pyspark/sql/tests/test_pandas_udf_grouped_map.py | 11 +++++++++--
2 files changed, 13 insertions(+), 3 deletions(-)
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index f3ebd37..fd46952 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -281,7 +281,10 @@ def _create_batch(series, timezone):
# TODO: see ARROW-2432. Remove when the minimum PyArrow version becomes 0.10.0.
return pa.Array.from_pandas(s.apply(
lambda v: decimal.Decimal('NaN') if v is None else v), mask=mask, type=t)
- return pa.Array.from_pandas(s, mask=mask, type=t)
+ elif LooseVersion(pa.__version__) < LooseVersion("0.11.0"):
+ # TODO: see ARROW-1949. Remove when the minimum PyArrow version becomes 0.11.0.
+ return pa.Array.from_pandas(s, mask=mask, type=t)
+ return pa.Array.from_pandas(s, mask=mask, type=t, safe=False)
arrs = [create_array(s, t) for s, t in series]
return pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in xrange(len(arrs))])
diff --git a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py
index bfecc07..a12c608 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py
@@ -468,8 +468,15 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
with QuietTest(self.sc):
with self.assertRaisesRegexp(Exception, "KeyError: 'id'"):
grouped_df.apply(column_name_typo).collect()
- with self.assertRaisesRegexp(Exception, "No cast implemented"):
- grouped_df.apply(invalid_positional_types).collect()
+ from distutils.version import LooseVersion
+ import pyarrow as pa
+ if LooseVersion(pa.__version__) < LooseVersion("0.11.0"):
+ # TODO: see ARROW-1949. Remove when the minimum PyArrow version becomes 0.11.0.
+ with self.assertRaisesRegexp(Exception, "No cast implemented"):
+ grouped_df.apply(invalid_positional_types).collect()
+ else:
+ with self.assertRaisesRegexp(Exception, "an integer is required"):
+ grouped_df.apply(invalid_positional_types).collect()
def test_positional_assignment_conf(self):
import pandas as pd
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org