You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dieter De Paepe (Jira)" <ji...@apache.org> on 2020/10/14 15:21:00 UTC

[jira] [Created] (SPARK-33150) Groupby key may not be unique when using window

Dieter De Paepe created SPARK-33150:
---------------------------------------

             Summary: Groupby key may not be unique when using window
                 Key: SPARK-33150
                 URL: https://issues.apache.org/jira/browse/SPARK-33150
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 3.0.0, 2.3.3
            Reporter: Dieter De Paepe


 

Due to the way spark converts dates to local times, it may end up losing details that allow it to differentiate instants when those times fall in the transition for daylight savings time. Setting the spark timezone to UTC does not resolve the issue.

This issue is somewhat related to SPARK-32123, but seems independent enough to consider this a separate issue.

A minimal example is below. I tested these on Spark 3.0.0 and 2.3.3 (I could not get 2.4.x to work on my system). My machine is located in timezone "Europe/Brussels".

 
{code:java}
import pyspark
import pyspark.sql.functions as f
spark = (pyspark
 .sql
 .SparkSession
 .builder
 .master('local[1]')
 .config("spark.sql.session.timeZone", "UTC")
 .config('spark.driver.extraJavaOptions', '-Duser.timezone=UTC') \
 .config('spark.executor.extraJavaOptions', '-Duser.timezone=UTC')
 .getOrCreate()
)
debug_df = spark.createDataFrame([
 (1572137640, 1),
 (1572137640, 2),
 (1572141240, 3),
 (1572141240, 4)
],['epochtime', 'value'])

debug_df \
 .withColumn('time', f.from_unixtime('epochtime')) \
 .withColumn('window', f.window('time', '1 minute').start) \
 .collect()
{code}
 

Output, here we see the window function internally transforms the times to local time, and as such has to disambiguate between the Belgian winter and summer hour transition by setting the "fold" attribute:

 
{code:java}
[Row(epochtime=1572137640, value=1, time='2019-10-27 00:54:00', window=datetime.datetime(2019, 10, 27, 2, 54)),
 Row(epochtime=1572137640, value=2, time='2019-10-27 00:54:00', window=datetime.datetime(2019, 10, 27, 2, 54)),
 Row(epochtime=1572141240, value=3, time='2019-10-27 01:54:00', window=datetime.datetime(2019, 10, 27, 2, 54, fold=1)),
 Row(epochtime=1572141240, value=4, time='2019-10-27 01:54:00', window=datetime.datetime(2019, 10, 27, 2, 54, fold=1))]{code}
 

Now, this has severe implications when we use the window function for a groupby operation:

 
{code:java}
output = debug_df \
 .withColumn('time', f.from_unixtime('epochtime')) \
 .groupby(f.window('time', '1 minute').start.alias('window')).agg(
   f.min('value').alias('min_value')
 )
output_collect = output.collect()
output_pandas = output.toPandas()
print(output_collect)
print(output_pandas)
{code}
Output:

 
{code:java}
[Row(window=datetime.datetime(2019, 10, 27, 2, 54), min_value=1), Row(window=datetime.datetime(2019, 10, 27, 2, 54, fold=1), min_value=3)]

  window              min_value
0 2019-10-27 00:54:00 1
1 2019-10-27 00:54:00 3
{code}
 

While the output using collect() outputs Belgian local time, it allows us to differentiate between the two different keys visually using the fold attribute. However, due to the way the fold attribute is defined, [it is ignored for|https://www.python.org/dev/peps/pep-0495/#the-fold-attribute] equality comparison.

On the other hand, the pandas output uses the UTC output (due to the setting of spark.sql.session.timeZone), but it has lost the disambiguating fold attribute in the pandas datatype conversion.

In both cases, the column on which was grouped is not unique.

 
{code:java}
print(output_collect[0].window == output_collect[1].window)  # True
print(output_collect[0].window.fold == output_collect[1].window.fold)  # False
print(output_pandas.window[0] == output_pandas.window[1])  # True
print(output_pandas.window[0].fold == output_pandas.window[1].fold)  # True
{code}
 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org