You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/05/20 20:54:44 UTC

[GitHub] [arrow] paleolimbot commented on issue #12919: [R Package] map_batches in parallel

paleolimbot commented on issue #12919:
URL: https://github.com/apache/arrow/issues/12919#issuecomment-1133364938

   There's a good discussion on the mailing list about this in the context of Python user-defined-functions, which were just added ( https://lists.apache.org/thread/hwcrnf77j8p4dvyzoc3v5cwgws83nvqp ). In the next few months we'll have R user-defined functions too, which will serve a related purpose (do things with the Arrow compute engine that aren't implemented in C++). The gist of the mailing list discussion is that a lot of the parallelism is already handled by Arrow and when R/Python code dispatches *more* workers it might not result in the performance gain that one might expect.
   
   For `map_batches()` it's a little easier to think of a use-case where dispatching incoming RecordBatches to workers running on other cores would help (my mind jumps to performing a spatial join, which won't be in the query engine anytie soon).
   
   I think we have almost all the infrastructure we need to do this...we can read/write from R connections (like `socketConnection()`), which gives us some options for interprocess communication if they're not already present/too slow in the future package abstraction.
   
   I had some fun trying to wire all of that up and ran into some problems, which expose some of the limits of our current infrastucture. Perhaps a good scope of what we should support in Arrow is the tools to make this work (while the actual implementation could/should live in an extension package?)
   
   <details>
   
   ``` r
   make_arrow_worker <- function(schema, .f, ..., port = 1234) {
     fun <- function(..., port) {
       dots <- list(...)
       input <- arrow:::MakeRConnectionInputStream(socketConnection(port = port, server = TRUE, blocking = TRUE))
       output_con <- pipe("cat", "wb")
       output <- arrow:::MakeRConnectionOutputStream(output_con)
       
       reader <- arrow::RecordBatchStreamReader$create(input)
       writer <- arrow::RecordBatchStreamWriter$create(output, schema)
       while (!is.null(batch <- reader$read_next_batch())) {
         args <- c(list(batch), dots)
         result <- do.call(.f, args)
         writer$write(as_record_batch(result))
         flush(con)
       }
     }
     
     callr::r_bg(fun, list(..., port = port))
   }
   
   worker <- make_arrow_worker(
     arrow::schema(x = arrow::int32()),
     function(batch, offset = 0) {
       arrow::record_batch(x = arrow::int32())
       batch$x <- batch$x + offset
       batch
     },
     offset = 123,
     port = 4837
   )
   
   
   input_con <- socketConnection(Sys.info()["nodename"], port = 4837)
   #> Warning in socketConnection(Sys.info()["nodename"], port = 4837): Deweys-
   #> MacBook-Air-2.local:4837 cannot be opened
   #> Error in socketConnection(Sys.info()["nodename"], port = 4837): cannot open the connection
   input_stream <- arrow:::MakeRConnectionOutputStream(input_con)
   #> Error in arrow:::MakeRConnectionOutputStream(input_con): object 'input_con' not found
   writer <- arrow::RecordBatchStreamWriter$create(input_stream, arrow::schema(x = arrow::int32()))
   #> Error in is.string(sink): object 'input_stream' not found
   writer$write_batch(arrow::record_batch(x = 2L))
   #> Error in eval(expr, envir, enclos): object 'writer' not found
   worker$read_output()
   #> [1] ""
   worker$read_error()
   #> [1] "Error in socketConnection(port = port, server = TRUE, blocking = TRUE) : \n  cannot open the connection\nIn addition: Warning message:\nIn socketConnection(port = port, server = TRUE, blocking = TRUE) :\n  port 4837 cannot be opened\n"
   ```
   
   <sup>Created on 2022-05-20 by the [reprex package](https://reprex.tidyverse.org) (v2.0.1)</sup>
   
   </details>
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org