You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/11/28 08:45:27 UTC

spark git commit: [SPARK-22395][SQL][PYTHON] Fix the behavior of timestamp values for Pandas to respect session timezone

Repository: spark
Updated Branches:
  refs/heads/master 33d43bf1b -> 64817c423


[SPARK-22395][SQL][PYTHON] Fix the behavior of timestamp values for Pandas to respect session timezone

## What changes were proposed in this pull request?

When converting Pandas DataFrame/Series from/to Spark DataFrame using `toPandas()` or pandas udfs, timestamp values behave to respect Python system timezone instead of session timezone.

For example, let's say we use `"America/Los_Angeles"` as session timezone and have a timestamp value `"1970-01-01 00:00:01"` in the timezone. Btw, I'm in Japan so Python timezone would be `"Asia/Tokyo"`.

The timestamp value from current `toPandas()` will be the following:

```
>>> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
>>> df = spark.createDataFrame([28801], "long").selectExpr("timestamp(value) as ts")
>>> df.show()
+-------------------+
|                 ts|
+-------------------+
|1970-01-01 00:00:01|
+-------------------+

>>> df.toPandas()
                   ts
0 1970-01-01 17:00:01
```

As you can see, the value becomes `"1970-01-01 17:00:01"` because it respects Python timezone.
As we discussed in #18664, we consider this behavior is a bug and the value should be `"1970-01-01 00:00:01"`.

## How was this patch tested?

Added tests and existing tests.

Author: Takuya UESHIN <ue...@databricks.com>

Closes #19607 from ueshin/issues/SPARK-22395.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/64817c42
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/64817c42
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/64817c42

Branch: refs/heads/master
Commit: 64817c423c0d82a805abd69a3e166e5bfd79c739
Parents: 33d43bf
Author: Takuya UESHIN <ue...@databricks.com>
Authored: Tue Nov 28 16:45:22 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Nov 28 16:45:22 2017 +0800

----------------------------------------------------------------------
 docs/sql-programming-guide.md                   |   2 +
 python/pyspark/serializers.py                   |  13 +-
 python/pyspark/sql/dataframe.py                 |  24 ++-
 python/pyspark/sql/session.py                   |  52 ++++-
 python/pyspark/sql/tests.py                     | 214 +++++++++++++++++--
 python/pyspark/sql/types.py                     |  87 +++++++-
 python/pyspark/worker.py                        |   3 +-
 python/setup.py                                 |   2 +-
 .../org/apache/spark/sql/internal/SQLConf.scala |  11 +
 .../execution/python/ArrowEvalPythonExec.scala  |   4 +-
 .../execution/python/ArrowPythonRunner.scala    |   8 +-
 .../python/FlatMapGroupsInPandasExec.scala      |   4 +-
 12 files changed, 371 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/64817c42/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 5f98213..983770d 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1716,6 +1716,8 @@ options.
     </table>
 
     Note that, for <b>DecimalType(38,0)*</b>, the table above intentionally does not cover all other combinations of scales and precisions because currently we only infer decimal type like `BigInteger`/`BigInt`. For example, 1.1 is inferred as double type.
