You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Todd Farmer (Jira)" <ji...@apache.org> on 2022/07/12 14:05:02 UTC

[jira] [Assigned] (ARROW-15069) [R] open_dataset very slow on heavily partitioned parquet dataset

     [ https://issues.apache.org/jira/browse/ARROW-15069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Todd Farmer reassigned ARROW-15069:
-----------------------------------

    Assignee:     (was: Weston Pace)

This issue was last updated over 90 days ago, which may be an indication it is no longer being actively worked. To better reflect the current state, the issue is being unassigned. Please feel free to re-take assignment of the issue if it is being actively worked, or if you plan to start that work soon.

> [R] open_dataset very slow on heavily partitioned parquet dataset
> -----------------------------------------------------------------
>
>                 Key: ARROW-15069
>                 URL: https://issues.apache.org/jira/browse/ARROW-15069
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: R
>    Affects Versions: 6.0.1
>         Environment: macOS Mojave, R 4.1.1 
>            Reporter: Andy Teucher
>            Priority: Minor
>
> Opening a (particular) partitioned hive-style parquet dataset is very slow (45s to 1 minute).  I have a reproducible example below that takes 780 csv files and writes them to parquet using the {{open_dataset("csv files") |> group_by(vars) |> write_dataset("parquet")}} suggested [here|https://arrow.apache.org/docs/r/articles/dataset.html#writing-datasets]. Opening and querying the subsequent parquet dataset is much slower than doing it on the original csv files, which is not what I expected.
> {code:java}
> library(arrow)
> library(dplyr)
> library(tictoc)
> zipfile <- "ahccd.zip"
> csv_dir <- "data/csv"
> parquet_dir <- "data/parquet"
> dir.create(csv_dir, recursive = TRUE)
> dir.create(parquet_dir, recursive = TRUE)
> # A zip of 780 csvs of daily temperature data at Canadian climate stations (one file per station)
> download.file("https://www.dropbox.com/s/f0a18jp0lvbp1hp/ahccd.zip?dl=1", destfile = zipfile)
> unzip(zipfile, exdir = csv_dir)
> csv_schema <- schema(
>   field("stn_id", string()),
>   field("stn_name", string()),
>   field("measure", string()),
>   field("date", date32()),
>   field("year", int64()),
>   field("month", int64()),
>   field("temp", double()),
>   field("province", string()),
>   field("stn_joined", string()),
>   field("element", string()),
>   field("unit", string()),
>   field("stn_last_updated", string()),
>   field("flag", string())
> )
> csv_format <- FileFormat$create(format = "csv", quoting = FALSE)
> # Write to parquet, partitioning on stn_id, year, measure
> tic("write csv to parquet")
> arrow::open_dataset("data/csv", schema = csv_schema,
>                     format = csv_format) |>
>   group_by(stn_id, year, measure) |>
>   write_dataset(parquet_dir, format = "parquet")
> toc()
> #> write csv to parquet: 2067.093 sec elapsed
> stations <- c("1100031", "1100120", "1100119", "1036B06")
> ## Query directory of original csv files
> tic("query csv")
> foo <- arrow::open_dataset(csv_dir, schema = csv_schema,
>                            format = csv_format) |>
>   filter(year >= 1990,
>          year <= 2020,
>          stn_id %in% stations,
>          measure == "daily_max") |>
>   collect()
> toc()
> #> query csv: 12.571 sec elapsed
> ## Query the hive-style parquet directory
> tic("query parquet")
> bar <- arrow::open_dataset("data/parquet") |>
>   filter(year >= 1990,
>          year <= 2020,
>          stn_id %in% stations,
>          measure == "daily_max") |>
>   collect()
> toc()
> #> query parquet: 41.79 sec elapsed
> ## It turns out that it is just the opening of the dataset 
> ## that takes so long
> tic("open parquet dataset")
> ds <- arrow::open_dataset("~/Desktop/arrow-report/data/parquet")
> toc()
> #> open parquet dataset: 45.581 sec elapsed
> ds
> #> FileSystemDataset with 191171 Parquet files
> #> stn_name: string
> #> date: date32[day]
> #> month: int64
> #> temp: double
> #> province: string
> #> stn_joined: string
> #> element: string
> #> unit: string
> #> stn_last_updated: string
> #> flag: string
> #> stn_id: string
> #> year: int32
> #> measure: string
> tic("query already openend dataset")
> ds |> 
>   filter(year >= 1990,
>          year <= 2020,
>          stn_id %in% stations,
>          measure == "daily_max") |>
>   collect()
> #> # A tibble: 44,469 × 13
> #>    stn_name date       month  temp province stn_joined     element        unit  
> #>    <chr>    <date>     <int> <dbl> <chr>    <chr>          <chr>          <chr> 
> #>  1 ALBERNI  1992-01-01     1   6.5 BC       station joined Homogenized d… Deg C…
> #>  2 ALBERNI  1992-01-02     1   5.5 BC       station joined Homogenized d… Deg C…
> #>  3 ALBERNI  1992-01-03     1   3.5 BC       station joined Homogenized d… Deg C…
> #>  4 ALBERNI  1992-01-04     1   6   BC       station joined Homogenized d… Deg C…
> #>  5 ALBERNI  1992-01-05     1   0.5 BC       station joined Homogenized d… Deg C…
> #>  6 ALBERNI  1992-01-06     1   0   BC       station joined Homogenized d… Deg C…
> #>  7 ALBERNI  1992-01-07     1   0   BC       station joined Homogenized d… Deg C…
> #>  8 ALBERNI  1992-01-08     1   1.5 BC       station joined Homogenized d… Deg C…
> #>  9 ALBERNI  1992-01-09     1   4   BC       station joined Homogenized d… Deg C…
> #> 10 ALBERNI  1992-01-10     1   5.5 BC       station joined Homogenized d… Deg C…
> #> # … with 44,459 more rows, and 5 more variables: stn_last_updated <chr>,
> #> #   flag <chr>, stn_id <chr>, year <int>, measure <chr>
> toc()
> #> query already openend dataset: 0.356 sec elapsed
> {code}
> The above reprex is self-contained, but will take a while to run, specifically the writing of the parquet dataset can take up to 30 min. It also downloads a 380MB zip file of csvs from my Dropbox.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)