You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Imran Rashid (JIRA)" <ji...@apache.org> on 2018/01/16 18:37:01 UTC

[jira] [Assigned] (SPARK-12297) Add work-around for Parquet/Hive int96 timestamp bug.

     [ https://issues.apache.org/jira/browse/SPARK-12297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Imran Rashid reassigned SPARK-12297:
------------------------------------

    Assignee: Marcelo Vanzin  (was: Imran Rashid)

> Add work-around for Parquet/Hive int96 timestamp bug.
> -----------------------------------------------------
>
>                 Key: SPARK-12297
>                 URL: https://issues.apache.org/jira/browse/SPARK-12297
>             Project: Spark
>          Issue Type: Task
>          Components: Spark Core
>            Reporter: Ryan Blue
>            Assignee: Marcelo Vanzin
>            Priority: Major
>             Fix For: 2.3.0
>
>
> Spark copied Hive's behavior for parquet, but this was inconsistent with other file formats, and inconsistent with Impala (which is the original source of putting a timestamp as an int96 in parquet, I believe).  This made timestamps in parquet act more like timestamps with timezones, while in other file formats, timestamps have no time zone, they are a "floating time".
> The easiest way to see this issue is to write out a table with timestamps in multiple different formats from one timezone, then try to read them back in another timezone.  Eg., here I write out a few timestamps to parquet and textfile hive tables, and also just as a json file, all in the "America/Los_Angeles" timezone:
> {code}
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> val tblPrefix = args(0)
> val schema = new StructType().add("ts", TimestampType)
> val rows = sc.parallelize(Seq(
>   "2015-12-31 23:50:59.123",
>   "2015-12-31 22:49:59.123",
>   "2016-01-01 00:39:59.123",
>   "2016-01-01 01:29:59.123"
> ).map { x => Row(java.sql.Timestamp.valueOf(x)) })
> val rawData = spark.createDataFrame(rows, schema).toDF()
> rawData.show()
> Seq("parquet", "textfile").foreach { format =>
>   val tblName = s"${tblPrefix}_$format"
>   spark.sql(s"DROP TABLE IF EXISTS $tblName")
>   spark.sql(
>     raw"""CREATE TABLE $tblName (
>           |  ts timestamp
>           | )
>           | STORED AS $format
>      """.stripMargin)
>   rawData.write.insertInto(tblName)
> }
> rawData.write.json(s"${tblPrefix}_json")
> {code}
> Then I start a spark-shell in "America/New_York" timezone, and read the data back from each table:
> {code}
> scala> spark.sql("select * from la_parquet").collect().foreach{println}
> [2016-01-01 02:50:59.123]
> [2016-01-01 01:49:59.123]
> [2016-01-01 03:39:59.123]
> [2016-01-01 04:29:59.123]
> scala> spark.sql("select * from la_textfile").collect().foreach{println}
> [2015-12-31 23:50:59.123]
> [2015-12-31 22:49:59.123]
> [2016-01-01 00:39:59.123]
> [2016-01-01 01:29:59.123]
> scala> spark.read.json("la_json").collect().foreach{println}
> [2015-12-31 23:50:59.123]
> [2015-12-31 22:49:59.123]
> [2016-01-01 00:39:59.123]
> [2016-01-01 01:29:59.123]
> scala> spark.read.json("la_json").join(spark.sql("select * from la_textfile"), "ts").show()
> +--------------------+
> |                  ts|
> +--------------------+
> |2015-12-31 23:50:...|
> |2015-12-31 22:49:...|
> |2016-01-01 00:39:...|
> |2016-01-01 01:29:...|
> +--------------------+
> scala> spark.read.json("la_json").join(spark.sql("select * from la_parquet"), "ts").show()
> +---+
> | ts|
> +---+
> +---+
> {code}
> The textfile and json based data shows the same times, and can be joined against each other, while the times from the parquet data have changed (and obviously joins fail).
> This is a big problem for any organization that may try to read the same data (say in S3) with clusters in multiple timezones.  It can also be a nasty surprise as an organization tries to migrate file formats.  Finally, its a source of incompatibility between Hive, Impala, and Spark.
> HIVE-12767 aims to fix this by introducing a table property which indicates the "storage timezone" for the table.  Spark should add the same to ensure consistency between file formats, and with Hive & Impala.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org