+  - In PySpark, now we need Pandas 0.19.2 or upper if you want to use Pandas related functionalities, such as `toPandas`, `createDataFrame` from Pandas DataFrame, etc.
+  - In PySpark, the behavior of timestamp values for Pandas related functionalities was changed to respect session timezone. If you want to use the old behavior, you need to set a configuration `spark.sql.execution.pandas.respectSessionTimeZone` to `False`. See [SPARK-22395](https://issues.apache.org/jira/browse/SPARK-22395) for details.
 
 ## Upgrading From Spark SQL 2.1 to 2.2
 

http://git-wip-us.apache.org/repos/asf/spark/blob/64817c42/python/pyspark/serializers.py
----------------------------------------------------------------------
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index b95de2c..37e7cf3 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -206,11 +206,12 @@ class ArrowSerializer(FramedSerializer):
         return "ArrowSerializer"
 
 
-def _create_batch(series):
+def _create_batch(series, timezone):
     """
     Create an Arrow record batch from the given pandas.Series or list of Series, with optional type.
 
     :param series: A single pandas.Series, list of Series, or list of (series, arrow_type)
+    :param timezone: A timezone to respect when handling timestamp values
     :return: Arrow RecordBatch
     """
 
@@ -227,7 +228,7 @@ def _create_batch(series):
     def cast_series(s, t):
         if type(t) == pa.TimestampType:
             # NOTE: convert to 'us' with astype here, unit ignored in `from_pandas` see ARROW-1680
-            return _check_series_convert_timestamps_internal(s.fillna(0))\
+            return _check_series_convert_timestamps_internal(s.fillna(0), timezone)\
                 .values.astype('datetime64[us]', copy=False)
         # NOTE: can not compare None with pyarrow.DataType(), fixed with Arrow >= 0.7.1
         elif t is not None and t == pa.date32():
@@ -253,6 +254,10 @@ class ArrowStreamPandasSerializer(Serializer):
     Serializes Pandas.Series as Arrow data with Arrow streaming format.
     """
 
+    def __init__(self, timezone):
+        super(ArrowStreamPandasSerializer, self).__init__()
+        self._timezone = timezone
+
     def dump_stream(self, iterator, stream):
         """
         Make ArrowRecordBatches from Pandas Series and serialize. Input is a single series or
@@ -262,7 +267,7 @@ class ArrowStreamPandasSerializer(Serializer):
         writer = None
         try:
             for series in iterator:
-                batch = _create_batch(series)
+                batch = _create_batch(series, self._timezone)
                 if writer is None:
                     write_int(SpecialLengths.START_ARROW_STREAM, stream)
                     writer = pa.RecordBatchStreamWriter(stream, batch.schema)
@@ -280,7 +285,7 @@ class ArrowStreamPandasSerializer(Serializer):
         reader = pa.open_stream(stream)
         for batch in reader:
             # NOTE: changed from pa.Columns.to_pandas, timezone issue in conversion fixed in 0.7.1
-            pdf = _check_dataframe_localize_timestamps(batch.to_pandas())
+            pdf = _check_dataframe_localize_timestamps(batch.to_pandas(), self._timezone)
             yield [c for _, c in pdf.iteritems()]
 
     def __repr__(self):

http://git-wip-us.apache.org/repos/asf/spark/blob/64817c42/python/pyspark/sql/dataframe.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 406686e..9864dc9 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -39,6 +39,7 @@ from pyspark.sql.readwriter import DataFrameWriter
 from pyspark.sql.streaming import DataStreamWriter
 from pyspark.sql.types import IntegralType
 from pyspark.sql.types import *
+from pyspark.util import _exception_message
 
 __all__ = ["DataFrame", "DataFrameNaFunctions", "DataFrameStatFunctions"]
 
@@ -1881,6 +1882,13 @@ class DataFrame(object):
         1    5    Bob
         """
         import pandas as pd
+
+        if self.sql_ctx.getConf("spark.sql.execution.pandas.respectSessionTimeZone").lower() \
+           == "true":
+            timezone = self.sql_ctx.getConf("spark.sql.session.timeZone")
+        else:
+            timezone = None
+
         if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled", "false").lower() == "true":
             try:
                 from pyspark.sql.types import _check_dataframe_localize_timestamps
@@ -1889,13 +1897,13 @@ class DataFrame(object):
                 if tables:
                     table = pyarrow.concat_tables(tables)
                     pdf = table.to_pandas()
-                    return _check_dataframe_localize_timestamps(pdf)
+                    return _check_dataframe_localize_timestamps(pdf, timezone)
                 else:
                     return pd.DataFrame.from_records([], columns=self.columns)
             except ImportError as e:
                 msg = "note: pyarrow must be installed and available on calling Python process " \
                       "if using spark.sql.execution.arrow.enabled=true"
-                raise ImportError("%s\n%s" % (e.message, msg))
+                raise ImportError("%s\n%s" % (_exception_message(e), msg))
         else:
             pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
 
@@ -1913,7 +1921,17 @@ class DataFrame(object):
 
             for f, t in dtype.items():
                 pdf[f] = pdf[f].astype(t, copy=False)
-            return pdf
+
+            if timezone is None:
+                return pdf
+            else:
+                from pyspark.sql.types import _check_series_convert_timestamps_local_tz
+                for field in self.schema:
+                    # TODO: handle nested timestamps, such as ArrayType(TimestampType())?
+                    if isinstance(field.dataType, TimestampType):
+                        pdf[field.name] = \
+                            _check_series_convert_timestamps_local_tz(pdf[field.name], timezone)
+                return pdf
 
     def _collectAsArrow(self):
         """

http://git-wip-us.apache.org/repos/asf/spark/blob/64817c42/python/pyspark/sql/session.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 47c58bb..e2435e0 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -34,8 +34,9 @@ from pyspark.sql.conf import RuntimeConfig
 from pyspark.sql.dataframe import DataFrame
 from pyspark.sql.readwriter import DataFrameReader
 from pyspark.sql.streaming import DataStreamReader
-from pyspark.sql.types import Row, DataType, StringType, StructType, _make_type_verifier, \
-    _infer_schema, _has_nulltype, _merge_type, _create_converter, _parse_datatype_string
+from pyspark.sql.types import Row, DataType, StringType, StructType, TimestampType, \
+    _make_type_verifier, _infer_schema, _has_nulltype, _merge_type, _create_converter, \
+    _parse_datatype_string
 from pyspark.sql.utils import install_exception_handler
 
 __all__ = ["SparkSession"]
@@ -444,11 +445,34 @@ class SparkSession(object):
             record_type_list.append((str(col_names[i]), curr_type))
         return np.dtype(record_type_list) if has_rec_fix else None
 
-    def _convert_from_pandas(self, pdf):
+    def _convert_from_pandas(self, pdf, schema, timezone):
         """
          Convert a pandas.DataFrame to list of records that can be used to make a DataFrame
          :return list of records
         """
+        if timezone is not None:
+            from pyspark.sql.types import _check_series_convert_timestamps_tz_local
+            copied = False
+            if isinstance(schema, StructType):
+                for field in schema:
+                    # TODO: handle nested timestamps, such as ArrayType(TimestampType())?
+                    if isinstance(field.dataType, TimestampType):
+                        s = _check_series_convert_timestamps_tz_local(pdf[field.name], timezone)
+                        if not copied and s is not pdf[field.name]:
+                            # Copy once if the series is modified to prevent the original Pandas
+                            # DataFrame from being updated
+                            pdf = pdf.copy()
+                            copied = True
+                        pdf[field.name] = s
+            else:
+                for column, series in pdf.iteritems():
+                    s = _check_series_convert_timestamps_tz_local(pdf[column], timezone)
+                    if not copied and s is not pdf[column]:
+                        # Copy once if the series is modified to prevent the original Pandas
+                        # DataFrame from being updated
+                        pdf = pdf.copy()
+                        copied = True
+                    pdf[column] = s
 
         # Convert pandas.DataFrame to list of numpy records
         np_records = pdf.to_records(index=False)
@@ -462,15 +486,19 @@ class SparkSession(object):
         # Convert list of numpy records to python lists
         return [r.tolist() for r in np_records]
 
-    def _create_from_pandas_with_arrow(self, pdf, schema):
+    def _create_from_pandas_with_arrow(self, pdf, schema, timezone):
         """
         Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting
         to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the
         data types will be used to coerce the data in Pandas to Arrow conversion.
         """
         from pyspark.serializers import ArrowSerializer, _create_batch
-        from pyspark.sql.types import from_arrow_schema, to_arrow_type, TimestampType
-        from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype
+        from pyspark.sql.types import from_arrow_schema, to_arrow_type, \
+            _old_pandas_exception_message, TimestampType
+        try:
+            from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype
+        except ImportError as e:
+            raise ImportError(_old_pandas_exception_message(e))
 
         # Determine arrow types to coerce data when creating batches
         if isinstance(schema, StructType):
@@ -488,7 +516,8 @@ class SparkSession(object):
         pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step))
 
         # Create Arrow record batches
