You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2019/04/22 10:31:00 UTC

[spark] branch master updated: [SPARK-27276][PYTHON][SQL] Increase minimum version of pyarrow to 0.12.1 and remove prior workarounds

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d36cce1  [SPARK-27276][PYTHON][SQL] Increase minimum version of pyarrow to 0.12.1 and remove prior workarounds
d36cce1 is described below

commit d36cce18e262dc9cbd687ef42f8b67a62f0a3e22
Author: Bryan Cutler <cu...@gmail.com>
AuthorDate: Mon Apr 22 19:30:31 2019 +0900

    [SPARK-27276][PYTHON][SQL] Increase minimum version of pyarrow to 0.12.1 and remove prior workarounds
    
    ## What changes were proposed in this pull request?
    
    This increases the minimum support version of pyarrow to 0.12.1 and removes workarounds in pyspark to remain compatible with prior versions. This means that users will need to have at least pyarrow 0.12.1 installed and available in the cluster or an `ImportError` will be raised to indicate an upgrade is needed.
    
    ## How was this patch tested?
    
    Existing tests using:
    Python 2.7.15, pyarrow 0.12.1, pandas 0.24.2
    Python 3.6.7, pyarrow 0.12.1, pandas 0.24.0
    
    Closes #24298 from BryanCutler/arrow-bump-min-pyarrow-SPARK-27276.
    
    Authored-by: Bryan Cutler <cu...@gmail.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 python/pyspark/serializers.py                      | 53 ++++++------------
 python/pyspark/sql/dataframe.py                    |  8 +--
 python/pyspark/sql/session.py                      |  7 +--
 python/pyspark/sql/tests/test_arrow.py             | 39 +++-----------
 python/pyspark/sql/tests/test_pandas_udf.py        | 48 ++++++-----------
 .../sql/tests/test_pandas_udf_grouped_map.py       | 28 ++++------
 python/pyspark/sql/tests/test_pandas_udf_scalar.py | 30 +++--------
 python/pyspark/sql/types.py                        | 63 ----------------------
 python/pyspark/sql/utils.py                        |  2 +-
 python/setup.py                                    |  7 +--
 10 files changed, 67 insertions(+), 218 deletions(-)

diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index ed419db..6058e94 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -260,10 +260,14 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
         self._safecheck = safecheck
         self._assign_cols_by_name = assign_cols_by_name
 
-    def arrow_to_pandas(self, arrow_column, data_type):
-        from pyspark.sql.types import _arrow_column_to_pandas, _check_series_localize_timestamps
+    def arrow_to_pandas(self, arrow_column):
+        from pyspark.sql.types import _check_series_localize_timestamps
+
+        # If the given column is a date type column, creates a series of datetime.date directly
+        # instead of creating datetime64[ns] as intermediate data to avoid overflow caused by
+        # datetime64[ns] type handling.
+        s = arrow_column.to_pandas(date_as_object=True)
 
-        s = _arrow_column_to_pandas(arrow_column, data_type)
         s = _check_series_localize_timestamps(s, self._timezone)
         return s
 
