You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/05/16 23:02:53 UTC

[GitHub] [arrow] nealrichardson opened a new pull request, #13170: ARROW-15271: [R] Refactor do_exec_plan to return a RecordBatchReader

nealrichardson opened a new pull request, #13170:
URL: https://github.com/apache/arrow/pull/13170

   Ticket title is misleading: this PR actually removes do_exec_plan(). plan$Run() now always returns a RBR; the two cases where Tables are used to post-process ExecPlan results are encapsulated in Run() now.
   
   There is one catch that still needs addressing, but I'll make another jira for it: you can provide schema metadata to the WriteNode but not the other SinkNodes, so anything that preserves R metadata needs to handle that separately because Run() will drop it. This seems to be a limitation of the C++ library.
   
   One other change here: map_batches() now returns a RBR and requires that the function it maps returns something that is coercible to a RecordBatch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #13170: ARROW-15271: [R] Refactor do_exec_plan to return a RecordBatchReader

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #13170:
URL: https://github.com/apache/arrow/pull/13170#issuecomment-1128219541

   https://issues.apache.org/jira/browse/ARROW-15271


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wjones127 commented on a diff in pull request #13170: ARROW-15271: [R] Refactor do_exec_plan to return a RecordBatchReader

Posted by GitBox <gi...@apache.org>.
wjones127 commented on code in PR #13170:
URL: https://github.com/apache/arrow/pull/13170#discussion_r876399314


##########
r/R/dataset-scan.R:
##########
@@ -169,43 +169,57 @@ tail_from_batches <- function(batches, n) {
     if (n <= 0) break
   }
   # rev() the result to put the batches back in the right order
-  Table$create(!!!rev(result))
+  RecordBatchReader$create(batches = rev(result))
 }
 
 #' Apply a function to a stream of RecordBatches
 #'
 #' As an alternative to calling `collect()` on a `Dataset` query, you can
 #' use this function to access the stream of `RecordBatch`es in the `Dataset`.
-#' This lets you aggregate on each chunk and pull the intermediate results into
-#' a `data.frame` for further aggregation, even if you couldn't fit the whole
-#' `Dataset` result in memory.
+#' This lets you do more complex operations in R that operate on chunks of data
+#' without having to hold the entire Dataset in memory at once. You can include
+#' `map_batches()` in a dplyr pipeline and do additional dplyr methods on the
+#' stream of data in Arrow after it.
 #'
-#' This is experimental and not recommended for production use.
+#' Note that, unlike the core dplyr methods that are implemented in the Arrow
+#' query engine, `map_batches()` is not lazy: it starts evaluating on the data
+#' when you call it, even if you send its result to another pipeline function.
+#'
+#' This is experimental and not recommended for production use. It is also
+#' single-threaded and runs in R not C++, so it won't be as fast as core
+#' Arrow methods.
 #'
 #' @param X A `Dataset` or `arrow_dplyr_query` object, as returned by the
 #' `dplyr` methods on `Dataset`.
 #' @param FUN A function or `purrr`-style lambda expression to apply to each
-#' batch
+#' batch. It must return a RecordBatch or something coercible to one via
+#' `as_record_batch()'.
 #' @param ... Additional arguments passed to `FUN`
-#' @param .data.frame logical: collect the resulting chunks into a single
-#' `data.frame`? Default `TRUE`
+#' @param .data.frame Deprecated argument, ignored
+#' @return An `arrow_dplyr_query`.
 #' @export
