You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Don Drake <do...@gmail.com> on 2017/02/01 03:31:10 UTC

Re: Converting timezones in Spark

So, to follow up on this.

A few lessons learned, when you print a timestamp, it will only show the
date/time in your current timezone, regardless of any conversions you
applied to it.

The trick is to convert it (cast) to a Long, and then the Java8 java.time.*
functions can translate to any timezone and generate a string representing
the timestamp.

Here's a working example:

scala> :paste
// Entering paste mode (ctrl-D to finish)

import java.time.Instant
import java.time.ZonedDateTime
import java.time.ZoneId
import java.time.format.DateTimeFormatter
import org.apache.spark.sql.functions.udf

def convertToTZ(col: Long, zone: String, formatter:
DateTimeFormatter):String = {

  val i = Instant.ofEpochSecond(col)
  val z = ZonedDateTime.ofInstant(i, ZoneId.of(zone))

  z.format(formatter)

}

def convertToTZFullTimestamp = udf((col: Long, zone:String) =>
convertToTZ(col, zone, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss
z")) )

val df = Seq((1L, "2016-09-14 16:46:32 UTC"), (2L, "not a timestamp"), (3L,
"2016-09-14 16:59:57 UTC"), (4L, "2016-11-30 12:00:01 UTC")).toDF("id",
"dts")

val df2 = df.withColumn("created_at", unix_timestamp($"dts", "yyyy-MM-dd
HH:mm:ss Z").cast("timestamp")).withColumn("EST_tz",
convertToTZFullTimestamp($"created_at".cast("long"),
lit("America/New_York")))

df2.show(4, false)


// Exiting paste mode, now interpreting.

+---+-----------------------+---------------------+-----------------------+
|id |dts                    |created_at           |EST_tz                 |
+---+-----------------------+---------------------+-----------------------+
|1  |2016-09-14 16:46:32 UTC|2016-09-14 11:46:32.0|2016-09-14 12:46:32 EDT|
|2  |not a timestamp        |null                 |null                   |
|3  |2016-09-14 16:59:57 UTC|2016-09-14 11:59:57.0|2016-09-14 12:59:57 EDT|
|4  |2016-11-30 12:00:01 UTC|2016-11-30 06:00:01.0|2016-11-30 07:00:01 EST|
+---+-----------------------+---------------------+-----------------------+

import java.time.Instant
import java.time.ZonedDateTime
import java.time.ZoneId
import java.time.format.DateTimeFormatter
import org.apache.spark.sql.functions.udf
convertToTZ: (col: Long, zone: String, formatter:
java.time.format.DateTimeFormatter)String
convertToTZFullTimestamp:
org.apache.spark.sql.expressions.UserDefinedFunction
df: org.apache.spark.sql.DataFrame = [id: bigint, dts: string]
df2: org.apache.spark.sql.DataFrame = [id: bigint, dts: string ... 2 more
fields]

scala>



On Fri, Jan 27, 2017 at 12:01 PM, Don Drake <do...@gmail.com> wrote:

> I'm reading CSV with a timestamp clearly identified in the UTC timezone,
> and I need to store this in a parquet format and eventually read it back
> and convert to different timezones as needed.
>
> Sounds straightforward, but this involves some crazy function calls and
> I'm seeing strange results as I build a test case.
>
> See my example below.  Why are the values for est_ts and cst_ts the same
> in rows 1 and 3 (wrong), but different and correct in row 4?  I have a
> feeling it has to do with daylight savings time, but I'm not sure where to
> resolve it.
>
> Please note that I'm in the Central timezone.
>
> Is there a better method to do this?
>
>
> Spark context available as 'sc' (master = local[*], app id =
> local-1485539128193).
>
> Spark session available as 'spark'.
>
> Welcome to
>
>       ____              __
>
>      / __/__  ___ _____/ /__
>
>     _\ \/ _ \/ _ `/ __/  '_/
>
>    /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
>
>       /_/
>
>
> Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
> 1.8.0_60)
>
> Type in expressions to have them evaluated.
>
> Type :help for more information.
>
>
> scala> :paste
>
> // Entering paste mode (ctrl-D to finish)
>
>
> import org.apache.spark.sql.Column
>
> def stringts_to_tz(col:Column, tz:String) = {
>
>     from_utc_timestamp(to_utc_timestamp(from_unixtime(unix_timestamp(col,
> "yyyy-MM-dd HH:mm:ss Z")), "CST"), tz)
>
> }
>
>
> val df = Seq((1L, "2016-09-14 16:46:32 UTC"), (2L, "not a timestamp"),
> (3L, "2016-09-14 16:59:57 UTC"), (4L, "2016-11-31 12:00:01
> UTC")).toDF("id", "dts")
>
> val df2 = df.withColumn("created_at", unix_timestamp($"dts", "yyyy-MM-dd
> HH:mm:ss Z").cast("timestamp"))
>
>     .withColumn("unix_ts", unix_timestamp($"dts", "yyyy-MM-dd HH:mm:ss Z"))
>
>     .withColumn("local_hour", hour($"created_at"))
>
>     .withColumn("s2", from_unixtime($"unix_ts"))
>
>     .withColumn("s3", to_utc_timestamp($"s2", "CST"))
>
>     .withColumn("s4", from_utc_timestamp($"s3", "EST"))
>
>     .withColumn("utc_ts", stringts_to_tz($"dts", "UTC"))
>
>     .withColumn("est_ts", stringts_to_tz($"dts", "CST"))
>
>     .withColumn("cst_ts", stringts_to_tz($"dts", "EST"))
>
> df2.show(4,false)
>
> df2.printSchema
>
>
>
> // Exiting paste mode, now interpreting.
>
>
> +---+-----------------------+---------------------+---------
> -+----------+-------------------+---------------------+-----
> ----------------+---------------------+---------------------
> +---------------------+
>
> |id |dts                    |created_at           |unix_ts
> |local_hour|s2                 |s3                   |s4
> |utc_ts               |est_ts               |cst_ts               |
>
> +---+-----------------------+---------------------+---------
> -+----------+-------------------+---------------------+-----
> ----------------+---------------------+---------------------
> +---------------------+
>
> |1  |2016-09-14 16:46:32 UTC|2016-09-14 11:46:32.0|1473871592|11
>  |2016-09-14 11:46:32|2016-09-14 16:46:32.0|2016-09-14
> 11:46:32.0|2016-09-14 16:46:32.0|2016-09-14 11:46:32.0|2016-09-14
> 11:46:32.0|
>
> |2  |not a timestamp        |null                 |null      |null
>  |null               |null                 |null                 |null
>             |null                 |null                 |
>
> |3  |2016-09-14 16:59:57 UTC|2016-09-14 11:59:57.0|1473872397|11
>  |2016-09-14 11:59:57|2016-09-14 16:59:57.0|2016-09-14
> 11:59:57.0|2016-09-14 16:59:57.0|2016-09-14 11:59:57.0|2016-09-14
> 11:59:57.0|
>
> |4  |2016-11-31 12:00:01 UTC|2016-12-01 06:00:01.0|1480593601|6
> |2016-12-01 06:00:01|2016-12-01 12:00:01.0|2016-12-01 07:00:01.0|2016-12-01
> 12:00:01.0|2016-12-01 06:00:01.0|2016-12-01 07:00:01.0|
>
> +---+-----------------------+---------------------+---------
> -+----------+-------------------+---------------------+-----
> ----------------+---------------------+---------------------
> +---------------------+
>
>
> root
>
>  |-- id: long (nullable = false)
>
>  |-- dts: string (nullable = true)
>
>  |-- created_at: timestamp (nullable = true)
>
>  |-- unix_ts: long (nullable = true)
>
>  |-- local_hour: integer (nullable = true)
>
>  |-- s2: string (nullable = true)
>
>  |-- s3: timestamp (nullable = true)
>
>  |-- s4: timestamp (nullable = true)
>
>  |-- utc_ts: timestamp (nullable = true)
>
>  |-- est_ts: timestamp (nullable = true)
>
>  |-- cst_ts: timestamp (nullable = true)
>
>
> import org.apache.spark.sql.Column
>
> stringts_to_tz: (col: org.apache.spark.sql.Column, tz:
> String)org.apache.spark.sql.Column
>
> df: org.apache.spark.sql.DataFrame = [id: bigint, dts: string]
>
> df2: org.apache.spark.sql.DataFrame = [id: bigint, dts: string ... 9 more
> fields]
>
>
> scala>
>
> --
> Donald Drake
> Drake Consulting
> http://www.drakeconsulting.com/
> https://twitter.com/dondrake <http://www.MailLaunder.com/>
> 800-733-2143 <(800)%20733-2143>
>



-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake <http://www.MailLaunder.com/>
800-733-2143