-        batches = [_create_batch([(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)])
+        batches = [_create_batch([(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)],
+                                 timezone)
                    for pdf_slice in pdf_slices]
 
         # Create the Spark schema from the first Arrow batch (always at least 1 batch after slicing)
@@ -606,6 +635,11 @@ class SparkSession(object):
         except Exception:
             has_pandas = False
         if has_pandas and isinstance(data, pandas.DataFrame):
+            if self.conf.get("spark.sql.execution.pandas.respectSessionTimeZone").lower() \
+               == "true":
+                timezone = self.conf.get("spark.sql.session.timeZone")
+            else:
+                timezone = None
 
             # If no schema supplied by user then get the names of columns only
             if schema is None:
@@ -614,11 +648,11 @@ class SparkSession(object):
             if self.conf.get("spark.sql.execution.arrow.enabled", "false").lower() == "true" \
                     and len(data) > 0:
                 try:
-                    return self._create_from_pandas_with_arrow(data, schema)
+                    return self._create_from_pandas_with_arrow(data, schema, timezone)
                 except Exception as e:
                     warnings.warn("Arrow will not be used in createDataFrame: %s" % str(e))
                     # Fallback to create DataFrame without arrow if raise some exception
-            data = self._convert_from_pandas(data)
+            data = self._convert_from_pandas(data, schema, timezone)
 
         if isinstance(schema, StructType):
             verify_func = _make_type_verifier(schema) if verifySchema else lambda _: True

http://git-wip-us.apache.org/repos/asf/spark/blob/64817c42/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 762afe0..b4d32d8 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -49,9 +49,14 @@ else:
     import unittest
 
 _have_pandas = False
+_have_old_pandas = False
 try:
     import pandas
-    _have_pandas = True
+    try:
+        import pandas.api
+        _have_pandas = True
+    except:
+        _have_old_pandas = True
 except:
     # No Pandas, but that's okay, we'll skip those tests
     pass
@@ -2565,21 +2570,38 @@ class SQLTests(ReusedSQLTestCase):
             .mode("overwrite").saveAsTable("pyspark_bucket"))
         self.assertSetEqual(set(data), set(self.spark.table("pyspark_bucket").collect()))
 
-    @unittest.skipIf(not _have_pandas, "Pandas not installed")
-    def test_to_pandas(self):
+    def _to_pandas(self):
+        from datetime import datetime, date
         import numpy as np
         schema = StructType().add("a", IntegerType()).add("b", StringType())\
-                             .add("c", BooleanType()).add("d", FloatType())
+                             .add("c", BooleanType()).add("d", FloatType())\
+                             .add("dt", DateType()).add("ts", TimestampType())
         data = [
-            (1, "foo", True, 3.0), (2, "foo", True, 5.0),
-            (3, "bar", False, -1.0), (4, "bar", False, 6.0),
+            (1, "foo", True, 3.0, date(1969, 1, 1), datetime(1969, 1, 1, 1, 1, 1)),
+            (2, "foo", True, 5.0, None, None),
+            (3, "bar", False, -1.0, date(2012, 3, 3), datetime(2012, 3, 3, 3, 3, 3)),
+            (4, "bar", False, 6.0, date(2100, 4, 4), datetime(2100, 4, 4, 4, 4, 4)),
         ]
         df = self.spark.createDataFrame(data, schema)
