You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Antoine Pitrou (Jira)" <ji...@apache.org> on 2021/06/07 16:25:00 UTC

[jira] [Resolved] (ARROW-12644) [C++][Dataset] Support reading date/time-partitioned datasets accounting for URL encoding (Spark)

     [ https://issues.apache.org/jira/browse/ARROW-12644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Antoine Pitrou resolved ARROW-12644.
------------------------------------
    Fix Version/s: 5.0.0
       Resolution: Fixed

Issue resolved by pull request 10264
[https://github.com/apache/arrow/pull/10264]

> [C++][Dataset] Support reading date/time-partitioned datasets accounting for URL encoding (Spark)
> -------------------------------------------------------------------------------------------------
>
>                 Key: ARROW-12644
>                 URL: https://issues.apache.org/jira/browse/ARROW-12644
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: C++
>    Affects Versions: 3.0.0
>            Reporter: Paul Bormans
>            Assignee: David Li
>            Priority: Major
>              Labels: dataset, datasets, delta, parquet, pull-request-available, spark
>             Fix For: 5.0.0
>
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> I'm using Spark (3.1.1) to write a dataframe to a partitioned parquet dataset (using delta.io) which is partitioned by a timestamp field.
> The relevant Spark code:
> {code:java}
> // code placeholder
> (
>   df.withColumn(
>                 "Date",
>                 sf.date_trunc(
>                     "DAY",
>                     sf.from_unixtime(
>                         (sf.col("MyEpochField")),
>                     ),
>                 ),
>             )
>     .write.format("delta")
>     .mode("append")
>     .partitionBy("Date")
>     .save("...")
> {code}
> This gives a structure like following:
> {code:java}
> // code placeholder
> /tip
> /tip/Date=2021-05-04 00%3A00%3A00
> /tip/Date=2021-05-04 00%3A00%3A00/Time=2021-05-04 07%3A27%3A00
> /tip/Date=2021-05-04 00%3A00%3A00/Time=2021-05-04 07%3A27%3A00/part-00000-8846eb80-a369-43f6-a715-fec9cf1adf95.c000.snappy.parquet
> {code}
> Notice the : character is (url?) encoded because of fs protocol violation.
> When i try to open this dataset using delta-rs ([https://github.com/delta-io/delta-rs)] which uses Arrow below the hood, then an error is raised trying to parse the Date (folder) value.
> {code:java}
> // code placeholder
> pyarrow.lib.ArrowInvalid: error parsing '2021-05-03 00%3A00%3A00' as scalar of type timestamp[ns]
> {code}
> It seems this error is raised in ScalarParseImpl => ParseValue => StringConverter<TimestampType>::Convert => ParseTimestampISO8601
> The mentioned parse method does support for format:
> {code:java}
> // code placeholder
> static inline bool ParseTimestampISO8601(const char* s, size_t length,
>                                          TimeUnit::type unit,
>                                          TimestampType::c_type* out) {
>   using seconds_type = std::chrono::duration<TimestampType::c_type>;  // We allow the following formats for all units:
>   // - "YYYY-MM-DD"
>   // - "YYYY-MM-DD[ T]hhZ?"
>   // - "YYYY-MM-DD[ T]hh:mmZ?"
>   // - "YYYY-MM-DD[ T]hh:mm:ssZ?"
> <...>{code}
> But may not support (url?) decoding the value upfront?
> Questions we have:
>  * Should Arrow support timestamp fields when used as partitioned field?
>  * Where to decode?
>  
> Some more information from the writing side.
> The writing is initiated using FileFormatWriter.write that eventually uses a DynamicPartitionDataWriter (passing in the partitionColumns through the job description).
> Here the actual "value" is rendered and concatennated.
> {code:java}
> // code placeholder
>   /** Expression that given partition columns builds a path string like: col1=val/col2=val/... */
>   private lazy val partitionPathExpression: Expression = Concat(
>     description.partitionColumns.zipWithIndex.flatMap { case (c, i) =>
>       val partitionName = ScalaUDF(
>         ExternalCatalogUtils.getPartitionPathString _,
>         StringType,
>         Seq(Literal(c.name), Cast(c, StringType, Option(description.timeZoneId))))
>       if (i == 0) Seq(partitionName) else Seq(Literal(Path.SEPARATOR), partitionName)
>     })
> {code}
> Where the encoding is done in:
> [https://github.com/apache/spark/blob/v3.0.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala#L66]
> If i understand correct, then Arrow should provide the equivalent of unescapePathName for fields used as partitioned columns.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)