You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/07/19 16:54:29 UTC

[GitHub] [arrow] wjones127 commented on a diff in pull request #13650: ARROW-16703: [R] Refactor map_batches() so it can stream results

wjones127 commented on code in PR #13650:
URL: https://github.com/apache/arrow/pull/13650#discussion_r924724405


##########
r/tests/testthat/test-dataset-write.R:
##########
@@ -703,6 +703,7 @@ test_that("Dataset min_rows_per_group", {
 
   row_group_sizes <- ds %>%
     map_batches(~ record_batch(nrows = .$num_rows)) %>%
+    (function(x) x$read_table()) %>%

Review Comment:
   Would `collect()` not work here?



##########
r/R/dataset-scan.R:
##########
@@ -197,25 +199,50 @@ map_batches <- function(X, FUN, ..., .data.frame = NULL) {
   }
   FUN <- as_mapper(FUN)
   reader <- as_record_batch_reader(X)
+  dots <- rlang::list2(...)
 
-  # TODO: for future consideration
-  # * Move eval to C++ and make it a generator so it can stream, not block
-  # * Accept an output schema argument: with that, we could make this lazy (via collapse)
-  batch <- reader$read_next_batch()
-  res <- vector("list", 1024)
-  i <- 0L
-  while (!is.null(batch)) {
-    i <- i + 1L
-    res[[i]] <- as_record_batch(FUN(batch, ...))
+  # If no schema is supplied, we have to evaluate the first batch here
+  if (is.null(.schema)) {
     batch <- reader$read_next_batch()
-  }
+    if (is.null(batch)) {
+      abort("Can't infer schema from a RecordBatchReader with zero batches")
+    }
+
+    first_result <- as_record_batch(do.call(FUN, c(list(batch, dots))))
+    .schema <- first_result$schema
+    fun <- function() {
+      if (!is.null(first_result)) {
+        result <- first_result
+        first_result <<- NULL
+        result
+      } else {
+        batch <- reader$read_next_batch()
+        if (is.null(batch)) {
+          NULL
+        } else {
+          as_record_batch(
+            do.call(FUN, c(list(batch, dots))),

Review Comment:
   We should probably add a test to make sure the `dots` are being passed through.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org