-        types = df.toPandas().dtypes
+        return df.toPandas()
+
+    @unittest.skipIf(not _have_pandas, "Pandas not installed")
+    def test_to_pandas(self):
+        import numpy as np
+        pdf = self._to_pandas()
+        types = pdf.dtypes
         self.assertEquals(types[0], np.int32)
         self.assertEquals(types[1], np.object)
         self.assertEquals(types[2], np.bool)
         self.assertEquals(types[3], np.float32)
+        self.assertEquals(types[4], 'datetime64[ns]')
+        self.assertEquals(types[5], 'datetime64[ns]')
+
+    @unittest.skipIf(not _have_old_pandas, "Old Pandas not installed")
+    def test_to_pandas_old(self):
+        with QuietTest(self.sc):
+            with self.assertRaisesRegexp(ImportError, 'Pandas \(.*\) must be installed'):
+                self._to_pandas()
 
     @unittest.skipIf(not _have_pandas, "Pandas not installed")
     def test_to_pandas_avoid_astype(self):
@@ -2614,6 +2636,16 @@ class SQLTests(ReusedSQLTestCase):
         self.assertTrue(isinstance(df.schema['ts'].dataType, TimestampType))
         self.assertTrue(isinstance(df.schema['d'].dataType, DateType))
 
+    @unittest.skipIf(not _have_old_pandas, "Old Pandas not installed")
+    def test_create_dataframe_from_old_pandas(self):
+        import pandas as pd
+        from datetime import datetime
+        pdf = pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)],
+                            "d": [pd.Timestamp.now().date()]})
+        with QuietTest(self.sc):
+            with self.assertRaisesRegexp(ImportError, 'Pandas \(.*\) must be installed'):
+                self.spark.createDataFrame(pdf)
+
 
 class HiveSparkSubmitTests(SparkSubmitTests):
 
@@ -3103,7 +3135,7 @@ class DataTypeVerificationTests(unittest.TestCase):
                 _make_type_verifier(data_type, nullable=False)(obj)
 
 
-@unittest.skipIf(not _have_arrow, "Arrow not installed")
+@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed")
 class ArrowTests(ReusedSQLTestCase):
 
     @classmethod
@@ -3169,16 +3201,47 @@ class ArrowTests(ReusedSQLTestCase):
         null_counts = pdf.isnull().sum().tolist()
         self.assertTrue(all([c == 1 for c in null_counts]))
 
-    def test_toPandas_arrow_toggle(self):
-        df = self.spark.createDataFrame(self.data, schema=self.schema)
+    def _toPandas_arrow_toggle(self, df):
         self.spark.conf.set("spark.sql.execution.arrow.enabled", "false")
         try:
             pdf = df.toPandas()
         finally:
             self.spark.conf.set("spark.sql.execution.arrow.enabled", "true")
         pdf_arrow = df.toPandas()
+        return pdf, pdf_arrow
+
+    def test_toPandas_arrow_toggle(self):
+        df = self.spark.createDataFrame(self.data, schema=self.schema)
+        pdf, pdf_arrow = self._toPandas_arrow_toggle(df)
         self.assertFramesEqual(pdf_arrow, pdf)
 
+    def test_toPandas_respect_session_timezone(self):
+        df = self.spark.createDataFrame(self.data, schema=self.schema)
+        orig_tz = self.spark.conf.get("spark.sql.session.timeZone")
+        try:
+            timezone = "America/New_York"
+            self.spark.conf.set("spark.sql.session.timeZone", timezone)
+            self.spark.conf.set("spark.sql.execution.pandas.respectSessionTimeZone", "false")
+            try:
+                pdf_la, pdf_arrow_la = self._toPandas_arrow_toggle(df)
+                self.assertFramesEqual(pdf_arrow_la, pdf_la)
+            finally:
+                self.spark.conf.set("spark.sql.execution.pandas.respectSessionTimeZone", "true")
+            pdf_ny, pdf_arrow_ny = self._toPandas_arrow_toggle(df)
+            self.assertFramesEqual(pdf_arrow_ny, pdf_ny)
+
+            self.assertFalse(pdf_ny.equals(pdf_la))
+
+            from pyspark.sql.types import _check_series_convert_timestamps_local_tz
+            pdf_la_corrected = pdf_la.copy()
+            for field in self.schema:
+                if isinstance(field.dataType, TimestampType):
+                    pdf_la_corrected[field.name] = _check_series_convert_timestamps_local_tz(
+                        pdf_la_corrected[field.name], timezone)
+            self.assertFramesEqual(pdf_ny, pdf_la_corrected)
+        finally:
+            self.spark.conf.set("spark.sql.session.timeZone", orig_tz)
+
     def test_pandas_round_trip(self):
         pdf = self.create_pandas_data_frame()
         df = self.spark.createDataFrame(self.data, schema=self.schema)
