You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "nealrichardson (via GitHub)" <gi...@apache.org> on 2023/02/01 16:43:26 UTC

[GitHub] [arrow] nealrichardson commented on a diff in pull request #33917: GH-33892: [R] Map dplyr::n() to count_all kernel

nealrichardson commented on code in PR #33917:
URL: https://github.com/apache/arrow/pull/33917#discussion_r1093417298


##########
r/R/dplyr-summarize.R:
##########
@@ -18,7 +18,7 @@
 # Aggregation functions
 # These all return a list of:
 # @param fun string function name
-# @param data Expression (these are all currently a single field)
+# @param data Expression (all currently a single field, except for dplyr::n)

Review Comment:
   ```suggestion
   # @param data list of 0 or more Expressions
   ```



##########
r/R/dplyr-summarize.R:
##########
@@ -34,67 +34,46 @@ ensure_one_arg <- function(args, fun) {
   args[[1]]
 }
 
-agg_fun_output_type <- function(fun, input_type, hash) {
-  # These are quick and dirty heuristics.
-  if (fun %in% c("any", "all")) {
-    bool()
-  } else if (fun %in% "sum") {
-    # It may upcast to a bigger type but this is close enough
-    input_type
-  } else if (fun %in% c("mean", "stddev", "variance", "approximate_median")) {
-    float64()
-  } else if (fun %in% "tdigest") {
-    if (hash) {
-      fixed_size_list_of(float64(), 1L)
-    } else {
-      float64()
-    }
-  } else {
-    # Just so things don't error, assume the resulting type is the same
-    input_type
-  }
-}
-
 register_bindings_aggregate <- function() {
   register_binding_agg("base::sum", function(..., na.rm = FALSE) {
     list(
       fun = "sum",
-      data = ensure_one_arg(list2(...), "sum"),
+      data = list(ensure_one_arg(list2(...), "sum")),

Review Comment:
   If you change `ensure_one_arg()` to return `args` instead of `args[[1]]`, then you don't need to wrap all of these in `list()`.



##########
r/R/dplyr-summarize.R:
##########
@@ -322,15 +301,76 @@ arrow_eval_or_stop <- function(expr, mask) {
   out
 }
 
+# This function returns a list of expressions that can be used to project the
+# data before an aggregation to only the fields required for the aggregation,
+# including the fields used in the aggregations (the "targets") and the group
+# fields. The names of the returned list are used to ensure that the projection
+# node is wired up correctly to the aggregation node.
 summarize_projection <- function(.data) {
   c(
-    map(.data$aggregations, ~ .$data),
+    unlist(unname(imap(
+      .data$aggregations,
+      ~set_names(
+        .x$data,
+        aggregate_target_names(.x$data, .y)
+      )
+    ))),
+    .data$selected_columns[.data$group_by_vars]
+  )
+}
+
+# This function determines what names to give to the fields used in aggregations
+# (the "targets"). When an aggregate function takes 2 or more fields as targets,
+# this function gives the fields unique names by appending `..1`, `..2`, etc.
+# When an aggregate function is nullary, this function returns a zero-length
+# character vector.
+aggregate_target_names <- function(data, name) {
+  if (length(data) > 1) {
+    paste(name, seq_along(data), sep = "..")
+  } else if (length(data) > 0) {
+    name
+  } else {
+    character(0)
+  }
+}
+
+# This function returns a list of expressions representing the aggregated fields
+# that will be returned by an aggregation
+aggregated_fields <- function(aggs) {
+  map(
+    aggs,
+    ~Expression$create(.$fun, args = .$data, options = .$options)
+  )
+}
+
+# Unlike with other pairs of non-hash/hash aggregate kernels in the Arrow C++
+# library, the `tdigest` and `hash_tdigest` kernels have different output types.

Review Comment:
   Perhaps a safer way to do the type determination is to build expressions with the hash versions. The prose documentation says the hash kernels are unary, but according to the error messages you get, as well as the inline function docs in hash_aggregate.cc, they're binary. After some trial and error, I got this to work:
   
   ```
   > x <- Expression$create("tdigest", Expression$field_ref("a"), options=list(q=c(.1, .2)))
   > x$type(schema(a = int32()))
   Float64
   double
   
   > x <- Expression$create("hash_tdigest", Expression$field_ref("a"), Expression$scalar(1L)$cast(uint32()), options=list(q=c(.1, .2)))
   > x$type(schema(a = int32()))
   FixedSizeListType
   fixed_size_list<item: double>[2]
   ```
   
   So the second argument is the group hash, and that is `uint32()`. 



##########
r/R/dplyr-summarize.R:
##########
@@ -322,15 +301,76 @@ arrow_eval_or_stop <- function(expr, mask) {
   out
 }
 
+# This function returns a list of expressions that can be used to project the
+# data before an aggregation to only the fields required for the aggregation,
+# including the fields used in the aggregations (the "targets") and the group
+# fields. The names of the returned list are used to ensure that the projection
+# node is wired up correctly to the aggregation node.
 summarize_projection <- function(.data) {
   c(
-    map(.data$aggregations, ~ .$data),
+    unlist(unname(imap(
+      .data$aggregations,
+      ~set_names(
+        .x$data,
+        aggregate_target_names(.x$data, .y)
+      )
+    ))),
+    .data$selected_columns[.data$group_by_vars]
+  )
+}
+
+# This function determines what names to give to the fields used in aggregations
+# (the "targets"). When an aggregate function takes 2 or more fields as targets,
+# this function gives the fields unique names by appending `..1`, `..2`, etc.
+# When an aggregate function is nullary, this function returns a zero-length
+# character vector.
+aggregate_target_names <- function(data, name) {
+  if (length(data) > 1) {
+    paste(name, seq_along(data), sep = "..")
+  } else if (length(data) > 0) {
+    name
+  } else {
+    character(0)
+  }
+}
+
+# This function returns a list of expressions representing the aggregated fields
+# that will be returned by an aggregation
+aggregated_fields <- function(aggs) {

Review Comment:
   Technically these aren't "Fields". Perhaps `aggregation_exprs` instead?



##########
r/R/query-engine.R:
##########
@@ -101,7 +101,11 @@ ExecPlan <- R6Class("ExecPlan",
         # plus group_by_vars (last)
         # TODO: validate that none of names(aggregations) are the same as names(group_by_vars)
         # dplyr does not error on this but the result it gives isn't great
-        node <- node$Project(summarize_projection(.data))
+        projection <- summarize_projection(.data)
+        # skip projection if no grouping and all aggregate functions are nullary

Review Comment:
   Why?



##########
r/R/dplyr-summarize.R:
##########
@@ -322,15 +301,76 @@ arrow_eval_or_stop <- function(expr, mask) {
   out
 }
 
+# This function returns a list of expressions that can be used to project the

Review Comment:
   ```suggestion
   # This function returns a list of expressions that is used to project the
   ```



##########
r/R/dplyr-summarize.R:
##########
@@ -322,15 +301,76 @@ arrow_eval_or_stop <- function(expr, mask) {
   out
 }
 
+# This function returns a list of expressions that can be used to project the
+# data before an aggregation to only the fields required for the aggregation,
+# including the fields used in the aggregations (the "targets") and the group
+# fields. The names of the returned list are used to ensure that the projection
+# node is wired up correctly to the aggregation node.
 summarize_projection <- function(.data) {
   c(
-    map(.data$aggregations, ~ .$data),
+    unlist(unname(imap(
+      .data$aggregations,
+      ~set_names(
+        .x$data,
+        aggregate_target_names(.x$data, .y)
+      )
+    ))),
+    .data$selected_columns[.data$group_by_vars]
+  )
+}
+
+# This function determines what names to give to the fields used in aggregations
+# (the "targets"). When an aggregate function takes 2 or more fields as targets,
+# this function gives the fields unique names by appending `..1`, `..2`, etc.
+# When an aggregate function is nullary, this function returns a zero-length
+# character vector.
+aggregate_target_names <- function(data, name) {
+  if (length(data) > 1) {
+    paste(name, seq_along(data), sep = "..")
+  } else if (length(data) > 0) {
+    name
+  } else {
+    character(0)
+  }
+}
+
+# This function returns a list of expressions representing the aggregated fields
+# that will be returned by an aggregation
+aggregated_fields <- function(aggs) {
+  map(
+    aggs,
+    ~Expression$create(.$fun, args = .$data, options = .$options)
+  )
+}
+
+# Unlike with other pairs of non-hash/hash aggregate kernels in the Arrow C++
+# library, the `tdigest` and `hash_tdigest` kernels have different output types.
+# The `tdigest` kernel returns `Float64`, but the `hash_tdigest` kernel returns
+# `FixedSizeList[Float64]`. The system that the R bindings use to infer the
+# output types of expressions does not account for this. It infers the output
+# type of `hash_tdigest` as `Float64`. This function is used to correct this.
+fix_aggregated_types <- function(fields, aggs, hash) {
+  imap(
+    fields,
+    ~if (hash && aggs[[.y]]$fun == "tdigest") {
+        fixed_size_list_of(float64(), 1L)

Review Comment:
   Nevermind, see above, I think we can construct `hash_` Expressions and get the actual output type without any munging.



##########
r/R/dplyr-summarize.R:
##########
@@ -322,15 +301,76 @@ arrow_eval_or_stop <- function(expr, mask) {
   out
 }
 
+# This function returns a list of expressions that can be used to project the
+# data before an aggregation to only the fields required for the aggregation,
+# including the fields used in the aggregations (the "targets") and the group
+# fields. The names of the returned list are used to ensure that the projection
+# node is wired up correctly to the aggregation node.
 summarize_projection <- function(.data) {
   c(
-    map(.data$aggregations, ~ .$data),
+    unlist(unname(imap(
+      .data$aggregations,
+      ~set_names(
+        .x$data,
+        aggregate_target_names(.x$data, .y)
+      )
+    ))),
+    .data$selected_columns[.data$group_by_vars]
+  )
+}
+
+# This function determines what names to give to the fields used in aggregations
+# (the "targets"). When an aggregate function takes 2 or more fields as targets,
+# this function gives the fields unique names by appending `..1`, `..2`, etc.
+# When an aggregate function is nullary, this function returns a zero-length
+# character vector.
+aggregate_target_names <- function(data, name) {
+  if (length(data) > 1) {
+    paste(name, seq_along(data), sep = "..")
+  } else if (length(data) > 0) {
+    name
+  } else {
+    character(0)
+  }
+}
+
+# This function returns a list of expressions representing the aggregated fields
+# that will be returned by an aggregation
+aggregated_fields <- function(aggs) {
+  map(
+    aggs,
+    ~Expression$create(.$fun, args = .$data, options = .$options)
+  )
+}
+
+# Unlike with other pairs of non-hash/hash aggregate kernels in the Arrow C++
+# library, the `tdigest` and `hash_tdigest` kernels have different output types.
+# The `tdigest` kernel returns `Float64`, but the `hash_tdigest` kernel returns
+# `FixedSizeList[Float64]`. The system that the R bindings use to infer the
+# output types of expressions does not account for this. It infers the output
+# type of `hash_tdigest` as `Float64`. This function is used to correct this.
+fix_aggregated_types <- function(fields, aggs, hash) {
+  imap(
+    fields,
+    ~if (hash && aggs[[.y]]$fun == "tdigest") {
+        fixed_size_list_of(float64(), 1L)

Review Comment:
   Is it always float64()? (looks like it is today, but probably not safe to assume that things like timestamp wouldn't return timestamp)
   
   Is it always length 1? (looks like not, the length depends on the number of quantiles you give it in the options)
   
   I think this would generalize better:
   
   ```suggestion
           fixed_size_list_of(.x, length(aggs[[.y]][["options"]][["q"]])
   ```



##########
r/R/dplyr-summarize.R:
##########
@@ -322,15 +301,76 @@ arrow_eval_or_stop <- function(expr, mask) {
   out
 }
 
+# This function returns a list of expressions that can be used to project the
+# data before an aggregation to only the fields required for the aggregation,
+# including the fields used in the aggregations (the "targets") and the group
+# fields. The names of the returned list are used to ensure that the projection
+# node is wired up correctly to the aggregation node.
 summarize_projection <- function(.data) {
   c(
-    map(.data$aggregations, ~ .$data),
+    unlist(unname(imap(
+      .data$aggregations,
+      ~set_names(
+        .x$data,
+        aggregate_target_names(.x$data, .y)
+      )
+    ))),
+    .data$selected_columns[.data$group_by_vars]
+  )
+}
+
+# This function determines what names to give to the fields used in aggregations
+# (the "targets"). When an aggregate function takes 2 or more fields as targets,
+# this function gives the fields unique names by appending `..1`, `..2`, etc.
+# When an aggregate function is nullary, this function returns a zero-length
+# character vector.
+aggregate_target_names <- function(data, name) {
+  if (length(data) > 1) {
+    paste(name, seq_along(data), sep = "..")
+  } else if (length(data) > 0) {
+    name
+  } else {
+    character(0)
+  }
+}
+
+# This function returns a list of expressions representing the aggregated fields
+# that will be returned by an aggregation
+aggregated_fields <- function(aggs) {
+  map(
+    aggs,
+    ~Expression$create(.$fun, args = .$data, options = .$options)

Review Comment:
   See the comment below. Basically, `if (hash)` then it's `paste0("hash_", .$fun)` and `c(.$data, Expression$scalar(Scalar$create(1L, type = uint32())))`



##########
r/R/dplyr-summarize.R:
##########
@@ -322,15 +301,76 @@ arrow_eval_or_stop <- function(expr, mask) {
   out
 }
 
+# This function returns a list of expressions that can be used to project the
+# data before an aggregation to only the fields required for the aggregation,
+# including the fields used in the aggregations (the "targets") and the group
+# fields. The names of the returned list are used to ensure that the projection
+# node is wired up correctly to the aggregation node.
 summarize_projection <- function(.data) {
   c(
-    map(.data$aggregations, ~ .$data),
+    unlist(unname(imap(
+      .data$aggregations,
+      ~set_names(
+        .x$data,
+        aggregate_target_names(.x$data, .y)
+      )
+    ))),
+    .data$selected_columns[.data$group_by_vars]
+  )
+}
+
+# This function determines what names to give to the fields used in aggregations
+# (the "targets"). When an aggregate function takes 2 or more fields as targets,
+# this function gives the fields unique names by appending `..1`, `..2`, etc.

Review Comment:
   Why? I'm not sure about this. Since there's no code that actually does this in this PR, maybe save it for #33959, where you can prove out what works/is necessary?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org