You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "David Li (Jira)" <ji...@apache.org> on 2021/05/04 13:15:00 UTC

[jira] [Commented] (ARROW-12644) [C++] Can't read from parquet partitioned by date/time (Spark)

    [ https://issues.apache.org/jira/browse/ARROW-12644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17338980#comment-17338980 ] 

David Li commented on ARROW-12644:
----------------------------------

Yes, it looks like we may need/want to URL-decode paths at some point, though we need to figure out how to handle that. (For instance: presumably it usually doesn't make sense to URL-decode local file system paths, but it looks like Spark may URL-encode paths so we need some way to enable that optionally?)

This also looks somewhat related to ARROW-11378, in that partitioning by a timestamp is rather unusual (the cardinality would normally be too high to make for a worthwhile partition, no?).

> [C++] Can't read from parquet partitioned by date/time (Spark)
> --------------------------------------------------------------
>
>                 Key: ARROW-12644
>                 URL: https://issues.apache.org/jira/browse/ARROW-12644
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: C++
>    Affects Versions: 3.0.0
>            Reporter: Paul Bormans
>            Priority: Major
>              Labels: dataset, datasets, delta, parquet, spark
>
> I'm using Spark (3.1.1) to write a dataframe to a partitioned parquet dataset (using delta.io) which is partitioned by a timestamp field.
> The relevant Spark code:
> {code:java}
> // code placeholder
> (
>   df.withColumn(
>                 "Date",
>                 sf.date_trunc(
>                     "DAY",
>                     sf.from_unixtime(
>                         (sf.col("MyEpochField")),
>                     ),
>                 ),
>             )
>     .write.format("delta")
>     .mode("append")
>     .partitionBy("Date")
>     .save("...")
> {code}
> This gives a structure like following:
> {code:java}
> // code placeholder
> /tip
> /tip/Date=2021-05-04 00%3A00%3A00
> /tip/Date=2021-05-04 00%3A00%3A00/Time=2021-05-04 07%3A27%3A00
> /tip/Date=2021-05-04 00%3A00%3A00/Time=2021-05-04 07%3A27%3A00/part-00000-8846eb80-a369-43f6-a715-fec9cf1adf95.c000.snappy.parquet
> {code}
> Notice the : character is (url?) encoded because of fs protocol violation.
> When i try to open this dataset using delta-rs ([https://github.com/delta-io/delta-rs)] which uses Arrow below the hood, then an error is raised trying to parse the Date (folder) value.
> {code:java}
> // code placeholder
> pyarrow.lib.ArrowInvalid: error parsing '2021-05-03 00%3A00%3A00' as scalar of type timestamp[ns]
> {code}
> It seems this error is raised in ScalarParseImpl => ParseValue => StringConverter<TimestampType>::Convert => ParseTimestampISO8601
> The mentioned parse method does support for format:
> {code:java}
> // code placeholder
> static inline bool ParseTimestampISO8601(const char* s, size_t length,
>                                          TimeUnit::type unit,
>                                          TimestampType::c_type* out) {
>   using seconds_type = std::chrono::duration<TimestampType::c_type>;  // We allow the following formats for all units:
>   // - "YYYY-MM-DD"
>   // - "YYYY-MM-DD[ T]hhZ?"
>   // - "YYYY-MM-DD[ T]hh:mmZ?"
>   // - "YYYY-MM-DD[ T]hh:mm:ssZ?"
> <...>{code}
> But may not support (url?) decoding the value upfront?
> Questions we have:
>  * Should Arrow support timestamp fields when used as partitioned field?
>  * Where to decode?
>  
> Some more information from the writing side.
> The writing is initiated using FileFormatWriter.write that eventually uses a DynamicPartitionDataWriter (passing in the partitionColumns through the job description).
> Here the actual "value" is rendered and concatennated.
> {code:java}
> // code placeholder
>   /** Expression that given partition columns builds a path string like: col1=val/col2=val/... */
>   private lazy val partitionPathExpression: Expression = Concat(
>     description.partitionColumns.zipWithIndex.flatMap { case (c, i) =>
>       val partitionName = ScalaUDF(
>         ExternalCatalogUtils.getPartitionPathString _,
>         StringType,
>         Seq(Literal(c.name), Cast(c, StringType, Option(description.timeZoneId))))
>       if (i == 0) Seq(partitionName) else Seq(Literal(Path.SEPARATOR), partitionName)
>     })
> {code}
> Where the encoding is done in:
> [https://github.com/apache/spark/blob/v3.0.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala#L66]
> If i understand correct, then Arrow should provide the equivalent of unescapePathName for fields used as partitioned columns.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)