@@ -3192,16 +3255,50 @@ class ArrowTests(ReusedSQLTestCase):
         self.assertEqual(pdf.columns[0], "i")
         self.assertTrue(pdf.empty)
 
-    def test_createDataFrame_toggle(self):
-        pdf = self.create_pandas_data_frame()
+    def _createDataFrame_toggle(self, pdf, schema=None):
         self.spark.conf.set("spark.sql.execution.arrow.enabled", "false")
         try:
-            df_no_arrow = self.spark.createDataFrame(pdf)
+            df_no_arrow = self.spark.createDataFrame(pdf, schema=schema)
         finally:
             self.spark.conf.set("spark.sql.execution.arrow.enabled", "true")
-        df_arrow = self.spark.createDataFrame(pdf)
+        df_arrow = self.spark.createDataFrame(pdf, schema=schema)
+        return df_no_arrow, df_arrow
+
+    def test_createDataFrame_toggle(self):
+        pdf = self.create_pandas_data_frame()
+        df_no_arrow, df_arrow = self._createDataFrame_toggle(pdf, schema=self.schema)
         self.assertEquals(df_no_arrow.collect(), df_arrow.collect())
 
+    def test_createDataFrame_respect_session_timezone(self):
+        from datetime import timedelta
+        pdf = self.create_pandas_data_frame()
+        orig_tz = self.spark.conf.get("spark.sql.session.timeZone")
+        try:
+            timezone = "America/New_York"
+            self.spark.conf.set("spark.sql.session.timeZone", timezone)
+            self.spark.conf.set("spark.sql.execution.pandas.respectSessionTimeZone", "false")
+            try:
+                df_no_arrow_la, df_arrow_la = self._createDataFrame_toggle(pdf, schema=self.schema)
+                result_la = df_no_arrow_la.collect()
+                result_arrow_la = df_arrow_la.collect()
+                self.assertEqual(result_la, result_arrow_la)
+            finally:
+                self.spark.conf.set("spark.sql.execution.pandas.respectSessionTimeZone", "true")
+            df_no_arrow_ny, df_arrow_ny = self._createDataFrame_toggle(pdf, schema=self.schema)
+            result_ny = df_no_arrow_ny.collect()
+            result_arrow_ny = df_arrow_ny.collect()
+            self.assertEqual(result_ny, result_arrow_ny)
+
+            self.assertNotEqual(result_ny, result_la)
+
+            # Correct result_la by adjusting 3 hours difference between Los Angeles and New York
+            result_la_corrected = [Row(**{k: v - timedelta(hours=3) if k == '7_timestamp_t' else v
+                                          for k, v in row.asDict().items()})
+                                   for row in result_la]
+            self.assertEqual(result_ny, result_la_corrected)
+        finally:
+            self.spark.conf.set("spark.sql.session.timeZone", orig_tz)
+
     def test_createDataFrame_with_schema(self):
         pdf = self.create_pandas_data_frame()
         df = self.spark.createDataFrame(pdf, schema=self.schema)
@@ -3385,6 +3482,27 @@ class PandasUDFTests(ReusedSQLTestCase):
 @unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed")
 class VectorizedUDFTests(ReusedSQLTestCase):
 
+    @classmethod
+    def setUpClass(cls):
+        ReusedSQLTestCase.setUpClass()
+
+        # Synchronize default timezone between Python and Java
+        cls.tz_prev = os.environ.get("TZ", None)  # save current tz if set
+        tz = "America/Los_Angeles"
+        os.environ["TZ"] = tz
+        time.tzset()
+
+        cls.sc.environment["TZ"] = tz
+        cls.spark.conf.set("spark.sql.session.timeZone", tz)
+
+    @classmethod
+    def tearDownClass(cls):
+        del os.environ["TZ"]
+        if cls.tz_prev is not None:
+            os.environ["TZ"] = cls.tz_prev
+        time.tzset()
+        ReusedSQLTestCase.tearDownClass()
+
     def test_vectorized_udf_basic(self):
         from pyspark.sql.functions import pandas_udf, col
         df = self.spark.range(10).select(
@@ -3621,29 +3739,37 @@ class VectorizedUDFTests(ReusedSQLTestCase):
         data = [(0, datetime(1969, 1, 1, 1, 1, 1)),
                 (1, datetime(2012, 2, 2, 2, 2, 2)),
                 (2, None),
-                (3, datetime(2100, 4, 4, 4, 4, 4))]
+                (3, datetime(2100, 3, 3, 3, 3, 3))]
+
         df = self.spark.createDataFrame(data, schema=schema)
 
         # Check that a timestamp passed through a pandas_udf will not be altered by timezone calc
         f_timestamp_copy = pandas_udf(lambda t: t, returnType=TimestampType())
         df = df.withColumn("timestamp_copy", f_timestamp_copy(col("timestamp")))
 
-        @pandas_udf(returnType=BooleanType())
+        @pandas_udf(returnType=StringType())
         def check_data(idx, timestamp, timestamp_copy):