-map_batches <- function(X, FUN, ..., .data.frame = TRUE) {
-  # TODO: ARROW-15271 possibly refactor do_exec_plan to return a RecordBatchReader
+map_batches <- function(X, FUN, ..., .data.frame = NULL) {
+  if (!is.null(.data.frame)) {
+    warning(
+      "The .data.frame argument is deprecated. ",
+      "Call collect() on the result to get a data.frame.",
+      call. = FALSE
+    )
+  }
   plan <- ExecPlan$create()
   final_node <- plan$Build(as_adq(X))
   reader <- plan$Run(final_node)
   FUN <- as_mapper(FUN)
 
-  # TODO: wrap batch in arrow_dplyr_query with X$selected_columns,
-  # X$temp_columns, and X$group_by_vars
-  # if X is arrow_dplyr_query, if some other arg (.dplyr?) == TRUE
+  # TODO: for future consideration
+  # * Move eval to C++ and make it a generator so it can stream, not block

Review Comment:
   +1 that would be nice.



##########
r/R/dataset-scan.R:
##########
@@ -214,13 +228,7 @@ map_batches <- function(X, FUN, ..., .data.frame = TRUE) {
     res <- res[seq_len(i)]
   }
 
-  if (.data.frame & inherits(res[[1]], "arrow_dplyr_query")) {
-    res <- dplyr::bind_rows(map(res, dplyr::collect))
-  } else if (.data.frame) {
-    res <- dplyr::bind_rows(map(res, as.data.frame))
-  }
-
-  res
+  as_adq(RecordBatchReader$create(batches = res))

Review Comment:
   Question: Why return an ADQ instead of a straight RBR? RBR is a documented type, while ADQ is undocumented, and I think you can still use a RBR in a dplyr pipeline, right?



##########
r/R/record-batch-reader.R:
##########
@@ -104,6 +105,16 @@ RecordBatchReader <- R6Class("RecordBatchReader",
     schema = function() RecordBatchReader__schema(self)
   )
 )
+RecordBatchReader$create <- function(..., batches = list(...), schema = NULL) {

Review Comment:
   I think I just learned something new about R. `batches` here is the equivalent of Python's `*args`, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] ursabot commented on pull request #13170: ARROW-15271: [R] Refactor do_exec_plan to return a RecordBatchReader

Posted by GitBox <gi...@apache.org>.
ursabot commented on PR #13170:
URL: https://github.com/apache/arrow/pull/13170#issuecomment-1131530737

   Benchmark runs are scheduled for baseline = 663dc325de1176a5caf32809942acae98abf7a8b and contender = dc39f83e2f9d05b0fcea1ad8ed4ed9e07da7bef0. dc39f83e2f9d05b0fcea1ad8ed4ed9e07da7bef0 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Finished :arrow_down:0.0% :arrow_up:0.0%] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/679e4e4402d8439d9ff43edd41eb97de...9acf4b5cd6fe4f8d8539b78ac55a0ea3/)
   [Failed :arrow_down:11.53% :arrow_up:0.0%] [test-mac-arm](https://conbench.ursa.dev/compare/runs/4883c8a43a764d42aa646ddbb28a3f46...55c6f23a3dcb4c20a616a108c47ad345/)
   [Failed :arrow_down:0.37% :arrow_up:0.37%] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/2880b9bcf61e46bc8de245f742a50900...69f4a27b959147c489dc11a6d5cdddfd/)
   [Finished :arrow_down:0.32% :arrow_up:0.0%] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/c14e1c957fce4c2da381ef9e6d42cc6f...085c7b1c3f034465990f2b74b5e337b1/)
   Buildkite builds:
   [Finished] [`dc39f83e` ec2-t3-xlarge-us-east-2](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ec2-t3-xlarge-us-east-2/builds/791)
   [Failed] [`dc39f83e` test-mac-arm](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-test-mac-arm/builds/788)
   [Failed] [`dc39f83e` ursa-i9-9960x](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-i9-9960x/builds/778)
   [Finished] [`dc39f83e` ursa-thinkcentre-m75q](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-thinkcentre-m75q/builds/794)
   [Finished] [`663dc325` ec2-t3-xlarge-us-east-2](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ec2-t3-xlarge-us-east-2/builds/790)
   [Failed] [`663dc325` test-mac-arm](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-test-mac-arm/builds/787)
   [Failed] [`663dc325` ursa-i9-9960x](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-i9-9960x/builds/777)
   [Finished] [`663dc325` ursa-thinkcentre-m75q](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-thinkcentre-m75q/builds/793)
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] nealrichardson commented on a diff in pull request #13170: ARROW-15271: [R] Refactor do_exec_plan to return a RecordBatchReader

