You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Sungju Jin (Jira)" <ji...@apache.org> on 2020/07/17 04:05:00 UTC
[jira] [Updated] (HUDI-1107) Respect
spark.sql.parquet.outputTimestampType when writing datasets
[ https://issues.apache.org/jira/browse/HUDI-1107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sungju Jin updated HUDI-1107:
-----------------------------
Description:
Hi team, I've noticed `spark.sql.parquet.outputTimestampType` was ignored when writing datasets. It would be great if Hudi supports this if possible, thank you!
{code:java}
val spark = SparkSession
.builder()
.master("local")
.getOrCreate()
import spark.implicits._
spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS")
val inputs = """{"id":"2", "discount_rate_id": "202", "price": "10.00", "currency": "usd", "start_date": "2020-04-19", "end_date": "null", "created": "2020-04-19 10:57:17", "modified": "2018-04-19 10:57:17"}
{"id":"4", "discount_rate_id": "202", "price": "10.00", "currency": "eur", "start_date": "2020-04-19", "end_date": "null", "created": "2020-04-19 10:57:18", "modified": "2018-04-19 10:57:18"}"""
val df = spark
.read
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss")
.json(Seq(inputs).toDS)
.toDF
.withColumn("created", col("created").cast(TimestampType))
.withColumn("modified", col("modified").cast(TimestampType))
df.write
.format("org.apache.hudi")
.option(PRECOMBINE_FIELD_OPT_KEY, "id")
.option(RECORDKEY_FIELD_OPT_KEY, "created")
.option(TABLE_NAME, s"adhoc.test")
.mode("overwrite")
.save("/tmp/test")
{code}
{code:java}
$ cd /tmp/test/default/
$ parquet-tools schema 0e924c77-0e31-46c4-ade9-f9cc97ea02cd-0_0-21-12005_20200716205246.parquet
message adhoc.test_record {
optional binary _hoodie_commit_time (UTF8);
optional binary _hoodie_commit_seqno (UTF8);
optional binary _hoodie_record_key (UTF8);
optional binary _hoodie_partition_path (UTF8);
optional binary _hoodie_file_name (UTF8);
optional int64 created (TIMESTAMP_MICROS); <-- This should be TIMESTAMP_MILLIS
optional binary currency (UTF8);
optional binary discount_rate_id (UTF8);
optional binary end_date (UTF8);
optional binary id (UTF8);
optional int64 modified (TIMESTAMP_MICROS); <-- This should be TIMESTAMP_MILLIS
optional binary price (UTF8);
optional binary start_date (UTF8);
}
{code}
was:
Hi team, I've noticed `spark.sql.parquet.outputTimestampType` ignores somehow when writing datasets. It would be great if Hudi supports this if possible, thank you!
{code:java}
val spark = SparkSession
.builder()
.master("local")
.getOrCreate()
import spark.implicits._
spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS")
val inputs = """{"id":"2", "discount_rate_id": "202", "price": "10.00", "currency": "usd", "start_date": "2020-04-19", "end_date": "null", "created": "2020-04-19 10:57:17", "modified": "2018-04-19 10:57:17"}
{"id":"4", "discount_rate_id": "202", "price": "10.00", "currency": "eur", "start_date": "2020-04-19", "end_date": "null", "created": "2020-04-19 10:57:18", "modified": "2018-04-19 10:57:18"}"""
val df = spark
.read
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss")
.json(Seq(inputs).toDS)
.toDF
.withColumn("created", col("created").cast(TimestampType))
.withColumn("modified", col("modified").cast(TimestampType))
df.write
.format("org.apache.hudi")
.option(PRECOMBINE_FIELD_OPT_KEY, "id")
.option(RECORDKEY_FIELD_OPT_KEY, "created")
.option(TABLE_NAME, s"adhoc.test")
.mode("overwrite")
.save("/tmp/test")
{code}
{code:java}
$ cd /tmp/test/default/
$ parquet-tools schema 0e924c77-0e31-46c4-ade9-f9cc97ea02cd-0_0-21-12005_20200716205246.parquet
message adhoc.test_record {
optional binary _hoodie_commit_time (UTF8);
optional binary _hoodie_commit_seqno (UTF8);
optional binary _hoodie_record_key (UTF8);
optional binary _hoodie_partition_path (UTF8);
optional binary _hoodie_file_name (UTF8);
optional int64 created (TIMESTAMP_MICROS); <-- This should be TIMESTAMP_MILLIS
optional binary currency (UTF8);
optional binary discount_rate_id (UTF8);
optional binary end_date (UTF8);
optional binary id (UTF8);
optional int64 modified (TIMESTAMP_MICROS); <-- This should be TIMESTAMP_MILLIS
optional binary price (UTF8);
optional binary start_date (UTF8);
}
{code}
> Respect spark.sql.parquet.outputTimestampType when writing datasets
> -------------------------------------------------------------------
>
> Key: HUDI-1107
> URL: https://issues.apache.org/jira/browse/HUDI-1107
> Project: Apache Hudi
> Issue Type: Bug
> Affects Versions: 0.5.2
> Reporter: Sungju Jin
> Priority: Major
>
> Hi team, I've noticed `spark.sql.parquet.outputTimestampType` was ignored when writing datasets. It would be great if Hudi supports this if possible, thank you!
>
> {code:java}
> val spark = SparkSession
> .builder()
> .master("local")
> .getOrCreate()
> import spark.implicits._
> spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS")
> val inputs = """{"id":"2", "discount_rate_id": "202", "price": "10.00", "currency": "usd", "start_date": "2020-04-19", "end_date": "null", "created": "2020-04-19 10:57:17", "modified": "2018-04-19 10:57:17"}
> {"id":"4", "discount_rate_id": "202", "price": "10.00", "currency": "eur", "start_date": "2020-04-19", "end_date": "null", "created": "2020-04-19 10:57:18", "modified": "2018-04-19 10:57:18"}"""
> val df = spark
> .read
> .option("timestampFormat", "yyyy/MM/dd HH:mm:ss")
> .json(Seq(inputs).toDS)
> .toDF
> .withColumn("created", col("created").cast(TimestampType))
> .withColumn("modified", col("modified").cast(TimestampType))
> df.write
> .format("org.apache.hudi")
> .option(PRECOMBINE_FIELD_OPT_KEY, "id")
> .option(RECORDKEY_FIELD_OPT_KEY, "created")
> .option(TABLE_NAME, s"adhoc.test")
> .mode("overwrite")
> .save("/tmp/test")
> {code}
> {code:java}
> $ cd /tmp/test/default/
> $ parquet-tools schema 0e924c77-0e31-46c4-ade9-f9cc97ea02cd-0_0-21-12005_20200716205246.parquet
> message adhoc.test_record {
> optional binary _hoodie_commit_time (UTF8);
> optional binary _hoodie_commit_seqno (UTF8);
> optional binary _hoodie_record_key (UTF8);
> optional binary _hoodie_partition_path (UTF8);
> optional binary _hoodie_file_name (UTF8);
> optional int64 created (TIMESTAMP_MICROS); <-- This should be TIMESTAMP_MILLIS
> optional binary currency (UTF8);
> optional binary discount_rate_id (UTF8);
> optional binary end_date (UTF8);
> optional binary id (UTF8);
> optional int64 modified (TIMESTAMP_MICROS); <-- This should be TIMESTAMP_MILLIS
> optional binary price (UTF8);
> optional binary start_date (UTF8);
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)