+            import pandas as pd
+            msgs = []
             is_equal = timestamp.isnull()  # use this array to check values are equal
             for i in range(len(idx)):
                 # Check that timestamps are as expected in the UDF
-                is_equal[i] = (is_equal[i] and data[idx[i]][1] is None) or \
-                    timestamp[i].to_pydatetime() == data[idx[i]][1]
-            return is_equal
-
-        result = df.withColumn("is_equal", check_data(col("idx"), col("timestamp"),
-                                                      col("timestamp_copy"))).collect()
+                if (is_equal[i] and data[idx[i]][1] is None) or \
+                        timestamp[i].to_pydatetime() == data[idx[i]][1]:
+                    msgs.append(None)
+                else:
+                    msgs.append(
+                        "timestamp values are not equal (timestamp='%s': data[%d][1]='%s')"
+                        % (timestamp[i], idx[i], data[idx[i]][1]))
+            return pd.Series(msgs)
+
+        result = df.withColumn("check_data", check_data(col("idx"), col("timestamp"),
+                                                        col("timestamp_copy"))).collect()
         # Check that collection values are correct
         self.assertEquals(len(data), len(result))
         for i in range(len(result)):
             self.assertEquals(data[i][1], result[i][1])  # "timestamp" col
-            self.assertTrue(result[i][3])  # "is_equal" data in udf was as expected
+            self.assertIsNone(result[i][3])  # "check_data" col
 
     def test_vectorized_udf_return_timestamp_tz(self):
         from pyspark.sql.functions import pandas_udf, col
@@ -3683,6 +3809,48 @@ class VectorizedUDFTests(ReusedSQLTestCase):
             else:
                 self.spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", orig_value)
 
+    def test_vectorized_udf_timestamps_respect_session_timezone(self):
+        from pyspark.sql.functions import pandas_udf, col
+        from datetime import datetime
+        import pandas as pd
+        schema = StructType([
+            StructField("idx", LongType(), True),
+            StructField("timestamp", TimestampType(), True)])
+        data = [(1, datetime(1969, 1, 1, 1, 1, 1)),
+                (2, datetime(2012, 2, 2, 2, 2, 2)),
+                (3, None),
+                (4, datetime(2100, 3, 3, 3, 3, 3))]
+        df = self.spark.createDataFrame(data, schema=schema)
+
+        f_timestamp_copy = pandas_udf(lambda ts: ts, TimestampType())
+        internal_value = pandas_udf(
+            lambda ts: ts.apply(lambda ts: ts.value if ts is not pd.NaT else None), LongType())
+
+        orig_tz = self.spark.conf.get("spark.sql.session.timeZone")
+        try:
+            timezone = "America/New_York"
+            self.spark.conf.set("spark.sql.session.timeZone", timezone)
+            self.spark.conf.set("spark.sql.execution.pandas.respectSessionTimeZone", "false")
+            try:
+                df_la = df.withColumn("tscopy", f_timestamp_copy(col("timestamp"))) \
+                    .withColumn("internal_value", internal_value(col("timestamp")))
+                result_la = df_la.select(col("idx"), col("internal_value")).collect()
+                # Correct result_la by adjusting 3 hours difference between Los Angeles and New York
+                diff = 3 * 60 * 60 * 1000 * 1000 * 1000
+                result_la_corrected = \
+                    df_la.select(col("idx"), col("tscopy"), col("internal_value") + diff).collect()
+            finally:
+                self.spark.conf.set("spark.sql.execution.pandas.respectSessionTimeZone", "true")
+
+            df_ny = df.withColumn("tscopy", f_timestamp_copy(col("timestamp"))) \
+                .withColumn("internal_value", internal_value(col("timestamp")))
+            result_ny = df_ny.select(col("idx"), col("tscopy"), col("internal_value")).collect()
+
+            self.assertNotEqual(result_ny, result_la)
+            self.assertEqual(result_ny, result_la_corrected)
+        finally:
+            self.spark.conf.set("spark.sql.session.timeZone", orig_tz)
+
 
 @unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed")
 class GroupbyApplyTests(ReusedSQLTestCase):

http://git-wip-us.apache.org/repos/asf/spark/blob/64817c42/python/pyspark/sql/types.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index fe62f60..78abc32 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -35,6 +35,7 @@ from py4j.java_gateway import JavaClass
 
 from pyspark import SparkContext
 from pyspark.serializers import CloudPickleSerializer
+from pyspark.util import _exception_message
 
 __all__ = [
     "DataType", "NullType", "StringType", "BinaryType", "BooleanType", "DateType",
@@ -1678,37 +1679,105 @@ def from_arrow_schema(arrow_schema):
          for field in arrow_schema])
 
 
-def _check_dataframe_localize_timestamps(pdf):
+def _old_pandas_exception_message(e):
+    """ Create an error message for importing old Pandas.
     """
