You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/19 08:11:16 UTC

[GitHub] [arrow] KnutJaegersberg opened a new issue, #12919: [R Package] map_batches in parallel

KnutJaegersberg opened a new issue, #12919:
URL: https://github.com/apache/arrow/issues/12919

   Regular user of disk.frame. I love that packages map function, which allows to apply custom R functions on a larger than ram dataset in paralllel over the batches by using the future package. Am I right map_batches right now supports single core processing as it is implemented as a while loop over the batches? Do you plan to bring this very handy capability to R arrow, too? Sure it must be slower than arrows c++ based functionality, but it is still super useful. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] paleolimbot commented on issue #12919: [R Package] map_batches in parallel

Posted by GitBox <gi...@apache.org>.
paleolimbot commented on issue #12919:
URL: https://github.com/apache/arrow/issues/12919#issuecomment-1133364938

   There's a good discussion on the mailing list about this in the context of Python user-defined-functions, which were just added ( https://lists.apache.org/thread/hwcrnf77j8p4dvyzoc3v5cwgws83nvqp ). In the next few months we'll have R user-defined functions too, which will serve a related purpose (do things with the Arrow compute engine that aren't implemented in C++). The gist of the mailing list discussion is that a lot of the parallelism is already handled by Arrow and when R/Python code dispatches *more* workers it might not result in the performance gain that one might expect.
   
   For `map_batches()` it's a little easier to think of a use-case where dispatching incoming RecordBatches to workers running on other cores would help (my mind jumps to performing a spatial join, which won't be in the query engine anytie soon).
   
   I think we have almost all the infrastructure we need to do this...we can read/write from R connections (like `socketConnection()`), which gives us some options for interprocess communication if they're not already present/too slow in the future package abstraction.
   
   I had some fun trying to wire all of that up and ran into some problems, which expose some of the limits of our current infrastucture. Perhaps a good scope of what we should support in Arrow is the tools to make this work (while the actual implementation could/should live in an extension package?)
   
   <details>
   
   ``` r
   make_arrow_worker <- function(schema, .f, ..., port = 1234) {
     fun <- function(..., port) {
       dots <- list(...)
       input <- arrow:::MakeRConnectionInputStream(socketConnection(port = port, server = TRUE, blocking = TRUE))
       output_con <- pipe("cat", "wb")
       output <- arrow:::MakeRConnectionOutputStream(output_con)
       
       reader <- arrow::RecordBatchStreamReader$create(input)
       writer <- arrow::RecordBatchStreamWriter$create(output, schema)
       while (!is.null(batch <- reader$read_next_batch())) {
         args <- c(list(batch), dots)
         result <- do.call(.f, args)
         writer$write(as_record_batch(result))
         flush(con)
       }
     }
     
     callr::r_bg(fun, list(..., port = port))
   }
   
   worker <- make_arrow_worker(
     arrow::schema(x = arrow::int32()),
     function(batch, offset = 0) {
       arrow::record_batch(x = arrow::int32())
       batch$x <- batch$x + offset
       batch
     },
     offset = 123,
     port = 4837
   )
   
   
   input_con <- socketConnection(Sys.info()["nodename"], port = 4837)
   #> Warning in socketConnection(Sys.info()["nodename"], port = 4837): Deweys-
   #> MacBook-Air-2.local:4837 cannot be opened
   #> Error in socketConnection(Sys.info()["nodename"], port = 4837): cannot open the connection
   input_stream <- arrow:::MakeRConnectionOutputStream(input_con)
   #> Error in arrow:::MakeRConnectionOutputStream(input_con): object 'input_con' not found
   writer <- arrow::RecordBatchStreamWriter$create(input_stream, arrow::schema(x = arrow::int32()))
   #> Error in is.string(sink): object 'input_stream' not found
   writer$write_batch(arrow::record_batch(x = 2L))
   #> Error in eval(expr, envir, enclos): object 'writer' not found
   worker$read_output()
   #> [1] ""
   worker$read_error()
   #> [1] "Error in socketConnection(port = port, server = TRUE, blocking = TRUE) : \n  cannot open the connection\nIn addition: Warning message:\nIn socketConnection(port = port, server = TRUE, blocking = TRUE) :\n  port 4837 cannot be opened\n"
   ```
   
   <sup>Created on 2022-05-20 by the [reprex package](https://reprex.tidyverse.org) (v2.0.1)</sup>
   
   </details>
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wjones127 commented on issue #12919: [R Package] map_batches in parallel

Posted by GitBox <gi...@apache.org>.
wjones127 commented on issue #12919:
URL: https://github.com/apache/arrow/issues/12919#issuecomment-1133047634

   > Am I right map_batches right now supports single core processing as it is implemented as a while loop over the batches?
   
   Yes, right now it only uses a single core. Other operations implemented in C++ (like the dplyr ones) are multi-threaded, but I don't think we have a way to arbitrary R code in multiple threads.
   
   > Do you plan to bring this very handy capability to R arrow, too?
   
   Maybe. From [Futures](https://cran.r-project.org/web/packages/future/index.html) package:
   
    > This package implements sequential, multicore, multisession, and cluster futures. With these, R expressions can be evaluated on the local machine, in parallel a set of local machines, or distributed on a mix of local and remote machines.
   
   I'm not sure if we would target non-local execution, but we might target multisession if we had a good way of sharing the Arrow RecordBatches with subprocesses easily.
   
   I might research this a bit. @paleolimbot you have any thoughts on this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org