Posted by GitBox <gi...@apache.org>.
nealrichardson commented on code in PR #13170:
URL: https://github.com/apache/arrow/pull/13170#discussion_r876431555


##########
r/R/dataset-scan.R:
##########
@@ -214,13 +228,7 @@ map_batches <- function(X, FUN, ..., .data.frame = TRUE) {
     res <- res[seq_len(i)]
   }
 
-  if (.data.frame & inherits(res[[1]], "arrow_dplyr_query")) {
-    res <- dplyr::bind_rows(map(res, dplyr::collect))
-  } else if (.data.frame) {
-    res <- dplyr::bind_rows(map(res, as.data.frame))
-  }
-
-  res
+  as_adq(RecordBatchReader$create(batches = res))

Review Comment:
   Done in https://github.com/apache/arrow/pull/13170/commits/e12c633327439fef6857ea8c6b08a18af2301c96



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] ursabot commented on pull request #13170: ARROW-15271: [R] Refactor do_exec_plan to return a RecordBatchReader

Posted by GitBox <gi...@apache.org>.
ursabot commented on PR #13170:
URL: https://github.com/apache/arrow/pull/13170#issuecomment-1131530856

   ['Python', 'R'] benchmarks have high level of regressions.
   [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/2880b9bcf61e46bc8de245f742a50900...69f4a27b959147c489dc11a6d5cdddfd/)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] nealrichardson commented on a diff in pull request #13170: ARROW-15271: [R] Refactor do_exec_plan to return a RecordBatchReader

Posted by GitBox <gi...@apache.org>.
nealrichardson commented on code in PR #13170:
URL: https://github.com/apache/arrow/pull/13170#discussion_r875981109


##########
r/R/dplyr-collect.R:
##########
@@ -27,15 +27,31 @@ collect.arrow_dplyr_query <- function(x, as_data_frame = TRUE, ...) {
   }
 
   # See query-engine.R for ExecPlan/Nodes
+  plan <- ExecPlan$create()
+  final_node <- plan$Build(x)
   tryCatch(
-    tab <- do_exec_plan(x),
+    tab <- plan$Run(final_node)$read_table(),
     # n = 4 because we want the error to show up as being from collect()
     # and not handle_csv_read_error()
     error = function(e, call = caller_env(n = 4)) {
       handle_csv_read_error(e, x$.data$schema, call)
     }
   )
 
+  # TODO: SinkNodeOptions need to take KVM like WriteNodeOptions
+  # or need to be able to set metadata on RBR

Review Comment:
   ```suggestion
     # TODO(ARROW-16607): move KVM handling into ExecPlan
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] nealrichardson commented on a diff in pull request #13170: ARROW-15271: [R] Refactor do_exec_plan to return a RecordBatchReader

Posted by GitBox <gi...@apache.org>.
nealrichardson commented on code in PR #13170:
URL: https://github.com/apache/arrow/pull/13170#discussion_r875977868


##########
r/R/record-batch-reader.R:
##########
@@ -226,8 +237,8 @@ as_record_batch_reader.Dataset <- function(x, ...) {
 #' @rdname as_record_batch_reader
 #' @export
 as_record_batch_reader.arrow_dplyr_query <- function(x, ...) {
-  # TODO(ARROW-15271): make ExecPlan return RBR
-  as_record_batch_reader(collect.arrow_dplyr_query(x, as_data_frame = FALSE))
+  # TODO(ARROW-15271): use ExecPlan directly when it handles metadata

Review Comment:
   ```suggestion
     # TODO(ARROW-16607): use ExecPlan directly when it handles metadata
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] nealrichardson commented on a diff in pull request #13170: ARROW-15271: [R] Refactor do_exec_plan to return a RecordBatchReader

Posted by GitBox <gi...@apache.org>.
nealrichardson commented on code in PR #13170:
URL: https://github.com/apache/arrow/pull/13170#discussion_r876420634


##########
r/R/record-batch-reader.R:
##########
@@ -104,6 +105,16 @@ RecordBatchReader <- R6Class("RecordBatchReader",
     schema = function() RecordBatchReader__schema(self)
   )
 )