-    Convert timezone aware timestamps to timezone-naive in local time
+    msg = "note: Pandas (>=0.19.2) must be installed and available on calling Python process"
+    return "%s\n%s" % (_exception_message(e), msg)
+
+
+def _check_dataframe_localize_timestamps(pdf, timezone):
+    """
+    Convert timezone aware timestamps to timezone-naive in the specified timezone or local timezone
 
     :param pdf: pandas.DataFrame
-    :return pandas.DataFrame where any timezone aware columns have be converted to tz-naive
+    :param timezone: the timezone to convert. if None then use local timezone
+    :return pandas.DataFrame where any timezone aware columns have been converted to tz-naive
     """
-    from pandas.api.types import is_datetime64tz_dtype
+    try:
+        from pandas.api.types import is_datetime64tz_dtype
+    except ImportError as e:
+        raise ImportError(_old_pandas_exception_message(e))
+    tz = timezone or 'tzlocal()'
     for column, series in pdf.iteritems():
         # TODO: handle nested timestamps, such as ArrayType(TimestampType())?
         if is_datetime64tz_dtype(series.dtype):
-            pdf[column] = series.dt.tz_convert('tzlocal()').dt.tz_localize(None)
+            pdf[column] = series.dt.tz_convert(tz).dt.tz_localize(None)
     return pdf
 
 
-def _check_series_convert_timestamps_internal(s):
+def _check_series_convert_timestamps_internal(s, timezone):
     """
-    Convert a tz-naive timestamp in local tz to UTC normalized for Spark internal storage
+    Convert a tz-naive timestamp in the specified timezone or local timezone to UTC normalized for
+    Spark internal storage
+
     :param s: a pandas.Series
+    :param timezone: the timezone to convert. if None then use local timezone
     :return pandas.Series where if it is a timestamp, has been UTC normalized without a time zone
     """
-    from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype
+    try:
+        from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype
+    except ImportError as e:
+        raise ImportError(_old_pandas_exception_message(e))
     # TODO: handle nested timestamps, such as ArrayType(TimestampType())?
     if is_datetime64_dtype(s.dtype):
-        return s.dt.tz_localize('tzlocal()').dt.tz_convert('UTC')
+        tz = timezone or 'tzlocal()'
+        return s.dt.tz_localize(tz).dt.tz_convert('UTC')
     elif is_datetime64tz_dtype(s.dtype):
         return s.dt.tz_convert('UTC')
     else:
         return s
 
 
+def _check_series_convert_timestamps_localize(s, from_timezone, to_timezone):
+    """
+    Convert timestamp to timezone-naive in the specified timezone or local timezone
+
+    :param s: a pandas.Series
+    :param from_timezone: the timezone to convert from. if None then use local timezone
+    :param to_timezone: the timezone to convert to. if None then use local timezone
+    :return pandas.Series where if it is a timestamp, has been converted to tz-naive
+    """
+    try:
+        import pandas as pd
+        from pandas.api.types import is_datetime64tz_dtype, is_datetime64_dtype
+    except ImportError as e:
+        raise ImportError(_old_pandas_exception_message(e))
+    from_tz = from_timezone or 'tzlocal()'
+    to_tz = to_timezone or 'tzlocal()'
+    # TODO: handle nested timestamps, such as ArrayType(TimestampType())?
+    if is_datetime64tz_dtype(s.dtype):
+        return s.dt.tz_convert(to_tz).dt.tz_localize(None)
+    elif is_datetime64_dtype(s.dtype) and from_tz != to_tz:
+        # `s.dt.tz_localize('tzlocal()')` doesn't work properly when including NaT.
+        return s.apply(lambda ts: ts.tz_localize(from_tz).tz_convert(to_tz).tz_localize(None)
+                       if ts is not pd.NaT else pd.NaT)
+    else:
+        return s
+
+
+def _check_series_convert_timestamps_local_tz(s, timezone):
+    """
+    Convert timestamp to timezone-naive in the specified timezone or local timezone
+
+    :param s: a pandas.Series
+    :param timezone: the timezone to convert to. if None then use local timezone
+    :return pandas.Series where if it is a timestamp, has been converted to tz-naive
+    """
+    return _check_series_convert_timestamps_localize(s, None, timezone)
+
+
+def _check_series_convert_timestamps_tz_local(s, timezone):
+    """
+    Convert timestamp to timezone-naive in the specified timezone or local timezone
+
+    :param s: a pandas.Series
+    :param timezone: the timezone to convert from. if None then use local timezone
+    :return pandas.Series where if it is a timestamp, has been converted to tz-naive
+    """
+    return _check_series_convert_timestamps_localize(s, timezone, None)
+
+
 def _test():
     import doctest
     from pyspark.context import SparkContext

http://git-wip-us.apache.org/repos/asf/spark/blob/64817c42/python/pyspark/worker.py
----------------------------------------------------------------------
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 9396430..e6737ae 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -150,7 +150,8 @@ def read_udfs(pickleSer, infile, eval_type):
 
     if eval_type == PythonEvalType.SQL_PANDAS_SCALAR_UDF \
        or eval_type == PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF:
-        ser = ArrowStreamPandasSerializer()
+        timezone = utf8_deserializer.loads(infile)
+        ser = ArrowStreamPandasSerializer(timezone)
     else:
         ser = BatchedSerializer(PickleSerializer(), 100)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/64817c42/python/setup.py
