You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Sungju Jin (Jira)" <ji...@apache.org> on 2020/07/17 04:05:00 UTC

[jira] [Updated] (HUDI-1107) Respect spark.sql.parquet.outputTimestampType when writing datasets

     [ https://issues.apache.org/jira/browse/HUDI-1107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sungju Jin updated HUDI-1107:
-----------------------------
    Description: 
Hi team, I've noticed `spark.sql.parquet.outputTimestampType` was ignored when writing datasets. It would be great if Hudi supports this if possible, thank you!

 
{code:java}
val spark = SparkSession
  .builder()
  .master("local")
  .getOrCreate()
import spark.implicits._
spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS")
val inputs = """{"id":"2", "discount_rate_id": "202", "price": "10.00", "currency": "usd", "start_date": "2020-04-19", "end_date": "null", "created": "2020-04-19 10:57:17", "modified": "2018-04-19 10:57:17"}
{"id":"4", "discount_rate_id": "202", "price": "10.00", "currency": "eur", "start_date": "2020-04-19", "end_date": "null", "created": "2020-04-19 10:57:18", "modified": "2018-04-19 10:57:18"}"""
val df = spark
  .read
  .option("timestampFormat", "yyyy/MM/dd HH:mm:ss")
  .json(Seq(inputs).toDS)
  .toDF
  .withColumn("created", col("created").cast(TimestampType))
  .withColumn("modified", col("modified").cast(TimestampType))

df.write
  .format("org.apache.hudi")
  .option(PRECOMBINE_FIELD_OPT_KEY, "id")
  .option(RECORDKEY_FIELD_OPT_KEY, "created")
  .option(TABLE_NAME, s"adhoc.test")
  .mode("overwrite")
  .save("/tmp/test")
{code}
{code:java}
$ cd /tmp/test/default/
$ parquet-tools schema 0e924c77-0e31-46c4-ade9-f9cc97ea02cd-0_0-21-12005_20200716205246.parquet

message adhoc.test_record {
  optional binary _hoodie_commit_time (UTF8);
  optional binary _hoodie_commit_seqno (UTF8);
  optional binary _hoodie_record_key (UTF8);
  optional binary _hoodie_partition_path (UTF8);
  optional binary _hoodie_file_name (UTF8);
  optional int64 created (TIMESTAMP_MICROS); <-- This should be TIMESTAMP_MILLIS
  optional binary currency (UTF8);
  optional binary discount_rate_id (UTF8);
  optional binary end_date (UTF8);
  optional binary id (UTF8);
  optional int64 modified (TIMESTAMP_MICROS); <-- This should be TIMESTAMP_MILLIS
  optional binary price (UTF8);
  optional binary start_date (UTF8);
}
{code}

  was:
Hi team, I've noticed `spark.sql.parquet.outputTimestampType` ignores somehow when writing datasets. It would be great if Hudi supports this if possible, thank you!

 
{code:java}
val spark = SparkSession
  .builder()
  .master("local")
  .getOrCreate()
import spark.implicits._
spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS")
val inputs = """{"id":"2", "discount_rate_id": "202", "price": "10.00", "currency": "usd", "start_date": "2020-04-19", "end_date": "null", "created": "2020-04-19 10:57:17", "modified": "2018-04-19 10:57:17"}
{"id":"4", "discount_rate_id": "202", "price": "10.00", "currency": "eur", "start_date": "2020-04-19", "end_date": "null", "created": "2020-04-19 10:57:18", "modified": "2018-04-19 10:57:18"}"""
val df = spark
  .read
  .option("timestampFormat", "yyyy/MM/dd HH:mm:ss")
  .json(Seq(inputs).toDS)
  .toDF
  .withColumn("created", col("created").cast(TimestampType))
  .withColumn("modified", col("modified").cast(TimestampType))

df.write
  .format("org.apache.hudi")
  .option(PRECOMBINE_FIELD_OPT_KEY, "id")
  .option(RECORDKEY_FIELD_OPT_KEY, "created")
  .option(TABLE_NAME, s"adhoc.test")
  .mode("overwrite")
  .save("/tmp/test")
{code}
{code:java}
$ cd /tmp/test/default/
$ parquet-tools schema 0e924c77-0e31-46c4-ade9-f9cc97ea02cd-0_0-21-12005_20200716205246.parquet

message adhoc.test_record {
  optional binary _hoodie_commit_time (UTF8);
  optional binary _hoodie_commit_seqno (UTF8);
  optional binary _hoodie_record_key (UTF8);
  optional binary _hoodie_partition_path (UTF8);
  optional binary _hoodie_file_name (UTF8);
  optional int64 created (TIMESTAMP_MICROS); <-- This should be TIMESTAMP_MILLIS
  optional binary currency (UTF8);
  optional binary discount_rate_id (UTF8);
  optional binary end_date (UTF8);
  optional binary id (UTF8);
  optional int64 modified (TIMESTAMP_MICROS); <-- This should be TIMESTAMP_MILLIS
  optional binary price (UTF8);
  optional binary start_date (UTF8);
}
{code}


> Respect spark.sql.parquet.outputTimestampType when writing datasets
> -------------------------------------------------------------------
>
>                 Key: HUDI-1107
>                 URL: https://issues.apache.org/jira/browse/HUDI-1107
>             Project: Apache Hudi
>          Issue Type: Bug
>    Affects Versions: 0.5.2
>            Reporter: Sungju Jin
>            Priority: Major
>
> Hi team, I've noticed `spark.sql.parquet.outputTimestampType` was ignored when writing datasets. It would be great if Hudi supports this if possible, thank you!
>  
> {code:java}
> val spark = SparkSession
>   .builder()
>   .master("local")
>   .getOrCreate()
> import spark.implicits._
> spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS")
> val inputs = """{"id":"2", "discount_rate_id": "202", "price": "10.00", "currency": "usd", "start_date": "2020-04-19", "end_date": "null", "created": "2020-04-19 10:57:17", "modified": "2018-04-19 10:57:17"}
> {"id":"4", "discount_rate_id": "202", "price": "10.00", "currency": "eur", "start_date": "2020-04-19", "end_date": "null", "created": "2020-04-19 10:57:18", "modified": "2018-04-19 10:57:18"}"""
> val df = spark
>   .read
>   .option("timestampFormat", "yyyy/MM/dd HH:mm:ss")
>   .json(Seq(inputs).toDS)
>   .toDF
>   .withColumn("created", col("created").cast(TimestampType))
>   .withColumn("modified", col("modified").cast(TimestampType))
> df.write
>   .format("org.apache.hudi")
>   .option(PRECOMBINE_FIELD_OPT_KEY, "id")
>   .option(RECORDKEY_FIELD_OPT_KEY, "created")
>   .option(TABLE_NAME, s"adhoc.test")
>   .mode("overwrite")
>   .save("/tmp/test")
> {code}
> {code:java}
> $ cd /tmp/test/default/
> $ parquet-tools schema 0e924c77-0e31-46c4-ade9-f9cc97ea02cd-0_0-21-12005_20200716205246.parquet
> message adhoc.test_record {
>   optional binary _hoodie_commit_time (UTF8);
>   optional binary _hoodie_commit_seqno (UTF8);
>   optional binary _hoodie_record_key (UTF8);
>   optional binary _hoodie_partition_path (UTF8);
>   optional binary _hoodie_file_name (UTF8);
>   optional int64 created (TIMESTAMP_MICROS); <-- This should be TIMESTAMP_MILLIS
>   optional binary currency (UTF8);
>   optional binary discount_rate_id (UTF8);
>   optional binary end_date (UTF8);
>   optional binary id (UTF8);
>   optional int64 modified (TIMESTAMP_MICROS); <-- This should be TIMESTAMP_MILLIS
>   optional binary price (UTF8);
>   optional binary start_date (UTF8);
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)