You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/03/25 21:17:03 UTC

[GitHub] [arrow] westonpace commented on a change in pull request #12558: ARROW-15841: [R] Implement SafeCallIntoR to safely call the R API from another thread

westonpace commented on a change in pull request #12558:
URL: https://github.com/apache/arrow/pull/12558#discussion_r835591009



##########
File path: r/src/safe-call-into-r-impl.cpp
##########
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "./arrow_types.h"
+#if defined(ARROW_R_WITH_ARROW)
+
+#include <functional>
+#include <thread>
+#include "./safe-call-into-r.h"
+
+static MainRThread main_r_thread;
+
+MainRThread* GetMainRThread() { return &main_r_thread; }

Review comment:
       Nit: stylistically, for read-only singletons, you can do:
   
   ```
   MainRThread* GetMainRThread() {
     static MainRThread main_r_thread;
     return &main_r_thread;
   }
   ```
   
   ...or even...
   
   ```
   const MainRThread& GetMainRThread() {
     static MainRThread main_r_thread;
     return main_r_thread;
   }
   ```
   
   This moves initialization from program load time to the first time `GetMainRThread` is called which, admittedly, doesn't matter 80% of the time :)

##########
File path: r/src/safe-call-into-r.h
##########
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef SAFE_CALL_INTO_R_INCLUDED
+#define SAFE_CALL_INTO_R_INCLUDED
+
+#include "./arrow_types.h"
+
+#include <arrow/util/future.h>
+#include <arrow/util/thread_pool.h>
+
+#include <functional>
+#include <thread>
+
+// The MainRThread class keeps track of the thread on which it is safe
+// to call the R API to facilitate its safe use (or erroring
+// if it is not safe). The MainRThread singleton can be accessed from
+// any thread using GetMainRThread(); the preferred way to call
+// the R API where it may not be safe to do so is to use
+// SafeCallIntoR<cpp_type>([&]() { some_expression_returning_sexp; }).
+class MainRThread {
+ public:
+  MainRThread() : initialized_(false), executor_(nullptr) {}
+
+  // Call this method from the R thread (e.g., on package load)
+  // to save an internal copy of the thread id.
+  void Initialize() {
+    thread_id_ = std::this_thread::get_id();
+    initialized_ = true;
+    SetError(R_NilValue);
+  }
+
+  bool IsInitialized() { return initialized_; }
+
+  // Check if the current thread is the main R thread
+  bool IsMainThread() { return initialized_ && std::this_thread::get_id() == thread_id_; }
+
+  // Class whose run() method will be called from the main R thread
+  // but whose results may be accessed (as class fields) from
+  // potentially another thread.
+  class Task {
+   public:
+    virtual ~Task() {}
+    virtual arrow::Status run() = 0;
+  };
+
+  // Run `task` if it is safe to do so or return an error otherwise.
+  arrow::Status RunTask(Task* task);
+
+  // The Executor that is running on the main R thread, if it exists
+  arrow::internal::Executor*& Executor() { return executor_; }
+
+  // Save an error token generated from a cpp11::unwind_exception
+  // so that it can be properly handled after some cleanup code
+  // has run (e.g., cancelling some futures or waiting for them
+  // to finish).
+  void SetError(cpp11::sexp token) { error_token_ = token; }
+
+  // Check if there is a saved error
+  bool HasError() { return error_token_ != R_NilValue; }
+
+  // Throw a cpp11::unwind_exception() with the saved token if it exists
+  void ClearError() {
+    if (HasError()) {
+      cpp11::unwind_exception e(error_token_);
+      SetError(R_NilValue);
+      throw e;
+    }
+  }
+
+ private:
+  bool initialized_;
+  std::thread::id thread_id_;
+  cpp11::sexp error_token_;
+  arrow::internal::Executor* executor_;
+};
+
+// Retrieve the MainRThread singleton
+MainRThread* GetMainRThread();
+
+// Call into R and return a C++ object. Note that you can't return
+// a SEXP (use cpp11::as_sexp<T> to convert it to a C++ type inside
+// `fun`).
+template <typename T>
+arrow::Result<T> SafeCallIntoR(std::function<T(void)> fun) {
+  class TypedTask : public MainRThread::Task {
+   public:
+    explicit TypedTask(std::function<T(void)> fun) : fun_(fun) {}
+
+    arrow::Status run() {
+      result = fun_();
+      return arrow::Status::OK();
+    }
+
+    T result;
+
+   private:
+    std::function<T(void)> fun_;
+  };
+
+  TypedTask task(fun);
+  ARROW_RETURN_NOT_OK(GetMainRThread()->RunTask(&task));
+  return task.result;
+}
+
+template <typename T>
+arrow::Result<T> RunWithCapturedR(std::function<arrow::Future<T>()> task) {

Review comment:
       It is just slightly confusing that this variable is named `task` but it isn't of type `Task`.  Maybe call it `arrow_task` or `make_arrow_call`.

##########
File path: r/src/safe-call-into-r.h
##########
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef SAFE_CALL_INTO_R_INCLUDED
+#define SAFE_CALL_INTO_R_INCLUDED
+
+#include "./arrow_types.h"
+
+#include <arrow/util/future.h>
+#include <arrow/util/thread_pool.h>
+
+#include <functional>
+#include <thread>
+
+// The MainRThread class keeps track of the thread on which it is safe
+// to call the R API to facilitate its safe use (or erroring
+// if it is not safe). The MainRThread singleton can be accessed from
+// any thread using GetMainRThread(); the preferred way to call
+// the R API where it may not be safe to do so is to use
+// SafeCallIntoR<cpp_type>([&]() { some_expression_returning_sexp; }).

Review comment:
       Hmm, it seems to say below that `some_expression_returning_sexp` can't return a `sexp`?

##########
File path: r/src/safe-call-into-r-impl.cpp
##########
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "./arrow_types.h"
+#if defined(ARROW_R_WITH_ARROW)
+
+#include <functional>
+#include <thread>
+#include "./safe-call-into-r.h"
+
+static MainRThread main_r_thread;
+
+MainRThread* GetMainRThread() { return &main_r_thread; }
+
+arrow::Status MainRThread::RunTask(Task* task) {
+  if (IsMainThread()) {
+    // If we're on the main thread, run the task immediately
+    try {
+      ARROW_RETURN_NOT_OK(task->run());
+      return arrow::Status::OK();
+    } catch (cpp11::unwind_exception& e) {
+      SetError(e.token);
+      return arrow::Status::UnknownError("R code execution error");
+    }
+  } else if (executor_ != nullptr) {
+    // If we are not on the main thread and have an Executor
+    // use it to run the task on the main R thread.
+    auto fut = executor_->Submit([task]() { return task->run(); });
+    ARROW_RETURN_NOT_OK(fut);
+    ARROW_RETURN_NOT_OK(fut.ValueUnsafe().result());
+    return arrow::Status::OK();
+  } else {
+    return arrow::Status::NotImplemented(
+        "Call to R from a non-R thread without an event loop");

Review comment:
       Nit: Maybe be more explicit and say `without calling RunWithCapturedR`.  You and I know what "event loop" means but future developers might not.

##########
File path: r/src/safe-call-into-r.h
##########
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef SAFE_CALL_INTO_R_INCLUDED
+#define SAFE_CALL_INTO_R_INCLUDED
+
+#include "./arrow_types.h"
+
+#include <arrow/util/future.h>
+#include <arrow/util/thread_pool.h>
+
+#include <functional>
+#include <thread>
+
+// The MainRThread class keeps track of the thread on which it is safe
+// to call the R API to facilitate its safe use (or erroring
+// if it is not safe). The MainRThread singleton can be accessed from
+// any thread using GetMainRThread(); the preferred way to call
+// the R API where it may not be safe to do so is to use
+// SafeCallIntoR<cpp_type>([&]() { some_expression_returning_sexp; }).
+class MainRThread {
+ public:
+  MainRThread() : initialized_(false), executor_(nullptr) {}
+
+  // Call this method from the R thread (e.g., on package load)
+  // to save an internal copy of the thread id.
+  void Initialize() {
+    thread_id_ = std::this_thread::get_id();
+    initialized_ = true;
+    SetError(R_NilValue);
+  }
+
+  bool IsInitialized() { return initialized_; }
+
+  // Check if the current thread is the main R thread
+  bool IsMainThread() { return initialized_ && std::this_thread::get_id() == thread_id_; }
+
+  // Class whose run() method will be called from the main R thread
+  // but whose results may be accessed (as class fields) from
+  // potentially another thread.
+  class Task {
+   public:
+    virtual ~Task() {}
+    virtual arrow::Status run() = 0;
+  };
+
+  // Run `task` if it is safe to do so or return an error otherwise.
+  arrow::Status RunTask(Task* task);
+
+  // The Executor that is running on the main R thread, if it exists
+  arrow::internal::Executor*& Executor() { return executor_; }
+
+  // Save an error token generated from a cpp11::unwind_exception
+  // so that it can be properly handled after some cleanup code
+  // has run (e.g., cancelling some futures or waiting for them
+  // to finish).
+  void SetError(cpp11::sexp token) { error_token_ = token; }
+
+  // Check if there is a saved error
+  bool HasError() { return error_token_ != R_NilValue; }
+
+  // Throw a cpp11::unwind_exception() with the saved token if it exists
+  void ClearError() {
+    if (HasError()) {
+      cpp11::unwind_exception e(error_token_);
+      SetError(R_NilValue);
+      throw e;
+    }
+  }
+
+ private:
+  bool initialized_;
+  std::thread::id thread_id_;
+  cpp11::sexp error_token_;
+  arrow::internal::Executor* executor_;
+};
+
+// Retrieve the MainRThread singleton
+MainRThread* GetMainRThread();
+
+// Call into R and return a C++ object. Note that you can't return
+// a SEXP (use cpp11::as_sexp<T> to convert it to a C++ type inside
+// `fun`).
+template <typename T>
+arrow::Result<T> SafeCallIntoR(std::function<T(void)> fun) {
+  class TypedTask : public MainRThread::Task {
+   public:
+    explicit TypedTask(std::function<T(void)> fun) : fun_(fun) {}
+
+    arrow::Status run() {
+      result = fun_();
+      return arrow::Status::OK();
+    }
+
+    T result;
+
+   private:
+    std::function<T(void)> fun_;
+  };
+
+  TypedTask task(fun);

Review comment:
       ```suggestion
     TypedTask task(std::move(fun));
   ```
   Just a nit, but some `std::function` objects can actually carry quite a bit of captured state.

##########
File path: r/src/safe-call-into-r.h
##########
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef SAFE_CALL_INTO_R_INCLUDED
+#define SAFE_CALL_INTO_R_INCLUDED
+
+#include "./arrow_types.h"
+
+#include <arrow/util/future.h>
+#include <arrow/util/thread_pool.h>
+
+#include <functional>
+#include <thread>
+
+// The MainRThread class keeps track of the thread on which it is safe
+// to call the R API to facilitate its safe use (or erroring
+// if it is not safe). The MainRThread singleton can be accessed from
+// any thread using GetMainRThread(); the preferred way to call
+// the R API where it may not be safe to do so is to use
+// SafeCallIntoR<cpp_type>([&]() { some_expression_returning_sexp; }).
+class MainRThread {
+ public:
+  MainRThread() : initialized_(false), executor_(nullptr) {}
+
+  // Call this method from the R thread (e.g., on package load)
+  // to save an internal copy of the thread id.
+  void Initialize() {
+    thread_id_ = std::this_thread::get_id();
+    initialized_ = true;
+    SetError(R_NilValue);
+  }
+
+  bool IsInitialized() { return initialized_; }
+
+  // Check if the current thread is the main R thread
+  bool IsMainThread() { return initialized_ && std::this_thread::get_id() == thread_id_; }
+
+  // Class whose run() method will be called from the main R thread
+  // but whose results may be accessed (as class fields) from
+  // potentially another thread.
+  class Task {
+   public:
+    virtual ~Task() {}
+    virtual arrow::Status run() = 0;
+  };
+
+  // Run `task` if it is safe to do so or return an error otherwise.
+  arrow::Status RunTask(Task* task);
+
+  // The Executor that is running on the main R thread, if it exists
+  arrow::internal::Executor*& Executor() { return executor_; }
+
+  // Save an error token generated from a cpp11::unwind_exception
+  // so that it can be properly handled after some cleanup code
+  // has run (e.g., cancelling some futures or waiting for them
+  // to finish).
+  void SetError(cpp11::sexp token) { error_token_ = token; }
+
+  // Check if there is a saved error
+  bool HasError() { return error_token_ != R_NilValue; }
+
+  // Throw a cpp11::unwind_exception() with the saved token if it exists
+  void ClearError() {
+    if (HasError()) {
+      cpp11::unwind_exception e(error_token_);
+      SetError(R_NilValue);
+      throw e;
+    }
+  }
+
+ private:
+  bool initialized_;
+  std::thread::id thread_id_;
+  cpp11::sexp error_token_;
+  arrow::internal::Executor* executor_;
+};
+
+// Retrieve the MainRThread singleton
+MainRThread* GetMainRThread();
+
+// Call into R and return a C++ object. Note that you can't return
+// a SEXP (use cpp11::as_sexp<T> to convert it to a C++ type inside
+// `fun`).
+template <typename T>
+arrow::Result<T> SafeCallIntoR(std::function<T(void)> fun) {

Review comment:
       I'm a little torn on whether this (and correspondingly `RunTask`) should return `arrow::Result<T>` or `arrow::Future<T>`.
   
   On the one hand, as it is, we have better parity with python.  However, in python, when we call "safe call into python" we know that the work is going to be done by the calling thread.  There is no need or ability to return a future.
   
   With R on the other hand we know the work is, in many cases, not going to happen on the calling thread.  So then we end up blocking the calling thread while we wait for the R work to complete.  Generally, for the "let's use the connections library for I/O" domain this isn't a problem, because the calling thread, if not already the R main thread, will be an I/O thread and we can safely let it block.
   
   That being said, I think it is slightly more flexible if this returns `Future<T>` and helps future-proof us as we add more callbacks like this.

##########
File path: r/src/safe-call-into-r-impl.cpp
##########
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "./arrow_types.h"
+#if defined(ARROW_R_WITH_ARROW)
+
+#include <functional>
+#include <thread>
+#include "./safe-call-into-r.h"
+
+static MainRThread main_r_thread;
+
+MainRThread* GetMainRThread() { return &main_r_thread; }
+
+arrow::Status MainRThread::RunTask(Task* task) {
+  if (IsMainThread()) {
+    // If we're on the main thread, run the task immediately
+    try {
+      ARROW_RETURN_NOT_OK(task->run());
+      return arrow::Status::OK();
+    } catch (cpp11::unwind_exception& e) {
+      SetError(e.token);
+      return arrow::Status::UnknownError("R code execution error");
+    }
+  } else if (executor_ != nullptr) {
+    // If we are not on the main thread and have an Executor
+    // use it to run the task on the main R thread.
+    auto fut = executor_->Submit([task]() { return task->run(); });
+    ARROW_RETURN_NOT_OK(fut);
+    ARROW_RETURN_NOT_OK(fut.ValueUnsafe().result());
+    return arrow::Status::OK();
+  } else {
+    return arrow::Status::NotImplemented(
+        "Call to R from a non-R thread without an event loop");
+  }
+}
+
+// [[arrow::export]]
+void InitializeMainRThread() { main_r_thread.Initialize(); }
+
+// [[arrow::export]]
+std::string TestSafeCallIntoR(cpp11::function r_fun_that_returns_a_string,
+                              std::string opt) {
+  if (opt == "async_with_executor") {
+    std::thread* thread_ptr;
+
+    auto result =
+        RunWithCapturedR<std::string>([&thread_ptr, r_fun_that_returns_a_string]() {
+          auto fut = arrow::Future<std::string>::Make();
+          thread_ptr = new std::thread([fut, r_fun_that_returns_a_string]() mutable {
+            auto result = SafeCallIntoR<std::string>([&] {
+              return cpp11::as_cpp<std::string>(r_fun_that_returns_a_string());
+            });
+
+            if (result.ok()) {
+              fut.MarkFinished(result.ValueUnsafe());
+            } else {
+              fut.MarkFinished(result.status());
+            }
+          });
+
+          return fut;
+        });
+
+    thread_ptr->join();
+    delete thread_ptr;
+
+    // Stop for any R execution errors that may have occurred
+    GetMainRThread()->ClearError();

Review comment:
       It seems the burden of calling `ClearError` shouldn't be on the user here.  Can we move this call into `RunTask`?

##########
File path: r/src/safe-call-into-r-impl.cpp
##########
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "./arrow_types.h"
+#if defined(ARROW_R_WITH_ARROW)
+
+#include <functional>
+#include <thread>
+#include "./safe-call-into-r.h"
+
+static MainRThread main_r_thread;
+
+MainRThread* GetMainRThread() { return &main_r_thread; }
+
+arrow::Status MainRThread::RunTask(Task* task) {
+  if (IsMainThread()) {
+    // If we're on the main thread, run the task immediately
+    try {
+      ARROW_RETURN_NOT_OK(task->run());
+      return arrow::Status::OK();

Review comment:
       ```suggestion
         return task->run();
   ```

##########
File path: r/tests/testthat/test-safe-call-into-r.R
##########
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+test_that("SafeCallIntoR works from the main R thread", {
+  expect_identical(
+    TestSafeCallIntoR(function() "string one!", opt = "on_main_thread"),
+    "string one!"
+  )
+
+  expect_error(
+    TestSafeCallIntoR(function() stop("an error!"), opt = "on_main_thread"),
+    "an error!"
+  )
+})
+
+test_that("SafeCallIntoR works within RunWithCapturedR", {
+  expect_identical(
+    TestSafeCallIntoR(function() "string one!", opt = "async_with_executor"),
+    "string one!"
+  )
+
+  # This runs with the expected error, but causes subsequent segfaults, probably related
+  # to the error_token_ (maybe having to do with the copy-constructor?)
+  # expect_error(
+  #   TestSafeCallIntoR(function() stop("an error!"), opt = "async_with_executor"),
+  #   "an error!"
+  # )

Review comment:
       Does this still need investigation?

##########
File path: r/src/safe-call-into-r-impl.cpp
##########
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "./arrow_types.h"
+#if defined(ARROW_R_WITH_ARROW)
+
+#include <functional>
+#include <thread>
+#include "./safe-call-into-r.h"
+
+static MainRThread main_r_thread;
+
+MainRThread* GetMainRThread() { return &main_r_thread; }
+
+arrow::Status MainRThread::RunTask(Task* task) {
+  if (IsMainThread()) {
+    // If we're on the main thread, run the task immediately
+    try {
+      ARROW_RETURN_NOT_OK(task->run());
+      return arrow::Status::OK();
+    } catch (cpp11::unwind_exception& e) {
+      SetError(e.token);
+      return arrow::Status::UnknownError("R code execution error");
+    }
+  } else if (executor_ != nullptr) {
+    // If we are not on the main thread and have an Executor
+    // use it to run the task on the main R thread.
+    auto fut = executor_->Submit([task]() { return task->run(); });
+    ARROW_RETURN_NOT_OK(fut);
+    ARROW_RETURN_NOT_OK(fut.ValueUnsafe().result());
+    return arrow::Status::OK();

Review comment:
       ```suggestion
       auto fut = DeferNotOk(executor_->Submit([task]() { return task->run(); }));
       return fut.status();
   ```

##########
File path: r/src/safe-call-into-r-impl.cpp
##########
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "./arrow_types.h"
+#if defined(ARROW_R_WITH_ARROW)
+
+#include <functional>
+#include <thread>
+#include "./safe-call-into-r.h"
+
+static MainRThread main_r_thread;
+
+MainRThread* GetMainRThread() { return &main_r_thread; }
+
+arrow::Status MainRThread::RunTask(Task* task) {
+  if (IsMainThread()) {
+    // If we're on the main thread, run the task immediately
+    try {
+      ARROW_RETURN_NOT_OK(task->run());
+      return arrow::Status::OK();
+    } catch (cpp11::unwind_exception& e) {
+      SetError(e.token);
+      return arrow::Status::UnknownError("R code execution error");
+    }
+  } else if (executor_ != nullptr) {
+    // If we are not on the main thread and have an Executor
+    // use it to run the task on the main R thread.
+    auto fut = executor_->Submit([task]() { return task->run(); });
+    ARROW_RETURN_NOT_OK(fut);
+    ARROW_RETURN_NOT_OK(fut.ValueUnsafe().result());
+    return arrow::Status::OK();
+  } else {
+    return arrow::Status::NotImplemented(
+        "Call to R from a non-R thread without an event loop");
+  }
+}
+
+// [[arrow::export]]
+void InitializeMainRThread() { main_r_thread.Initialize(); }
+
+// [[arrow::export]]
+std::string TestSafeCallIntoR(cpp11::function r_fun_that_returns_a_string,
+                              std::string opt) {
+  if (opt == "async_with_executor") {
+    std::thread* thread_ptr;
+
+    auto result =
+        RunWithCapturedR<std::string>([&thread_ptr, r_fun_that_returns_a_string]() {
+          auto fut = arrow::Future<std::string>::Make();
+          thread_ptr = new std::thread([fut, r_fun_that_returns_a_string]() mutable {
+            auto result = SafeCallIntoR<std::string>([&] {
+              return cpp11::as_cpp<std::string>(r_fun_that_returns_a_string());
+            });
+
+            if (result.ok()) {
+              fut.MarkFinished(result.ValueUnsafe());
+            } else {
+              fut.MarkFinished(result.status());
+            }

Review comment:
       ```suggestion
   fut.MarkFinished(result);
   ```

##########
File path: r/src/safe-call-into-r.h
##########
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef SAFE_CALL_INTO_R_INCLUDED
+#define SAFE_CALL_INTO_R_INCLUDED
+
+#include "./arrow_types.h"
+
+#include <arrow/util/future.h>
+#include <arrow/util/thread_pool.h>
+
+#include <functional>
+#include <thread>
+
+// The MainRThread class keeps track of the thread on which it is safe
+// to call the R API to facilitate its safe use (or erroring
+// if it is not safe). The MainRThread singleton can be accessed from
+// any thread using GetMainRThread(); the preferred way to call
+// the R API where it may not be safe to do so is to use
+// SafeCallIntoR<cpp_type>([&]() { some_expression_returning_sexp; }).
+class MainRThread {
+ public:
+  MainRThread() : initialized_(false), executor_(nullptr) {}
+
+  // Call this method from the R thread (e.g., on package load)
+  // to save an internal copy of the thread id.
+  void Initialize() {
+    thread_id_ = std::this_thread::get_id();
+    initialized_ = true;
+    SetError(R_NilValue);
+  }
+
+  bool IsInitialized() { return initialized_; }
+
+  // Check if the current thread is the main R thread
+  bool IsMainThread() { return initialized_ && std::this_thread::get_id() == thread_id_; }
+
+  // Class whose run() method will be called from the main R thread
+  // but whose results may be accessed (as class fields) from
+  // potentially another thread.
+  class Task {
+   public:
+    virtual ~Task() {}
+    virtual arrow::Status run() = 0;
+  };
+
+  // Run `task` if it is safe to do so or return an error otherwise.
+  arrow::Status RunTask(Task* task);
+
+  // The Executor that is running on the main R thread, if it exists
+  arrow::internal::Executor*& Executor() { return executor_; }
+
+  // Save an error token generated from a cpp11::unwind_exception
+  // so that it can be properly handled after some cleanup code
+  // has run (e.g., cancelling some futures or waiting for them
+  // to finish).
+  void SetError(cpp11::sexp token) { error_token_ = token; }
+
+  // Check if there is a saved error
+  bool HasError() { return error_token_ != R_NilValue; }
+
+  // Throw a cpp11::unwind_exception() with the saved token if it exists
+  void ClearError() {
+    if (HasError()) {
+      cpp11::unwind_exception e(error_token_);
+      SetError(R_NilValue);
+      throw e;
+    }
+  }
+
+ private:
+  bool initialized_;
+  std::thread::id thread_id_;
+  cpp11::sexp error_token_;
+  arrow::internal::Executor* executor_;
+};
+
+// Retrieve the MainRThread singleton
+MainRThread* GetMainRThread();
+
+// Call into R and return a C++ object. Note that you can't return
+// a SEXP (use cpp11::as_sexp<T> to convert it to a C++ type inside

Review comment:
       I don't really know how C++ <-> sexp works but `as_sexp` intuitively seems like you would be converting from C++ to sexp and the comment says this is to convert from sexp to C++.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org