----------------------------------------------------------------------
diff --git a/python/setup.py b/python/setup.py
index 02612ff..310670e 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -201,7 +201,7 @@ try:
         extras_require={
             'ml': ['numpy>=1.7'],
             'mllib': ['numpy>=1.7'],
-            'sql': ['pandas>=0.13.0']
+            'sql': ['pandas>=0.19.2']
         },
         classifiers=[
             'Development Status :: 5 - Production/Stable',

http://git-wip-us.apache.org/repos/asf/spark/blob/64817c42/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index ce68dbb..8abb426 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -998,6 +998,15 @@ object SQLConf {
       .intConf
       .createWithDefault(10000)
 
+  val PANDAS_RESPECT_SESSION_LOCAL_TIMEZONE =
+    buildConf("spark.sql.execution.pandas.respectSessionTimeZone")
+      .internal()
+      .doc("When true, make Pandas DataFrame with timestamp type respecting session local " +
+        "timezone when converting to/from Pandas DataFrame. This configuration will be " +
+        "deprecated in the future releases.")
+      .booleanConf
+      .createWithDefault(true)
+
   val REPLACE_EXCEPT_WITH_FILTER = buildConf("spark.sql.optimizer.replaceExceptWithFilter")
     .internal()
     .doc("When true, the apply function of the rule verifies whether the right node of the" +
@@ -1316,6 +1325,8 @@ class SQLConf extends Serializable with Logging {
 
   def arrowMaxRecordsPerBatch: Int = getConf(ARROW_EXECUTION_MAX_RECORDS_PER_BATCH)
 
+  def pandasRespectSessionTimeZone: Boolean = getConf(PANDAS_RESPECT_SESSION_LOCAL_TIMEZONE)
+
   def replaceExceptWithFilter: Boolean = getConf(REPLACE_EXCEPT_WITH_FILTER)
 
   /** ********************** SQLConf functionality methods ************ */

http://git-wip-us.apache.org/repos/asf/spark/blob/64817c42/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
index e272101..c06bc7b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
@@ -63,6 +63,7 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi
 
   private val batchSize = conf.arrowMaxRecordsPerBatch
   private val sessionLocalTimeZone = conf.sessionLocalTimeZone
+  private val pandasRespectSessionTimeZone = conf.pandasRespectSessionTimeZone
 
   protected override def evaluate(
       funcs: Seq[ChainedPythonFunctions],
@@ -81,7 +82,8 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi
 
     val columnarBatchIter = new ArrowPythonRunner(
         funcs, bufferSize, reuseWorker,
-        PythonEvalType.SQL_PANDAS_SCALAR_UDF, argOffsets, schema, sessionLocalTimeZone)
+        PythonEvalType.SQL_PANDAS_SCALAR_UDF, argOffsets, schema,
+        sessionLocalTimeZone, pandasRespectSessionTimeZone)
       .compute(batchIter, context.partitionId(), context)
 
     new Iterator[InternalRow] {

http://git-wip-us.apache.org/repos/asf/spark/blob/64817c42/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
index 94c05b9..9a94d77 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
@@ -44,7 +44,8 @@ class ArrowPythonRunner(
     evalType: Int,
     argOffsets: Array[Array[Int]],
     schema: StructType,
-    timeZoneId: String)
+    timeZoneId: String,
+    respectTimeZone: Boolean)
   extends BasePythonRunner[Iterator[InternalRow], ColumnarBatch](
     funcs, bufferSize, reuseWorker, evalType, argOffsets) {
 
@@ -58,6 +59,11 @@ class ArrowPythonRunner(
 
       protected override def writeCommand(dataOut: DataOutputStream): Unit = {
         PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets)
+        if (respectTimeZone) {
+          PythonRDD.writeUTF(timeZoneId, dataOut)
+        } else {
+          dataOut.writeInt(SpecialLengths.NULL)
+        }
       }
 
       protected override def writeIteratorToStream(dataOut: DataOutputStream): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/64817c42/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
index ee49581..59db66b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
@@ -78,6 +78,7 @@ case class FlatMapGroupsInPandasExec(
     val argOffsets = Array((0 until (child.output.length - groupingAttributes.length)).toArray)
     val schema = StructType(child.schema.drop(groupingAttributes.length))
     val sessionLocalTimeZone = conf.sessionLocalTimeZone
+    val pandasRespectSessionTimeZone = conf.pandasRespectSessionTimeZone
 
     inputRDD.mapPartitionsInternal { iter =>
       val grouped = if (groupingAttributes.isEmpty) {
@@ -95,7 +96,8 @@ case class FlatMapGroupsInPandasExec(
 
       val columnarBatchIter = new ArrowPythonRunner(
         chainedFunc, bufferSize, reuseWorker,
-        PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF, argOffsets, schema, sessionLocalTimeZone)
+        PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF, argOffsets, schema,
+        sessionLocalTimeZone, pandasRespectSessionTimeZone)
           .compute(grouped, context.partitionId(), context)
 
       columnarBatchIter.flatMap(_.rowIterator.asScala).map(UnsafeProjection.create(output, output))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org