You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/05/18 21:43:05 UTC

[GitHub] [arrow] wjones127 commented on a diff in pull request #13170: ARROW-15271: [R] Refactor do_exec_plan to return a RecordBatchReader

wjones127 commented on code in PR #13170:
URL: https://github.com/apache/arrow/pull/13170#discussion_r876399314


##########
r/R/dataset-scan.R:
##########
@@ -169,43 +169,57 @@ tail_from_batches <- function(batches, n) {
     if (n <= 0) break
   }
   # rev() the result to put the batches back in the right order
-  Table$create(!!!rev(result))
+  RecordBatchReader$create(batches = rev(result))
 }
 
 #' Apply a function to a stream of RecordBatches
 #'
 #' As an alternative to calling `collect()` on a `Dataset` query, you can
 #' use this function to access the stream of `RecordBatch`es in the `Dataset`.
-#' This lets you aggregate on each chunk and pull the intermediate results into
-#' a `data.frame` for further aggregation, even if you couldn't fit the whole
-#' `Dataset` result in memory.
+#' This lets you do more complex operations in R that operate on chunks of data
+#' without having to hold the entire Dataset in memory at once. You can include
+#' `map_batches()` in a dplyr pipeline and do additional dplyr methods on the
+#' stream of data in Arrow after it.
 #'
-#' This is experimental and not recommended for production use.
+#' Note that, unlike the core dplyr methods that are implemented in the Arrow
+#' query engine, `map_batches()` is not lazy: it starts evaluating on the data
+#' when you call it, even if you send its result to another pipeline function.
+#'
+#' This is experimental and not recommended for production use. It is also
+#' single-threaded and runs in R not C++, so it won't be as fast as core
+#' Arrow methods.
 #'
 #' @param X A `Dataset` or `arrow_dplyr_query` object, as returned by the
 #' `dplyr` methods on `Dataset`.
 #' @param FUN A function or `purrr`-style lambda expression to apply to each
-#' batch
+#' batch. It must return a RecordBatch or something coercible to one via
+#' `as_record_batch()'.
 #' @param ... Additional arguments passed to `FUN`
-#' @param .data.frame logical: collect the resulting chunks into a single
-#' `data.frame`? Default `TRUE`
+#' @param .data.frame Deprecated argument, ignored
+#' @return An `arrow_dplyr_query`.
 #' @export
-map_batches <- function(X, FUN, ..., .data.frame = TRUE) {
-  # TODO: ARROW-15271 possibly refactor do_exec_plan to return a RecordBatchReader
+map_batches <- function(X, FUN, ..., .data.frame = NULL) {
+  if (!is.null(.data.frame)) {
+    warning(
+      "The .data.frame argument is deprecated. ",
+      "Call collect() on the result to get a data.frame.",
+      call. = FALSE
+    )
+  }
   plan <- ExecPlan$create()
   final_node <- plan$Build(as_adq(X))
   reader <- plan$Run(final_node)
   FUN <- as_mapper(FUN)
 
-  # TODO: wrap batch in arrow_dplyr_query with X$selected_columns,
-  # X$temp_columns, and X$group_by_vars
-  # if X is arrow_dplyr_query, if some other arg (.dplyr?) == TRUE
+  # TODO: for future consideration
+  # * Move eval to C++ and make it a generator so it can stream, not block

Review Comment:
   +1 that would be nice.



##########
r/R/dataset-scan.R:
##########
@@ -214,13 +228,7 @@ map_batches <- function(X, FUN, ..., .data.frame = TRUE) {
     res <- res[seq_len(i)]
   }
 
-  if (.data.frame & inherits(res[[1]], "arrow_dplyr_query")) {
-    res <- dplyr::bind_rows(map(res, dplyr::collect))
-  } else if (.data.frame) {
-    res <- dplyr::bind_rows(map(res, as.data.frame))
-  }
-
-  res
+  as_adq(RecordBatchReader$create(batches = res))

Review Comment:
   Question: Why return an ADQ instead of a straight RBR? RBR is a documented type, while ADQ is undocumented, and I think you can still use a RBR in a dplyr pipeline, right?



##########
r/R/record-batch-reader.R:
##########
@@ -104,6 +105,16 @@ RecordBatchReader <- R6Class("RecordBatchReader",
     schema = function() RecordBatchReader__schema(self)
   )
 )
+RecordBatchReader$create <- function(..., batches = list(...), schema = NULL) {

Review Comment:
   I think I just learned something new about R. `batches` here is the equivalent of Python's `*args`, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org