You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by jo...@apache.org on 2022/04/07 16:50:36 UTC
[arrow] branch master updated: ARROW-15841: [R] Implement SafeCallIntoR to safely call the R API from another thread
This is an automated email from the ASF dual-hosted git repository.
jonkeane pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new e110eac71a ARROW-15841: [R] Implement SafeCallIntoR to safely call the R API from another thread
e110eac71a is described below
commit e110eac71aae63041a595fc1c8cc51960ba97f06
Author: Dewey Dunnington <de...@fishandwhistle.net>
AuthorDate: Thu Apr 7 11:50:28 2022 -0500
ARROW-15841: [R] Implement SafeCallIntoR to safely call the R API from another thread
This is a very WIP draft that currently just sketches a few things related to calling into R from other threads. Some code to get started:
``` r
arrow:::TestSafeCallIntoR(
list(
function() "string one",
function() "string two"
)
)
#> [1] "string one" "string two"
arrow:::TestSafeCallIntoR(
list(
function() stop("This is an error!")
)
)
#> Error in (function () : This is an error!
```
Closes #12558 from paleolimbot/r-safe-call-into
Authored-by: Dewey Dunnington <de...@fishandwhistle.net>
Signed-off-by: Jonathan Keane <jk...@gmail.com>
---
r/R/arrow-package.R | 5 ++
r/R/arrowExports.R | 8 ++
r/src/arrowExports.cpp | 33 +++++++
r/src/safe-call-into-r-impl.cpp | 89 +++++++++++++++++++
r/src/safe-call-into-r.h | 145 +++++++++++++++++++++++++++++++
r/tests/testthat/test-safe-call-into-r.R | 60 +++++++++++++
6 files changed, 340 insertions(+)
diff --git a/r/R/arrow-package.R b/r/R/arrow-package.R
index 509382e5da..2fab03d08c 100644
--- a/r/R/arrow-package.R
+++ b/r/R/arrow-package.R
@@ -31,6 +31,11 @@
#' @importFrom vctrs s3_register vec_size vec_cast vec_unique
.onLoad <- function(...) {
+ if (arrow_available()) {
+ # Make sure C++ knows on which thread it is safe to call the R API
+ InitializeMainRThread()
+ }
+
dplyr_methods <- paste0(
"dplyr::",
c(
diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index f43ef730ca..5ef6312196 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -1732,6 +1732,14 @@ ipc___RecordBatchStreamWriter__Open <- function(stream, schema, use_legacy_forma
.Call(`_arrow_ipc___RecordBatchStreamWriter__Open`, stream, schema, use_legacy_format, metadata_version)
}
+InitializeMainRThread <- function() {
+ invisible(.Call(`_arrow_InitializeMainRThread`))
+}
+
+TestSafeCallIntoR <- function(r_fun_that_returns_a_string, opt) {
+ .Call(`_arrow_TestSafeCallIntoR`, r_fun_that_returns_a_string, opt)
+}
+
Array__GetScalar <- function(x, i) {
.Call(`_arrow_Array__GetScalar`, x, i)
}
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index 45a883321d..0a29ed0872 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -6822,6 +6822,37 @@ extern "C" SEXP _arrow_ipc___RecordBatchStreamWriter__Open(SEXP stream_sexp, SEX
}
#endif
+// safe-call-into-r-impl.cpp
+#if defined(ARROW_R_WITH_ARROW)
+void InitializeMainRThread();
+extern "C" SEXP _arrow_InitializeMainRThread(){
+BEGIN_CPP11
+ InitializeMainRThread();
+ return R_NilValue;
+END_CPP11
+}
+#else
+extern "C" SEXP _arrow_InitializeMainRThread(){
+ Rf_error("Cannot call InitializeMainRThread(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. ");
+}
+#endif
+
+// safe-call-into-r-impl.cpp
+#if defined(ARROW_R_WITH_ARROW)
+std::string TestSafeCallIntoR(cpp11::function r_fun_that_returns_a_string, std::string opt);
+extern "C" SEXP _arrow_TestSafeCallIntoR(SEXP r_fun_that_returns_a_string_sexp, SEXP opt_sexp){
+BEGIN_CPP11
+ arrow::r::Input<cpp11::function>::type r_fun_that_returns_a_string(r_fun_that_returns_a_string_sexp);
+ arrow::r::Input<std::string>::type opt(opt_sexp);
+ return cpp11::as_sexp(TestSafeCallIntoR(r_fun_that_returns_a_string, opt));
+END_CPP11
+}
+#else
+extern "C" SEXP _arrow_TestSafeCallIntoR(SEXP r_fun_that_returns_a_string_sexp, SEXP opt_sexp){
+ Rf_error("Cannot call TestSafeCallIntoR(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. ");
+}
+#endif
+
// scalar.cpp
#if defined(ARROW_R_WITH_ARROW)
std::shared_ptr<arrow::Scalar> Array__GetScalar(const std::shared_ptr<arrow::Array>& x, int64_t i);
@@ -8146,6 +8177,8 @@ static const R_CallMethodDef CallEntries[] = {
{ "_arrow_ipc___RecordBatchWriter__Close", (DL_FUNC) &_arrow_ipc___RecordBatchWriter__Close, 1},
{ "_arrow_ipc___RecordBatchFileWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchFileWriter__Open, 4},
{ "_arrow_ipc___RecordBatchStreamWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchStreamWriter__Open, 4},
+ { "_arrow_InitializeMainRThread", (DL_FUNC) &_arrow_InitializeMainRThread, 0},
+ { "_arrow_TestSafeCallIntoR", (DL_FUNC) &_arrow_TestSafeCallIntoR, 2},
{ "_arrow_Array__GetScalar", (DL_FUNC) &_arrow_Array__GetScalar, 2},
{ "_arrow_Scalar__ToString", (DL_FUNC) &_arrow_Scalar__ToString, 1},
{ "_arrow_StructScalar__field", (DL_FUNC) &_arrow_StructScalar__field, 2},
diff --git a/r/src/safe-call-into-r-impl.cpp b/r/src/safe-call-into-r-impl.cpp
new file mode 100644
index 0000000000..aa0645aa7b
--- /dev/null
+++ b/r/src/safe-call-into-r-impl.cpp
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "./arrow_types.h"
+#if defined(ARROW_R_WITH_ARROW)
+
+#include <functional>
+#include <thread>
+#include "./safe-call-into-r.h"
+
+MainRThread& GetMainRThread() {
+ static MainRThread main_r_thread;
+ return main_r_thread;
+}
+
+// [[arrow::export]]
+void InitializeMainRThread() { GetMainRThread().Initialize(); }
+
+// [[arrow::export]]
+std::string TestSafeCallIntoR(cpp11::function r_fun_that_returns_a_string,
+ std::string opt) {
+ if (opt == "async_with_executor") {
+ std::thread* thread_ptr;
+
+ auto result =
+ RunWithCapturedR<std::string>([&thread_ptr, r_fun_that_returns_a_string]() {
+ auto fut = arrow::Future<std::string>::Make();
+ thread_ptr = new std::thread([fut, r_fun_that_returns_a_string]() mutable {
+ auto result = SafeCallIntoR<std::string>([&] {
+ return cpp11::as_cpp<std::string>(r_fun_that_returns_a_string());
+ });
+
+ fut.MarkFinished(result);
+ });
+
+ return fut;
+ });
+
+ thread_ptr->join();
+ delete thread_ptr;
+
+ return arrow::ValueOrStop(result);
+ } else if (opt == "async_without_executor") {
+ std::thread* thread_ptr;
+
+ auto fut = arrow::Future<std::string>::Make();
+ thread_ptr = new std::thread([fut, r_fun_that_returns_a_string]() mutable {
+ auto result = SafeCallIntoR<std::string>(
+ [&] { return cpp11::as_cpp<std::string>(r_fun_that_returns_a_string()); });
+
+ if (result.ok()) {
+ fut.MarkFinished(result.ValueUnsafe());
+ } else {
+ fut.MarkFinished(result.status());
+ }
+ });
+
+ thread_ptr->join();
+ delete thread_ptr;
+
+ // We should be able to get this far, but fut will contain an error
+ // because it tried to evaluate R code from another thread
+ return arrow::ValueOrStop(fut.result());
+
+ } else if (opt == "on_main_thread") {
+ auto result = SafeCallIntoR<std::string>(
+ [&]() { return cpp11::as_cpp<std::string>(r_fun_that_returns_a_string()); });
+ arrow::StopIfNotOk(result.status());
+ return result.ValueUnsafe();
+ } else {
+ cpp11::stop("Unknown `opt`");
+ }
+}
+
+#endif
diff --git a/r/src/safe-call-into-r.h b/r/src/safe-call-into-r.h
new file mode 100644
index 0000000000..1a27507b78
--- /dev/null
+++ b/r/src/safe-call-into-r.h
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef SAFE_CALL_INTO_R_INCLUDED
+#define SAFE_CALL_INTO_R_INCLUDED
+
+#include "./arrow_types.h"
+
+#include <arrow/util/future.h>
+#include <arrow/util/thread_pool.h>
+
+#include <functional>
+#include <thread>
+
+// The MainRThread class keeps track of the thread on which it is safe
+// to call the R API to facilitate its safe use (or erroring
+// if it is not safe). The MainRThread singleton can be accessed from
+// any thread using GetMainRThread(); the preferred way to call
+// the R API where it may not be safe to do so is to use
+// SafeCallIntoR<cpp_type>([&]() { ... }).
+class MainRThread {
+ public:
+ MainRThread() : initialized_(false), executor_(nullptr) {}
+
+ // Call this method from the R thread (e.g., on package load)
+ // to save an internal copy of the thread id.
+ void Initialize() {
+ thread_id_ = std::this_thread::get_id();
+ initialized_ = true;
+ SetError(R_NilValue);
+ }
+
+ bool IsInitialized() { return initialized_; }
+
+ // Check if the current thread is the main R thread
+ bool IsMainThread() { return initialized_ && std::this_thread::get_id() == thread_id_; }
+
+ // The Executor that is running on the main R thread, if it exists
+ arrow::internal::Executor*& Executor() { return executor_; }
+
+ // Save an error token generated from a cpp11::unwind_exception
+ // so that it can be properly handled after some cleanup code
+ // has run (e.g., cancelling some futures or waiting for them
+ // to finish).
+ void SetError(cpp11::sexp token) { error_token_ = token; }
+
+ void ResetError() { error_token_ = R_NilValue; }
+
+ // Check if there is a saved error
+ bool HasError() { return error_token_ != R_NilValue; }
+
+ // Throw a cpp11::unwind_exception() with the saved token if it exists
+ void ClearError() {
+ if (HasError()) {
+ cpp11::unwind_exception e(error_token_);
+ ResetError();
+ throw e;
+ }
+ }
+
+ private:
+ bool initialized_;
+ std::thread::id thread_id_;
+ cpp11::sexp error_token_;
+ arrow::internal::Executor* executor_;
+};
+
+// Retrieve the MainRThread singleton
+MainRThread& GetMainRThread();
+
+// Call into R and return a C++ object. Note that you can't return
+// a SEXP (use cpp11::as_cpp<T> to convert it to a C++ type inside
+// `fun`).
+template <typename T>
+arrow::Future<T> SafeCallIntoRAsync(std::function<T(void)> fun) {
+ MainRThread& main_r_thread = GetMainRThread();
+ if (main_r_thread.IsMainThread()) {
+ // If we're on the main thread, run the task immediately and let
+ // the cpp11::unwind_exception be thrown since it will be caught
+ // at the top level.
+ return fun();
+ } else if (main_r_thread.Executor() != nullptr) {
+ // If we are not on the main thread and have an Executor,
+ // use it to run the task on the main R thread. We can't throw
+ // a cpp11::unwind_exception here, so we need to propagate it back
+ // to RunWithCapturedR through the MainRThread singleton.
+ return DeferNotOk(main_r_thread.Executor()->Submit([fun]() {
+ if (GetMainRThread().HasError()) {
+ return arrow::Result<T>(arrow::Status::UnknownError("R code execution error"));
+ }
+
+ try {
+ return arrow::Result<T>(fun());
+ } catch (cpp11::unwind_exception& e) {
+ GetMainRThread().SetError(e.token);
+ return arrow::Result<T>(arrow::Status::UnknownError("R code execution error"));
+ }
+ }));
+ } else {
+ return arrow::Status::NotImplemented(
+ "Call to R from a non-R thread without calling RunWithCapturedR");
+ }
+}
+
+template <typename T>
+arrow::Result<T> SafeCallIntoR(std::function<T(void)> fun) {
+ arrow::Future<T> future = SafeCallIntoRAsync<T>(std::move(fun));
+ return future.result();
+}
+
+template <typename T>
+arrow::Result<T> RunWithCapturedR(std::function<arrow::Future<T>()> make_arrow_call) {
+ if (GetMainRThread().Executor() != nullptr) {
+ return arrow::Status::AlreadyExists("Attempt to use more than one R Executor()");
+ }
+
+ GetMainRThread().ResetError();
+
+ arrow::Result<T> result = arrow::internal::SerialExecutor::RunInSerialExecutor<T>(
+ [make_arrow_call](arrow::internal::Executor* executor) {
+ GetMainRThread().Executor() = executor;
+ return make_arrow_call();
+ });
+
+ GetMainRThread().Executor() = nullptr;
+ GetMainRThread().ClearError();
+
+ return result;
+}
+
+#endif
diff --git a/r/tests/testthat/test-safe-call-into-r.R b/r/tests/testthat/test-safe-call-into-r.R
new file mode 100644
index 0000000000..e9438de58b
--- /dev/null
+++ b/r/tests/testthat/test-safe-call-into-r.R
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Note that TestSafeCallIntoR is defined in safe-call-into-r-impl.cpp
+
+test_that("SafeCallIntoR works from the main R thread", {
+ skip_on_cran()
+
+ expect_identical(
+ TestSafeCallIntoR(function() "string one!", opt = "on_main_thread"),
+ "string one!"
+ )
+
+ expect_error(
+ TestSafeCallIntoR(function() stop("an error!"), opt = "on_main_thread"),
+ "an error!"
+ )
+})
+
+test_that("SafeCallIntoR works within RunWithCapturedR", {
+ skip_on_cran()
+
+ expect_identical(
+ TestSafeCallIntoR(function() "string one!", opt = "async_with_executor"),
+ "string one!"
+ )
+
+ expect_error(
+ TestSafeCallIntoR(function() stop("an error!"), opt = "async_with_executor"),
+ "an error!"
+ )
+})
+
+test_that("SafeCallIntoR errors from the non-R thread", {
+ skip_on_cran()
+
+ expect_error(
+ TestSafeCallIntoR(function() "string one!", opt = "async_without_executor"),
+ "Call to R from a non-R thread"
+ )
+
+ expect_error(
+ TestSafeCallIntoR(function() stop("an error!"), opt = "async_without_executor"),
+ "Call to R from a non-R thread"
+ )
+})