You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Andy Teucher (Jira)" <ji...@apache.org> on 2021/12/14 20:00:00 UTC

[jira] [Comment Edited] (ARROW-15069) [R] open_dataset very slow on heavily partitioned parquet dataset

    [ https://issues.apache.org/jira/browse/ARROW-15069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17459442#comment-17459442 ] 

Andy Teucher edited comment on ARROW-15069 at 12/14/21, 7:59 PM:
-----------------------------------------------------------------

Thanks so much! Only ~300 rows per file does seem excessively partitioned. I had partitioned on the fields I would query most on, but it makes sense that the directory listing is the bottleneck with so many nested directories. Interesting about the OS differences; it's pretty drastic (You are on Linux I'm guessing). [~stephhazlitt] also reproduced this on her new M1 mac and opening the heavily partitioned dataset was much faster than mine (~11s vs ~45s) but still slower than yours (2.6s). A colleague on Windows had ~60s to open the dataset, and similar performance patterns overal..

Partitioning only on year gave me similar timings to you, so that's great. Definitely a reasonable tradeoff.


was (Author: JIRAUSER279940):
Thanks so much! Only ~300 rows per file does seem excessively partitioned. I had partitioned on the fields I would query most on, but it makes sense that the directory listing is the bottleneck with so many nested directories. Interesting about the OS differences; it's pretty drastic (You are on Linux I'm guessing). [~stephhazlitt] also reproduced this on her new M1 mac and opening the heavily partitioned dataset was much faster than mine (~11s vs ~45s) but still slower than yours (2.6s). A colleague on Windows had ~60s to open the dataset.

Partitioning only on year gave me similar timings to you, so that's great. Definitely a reasonable tradeoff.

> [R] open_dataset very slow on heavily partitioned parquet dataset
> -----------------------------------------------------------------
>
>                 Key: ARROW-15069
>                 URL: https://issues.apache.org/jira/browse/ARROW-15069
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: R
>    Affects Versions: 6.0.1
>         Environment: macOS Mojave, R 4.1.1 
>            Reporter: Andy Teucher
>            Assignee: Weston Pace
>            Priority: Minor
>
> Opening a (particular) partitioned hive-style parquet dataset is very slow (45s to 1 minute).  I have a reproducible example below that takes 780 csv files and writes them to parquet using the {{open_dataset("csv files") |> group_by(vars) |> write_dataset("parquet")}} suggested [here|https://arrow.apache.org/docs/r/articles/dataset.html#writing-datasets]. Opening and querying the subsequent parquet dataset is much slower than doing it on the original csv files, which is not what I expected.
> {code:java}
> library(arrow)
> library(dplyr)
> library(tictoc)
> zipfile <- "ahccd.zip"
> csv_dir <- "data/csv"
> parquet_dir <- "data/parquet"
> dir.create(csv_dir, recursive = TRUE)
> dir.create(parquet_dir, recursive = TRUE)
> # A zip of 780 csvs of daily temperature data at Canadian climate stations (one file per station)
> download.file("https://www.dropbox.com/s/f0a18jp0lvbp1hp/ahccd.zip?dl=1", destfile = zipfile)
> unzip(zipfile, exdir = csv_dir)
> csv_schema <- schema(
>   field("stn_id", string()),
>   field("stn_name", string()),
>   field("measure", string()),
>   field("date", date32()),
>   field("year", int64()),
>   field("month", int64()),
>   field("temp", double()),
>   field("province", string()),
>   field("stn_joined", string()),
>   field("element", string()),
>   field("unit", string()),
>   field("stn_last_updated", string()),
>   field("flag", string())
> )
> csv_format <- FileFormat$create(format = "csv", quoting = FALSE)
> # Write to parquet, partitioning on stn_id, year, measure
> tic("write csv to parquet")
> arrow::open_dataset("data/csv", schema = csv_schema,
>                     format = csv_format) |>
>   group_by(stn_id, year, measure) |>
>   write_dataset(parquet_dir, format = "parquet")
> toc()
> #> write csv to parquet: 2067.093 sec elapsed
> stations <- c("1100031", "1100120", "1100119", "1036B06")
> ## Query directory of original csv files
> tic("query csv")
> foo <- arrow::open_dataset(csv_dir, schema = csv_schema,
>                            format = csv_format) |>
>   filter(year >= 1990,
>          year <= 2020,
>          stn_id %in% stations,
>          measure == "daily_max") |>
>   collect()
> toc()
> #> query csv: 12.571 sec elapsed
> ## Query the hive-style parquet directory
> tic("query parquet")
> bar <- arrow::open_dataset("data/parquet") |>
>   filter(year >= 1990,
>          year <= 2020,
>          stn_id %in% stations,
>          measure == "daily_max") |>
>   collect()
> toc()
> #> query parquet: 41.79 sec elapsed
> ## It turns out that it is just the opening of the dataset 
> ## that takes so long
> tic("open parquet dataset")
> ds <- arrow::open_dataset("~/Desktop/arrow-report/data/parquet")
> toc()
> #> open parquet dataset: 45.581 sec elapsed
> ds
> #> FileSystemDataset with 191171 Parquet files
> #> stn_name: string
> #> date: date32[day]
> #> month: int64
> #> temp: double
> #> province: string
> #> stn_joined: string
> #> element: string
> #> unit: string
> #> stn_last_updated: string
> #> flag: string
> #> stn_id: string
> #> year: int32
> #> measure: string
> tic("query already openend dataset")
> ds |> 
>   filter(year >= 1990,
>          year <= 2020,
>          stn_id %in% stations,
>          measure == "daily_max") |>
>   collect()
> #> # A tibble: 44,469 × 13
> #>    stn_name date       month  temp province stn_joined     element        unit  
> #>    <chr>    <date>     <int> <dbl> <chr>    <chr>          <chr>          <chr> 
> #>  1 ALBERNI  1992-01-01     1   6.5 BC       station joined Homogenized d… Deg C…
> #>  2 ALBERNI  1992-01-02     1   5.5 BC       station joined Homogenized d… Deg C…
> #>  3 ALBERNI  1992-01-03     1   3.5 BC       station joined Homogenized d… Deg C…
> #>  4 ALBERNI  1992-01-04     1   6   BC       station joined Homogenized d… Deg C…
> #>  5 ALBERNI  1992-01-05     1   0.5 BC       station joined Homogenized d… Deg C…
> #>  6 ALBERNI  1992-01-06     1   0   BC       station joined Homogenized d… Deg C…
> #>  7 ALBERNI  1992-01-07     1   0   BC       station joined Homogenized d… Deg C…
> #>  8 ALBERNI  1992-01-08     1   1.5 BC       station joined Homogenized d… Deg C…
> #>  9 ALBERNI  1992-01-09     1   4   BC       station joined Homogenized d… Deg C…
> #> 10 ALBERNI  1992-01-10     1   5.5 BC       station joined Homogenized d… Deg C…
> #> # … with 44,459 more rows, and 5 more variables: stn_last_updated <chr>,
> #> #   flag <chr>, stn_id <chr>, year <int>, measure <chr>
> toc()
> #> query already openend dataset: 0.356 sec elapsed
> {code}
> The above reprex is self-contained, but will take a while to run, specifically the writing of the parquet dataset can take up to 30 min. It also downloads a 380MB zip file of csvs from my Dropbox.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)