You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Antoine Pitrou (Jira)" <ji...@apache.org> on 2021/06/07 16:25:00 UTC
[jira] [Resolved] (ARROW-12644) [C++][Dataset] Support reading
date/time-partitioned datasets accounting for URL encoding (Spark)
[ https://issues.apache.org/jira/browse/ARROW-12644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Antoine Pitrou resolved ARROW-12644.
------------------------------------
Fix Version/s: 5.0.0
Resolution: Fixed
Issue resolved by pull request 10264
[https://github.com/apache/arrow/pull/10264]
> [C++][Dataset] Support reading date/time-partitioned datasets accounting for URL encoding (Spark)
> -------------------------------------------------------------------------------------------------
>
> Key: ARROW-12644
> URL: https://issues.apache.org/jira/browse/ARROW-12644
> Project: Apache Arrow
> Issue Type: Bug
> Components: C++
> Affects Versions: 3.0.0
> Reporter: Paul Bormans
> Assignee: David Li
> Priority: Major
> Labels: dataset, datasets, delta, parquet, pull-request-available, spark
> Fix For: 5.0.0
>
> Time Spent: 1h 40m
> Remaining Estimate: 0h
>
> I'm using Spark (3.1.1) to write a dataframe to a partitioned parquet dataset (using delta.io) which is partitioned by a timestamp field.
> The relevant Spark code:
> {code:java}
> // code placeholder
> (
> df.withColumn(
> "Date",
> sf.date_trunc(
> "DAY",
> sf.from_unixtime(
> (sf.col("MyEpochField")),
> ),
> ),
> )
> .write.format("delta")
> .mode("append")
> .partitionBy("Date")
> .save("...")
> {code}
> This gives a structure like following:
> {code:java}
> // code placeholder
> /tip
> /tip/Date=2021-05-04 00%3A00%3A00
> /tip/Date=2021-05-04 00%3A00%3A00/Time=2021-05-04 07%3A27%3A00
> /tip/Date=2021-05-04 00%3A00%3A00/Time=2021-05-04 07%3A27%3A00/part-00000-8846eb80-a369-43f6-a715-fec9cf1adf95.c000.snappy.parquet
> {code}
> Notice the : character is (url?) encoded because of fs protocol violation.
> When i try to open this dataset using delta-rs ([https://github.com/delta-io/delta-rs)] which uses Arrow below the hood, then an error is raised trying to parse the Date (folder) value.
> {code:java}
> // code placeholder
> pyarrow.lib.ArrowInvalid: error parsing '2021-05-03 00%3A00%3A00' as scalar of type timestamp[ns]
> {code}
> It seems this error is raised in ScalarParseImpl => ParseValue => StringConverter<TimestampType>::Convert => ParseTimestampISO8601
> The mentioned parse method does support for format:
> {code:java}
> // code placeholder
> static inline bool ParseTimestampISO8601(const char* s, size_t length,
> TimeUnit::type unit,
> TimestampType::c_type* out) {
> using seconds_type = std::chrono::duration<TimestampType::c_type>; // We allow the following formats for all units:
> // - "YYYY-MM-DD"
> // - "YYYY-MM-DD[ T]hhZ?"
> // - "YYYY-MM-DD[ T]hh:mmZ?"
> // - "YYYY-MM-DD[ T]hh:mm:ssZ?"
> <...>{code}
> But may not support (url?) decoding the value upfront?
> Questions we have:
> * Should Arrow support timestamp fields when used as partitioned field?
> * Where to decode?
>
> Some more information from the writing side.
> The writing is initiated using FileFormatWriter.write that eventually uses a DynamicPartitionDataWriter (passing in the partitionColumns through the job description).
> Here the actual "value" is rendered and concatennated.
> {code:java}
> // code placeholder
> /** Expression that given partition columns builds a path string like: col1=val/col2=val/... */
> private lazy val partitionPathExpression: Expression = Concat(
> description.partitionColumns.zipWithIndex.flatMap { case (c, i) =>
> val partitionName = ScalaUDF(
> ExternalCatalogUtils.getPartitionPathString _,
> StringType,
> Seq(Literal(c.name), Cast(c, StringType, Option(description.timeZoneId))))
> if (i == 0) Seq(partitionName) else Seq(Literal(Path.SEPARATOR), partitionName)
> })
> {code}
> Where the encoding is done in:
> [https://github.com/apache/spark/blob/v3.0.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala#L66]
> If i understand correct, then Arrow should provide the equivalent of unescapePathName for fields used as partitioned columns.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)