You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sebastian Eckweiler (Jira)" <ji...@apache.org> on 2021/05/11 06:57:00 UTC

[jira] [Updated] (SPARK-35367) Window group-key and pandas inconsistent

     [ https://issues.apache.org/jira/browse/SPARK-35367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sebastian Eckweiler updated SPARK-35367:
----------------------------------------
    Description: 
Not completely sure whether this is a bug or a configuration/usage issue:

We are seeing inconsistent timezone-treatment when using a windowed group by aggregation in combiation with a pandas udf.

A minimal example:

 
{code:java}
// code placeholder

def a_udf(group_key, pdf: pd.DataFrame) -> pd.DataFrame:
    w_start = group_key[0]["start"]
    w_end = group_key[0]["end"]

    print(f"Pandas   : {pdf['window_start'].iloc[0]} to {pdf['window_end'].iloc[0]}")
    print(f"Group key: {w_start} to {w_end}")
    print(f"Data     : {pdf['time'].min()} to {pdf['time'].max()}")

    assert (pdf["time"] >= w_start).all()
    assert (pdf["time"] < w_end).all()        

    # some result

    return pd.DataFrame.from_records([{"result": 1}])


df = spark.createDataFrame([(datetime.datetime(2020, 1, 1, 12, 30, 0),)], schema=["time"])

w = window("time", "60 minutes")
df.withColumn("window_start", w.start)\
  .withColumn("window_end", w.end)\
  .groupby(w).applyInPandas(a_udf, schema="result int")\
  .show()
{code}
 

Produces:
{code:java}
Pandas   : 2020-01-01 12:00:00 to 2020-01-01 13:00:00
Group key: 2020-01-01 11:00:00 to 2020-01-01 12:00:00
Data     : 2020-01-01 12:30:00 to 2020-01-01 12:30:00

{code}
And the assertions fail. It seems the group-key goes through same timezone- (and dst?) conversion that ends up being one hour off.
 This is without any specific timezone configuration and with CEST as the local timezone.

Is this working as expected?

Setting
{code:java}
"spark.sql.session.timeZone": "UTC"
"spark.driver.extraJavaOptions": "-Duser.timezone=UTC"
"spark.executor.extraJavaOptions": "-Duser.timezone=UTC"
{code}
seems to be workaround.
 Using "Europe/Berlin" for all three timezone settings however reproduces the inconsistent behaviour.

I would assume though, that running this in a non-UTC timezone should generally be possible?

 

  was:
Not completely sure whether this is a bug or a configuration/usage issue:

We are seeing inconsistent timezone-treatment when using a windowed group by aggregation in combiation with a pandas udf.

A minimal example:

 
{code:java}
// code placeholder

def a_udf(group_key, pdf: pd.DataFrame) -> pd.DataFrame:
    w_start = group_key[0]["start"]
    w_end = group_key[0]["end"]

    print(f"Pandas   : {pdf['window_start'].iloc[0]} to {pdf['window_end'].iloc[0]}")
    print(f"Group key: {w_start} to {w_end}")
    print(f"Data     : {pdf['time'].min()} to {pdf['time'].max()}")

    assert (pdf["time"] >= w_start).all()
    assert (pdf["time"] < w_end).all()        

    # some result

    return pd.DataFrame.from_records([{"result": 1}])

    df = spark.createDataFrame([(datetime.datetime(2020, 1, 1, 12, 30, 0),)], schema=["time"])

    w = window("time", "60 minutes")
    df.withColumn("window_start", w.start).withColumn("window_end", w.end).groupby(w).applyInPandas(a_udf, schema="result int").show()
{code}
 

Produces:
{code:java}
Pandas   : 2020-01-01 12:00:00 to 2020-01-01 13:00:00
Group key: 2020-01-01 11:00:00 to 2020-01-01 12:00:00
Data     : 2020-01-01 12:30:00 to 2020-01-01 12:30:00

{code}
And the assertions fail. It seems the group-key goes through same timezone- (and dst?) conversion that ends up being one hour off.
 This is without any specific timezone configuration and with CEST as the local timezone.

Is this working as expected?

Setting
{code:java}
"spark.sql.session.timeZone": "UTC"
"spark.driver.extraJavaOptions": "-Duser.timezone=UTC"
"spark.executor.extraJavaOptions": "-Duser.timezone=UTC"
{code}
seems to be workaround.
 Using "Europe/Berlin" for all three timezone settings however reproduces the inconsistent behaviour.

I would assume though, that running this in a non-UTC timezone should generally be possible?

 


> Window group-key and pandas inconsistent
> ----------------------------------------
>
>                 Key: SPARK-35367
>                 URL: https://issues.apache.org/jira/browse/SPARK-35367
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 3.0.1
>            Reporter: Sebastian Eckweiler
>            Priority: Major
>
> Not completely sure whether this is a bug or a configuration/usage issue:
> We are seeing inconsistent timezone-treatment when using a windowed group by aggregation in combiation with a pandas udf.
> A minimal example:
>  
> {code:java}
> // code placeholder
> def a_udf(group_key, pdf: pd.DataFrame) -> pd.DataFrame:
>     w_start = group_key[0]["start"]
>     w_end = group_key[0]["end"]
>     print(f"Pandas   : {pdf['window_start'].iloc[0]} to {pdf['window_end'].iloc[0]}")
>     print(f"Group key: {w_start} to {w_end}")
>     print(f"Data     : {pdf['time'].min()} to {pdf['time'].max()}")
>     assert (pdf["time"] >= w_start).all()
>     assert (pdf["time"] < w_end).all()        
>     # some result
>     return pd.DataFrame.from_records([{"result": 1}])
> df = spark.createDataFrame([(datetime.datetime(2020, 1, 1, 12, 30, 0),)], schema=["time"])
> w = window("time", "60 minutes")
> df.withColumn("window_start", w.start)\
>   .withColumn("window_end", w.end)\
>   .groupby(w).applyInPandas(a_udf, schema="result int")\
>   .show()
> {code}
>  
> Produces:
> {code:java}
> Pandas   : 2020-01-01 12:00:00 to 2020-01-01 13:00:00
> Group key: 2020-01-01 11:00:00 to 2020-01-01 12:00:00
> Data     : 2020-01-01 12:30:00 to 2020-01-01 12:30:00
> {code}
> And the assertions fail. It seems the group-key goes through same timezone- (and dst?) conversion that ends up being one hour off.
>  This is without any specific timezone configuration and with CEST as the local timezone.
> Is this working as expected?
> Setting
> {code:java}
> "spark.sql.session.timeZone": "UTC"
> "spark.driver.extraJavaOptions": "-Duser.timezone=UTC"
> "spark.executor.extraJavaOptions": "-Duser.timezone=UTC"
> {code}
> seems to be workaround.
>  Using "Europe/Berlin" for all three timezone settings however reproduces the inconsistent behaviour.
> I would assume though, that running this in a non-UTC timezone should generally be possible?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org