You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Xiaoming (Jira)" <ji...@apache.org> on 2022/04/26 07:10:00 UTC

[jira] [Created] (SPARK-39021) Timestamp data is converted to machine time inside UDF.

Xiaoming created SPARK-39021:
--------------------------------

             Summary: Timestamp data is converted to machine time inside UDF.
                 Key: SPARK-39021
                 URL: https://issues.apache.org/jira/browse/SPARK-39021
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 3.2.0, 2.4.7
         Environment: * Tested on Dataiku
(Spark 2.4.7, pyspark 3.2.0 (installed package version))
 * Tested locally
(Spark 3.2.0, pyspark 3.1.1 (installed package version))
 * Tested on Databricks
(Spark 3.2.0, Scala 2.12, pyspark 3.2.0)
            Reporter: Xiaoming


UDFs don't seem to be aware of the timezone settings a user can define in a config, but will use machine time default, which is as far as we know unexpected and undesired behaviour.

Locally (to test) and Dataiku (platform we use) the machine time is CET (Europe/Amsterdam).
That is what `spark.sql.session.timeZone` indicates.

If we start sparksession and set `spark.sql.session.timeZone` or `spark.driver.extraJavaOptions` as "-Duser.timezone=UTC", the settings don't change our datetime data (when saving or casted to string).
When the datetime value is an input for UDF it does change the time to CET, e.g. our data is "2020-04-06 10:55:01" and inside the UDF it identifies it as "2020-04-06 12:55:01".
Also if the UDF output is of TimestampType it is converted back to the settings of the user, e.g. inside the UDF has datetime "2020-04-06 12:55:01" and the TimestampType return will give "2020-04-06 10:55:01". We tested that datetimes constructed inside the UDF behave similar when the output has TimestampType, which is again unexpected and undesired in our eyes.

Problems won't be detected if the user config settings and machine time are exactly the same, which is logical.
It is also not a problem on Databricks, because machine time there is "Etc/UTC".

We don't have the knowledge to fix this bug, but we would like something of a fix.
Some of the very undesired workarounds we could use now:
* convert datetime inside the UDF to UTC, do stuff and if datetime output is needed convert it back to CET (if output is TimestampType) or simply output a String
* use input StringType and cast the string to datetime in the UDF and return a StringType.

Some code we used for testing below.
FYI, `collect()` has in this timezone difference setting inconsistent output w.r.t. `show()` and `toPandas()`.

{code:python}
spark = (
    SparkSession.builder
#   .config("spark.driver.extraJavaOptions", "-Duser.timezone=UTC")
#    .config("spark.executor.extraJavaOptions", "-Duser.timezone=UTC")
#     .config("spark.sql.session.timeZone", "UTC")
    .config("spark.sql.repl.eagerEval.enabled", "true")
    .getOrCreate()
)
sc = spark.sparkContext

def show_config(name):
    special = ('spark.driver.extraJavaOptions', 'spark.executor.extraJavaOptions')
    context = sc.getConf().get(name)

    if (context is None) & (name in special):
        spark_default = '____'
    else:
        spark_default = spark.conf.get(name)
        
    print(f'{name:42s} (spark, context): ({spark_default}, {context})')

show_config("spark.driver.extraJavaOptions")
show_config("spark.executor.extraJavaOptions")
show_config("spark.sql.session.timeZone")

show_config("spark.sql.parquet.int96AsTimestamp")
show_config("spark.sql.parquet.int96TimestampConversion")
show_config("spark.sql.parquet.outputTimestampType")

test_udf_1 = F.udf(lambda x: x, TimestampType())
test_udf_2 = F.udf(lambda x: str(x), StringType())
test_udf_3 = F.udf(lambda x: pd.to_datetime(x), TimestampType())

sdf_synth = (
    spark
    .createDataFrame([("summer", '2020-07-08 14:00:00', datetime(2020, 7, 8, 14, 0, 0)),
                      ("winter", '2020-01-08 14:00:00', datetime(2020, 1, 8, 14, 0, 0))],
                     StructType([
                        StructField("season",StringType()),
                        StructField("dts_str_og",StringType()),     
                        StructField("dts", TimestampType())
                     ]))
    
    .withColumn("dts_udf", test_udf_2(F.col("dts")))
    .withColumn("dts_str", F.col("dts").cast('string'))
    .withColumn("dts_str_udf", test_udf_2(F.col("dts_str")))
#     .withColumn("dts_utc", F.to_utc_timestamp(F.col("dts"), "UTC"))
    .withColumn("dts_cur", F.current_timestamp())
    .withColumn("dts_cur_udf", test_udf_2(F.col("dts_cur")))
    .withColumn("dts_cur_str", F.col('dts_cur').cast('string'))
    .withColumn("dts_cur_str_udf", test_udf_2(F.col("dts_cur_str")))
    .withColumn("dts_from_str", F.to_timestamp(F.col('dts_str_og'), 'yyyy-MM-dd HH:mm:ss'))
    .withColumn("dts_from_str_udf", test_udf_2(F.col("dts_from_str")))
    .withColumn("dts_from_str2", F.col('dts_from_str').cast('string'))
    .withColumn("dts_from_str2_udf", test_udf_2(F.col("dts_from_str2")))
)

# show()
sdf_synth #.show(truncate=False)

# collect()
for row in sdf_synth.collect():
    r = row.asDict()
    
    for key, value in r.items():
        print(f'{key:20s}: {value}')

# toPandas()
df_synth = sdf_synth.toPandas()
df_synth
{code}



 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org