+RecordBatchReader$create <- function(..., batches = list(...), schema = NULL) {

Review Comment:
   In effect, yes, or `**kwargs`. This is a convenient pattern when you want to accept `...` to turn into a list but may have already assembled the list.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] jonkeane commented on a diff in pull request #13170: ARROW-15271: [R] Refactor do_exec_plan to return a RecordBatchReader

Posted by GitBox <gi...@apache.org>.
jonkeane commented on code in PR #13170:
URL: https://github.com/apache/arrow/pull/13170#discussion_r876456619


##########
r/R/duckdb.R:
##########
@@ -161,9 +157,5 @@ to_arrow <- function(.data, as_arrow_query = TRUE) {
   # Run the query
   res <- DBI::dbSendQuery(dbplyr::remote_con(.data), dbplyr::remote_query(.data), arrow = TRUE)
 
-  if (as_arrow_query) {
-    arrow_dplyr_query(duckdb::duckdb_fetch_record_batch(res))
-  } else {
-    duckdb::duckdb_fetch_record_batch(res)
-  }
+  duckdb::duckdb_fetch_record_batch(res)

Review Comment:
   I'm glad we can get this cleaned up 🎉 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] nealrichardson commented on a diff in pull request #13170: ARROW-15271: [R] Refactor do_exec_plan to return a RecordBatchReader

Posted by GitBox <gi...@apache.org>.
nealrichardson commented on code in PR #13170:
URL: https://github.com/apache/arrow/pull/13170#discussion_r876431942


##########
r/R/duckdb.R:
##########
@@ -161,9 +157,5 @@ to_arrow <- function(.data, as_arrow_query = TRUE) {
   # Run the query
   res <- DBI::dbSendQuery(dbplyr::remote_con(.data), dbplyr::remote_query(.data), arrow = TRUE)
 
-  if (as_arrow_query) {
-    arrow_dplyr_query(duckdb::duckdb_fetch_record_batch(res))
-  } else {
-    duckdb::duckdb_fetch_record_batch(res)
-  }
+  duckdb::duckdb_fetch_record_batch(res)

Review Comment:
   @jonkeane FYI. See also https://github.com/apache/arrow/pull/11730#discussion_r794661445 for historical context



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] nealrichardson closed pull request #13170: ARROW-15271: [R] Refactor do_exec_plan to return a RecordBatchReader

Posted by GitBox <gi...@apache.org>.
nealrichardson closed pull request #13170: ARROW-15271: [R] Refactor do_exec_plan to return a RecordBatchReader
URL: https://github.com/apache/arrow/pull/13170


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #13170: ARROW-15271: [R] Refactor do_exec_plan to return a RecordBatchReader

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #13170:
URL: https://github.com/apache/arrow/pull/13170#issuecomment-1128219563

   :warning: Ticket **has not been started in JIRA**, please click 'Start Progress'.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] nealrichardson commented on a diff in pull request #13170: ARROW-15271: [R] Refactor do_exec_plan to return a RecordBatchReader

Posted by GitBox <gi...@apache.org>.
nealrichardson commented on code in PR #13170:
URL: https://github.com/apache/arrow/pull/13170#discussion_r876424751


##########
r/R/dataset-scan.R:
##########
@@ -214,13 +228,7 @@ map_batches <- function(X, FUN, ..., .data.frame = TRUE) {
     res <- res[seq_len(i)]
   }
 
-  if (.data.frame & inherits(res[[1]], "arrow_dplyr_query")) {
-    res <- dplyr::bind_rows(map(res, dplyr::collect))
-  } else if (.data.frame) {
-    res <- dplyr::bind_rows(map(res, as.data.frame))
-  }
-
-  res
+  as_adq(RecordBatchReader$create(batches = res))

Review Comment:
   I was thinking back to the discussion that ARROW-15489 refers to, where RBR didn't print usefully so it would be weird to get one back. But now that that issue has been done, you're right, RBR makes sense (it prints about the same as ADQ now). I can switch this one and also switch the duckdb to_arrow() one as well. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org