You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/02/11 08:31:40 UTC

spark git commit: [SPARK-23314][PYTHON] Add ambiguous=False when localizing tz-naive timestamps in Arrow codepath to deal with dst

Repository: spark
Updated Branches:
  refs/heads/master 0783876c8 -> a34fce19b


[SPARK-23314][PYTHON] Add ambiguous=False when localizing tz-naive timestamps in Arrow codepath to deal with dst

## What changes were proposed in this pull request?
When tz_localize a tz-naive timetamp, pandas will throw exception if the timestamp is during daylight saving time period, e.g., `2015-11-01 01:30:00`. This PR fixes this issue by setting `ambiguous=False` when calling tz_localize, which is the same default behavior of pytz.

## How was this patch tested?
Add `test_timestamp_dst`

Author: Li Jin <ic...@gmail.com>

Closes #20537 from icexelloss/SPARK-23314.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a34fce19
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a34fce19
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a34fce19

Branch: refs/heads/master
Commit: a34fce19bc0ee5a7e36c6ecba75d2aeb70fdcbc7
Parents: 0783876
Author: Li Jin <ic...@gmail.com>
Authored: Sun Feb 11 17:31:35 2018 +0900
Committer: hyukjinkwon <gu...@gmail.com>
Committed: Sun Feb 11 17:31:35 2018 +0900

----------------------------------------------------------------------
 python/pyspark/sql/tests.py | 39 +++++++++++++++++++++++++++++++++++++++
 python/pyspark/sql/types.py | 37 ++++++++++++++++++++++++++++++++++---
 2 files changed, 73 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a34fce19/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 1087c3f..4bc59fd 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -3670,6 +3670,21 @@ class ArrowTests(ReusedSQLTestCase):
         self.assertEqual(pdf_col_names, df.columns)
         self.assertEqual(pdf_col_names, df_arrow.columns)
 
