You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/07/06 15:06:29 UTC

[GitHub] [arrow] paleolimbot commented on a diff in pull request #13397: ARROW-16444: [R] Implement user-defined scalar functions in R bindings

paleolimbot commented on code in PR #13397:
URL: https://github.com/apache/arrow/pull/13397#discussion_r914955325


##########
r/R/query-engine.R:
##########
@@ -190,7 +190,7 @@ ExecPlan <- R6Class("ExecPlan",
       }
       node
     },
-    Run = function(node) {
+    Run = function(node, as_table = FALSE) {

Review Comment:
   It's definitely a problem...what I have here is the easy way, the hard way being to make sure that we have an event loop running at all times just in case there's a background thread that might want to execute R code.
   
   The way this is coded now you can still stream via `RecordBatchReader` (at the R level), you just can't do that *and* use a UDF. For example:
   
   ``` r
   library(arrow, warn.conflicts = FALSE)
   #> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.
   library(dplyr, warn.conflicts = FALSE)
   
   fun <- arrow_base_scalar_function(
     int32(), int64(),
     function(context, args) {
       args[[1]] + 1L
     }
   )
   
   register_scalar_function("my_test_scalar_function", fun)
   
   record_batch(a = 2L) |> 
     mutate(b = my_test_scalar_function(a)) |> 
     collect()
   #> # A tibble: 1 × 2
   #>       a     b
   #>   <int> <int>
   #> 1     2     3
   
   record_batch(a = 2L) |> 
     mutate(b = my_test_scalar_function(a)) |> 
     to_duckdb() |> 
     collect()
   #> Error in duckdb_execute(res): duckdb_execute_R: Failed to run query
   #> Error: Invalid Input Error: arrow_scan: get_next failed(): NotImplemented: Call to R from a non-R thread without calling RunWithCapturedR
   #> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/compute/kernel.cc:391  resolver_(ctx, args)
   #> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/compute/exec.cc:696  kernel_->signature->out_type().Resolve(kernel_ctx_, args.inputs)
   #> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/compute/exec/expression.cc:602  executor->Init(&kernel_context, {kernel, descrs, options})
   #> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/compute/exec/project_node.cc:92  ExecuteScalarExpression(simplified_expr, target, plan()->exec_context())
   #> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/dataset/scanner.cc:79  delegate_.Next()
   #> /Users/deweydunnington/Desktop/rscratch/arrow/cpp/src/arrow/c/bridge.cc:1651  reader()->ReadNext(&batch)
   ```
   
   <sup>Created on 2022-07-06 by the [reprex package](https://reprex.tidyverse.org) (v2.0.1)</sup>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org