@@ -275,8 +279,6 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
         :param series: A single pandas.Series, list of Series, or list of (series, arrow_type)
         :return: Arrow RecordBatch
         """
-        import decimal
-        from distutils.version import LooseVersion
         import pandas as pd
         import pyarrow as pa
         from pyspark.sql.types import _check_series_convert_timestamps_internal
@@ -289,24 +291,10 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
         def create_array(s, t):
             mask = s.isnull()
             # Ensure timestamp series are in expected form for Spark internal representation
-            # TODO: maybe don't need None check anymore as of Arrow 0.9.1
             if t is not None and pa.types.is_timestamp(t):
                 s = _check_series_convert_timestamps_internal(s.fillna(0), self._timezone)
                 # TODO: need cast after Arrow conversion, ns values cause error with pandas 0.19.2
                 return pa.Array.from_pandas(s, mask=mask).cast(t, safe=False)
-            elif t is not None and pa.types.is_string(t) and sys.version < '3':
-                # TODO: need decode before converting to Arrow in Python 2
-                # TODO: don't need as of Arrow 0.9.1
-                return pa.Array.from_pandas(s.apply(
-                    lambda v: v.decode("utf-8") if isinstance(v, str) else v), mask=mask, type=t)
-            elif t is not None and pa.types.is_decimal(t) and \
-                    LooseVersion("0.9.0") <= LooseVersion(pa.__version__) < LooseVersion("0.10.0"):
-                # TODO: see ARROW-2432. Remove when the minimum PyArrow version becomes 0.10.0.
-                return pa.Array.from_pandas(s.apply(
-                    lambda v: decimal.Decimal('NaN') if v is None else v), mask=mask, type=t)
-            elif LooseVersion(pa.__version__) < LooseVersion("0.11.0"):
-                # TODO: see ARROW-1949. Remove when the minimum PyArrow version becomes 0.11.0.
-                return pa.Array.from_pandas(s, mask=mask, type=t)
 
             try:
                 array = pa.Array.from_pandas(s, mask=mask, type=t, safe=self._safecheck)
@@ -340,12 +328,7 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
                                   for i, field in enumerate(t)]
 
                 struct_arrs, struct_names = zip(*arrs_names)
-
-                # TODO: from_arrays args switched for v0.9.0, remove when bump min pyarrow version
-                if LooseVersion(pa.__version__) < LooseVersion("0.9.0"):
-                    arrs.append(pa.StructArray.from_arrays(struct_names, struct_arrs))
-                else:
-                    arrs.append(pa.StructArray.from_arrays(struct_arrs, struct_names))
+                arrs.append(pa.StructArray.from_arrays(struct_arrs, struct_names))
             else:
                 arrs.append(create_array(s, t))
 
@@ -365,10 +348,8 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
         """
         batches = super(ArrowStreamPandasSerializer, self).load_stream(stream)
         import pyarrow as pa
-        from pyspark.sql.types import from_arrow_type
         for batch in batches:
-            yield [self.arrow_to_pandas(c, from_arrow_type(c.type))
-                   for c in pa.Table.from_batches([batch]).itercolumns()]
+            yield [self.arrow_to_pandas(c) for c in pa.Table.from_batches([batch]).itercolumns()]
 
     def __repr__(self):
         return "ArrowStreamPandasSerializer"
@@ -384,17 +365,17 @@ class ArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer):
             .__init__(timezone, safecheck, assign_cols_by_name)
         self._df_for_struct = df_for_struct
 
-    def arrow_to_pandas(self, arrow_column, data_type):
-        from pyspark.sql.types import StructType, \
-            _arrow_column_to_pandas, _check_dataframe_localize_timestamps
+    def arrow_to_pandas(self, arrow_column):
+        import pyarrow.types as types
 
-        if self._df_for_struct and type(data_type) == StructType:
+        if self._df_for_struct and types.is_struct(arrow_column.type):
             import pandas as pd
-            series = [_arrow_column_to_pandas(column, field.dataType).rename(field.name)
-                      for column, field in zip(arrow_column.flatten(), data_type)]
-            s = _check_dataframe_localize_timestamps(pd.concat(series, axis=1), self._timezone)
+            series = [super(ArrowStreamPandasUDFSerializer, self).arrow_to_pandas(column)
+                      .rename(field.name)
+                      for column, field in zip(arrow_column.flatten(), arrow_column.type)]
+            s = pd.concat(series, axis=1)
         else:
-            s = super(ArrowStreamPandasUDFSerializer, self).arrow_to_pandas(arrow_column, data_type)
+            s = super(ArrowStreamPandasUDFSerializer, self).arrow_to_pandas(arrow_column)
         return s
 
     def dump_stream(self, iterator, stream):
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 659cbc4..f8aeb62 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -2138,13 +2138,15 @@ class DataFrame(object):
             # of PyArrow is found, if 'spark.sql.execution.arrow.enabled' is enabled.
             if use_arrow:
                 try:
