You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Weston Pace (Jira)" <ji...@apache.org> on 2022/07/01 00:19:00 UTC

[jira] [Commented] (ARROW-16700) [C++] [R] [Datasets] aggregates on partitioning columns

    [ https://issues.apache.org/jira/browse/ARROW-16700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17561218#comment-17561218 ] 

Weston Pace commented on ARROW-16700:
-------------------------------------

So the root cause here is how partition columns are handed to the exec plan.  In the datasets API a partitioning doesn't necessarily yield a single value for a batch, but rather an expression.

For example, the folder `/foo=7/chunk-0.parquet` will yield the expression `foo == 7` instead of the value `foo:7`.  This allow for rather novel partitioning schemes like `/temp=med/chunk-0.parquet` which could attach the expression `25 < temp < 75`.

Unfortunately, this cleverness cannot really be utilized by the execution engine.  For example, how does one resolve the query `SELECT * FROM measurements WHERE temp == 50` when all they know for a batch is `25 < temp < 75`.

At the moment, these expressions are attached to the incoming batch as a "guarantee".  In addition, we also add a new column to the batch (e.g. `foo` or `temp`).  However, no value is ever set on this column and so it ends up getting set implicitly to `NULL`.  This guarantee is then used by project and filter nodes to simplify an expression.

This is why the query `SELECT foo from dataset WHERE foo == 7` works.  The expression `foo == 7` gets simplified by the guarantee (`foo ==7`) to `true` and so the row is included (even though the value of the row is null).  The projection `field_ref('foo')` gets simplified to `scalar(7)` and so the underlying array (which is full of nulls) is not looked at.

However, aggregate nodes do not simplify with a guarantee.  So, instead, they see the raw underlying value (null) and it doesn't get processed correctly by the aggregate node.

One quick fix would be to get rid of the guarantee concept entirely.  When we have an incoming partition expression we should:

 * If the expression is an equality we replace it with a scalar.
 * If the expression is not an equality we raise an error.

> [C++] [R] [Datasets] aggregates on partitioning columns
> -------------------------------------------------------
>
>                 Key: ARROW-16700
>                 URL: https://issues.apache.org/jira/browse/ARROW-16700
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: C++, R
>            Reporter: Jonathan Keane
>            Priority: Blocker
>             Fix For: 9.0.0, 8.0.1
>
>
> When summarizing a whole dataset (without group_by) with an aggregate, and summarizing a partitioned column, arrow returns wrong data:
> {code:r}
> library(arrow, warn.conflicts = FALSE)
> library(dplyr, warn.conflicts = FALSE)
> df <- expand.grid(
>   some_nulls = c(0L, 1L, 2L),
>   year = 2010:2023,
>   month = 1:12,
>   day = 1:30
> )
> path <- tempfile()
> dir.create(path)
> write_dataset(df, path, partitioning = c("year", "month"))
> ds <- open_dataset(path)
> # with arrow the mins/maxes are off for partitioning columns
> ds %>%
>   summarise(n = n(), min_year = min(year), min_month = min(month), min_day = min(day), max_year = max(year), max_month = max(month), max_day = max(day)) %>% 
>   collect()
> #> # A tibble: 1 × 7
> #>       n min_year min_month min_day max_year max_month max_day
> #>   <int>    <int>     <int>   <int>    <int>     <int>   <int>
> #> 1 15120     2023         1       1     2023        12      30
> # comapred to what we get with dplyr
> df %>%
>   summarise(n = n(), min_year = min(year), min_month = min(month), min_day = min(day), max_year = max(year), max_month = max(month), max_day = max(day)) %>% 
>   collect()
> #>       n min_year min_month min_day max_year max_month max_day
> #> 1 15120     2010         1       1     2023        12      30
> # even min alone is off:
> ds %>%
>   summarise(min_year = min(year)) %>% 
>   collect()
> #> # A tibble: 1 × 1
> #>   min_year
> #>      <int>
> #> 1     2016
>   
> # but non-partitioning columns are fine:
> ds %>%
>   summarise(min_day = min(day)) %>% 
>   collect()
> #> # A tibble: 1 × 1
> #>   min_day
> #>     <int>
> #> 1       1
>   
>   
> # But with a group_by, this seems ok
> ds %>%
>   group_by(some_nulls) %>%
>   summarise(min_year = min(year)) %>% 
>   collect()
> #> # A tibble: 3 × 2
> #>   some_nulls min_year
> #>        <int>    <int>
> #> 1          0     2010
> #> 2          1     2010
> #> 3          2     2010
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)