You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/02/11 08:31:40 UTC
spark git commit: [SPARK-23314][PYTHON] Add ambiguous=False when
localizing tz-naive timestamps in Arrow codepath to deal with dst
Repository: spark
Updated Branches:
refs/heads/master 0783876c8 -> a34fce19b
[SPARK-23314][PYTHON] Add ambiguous=False when localizing tz-naive timestamps in Arrow codepath to deal with dst
## What changes were proposed in this pull request?
When tz_localize a tz-naive timetamp, pandas will throw exception if the timestamp is during daylight saving time period, e.g., `2015-11-01 01:30:00`. This PR fixes this issue by setting `ambiguous=False` when calling tz_localize, which is the same default behavior of pytz.
## How was this patch tested?
Add `test_timestamp_dst`
Author: Li Jin <ic...@gmail.com>
Closes #20537 from icexelloss/SPARK-23314.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a34fce19
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a34fce19
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a34fce19
Branch: refs/heads/master
Commit: a34fce19bc0ee5a7e36c6ecba75d2aeb70fdcbc7
Parents: 0783876
Author: Li Jin <ic...@gmail.com>
Authored: Sun Feb 11 17:31:35 2018 +0900
Committer: hyukjinkwon <gu...@gmail.com>
Committed: Sun Feb 11 17:31:35 2018 +0900
----------------------------------------------------------------------
python/pyspark/sql/tests.py | 39 +++++++++++++++++++++++++++++++++++++++
python/pyspark/sql/types.py | 37 ++++++++++++++++++++++++++++++++++---
2 files changed, 73 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/a34fce19/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 1087c3f..4bc59fd 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -3670,6 +3670,21 @@ class ArrowTests(ReusedSQLTestCase):
self.assertEqual(pdf_col_names, df.columns)
self.assertEqual(pdf_col_names, df_arrow.columns)
+ # Regression test for SPARK-23314
+ def test_timestamp_dst(self):
+ import pandas as pd
+ # Daylight saving time for Los Angeles for 2015 is Sun, Nov 1 at 2:00 am
+ dt = [datetime.datetime(2015, 11, 1, 0, 30),
+ datetime.datetime(2015, 11, 1, 1, 30),
+ datetime.datetime(2015, 11, 1, 2, 30)]
+ pdf = pd.DataFrame({'time': dt})
+
+ df_from_python = self.spark.createDataFrame(dt, 'timestamp').toDF('time')
+ df_from_pandas = self.spark.createDataFrame(pdf)
+
+ self.assertPandasEqual(pdf, df_from_python.toPandas())
+ self.assertPandasEqual(pdf, df_from_pandas.toPandas())
+
@unittest.skipIf(
not _have_pandas or not _have_pyarrow,
@@ -4311,6 +4326,18 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
self.assertEquals(expected.collect(), res1.collect())
self.assertEquals(expected.collect(), res2.collect())
+ # Regression test for SPARK-23314
+ def test_timestamp_dst(self):
+ from pyspark.sql.functions import pandas_udf
+ # Daylight saving time for Los Angeles for 2015 is Sun, Nov 1 at 2:00 am
+ dt = [datetime.datetime(2015, 11, 1, 0, 30),
+ datetime.datetime(2015, 11, 1, 1, 30),
+ datetime.datetime(2015, 11, 1, 2, 30)]
+ df = self.spark.createDataFrame(dt, 'timestamp').toDF('time')
+ foo_udf = pandas_udf(lambda x: x, 'timestamp')
+ result = df.withColumn('time', foo_udf(df.time))
+ self.assertEquals(df.collect(), result.collect())
+
@unittest.skipIf(
not _have_pandas or not _have_pyarrow,
@@ -4482,6 +4509,18 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
with self.assertRaisesRegexp(Exception, 'Unsupported data type'):
df.groupby('id').apply(f).collect()
+ # Regression test for SPARK-23314
+ def test_timestamp_dst(self):
+ from pyspark.sql.functions import pandas_udf, PandasUDFType
+ # Daylight saving time for Los Angeles for 2015 is Sun, Nov 1 at 2:00 am
+ dt = [datetime.datetime(2015, 11, 1, 0, 30),
+ datetime.datetime(2015, 11, 1, 1, 30),
+ datetime.datetime(2015, 11, 1, 2, 30)]
+ df = self.spark.createDataFrame(dt, 'timestamp').toDF('time')
+ foo_udf = pandas_udf(lambda pdf: pdf, 'time timestamp', PandasUDFType.GROUPED_MAP)
+ result = df.groupby('time').apply(foo_udf).sort('time')
+ self.assertPandasEqual(df.toPandas(), result.toPandas())
+
@unittest.skipIf(
not _have_pandas or not _have_pyarrow,
http://git-wip-us.apache.org/repos/asf/spark/blob/a34fce19/python/pyspark/sql/types.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index 2599dc5..f7141b4 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -1759,8 +1759,38 @@ def _check_series_convert_timestamps_internal(s, timezone):
from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype
# TODO: handle nested timestamps, such as ArrayType(TimestampType())?
if is_datetime64_dtype(s.dtype):
+ # When tz_localize a tz-naive timestamp, the result is ambiguous if the tz-naive
+ # timestamp is during the hour when the clock is adjusted backward during due to
+ # daylight saving time (dst).
+ # E.g., for America/New_York, the clock is adjusted backward on 2015-11-01 2:00 to
+ # 2015-11-01 1:00 from dst-time to standard time, and therefore, when tz_localize
+ # a tz-naive timestamp 2015-11-01 1:30 with America/New_York timezone, it can be either
+ # dst time (2015-01-01 1:30-0400) or standard time (2015-11-01 1:30-0500).
+ #
+ # Here we explicit choose to use standard time. This matches the default behavior of
+ # pytz.
+ #
+ # Here are some code to help understand this behavior:
+ # >>> import datetime
+ # >>> import pandas as pd
+ # >>> import pytz
+ # >>>
+ # >>> t = datetime.datetime(2015, 11, 1, 1, 30)
+ # >>> ts = pd.Series([t])
+ # >>> tz = pytz.timezone('America/New_York')
+ # >>>
+ # >>> ts.dt.tz_localize(tz, ambiguous=True)
+ # 0 2015-11-01 01:30:00-04:00
+ # dtype: datetime64[ns, America/New_York]
+ # >>>
+ # >>> ts.dt.tz_localize(tz, ambiguous=False)
+ # 0 2015-11-01 01:30:00-05:00
+ # dtype: datetime64[ns, America/New_York]
+ # >>>
+ # >>> str(tz.localize(t))
+ # '2015-11-01 01:30:00-05:00'
tz = timezone or _get_local_timezone()
- return s.dt.tz_localize(tz).dt.tz_convert('UTC')
+ return s.dt.tz_localize(tz, ambiguous=False).dt.tz_convert('UTC')
elif is_datetime64tz_dtype(s.dtype):
return s.dt.tz_convert('UTC')
else:
@@ -1788,8 +1818,9 @@ def _check_series_convert_timestamps_localize(s, from_timezone, to_timezone):
return s.dt.tz_convert(to_tz).dt.tz_localize(None)
elif is_datetime64_dtype(s.dtype) and from_tz != to_tz:
# `s.dt.tz_localize('tzlocal()')` doesn't work properly when including NaT.
- return s.apply(lambda ts: ts.tz_localize(from_tz).tz_convert(to_tz).tz_localize(None)
- if ts is not pd.NaT else pd.NaT)
+ return s.apply(
+ lambda ts: ts.tz_localize(from_tz, ambiguous=False).tz_convert(to_tz).tz_localize(None)
+ if ts is not pd.NaT else pd.NaT)
else:
return s
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org