You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by jo...@apache.org on 2022/04/07 16:50:36 UTC

[arrow] branch master updated: ARROW-15841: [R] Implement SafeCallIntoR to safely call the R API from another thread

This is an automated email from the ASF dual-hosted git repository.

jonkeane pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new e110eac71a ARROW-15841: [R] Implement SafeCallIntoR to safely call the R API from another thread
e110eac71a is described below

commit e110eac71aae63041a595fc1c8cc51960ba97f06
Author: Dewey Dunnington <de...@fishandwhistle.net>
AuthorDate: Thu Apr 7 11:50:28 2022 -0500

    ARROW-15841: [R] Implement SafeCallIntoR to safely call the R API from another thread
    
    This is a very WIP draft that currently just sketches a few things related to calling into R from other threads. Some code to get started:
    
    ``` r
    arrow:::TestSafeCallIntoR(
      list(
        function() "string one",
        function() "string two"
      )
    )
    #> [1] "string one" "string two"
    
    arrow:::TestSafeCallIntoR(
      list(
        function() stop("This is an error!")
      )
    )
    #> Error in (function () : This is an error!
    ```
    
    Closes #12558 from paleolimbot/r-safe-call-into
    
    Authored-by: Dewey Dunnington <de...@fishandwhistle.net>
    Signed-off-by: Jonathan Keane <jk...@gmail.com>
---
 r/R/arrow-package.R                      |   5 ++
 r/R/arrowExports.R                       |   8 ++
 r/src/arrowExports.cpp                   |  33 +++++++
 r/src/safe-call-into-r-impl.cpp          |  89 +++++++++++++++++++
 r/src/safe-call-into-r.h                 | 145 +++++++++++++++++++++++++++++++
 r/tests/testthat/test-safe-call-into-r.R |  60 +++++++++++++
 6 files changed, 340 insertions(+)