+    # Regression test for SPARK-23314
+    def test_timestamp_dst(self):
+        import pandas as pd
+        # Daylight saving time for Los Angeles for 2015 is Sun, Nov 1 at 2:00 am
+        dt = [datetime.datetime(2015, 11, 1, 0, 30),
+              datetime.datetime(2015, 11, 1, 1, 30),
+              datetime.datetime(2015, 11, 1, 2, 30)]
+        pdf = pd.DataFrame({'time': dt})
+
+        df_from_python = self.spark.createDataFrame(dt, 'timestamp').toDF('time')
+        df_from_pandas = self.spark.createDataFrame(pdf)
+
+        self.assertPandasEqual(pdf, df_from_python.toPandas())
+        self.assertPandasEqual(pdf, df_from_pandas.toPandas())
+
 
 @unittest.skipIf(
     not _have_pandas or not _have_pyarrow,
@@ -4311,6 +4326,18 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
         self.assertEquals(expected.collect(), res1.collect())
         self.assertEquals(expected.collect(), res2.collect())
 
+    # Regression test for SPARK-23314
+    def test_timestamp_dst(self):
+        from pyspark.sql.functions import pandas_udf
+        # Daylight saving time for Los Angeles for 2015 is Sun, Nov 1 at 2:00 am
+        dt = [datetime.datetime(2015, 11, 1, 0, 30),
+              datetime.datetime(2015, 11, 1, 1, 30),
+              datetime.datetime(2015, 11, 1, 2, 30)]
+        df = self.spark.createDataFrame(dt, 'timestamp').toDF('time')
+        foo_udf = pandas_udf(lambda x: x, 'timestamp')
+        result = df.withColumn('time', foo_udf(df.time))
+        self.assertEquals(df.collect(), result.collect())
+
 
 @unittest.skipIf(
     not _have_pandas or not _have_pyarrow,
@@ -4482,6 +4509,18 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
             with self.assertRaisesRegexp(Exception, 'Unsupported data type'):
                 df.groupby('id').apply(f).collect()
 
+    # Regression test for SPARK-23314
+    def test_timestamp_dst(self):
+        from pyspark.sql.functions import pandas_udf, PandasUDFType
+        # Daylight saving time for Los Angeles for 2015 is Sun, Nov 1 at 2:00 am
+        dt = [datetime.datetime(2015, 11, 1, 0, 30),
+              datetime.datetime(2015, 11, 1, 1, 30),
+              datetime.datetime(2015, 11, 1, 2, 30)]
+        df = self.spark.createDataFrame(dt, 'timestamp').toDF('time')
+        foo_udf = pandas_udf(lambda pdf: pdf, 'time timestamp', PandasUDFType.GROUPED_MAP)
+        result = df.groupby('time').apply(foo_udf).sort('time')
+        self.assertPandasEqual(df.toPandas(), result.toPandas())
+
 
 @unittest.skipIf(
     not _have_pandas or not _have_pyarrow,

http://git-wip-us.apache.org/repos/asf/spark/blob/a34fce19/python/pyspark/sql/types.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index 2599dc5..f7141b4 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -1759,8 +1759,38 @@ def _check_series_convert_timestamps_internal(s, timezone):
     from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype
     # TODO: handle nested timestamps, such as ArrayType(TimestampType())?
     if is_datetime64_dtype(s.dtype):
+        # When tz_localize a tz-naive timestamp, the result is ambiguous if the tz-naive
+        # timestamp is during the hour when the clock is adjusted backward during due to
+        # daylight saving time (dst).
+        # E.g., for America/New_York, the clock is adjusted backward on 2015-11-01 2:00 to
+        # 2015-11-01 1:00 from dst-time to standard time, and therefore, when tz_localize
+        # a tz-naive timestamp 2015-11-01 1:30 with America/New_York timezone, it can be either
+        # dst time (2015-01-01 1:30-0400) or standard time (2015-11-01 1:30-0500).
+        #
+        # Here we explicit choose to use standard time. This matches the default behavior of
+        # pytz.
+        #
+        # Here are some code to help understand this behavior:
+        # >>> import datetime
+        # >>> import pandas as pd
+        # >>> import pytz
+        # >>>
+        # >>> t = datetime.datetime(2015, 11, 1, 1, 30)
+        # >>> ts = pd.Series([t])
+        # >>> tz = pytz.timezone('America/New_York')
+        # >>>
+        # >>> ts.dt.tz_localize(tz, ambiguous=True)
+        # 0   2015-11-01 01:30:00-04:00
+        # dtype: datetime64[ns, America/New_York]
+        # >>>
+        # >>> ts.dt.tz_localize(tz, ambiguous=False)
+        # 0   2015-11-01 01:30:00-05:00
+        # dtype: datetime64[ns, America/New_York]
+        # >>>
+        # >>> str(tz.localize(t))
+        # '2015-11-01 01:30:00-05:00'
         tz = timezone or _get_local_timezone()
-        return s.dt.tz_localize(tz).dt.tz_convert('UTC')
+        return s.dt.tz_localize(tz, ambiguous=False).dt.tz_convert('UTC')
     elif is_datetime64tz_dtype(s.dtype):
         return s.dt.tz_convert('UTC')
     else:
@@ -1788,8 +1818,9 @@ def _check_series_convert_timestamps_localize(s, from_timezone, to_timezone):
         return s.dt.tz_convert(to_tz).dt.tz_localize(None)
     elif is_datetime64_dtype(s.dtype) and from_tz != to_tz:
         # `s.dt.tz_localize('tzlocal()')` doesn't work properly when including NaT.
-        return s.apply(lambda ts: ts.tz_localize(from_tz).tz_convert(to_tz).tz_localize(None)
-                       if ts is not pd.NaT else pd.NaT)
+        return s.apply(
+            lambda ts: ts.tz_localize(from_tz, ambiguous=False).tz_convert(to_tz).tz_localize(None)
+            if ts is not pd.NaT else pd.NaT)
     else:
         return s
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org