-                    from pyspark.sql.types import _arrow_table_to_pandas, \
-                        _check_dataframe_localize_timestamps
+                    from pyspark.sql.types import _check_dataframe_localize_timestamps
                     import pyarrow
                     batches = self._collectAsArrow()
                     if len(batches) > 0:
                         table = pyarrow.Table.from_batches(batches)
-                        pdf = _arrow_table_to_pandas(table, self.schema)
+                        # Pandas DataFrame created from PyArrow uses datetime64[ns] for date type
+                        # values, but we should use datetime.date to match the behavior with when
+                        # Arrow optimization is disabled.
+                        pdf = table.to_pandas(date_as_object=True)
                         return _check_dataframe_localize_timestamps(pdf, timezone)
                     else:
                         return pd.DataFrame.from_records([], columns=self.columns)
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index b11e0f3..52e9037 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -530,7 +530,6 @@ class SparkSession(object):
         to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the
         data types will be used to coerce the data in Pandas to Arrow conversion.
         """
-        from distutils.version import LooseVersion
         from pyspark.serializers import ArrowStreamPandasSerializer
         from pyspark.sql.types import from_arrow_type, to_arrow_type, TimestampType
         from pyspark.sql.utils import require_minimum_pandas_version, \
@@ -544,11 +543,7 @@ class SparkSession(object):
 
         # Create the Spark schema from list of names passed in with Arrow types
         if isinstance(schema, (list, tuple)):
-            if LooseVersion(pa.__version__) < LooseVersion("0.12.0"):
-                temp_batch = pa.RecordBatch.from_pandas(pdf[0:100], preserve_index=False)
-                arrow_schema = temp_batch.schema
-            else:
-                arrow_schema = pa.Schema.from_pandas(pdf, preserve_index=False)
+            arrow_schema = pa.Schema.from_pandas(pdf, preserve_index=False)
             struct = StructType()
             for name, field in zip(schema, arrow_schema):
                 struct.add(name, from_arrow_type(field.type), nullable=field.nullable)
diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py
index a45c3fb..22578cb 100644
--- a/python/pyspark/sql/tests/test_arrow.py
+++ b/python/pyspark/sql/tests/test_arrow.py
@@ -46,7 +46,6 @@ class ArrowTests(ReusedSQLTestCase):
     def setUpClass(cls):
         from datetime import date, datetime
         from decimal import Decimal
-        from distutils.version import LooseVersion
         super(ArrowTests, cls).setUpClass()
         cls.warnings_lock = threading.Lock()
 
@@ -68,23 +67,16 @@ class ArrowTests(ReusedSQLTestCase):
             StructField("5_double_t", DoubleType(), True),
             StructField("6_decimal_t", DecimalType(38, 18), True),
             StructField("7_date_t", DateType(), True),
-            StructField("8_timestamp_t", TimestampType(), True)])
+            StructField("8_timestamp_t", TimestampType(), True),
+            StructField("9_binary_t", BinaryType(), True)])
         cls.data = [(u"a", 1, 10, 0.2, 2.0, Decimal("2.0"),
-                     date(1969, 1, 1), datetime(1969, 1, 1, 1, 1, 1)),
+                     date(1969, 1, 1), datetime(1969, 1, 1, 1, 1, 1), bytearray(b"a")),
                     (u"b", 2, 20, 0.4, 4.0, Decimal("4.0"),
-                     date(2012, 2, 2), datetime(2012, 2, 2, 2, 2, 2)),
+                     date(2012, 2, 2), datetime(2012, 2, 2, 2, 2, 2), bytearray(b"bb")),
                     (u"c", 3, 30, 0.8, 6.0, Decimal("6.0"),
-                     date(2100, 3, 3), datetime(2100, 3, 3, 3, 3, 3)),
+                     date(2100, 3, 3), datetime(2100, 3, 3, 3, 3, 3), bytearray(b"ccc")),
                     (u"d", 4, 40, 1.0, 8.0, Decimal("8.0"),
-                     date(2262, 4, 12), datetime(2262, 3, 3, 3, 3, 3))]
-
-        # TODO: remove version check once minimum pyarrow version is 0.10.0
-        if LooseVersion("0.10.0") <= LooseVersion(pa.__version__):
-            cls.schema.add(StructField("9_binary_t", BinaryType(), True))
-            cls.data[0] = cls.data[0] + (bytearray(b"a"),)
-            cls.data[1] = cls.data[1] + (bytearray(b"bb"),)
-            cls.data[2] = cls.data[2] + (bytearray(b"ccc"),)
-            cls.data[3] = cls.data[3] + (bytearray(b"dddd"),)
+                     date(2262, 4, 12), datetime(2262, 3, 3, 3, 3, 3), bytearray(b"dddd"))]
 
     @classmethod
     def tearDownClass(cls):
@@ -123,8 +115,6 @@ class ArrowTests(ReusedSQLTestCase):
                         assert_frame_equal(pdf, pd.DataFrame({u'map': [{u'a': 1}]}))
 
     def test_toPandas_fallback_disabled(self):
-        from distutils.version import LooseVersion
-
         schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)])
         df = self.spark.createDataFrame([(None,)], schema=schema)
         with QuietTest(self.sc):
@@ -132,14 +122,6 @@ class ArrowTests(ReusedSQLTestCase):
                 with self.assertRaisesRegexp(Exception, 'Unsupported type'):
                     df.toPandas()
 
-        # TODO: remove BinaryType check once minimum pyarrow version is 0.10.0
-        if LooseVersion(pa.__version__) < LooseVersion("0.10.0"):
-            schema = StructType([StructField("binary", BinaryType(), True)])
-            df = self.spark.createDataFrame([(None,)], schema=schema)
-            with QuietTest(self.sc):
-                with self.assertRaisesRegexp(Exception, 'Unsupported type.*BinaryType'):
-                    df.toPandas()
-
     def test_null_conversion(self):
         df_null = self.spark.createDataFrame([tuple([None for _ in range(len(self.data[0]))])] +
                                              self.data)
@@ -348,20 +330,11 @@ class ArrowTests(ReusedSQLTestCase):
                     self.assertEqual(df.collect(), [Row(a={u'a': 1})])
 
     def test_createDataFrame_fallback_disabled(self):
-        from distutils.version import LooseVersion
-
         with QuietTest(self.sc):
             with self.assertRaisesRegexp(TypeError, 'Unsupported type'):
                 self.spark.createDataFrame(
                     pd.DataFrame([[{u'a': 1}]]), "a: map<string, int>")
 
-        # TODO: remove BinaryType check once minimum pyarrow version is 0.10.0
-        if LooseVersion(pa.__version__) < LooseVersion("0.10.0"):
-            with QuietTest(self.sc):
-                with self.assertRaisesRegexp(TypeError, 'Unsupported type.*BinaryType'):
-                    self.spark.createDataFrame(
-                        pd.DataFrame([[{'a': b'aaa'}]]), "a: binary")
-
     # Regression test for SPARK-23314
     def test_timestamp_dst(self):
         # Daylight saving time for Los Angeles for 2015 is Sun, Nov 1 at 2:00 am
diff --git a/python/pyspark/sql/tests/test_pandas_udf.py b/python/pyspark/sql/tests/test_pandas_udf.py
index fd6d4e1..72dc05b 100644
--- a/python/pyspark/sql/tests/test_pandas_udf.py
+++ b/python/pyspark/sql/tests/test_pandas_udf.py
@@ -198,10 +198,8 @@ class PandasUDFTests(ReusedSQLTestCase):
         )
 
     def test_pandas_udf_detect_unsafe_type_conversion(self):
-        from distutils.version import LooseVersion
         import pandas as pd
         import numpy as np
-        import pyarrow as pa
 
         values = [1.0] * 3
         pdf = pd.DataFrame({'A': values})
@@ -209,15 +207,14 @@ class PandasUDFTests(ReusedSQLTestCase):
 
         @pandas_udf(returnType="int")
         def udf(column):
-            return pd.Series(np.linspace(0, 1, 3))
+            return pd.Series(np.linspace(0, 1, len(column)))
 
         # Since 0.11.0, PyArrow supports the feature to raise an error for unsafe cast.
-        if LooseVersion(pa.__version__) >= LooseVersion("0.11.0"):
-            with self.sql_conf({
-                    "spark.sql.execution.pandas.arrowSafeTypeConversion": True}):
-                with self.assertRaisesRegexp(Exception,
-                                             "Exception thrown when converting pandas.Series"):
-                    df.select(['A']).withColumn('udf', udf('A')).collect()
+        with self.sql_conf({
+                "spark.sql.execution.pandas.arrowSafeTypeConversion": True}):
+            with self.assertRaisesRegexp(Exception,
+                                         "Exception thrown when converting pandas.Series"):
+                df.select(['A']).withColumn('udf', udf('A')).collect()
 
         # Disabling Arrow safe type check.
         with self.sql_conf({
@@ -225,35 +222,24 @@ class PandasUDFTests(ReusedSQLTestCase):
             df.select(['A']).withColumn('udf', udf('A')).collect()
 
     def test_pandas_udf_arrow_overflow(self):
-        from distutils.version import LooseVersion
         import pandas as pd
-        import pyarrow as pa
 
         df = self.spark.range(0, 1)
 
         @pandas_udf(returnType="byte")
         def udf(column):
-            return pd.Series([128])
-
-        # Arrow 0.11.0+ allows enabling or disabling safe type check.
-        if LooseVersion(pa.__version__) >= LooseVersion("0.11.0"):
-            # When enabling safe type check, Arrow 0.11.0+ disallows overflow cast.
-            with self.sql_conf({
-                    "spark.sql.execution.pandas.arrowSafeTypeConversion": True}):
-                with self.assertRaisesRegexp(Exception,
-                                             "Exception thrown when converting pandas.Series"):
-                    df.withColumn('udf', udf('id')).collect()
-
-            # Disabling safe type check, let Arrow do the cast anyway.
-            with self.sql_conf({"spark.sql.execution.pandas.arrowSafeTypeConversion": False}):
+            return pd.Series([128] * len(column))
+
+        # When enabling safe type check, Arrow 0.11.0+ disallows overflow cast.
+        with self.sql_conf({
+                "spark.sql.execution.pandas.arrowSafeTypeConversion": True}):
+            with self.assertRaisesRegexp(Exception,
+                                         "Exception thrown when converting pandas.Series"):
                 df.withColumn('udf', udf('id')).collect()
-        else:
-            # SQL config `arrowSafeTypeConversion` no matters for older Arrow.
-            # Overflow cast causes an error.
-            with self.sql_conf({"spark.sql.execution.pandas.arrowSafeTypeConversion": False}):
-                with self.assertRaisesRegexp(Exception,
-                                             "Integer value out of bounds"):
-                    df.withColumn('udf', udf('id')).collect()
+
+        # Disabling safe type check, let Arrow do the cast anyway.
+        with self.sql_conf({"spark.sql.execution.pandas.arrowSafeTypeConversion": False}):
+            df.withColumn('udf', udf('id')).collect()
 
 
 if __name__ == "__main__":
diff --git a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py
index c8bad99..1d87c63 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py
@@ -21,7 +21,6 @@ import sys
 
 from collections import OrderedDict
 from decimal import Decimal
-from distutils.version import LooseVersion
 
 from pyspark.sql import Row
 from pyspark.sql.functions import array, explode, col, lit, udf, sum, pandas_udf, PandasUDFType
@@ -65,20 +64,17 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
             1, 2, 3,
             4, 5, 1.1,
             2.2, Decimal(1.123),
-            [1, 2, 2], True, 'hello'
+            [1, 2, 2], True, 'hello',
+            bytearray([0x01, 0x02])
         ]
         output_fields = [
             ('id', IntegerType()), ('byte', ByteType()), ('short', ShortType()),
             ('int', IntegerType()), ('long', LongType()), ('float', FloatType()),
             ('double', DoubleType()), ('decim', DecimalType(10, 3)),
-            ('array', ArrayType(IntegerType())), ('bool', BooleanType()), ('str', StringType())
+            ('array', ArrayType(IntegerType())), ('bool', BooleanType()), ('str', StringType()),
+            ('bin', BinaryType())
         ]
 
-        # TODO: Add BinaryType to variables above once minimum pyarrow version is 0.10.0
-        if LooseVersion(pa.__version__) >= LooseVersion("0.10.0"):
-            values.append(bytearray([0x01, 0x02]))
-            output_fields.append(('bin', BinaryType()))
-
         output_schema = StructType([StructField(*x) for x in output_fields])
         df = self.spark.createDataFrame([values], schema=output_schema)
 
@@ -95,6 +91,7 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
                 bool=False if pdf.bool else True,
                 str=pdf.str + 'there',
                 array=pdf.array,
+                bin=pdf.bin
             ),
             output_schema,
             PandasUDFType.GROUPED_MAP
@@ -112,6 +109,7 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
                 bool=False if pdf.bool else True,
                 str=pdf.str + 'there',
                 array=pdf.array,
+                bin=pdf.bin
             ),
             output_schema,
             PandasUDFType.GROUPED_MAP
@@ -130,6 +128,7 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
                 bool=False if pdf.bool else True,
                 str=pdf.str + 'there',
                 array=pdf.array,
+                bin=pdf.bin
             ),
             output_schema,
             PandasUDFType.GROUPED_MAP
@@ -291,10 +290,6 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
             StructField('struct', StructType([StructField('l', LongType())])),
         ]
 
-        # TODO: Remove this if-statement once minimum pyarrow version is 0.10.0
-        if LooseVersion(pa.__version__) < LooseVersion("0.10.0"):
-            unsupported_types.append(StructField('bin', BinaryType()))
-
         for unsupported_type in unsupported_types:
             schema = StructType([StructField('id', LongType(), True), unsupported_type])
             with QuietTest(self.sc):
@@ -466,13 +461,8 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
         with QuietTest(self.sc):
             with self.assertRaisesRegexp(Exception, "KeyError: 'id'"):
                 grouped_df.apply(column_name_typo).collect()
-            if LooseVersion(pa.__version__) < LooseVersion("0.11.0"):
-                # TODO: see ARROW-1949. Remove when the minimum PyArrow version becomes 0.11.0.
-                with self.assertRaisesRegexp(Exception, "No cast implemented"):
-                    grouped_df.apply(invalid_positional_types).collect()
-            else:
-                with self.assertRaisesRegexp(Exception, "an integer is required"):
-                    grouped_df.apply(invalid_positional_types).collect()
+            with self.assertRaisesRegexp(Exception, "an integer is required"):
+                grouped_df.apply(invalid_positional_types).collect()
 
     def test_positional_assignment_conf(self):
         with self.sql_conf({
diff --git a/python/pyspark/sql/tests/test_pandas_udf_scalar.py b/python/pyspark/sql/tests/test_pandas_udf_scalar.py
index ebba074..b219624 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_scalar.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_scalar.py
@@ -28,7 +28,6 @@ if sys.version >= '3':
 
 from datetime import date, datetime
 from decimal import Decimal
-from distutils.version import LooseVersion
 
 from pyspark.rdd import PythonEvalType
 from pyspark.sql import Column
@@ -240,19 +239,12 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
         self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_null_binary(self):
-        if LooseVersion(pa.__version__) < LooseVersion("0.10.0"):
-            with QuietTest(self.sc):
-                with self.assertRaisesRegexp(
-                        NotImplementedError,
-                        'Invalid returnType.*scalar Pandas UDF.*BinaryType'):
-                    pandas_udf(lambda x: x, BinaryType())
-        else:
-            data = [(bytearray(b"a"),), (None,), (bytearray(b"bb"),), (bytearray(b"ccc"),)]
-            schema = StructType().add("binary", BinaryType())
-            df = self.spark.createDataFrame(data, schema)
-            str_f = pandas_udf(lambda x: x, BinaryType())
-            res = df.select(str_f(col('binary')))
-            self.assertEquals(df.collect(), res.collect())
+        data = [(bytearray(b"a"),), (None,), (bytearray(b"bb"),), (bytearray(b"ccc"),)]
+        schema = StructType().add("binary", BinaryType())
+        df = self.spark.createDataFrame(data, schema)
+        str_f = pandas_udf(lambda x: x, BinaryType())
+        res = df.select(str_f(col('binary')))
+        self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_array_type(self):
         data = [([1, 2],), ([3, 4],)]
@@ -293,15 +285,7 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
 
         struct_f = pandas_udf(lambda x: x, return_type)
         actual = df.select(struct_f(struct(col('id'), col('id').cast('string').alias('str'))))
-        if LooseVersion(pa.__version__) < LooseVersion("0.10.0"):
-            with QuietTest(self.sc):
-                from py4j.protocol import Py4JJavaError
-                with self.assertRaisesRegexp(
-                        Py4JJavaError,
-                        'Unsupported type in conversion from Arrow'):
-                    self.assertEqual(expected, actual.collect())
-        else:
-            self.assertEqual(expected, actual.collect())
+        self.assertEqual(expected, actual.collect())
 
     def test_vectorized_udf_struct_complex(self):
         df = self.spark.range(10)
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index 21595e5..72c437a 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -1581,7 +1581,6 @@ register_input_converter(DateConverter())
 def to_arrow_type(dt):
     """ Convert Spark data type to pyarrow type
     """
-    from distutils.version import LooseVersion
     import pyarrow as pa
     if type(dt) == BooleanType:
         arrow_type = pa.bool_()
@@ -1602,10 +1601,6 @@ def to_arrow_type(dt):
     elif type(dt) == StringType:
         arrow_type = pa.string()
     elif type(dt) == BinaryType:
-        # TODO: remove version check once minimum pyarrow version is 0.10.0
-        if LooseVersion(pa.__version__) < LooseVersion("0.10.0"):
-            raise TypeError("Unsupported type in conversion to Arrow: " + str(dt) +
-                            "\nPlease install pyarrow >= 0.10.0 for BinaryType support.")
         arrow_type = pa.binary()
     elif type(dt) == DateType:
         arrow_type = pa.date32()
@@ -1639,8 +1634,6 @@ def to_arrow_schema(schema):
 def from_arrow_type(at):
     """ Convert pyarrow type to Spark data type.
     """
-    from distutils.version import LooseVersion
-    import pyarrow as pa
     import pyarrow.types as types
     if types.is_boolean(at):
         spark_type = BooleanType()
@@ -1661,10 +1654,6 @@ def from_arrow_type(at):
     elif types.is_string(at):
         spark_type = StringType()
     elif types.is_binary(at):
-        # TODO: remove version check once minimum pyarrow version is 0.10.0
-        if LooseVersion(pa.__version__) < LooseVersion("0.10.0"):
-            raise TypeError("Unsupported type in conversion from Arrow: " + str(at) +
-                            "\nPlease install pyarrow >= 0.10.0 for BinaryType support.")
         spark_type = BinaryType()
     elif types.is_date32(at):
         spark_type = DateType()
@@ -1675,10 +1664,6 @@ def from_arrow_type(at):
             raise TypeError("Unsupported type in conversion from Arrow: " + str(at))
         spark_type = ArrayType(from_arrow_type(at.value_type))
     elif types.is_struct(at):
-        # TODO: remove version check once minimum pyarrow version is 0.10.0
-        if LooseVersion(pa.__version__) < LooseVersion("0.10.0"):
-            raise TypeError("Unsupported type in conversion from Arrow: " + str(at) +
-                            "\nPlease install pyarrow >= 0.10.0 for StructType support.")
         if any(types.is_struct(field.type) for field in at):
             raise TypeError("Nested StructType not supported in conversion from Arrow: " + str(at))
         return StructType(
@@ -1697,54 +1682,6 @@ def from_arrow_schema(arrow_schema):
          for field in arrow_schema])
 
 
-def _arrow_column_to_pandas(column, data_type):
-    """ Convert Arrow Column to pandas Series.
-
-    :param series: pyarrow.lib.Column
-    :param data_type: a Spark data type for the column
-    """
-    import pandas as pd
-    import pyarrow as pa
-    from distutils.version import LooseVersion
-    # If the given column is a date type column, creates a series of datetime.date directly instead
-    # of creating datetime64[ns] as intermediate data to avoid overflow caused by datetime64[ns]
-    # type handling.
-    if LooseVersion(pa.__version__) < LooseVersion("0.11.0"):
-        if type(data_type) == DateType:
-            return pd.Series(column.to_pylist(), name=column.name)
-        else:
-            return column.to_pandas()
-    else:
-        # Since Arrow 0.11.0, support date_as_object to return datetime.date instead of
-        # np.datetime64.
-        return column.to_pandas(date_as_object=True)
-
-
-def _arrow_table_to_pandas(table, schema):
-    """ Convert Arrow Table to pandas DataFrame.
-
-    Pandas DataFrame created from PyArrow uses datetime64[ns] for date type values, but we should
-    use datetime.date to match the behavior with when Arrow optimization is disabled.
-
-    :param table: pyarrow.lib.Table
-    :param schema: a Spark schema of the pyarrow.lib.Table
-    """
-    import pandas as pd
-    import pyarrow as pa
-    from distutils.version import LooseVersion
-    # If the given table contains a date type column, use `_arrow_column_to_pandas` for pyarrow<0.11
-    # or use `date_as_object` option for pyarrow>=0.11 to avoid creating datetime64[ns] as
-    # intermediate data.
-    if LooseVersion(pa.__version__) < LooseVersion("0.11.0"):
-        if any(type(field.dataType) == DateType for field in schema):
-            return pd.concat([_arrow_column_to_pandas(column, field.dataType)
-                              for column, field in zip(table.itercolumns(), schema)], axis=1)
-        else:
-            return table.to_pandas()
-    else:
-        return table.to_pandas(date_as_object=True)
-
-
 def _get_local_timezone():
     """ Get local timezone using pytz with environment variable, or dateutil.
 
diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py
index bdb3a14..709d3a0 100644
--- a/python/pyspark/sql/utils.py
+++ b/python/pyspark/sql/utils.py
@@ -136,7 +136,7 @@ def require_minimum_pyarrow_version():
     """ Raise ImportError if minimum version of pyarrow is not installed
     """
     # TODO(HyukjinKwon): Relocate and deduplicate the version specification.
-    minimum_pyarrow_version = "0.8.0"
+    minimum_pyarrow_version = "0.12.1"
 
     from distutils.version import LooseVersion
     try:
diff --git a/python/setup.py b/python/setup.py
index 3c129c9..e769bf5 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -102,10 +102,11 @@ if (in_spark):
               file=sys.stderr)
         sys.exit(-1)
 
-# If you are changing the versions here, please also change ./python/pyspark/sql/utils.py and
-# ./python/run-tests.py. In case of Arrow, you should also check ./pom.xml.
+# If you are changing the versions here, please also change ./python/pyspark/sql/utils.py
+# For Arrow, you should also check ./pom.xml and ensure there are no breaking changes in the
+# binary format protocol with the Java version, see ARROW_HOME/format/* for specifications.
 _minimum_pandas_version = "0.19.2"
-_minimum_pyarrow_version = "0.8.0"
+_minimum_pyarrow_version = "0.12.1"
 
 try:
     # We copy the shell script to be under pyspark/python/pyspark so that the launcher scripts


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org