diff --git a/r/R/arrow-package.R b/r/R/arrow-package.R
index 509382e5da..2fab03d08c 100644
--- a/r/R/arrow-package.R
+++ b/r/R/arrow-package.R
@@ -31,6 +31,11 @@
 
 #' @importFrom vctrs s3_register vec_size vec_cast vec_unique
 .onLoad <- function(...) {
+  if (arrow_available()) {
+    # Make sure C++ knows on which thread it is safe to call the R API
+    InitializeMainRThread()
+  }
+
   dplyr_methods <- paste0(
     "dplyr::",
     c(
diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index f43ef730ca..5ef6312196 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -1732,6 +1732,14 @@ ipc___RecordBatchStreamWriter__Open <- function(stream, schema, use_legacy_forma
   .Call(`_arrow_ipc___RecordBatchStreamWriter__Open`, stream, schema, use_legacy_format, metadata_version)
 }
 
+InitializeMainRThread <- function() {
+  invisible(.Call(`_arrow_InitializeMainRThread`))
+}
+
+TestSafeCallIntoR <- function(r_fun_that_returns_a_string, opt) {
+  .Call(`_arrow_TestSafeCallIntoR`, r_fun_that_returns_a_string, opt)
+}
+
 Array__GetScalar <- function(x, i) {
   .Call(`_arrow_Array__GetScalar`, x, i)
 }
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index 45a883321d..0a29ed0872 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -6822,6 +6822,37 @@ extern "C" SEXP _arrow_ipc___RecordBatchStreamWriter__Open(SEXP stream_sexp, SEX
 }
 #endif
 
+// safe-call-into-r-impl.cpp
+#if defined(ARROW_R_WITH_ARROW)
+void InitializeMainRThread();
+extern "C" SEXP _arrow_InitializeMainRThread(){
+BEGIN_CPP11
+	InitializeMainRThread();
+	return R_NilValue;
+END_CPP11
+}
+#else
+extern "C" SEXP _arrow_InitializeMainRThread(){
+	Rf_error("Cannot call InitializeMainRThread(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. ");
+}
+#endif
+
+// safe-call-into-r-impl.cpp
+#if defined(ARROW_R_WITH_ARROW)
+std::string TestSafeCallIntoR(cpp11::function r_fun_that_returns_a_string, std::string opt);
+extern "C" SEXP _arrow_TestSafeCallIntoR(SEXP r_fun_that_returns_a_string_sexp, SEXP opt_sexp){
+BEGIN_CPP11
+	arrow::r::Input<cpp11::function>::type r_fun_that_returns_a_string(r_fun_that_returns_a_string_sexp);
+	arrow::r::Input<std::string>::type opt(opt_sexp);
+	return cpp11::as_sexp(TestSafeCallIntoR(r_fun_that_returns_a_string, opt));
+END_CPP11
+}
+#else
+extern "C" SEXP _arrow_TestSafeCallIntoR(SEXP r_fun_that_returns_a_string_sexp, SEXP opt_sexp){
+	Rf_error("Cannot call TestSafeCallIntoR(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. ");
+}
+#endif
+
 // scalar.cpp
 #if defined(ARROW_R_WITH_ARROW)
 std::shared_ptr<arrow::Scalar> Array__GetScalar(const std::shared_ptr<arrow::Array>& x, int64_t i);
@@ -8146,6 +8177,8 @@ static const R_CallMethodDef CallEntries[] = {
 		{ "_arrow_ipc___RecordBatchWriter__Close", (DL_FUNC) &_arrow_ipc___RecordBatchWriter__Close, 1}, 
 		{ "_arrow_ipc___RecordBatchFileWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchFileWriter__Open, 4}, 
 		{ "_arrow_ipc___RecordBatchStreamWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchStreamWriter__Open, 4}, 
+		{ "_arrow_InitializeMainRThread", (DL_FUNC) &_arrow_InitializeMainRThread, 0}, 
+		{ "_arrow_TestSafeCallIntoR", (DL_FUNC) &_arrow_TestSafeCallIntoR, 2}, 
 		{ "_arrow_Array__GetScalar", (DL_FUNC) &_arrow_Array__GetScalar, 2}, 
 		{ "_arrow_Scalar__ToString", (DL_FUNC) &_arrow_Scalar__ToString, 1}, 
 		{ "_arrow_StructScalar__field", (DL_FUNC) &_arrow_StructScalar__field, 2}, 
diff --git a/r/src/safe-call-into-r-impl.cpp b/r/src/safe-call-into-r-impl.cpp
new file mode 100644
index 0000000000..aa0645aa7b
--- /dev/null
+++ b/r/src/safe-call-into-r-impl.cpp
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "./arrow_types.h"
+#if defined(ARROW_R_WITH_ARROW)
+
+#include <functional>
+#include <thread>
+#include "./safe-call-into-r.h"
+
+MainRThread& GetMainRThread() {
+  static MainRThread main_r_thread;
+  return main_r_thread;
+}
+
+// [[arrow::export]]
+void InitializeMainRThread() { GetMainRThread().Initialize(); }
+
+// [[arrow::export]]
+std::string TestSafeCallIntoR(cpp11::function r_fun_that_returns_a_string,
+                              std::string opt) {
+  if (opt == "async_with_executor") {
+    std::thread* thread_ptr;
+
+    auto result =
+        RunWithCapturedR<std::string>([&thread_ptr, r_fun_that_returns_a_string]() {
+          auto fut = arrow::Future<std::string>::Make();
+          thread_ptr = new std::thread([fut, r_fun_that_returns_a_string]() mutable {
+            auto result = SafeCallIntoR<std::string>([&] {
+              return cpp11::as_cpp<std::string>(r_fun_that_returns_a_string());
+            });
+
+            fut.MarkFinished(result);
+          });
+
+          return fut;
+        });
+
+    thread_ptr->join();
+    delete thread_ptr;
+
+    return arrow::ValueOrStop(result);
+  } else if (opt == "async_without_executor") {
+    std::thread* thread_ptr;
+
+    auto fut = arrow::Future<std::string>::Make();
+    thread_ptr = new std::thread([fut, r_fun_that_returns_a_string]() mutable {
+      auto result = SafeCallIntoR<std::string>(
+          [&] { return cpp11::as_cpp<std::string>(r_fun_that_returns_a_string()); });
+
+      if (result.ok()) {
+        fut.MarkFinished(result.ValueUnsafe());
+      } else {
+        fut.MarkFinished(result.status());
+      }
+    });
+
+    thread_ptr->join();
+    delete thread_ptr;
+
+    // We should be able to get this far, but fut will contain an error
+    // because it tried to evaluate R code from another thread
+    return arrow::ValueOrStop(fut.result());
+
+  } else if (opt == "on_main_thread") {
+    auto result = SafeCallIntoR<std::string>(
+        [&]() { return cpp11::as_cpp<std::string>(r_fun_that_returns_a_string()); });
+    arrow::StopIfNotOk(result.status());
+    return result.ValueUnsafe();
+  } else {
+    cpp11::stop("Unknown `opt`");
+  }
+}
+
+#endif
diff --git a/r/src/safe-call-into-r.h b/r/src/safe-call-into-r.h
new file mode 100644
index 0000000000..1a27507b78
--- /dev/null
+++ b/r/src/safe-call-into-r.h
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef SAFE_CALL_INTO_R_INCLUDED
+#define SAFE_CALL_INTO_R_INCLUDED
+
+#include "./arrow_types.h"
+
+#include <arrow/util/future.h>
+#include <arrow/util/thread_pool.h>
+
+#include <functional>
+#include <thread>
+
+// The MainRThread class keeps track of the thread on which it is safe
+// to call the R API to facilitate its safe use (or erroring
+// if it is not safe). The MainRThread singleton can be accessed from
+// any thread using GetMainRThread(); the preferred way to call
+// the R API where it may not be safe to do so is to use
+// SafeCallIntoR<cpp_type>([&]() { ... }).
+class MainRThread {
+ public:
+  MainRThread() : initialized_(false), executor_(nullptr) {}
+
+  // Call this method from the R thread (e.g., on package load)
+  // to save an internal copy of the thread id.
+  void Initialize() {
+    thread_id_ = std::this_thread::get_id();
+    initialized_ = true;
+    SetError(R_NilValue);
+  }
+
+  bool IsInitialized() { return initialized_; }
+
+  // Check if the current thread is the main R thread
+  bool IsMainThread() { return initialized_ && std::this_thread::get_id() == thread_id_; }
+
+  // The Executor that is running on the main R thread, if it exists
+  arrow::internal::Executor*& Executor() { return executor_; }
+
+  // Save an error token generated from a cpp11::unwind_exception
+  // so that it can be properly handled after some cleanup code
+  // has run (e.g., cancelling some futures or waiting for them
+  // to finish).
+  void SetError(cpp11::sexp token) { error_token_ = token; }
+
+  void ResetError() { error_token_ = R_NilValue; }
+
+  // Check if there is a saved error
+  bool HasError() { return error_token_ != R_NilValue; }
+
+  // Throw a cpp11::unwind_exception() with the saved token if it exists
+  void ClearError() {
+    if (HasError()) {
+      cpp11::unwind_exception e(error_token_);
+      ResetError();
+      throw e;
+    }
+  }
+
+ private:
+  bool initialized_;
+  std::thread::id thread_id_;
+  cpp11::sexp error_token_;
+  arrow::internal::Executor* executor_;
+};
+
+// Retrieve the MainRThread singleton
+MainRThread& GetMainRThread();
+
+// Call into R and return a C++ object. Note that you can't return
+// a SEXP (use cpp11::as_cpp<T> to convert it to a C++ type inside
+// `fun`).
+template <typename T>
+arrow::Future<T> SafeCallIntoRAsync(std::function<T(void)> fun) {
+  MainRThread& main_r_thread = GetMainRThread();
+  if (main_r_thread.IsMainThread()) {
+    // If we're on the main thread, run the task immediately and let
+    // the cpp11::unwind_exception be thrown since it will be caught
+    // at the top level.
+    return fun();
+  } else if (main_r_thread.Executor() != nullptr) {
+    // If we are not on the main thread and have an Executor,
+    // use it to run the task on the main R thread. We can't throw
+    // a cpp11::unwind_exception here, so we need to propagate it back
+    // to RunWithCapturedR through the MainRThread singleton.
+    return DeferNotOk(main_r_thread.Executor()->Submit([fun]() {
+      if (GetMainRThread().HasError()) {
+        return arrow::Result<T>(arrow::Status::UnknownError("R code execution error"));
+      }
+
+      try {
+        return arrow::Result<T>(fun());
+      } catch (cpp11::unwind_exception& e) {
+        GetMainRThread().SetError(e.token);
+        return arrow::Result<T>(arrow::Status::UnknownError("R code execution error"));
+      }
+    }));
+  } else {
+    return arrow::Status::NotImplemented(
+        "Call to R from a non-R thread without calling RunWithCapturedR");
+  }
+}
+
+template <typename T>
+arrow::Result<T> SafeCallIntoR(std::function<T(void)> fun) {
+  arrow::Future<T> future = SafeCallIntoRAsync<T>(std::move(fun));
+  return future.result();
+}
+
+template <typename T>
+arrow::Result<T> RunWithCapturedR(std::function<arrow::Future<T>()> make_arrow_call) {
+  if (GetMainRThread().Executor() != nullptr) {
+    return arrow::Status::AlreadyExists("Attempt to use more than one R Executor()");
+  }
+
+  GetMainRThread().ResetError();
+
+  arrow::Result<T> result = arrow::internal::SerialExecutor::RunInSerialExecutor<T>(
+      [make_arrow_call](arrow::internal::Executor* executor) {
+        GetMainRThread().Executor() = executor;
+        return make_arrow_call();
+      });
+
+  GetMainRThread().Executor() = nullptr;
+  GetMainRThread().ClearError();
+
+  return result;
+}
+
+#endif
diff --git a/r/tests/testthat/test-safe-call-into-r.R b/r/tests/testthat/test-safe-call-into-r.R
new file mode 100644
index 0000000000..e9438de58b
--- /dev/null
+++ b/r/tests/testthat/test-safe-call-into-r.R
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Note that TestSafeCallIntoR is defined in safe-call-into-r-impl.cpp
+
+test_that("SafeCallIntoR works from the main R thread", {
+  skip_on_cran()
+
+  expect_identical(
+    TestSafeCallIntoR(function() "string one!", opt = "on_main_thread"),
+    "string one!"
+  )
+
+  expect_error(
+    TestSafeCallIntoR(function() stop("an error!"), opt = "on_main_thread"),
+    "an error!"
+  )
+})
+
+test_that("SafeCallIntoR works within RunWithCapturedR", {
+  skip_on_cran()
+
+  expect_identical(
+    TestSafeCallIntoR(function() "string one!", opt = "async_with_executor"),
+    "string one!"
+  )
+
+  expect_error(
+    TestSafeCallIntoR(function() stop("an error!"), opt = "async_with_executor"),
+    "an error!"
+  )
+})
+
+test_that("SafeCallIntoR errors from the non-R thread", {
+  skip_on_cran()
+
+  expect_error(
+    TestSafeCallIntoR(function() "string one!", opt = "async_without_executor"),
+    "Call to R from a non-R thread"
+  )
+
+  expect_error(
+    TestSafeCallIntoR(function() stop("an error!"), opt = "async_without_executor"),
+    "Call to R from a non-R thread"
+  )
+})