You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/04 23:55:11 UTC

[GitHub] [arrow] westonpace commented on a diff in pull request #12558: ARROW-15841: [R] Implement SafeCallIntoR to safely call the R API from another thread

westonpace commented on code in PR #12558:
URL: https://github.com/apache/arrow/pull/12558#discussion_r842218873


##########
r/R/arrow-package.R:
##########
@@ -31,6 +31,11 @@
 
 #' @importFrom vctrs s3_register vec_size vec_cast vec_unique
 .onLoad <- function(...) {
+  if (arrow_available()) {
+    # Make sure C++ knows on which thread it is safe to call the R API
+    InitializeMainRThread()

Review Comment:
   Do we know for a fact that the R thread never changes?  For example, in JS, there is always "one thread" but the actual thread id can change from iteration to iteration of the event loop.



##########
r/src/safe-call-into-r-impl.cpp:
##########
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "./arrow_types.h"
+#if defined(ARROW_R_WITH_ARROW)
+
+#include <functional>
+#include <thread>
+#include "./safe-call-into-r.h"
+
+MainRThread& GetMainRThread() {
+  static MainRThread main_r_thread;
+  return main_r_thread;
+}
+
+// [[arrow::export]]
+void InitializeMainRThread() { GetMainRThread().Initialize(); }
+
+// [[arrow::export]]
+std::string TestSafeCallIntoR(cpp11::function r_fun_that_returns_a_string,
+                              std::string opt) {
+  if (opt == "async_with_executor") {
+    std::thread* thread_ptr;
+
+    auto result =
+        RunWithCapturedR<std::string>([&thread_ptr, r_fun_that_returns_a_string]() {
+          auto fut = arrow::Future<std::string>::Make();
+          thread_ptr = new std::thread([fut, r_fun_that_returns_a_string]() mutable {
+            auto result = SafeCallIntoR<std::string>([&] {
+              return cpp11::as_cpp<std::string>(r_fun_that_returns_a_string());
+            });
+
+            fut.MarkFinished(result);
+          });
+
+          return fut;
+        });
+
+    thread_ptr->join();
+    delete thread_ptr;

Review Comment:
   So this is probably fine but you could wrap `thread_ptr` in a `unique_ptr`.  For example:
   
   `thread_ptr = std::unique_ptr<std::thread>(new std::thread(...));`
   
   It gets rid of the delete call and guards you against very unlikely things like `->join()` throwing an exception and the memory never getting cleaned up (not that such a thing would really matter in test code).



##########
r/src/safe-call-into-r.h:
##########
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef SAFE_CALL_INTO_R_INCLUDED
+#define SAFE_CALL_INTO_R_INCLUDED
+
+#include "./arrow_types.h"
+
+#include <arrow/util/future.h>
+#include <arrow/util/thread_pool.h>
+
+#include <functional>
+#include <thread>
+
+// The MainRThread class keeps track of the thread on which it is safe
+// to call the R API to facilitate its safe use (or erroring
+// if it is not safe). The MainRThread singleton can be accessed from
+// any thread using GetMainRThread(); the preferred way to call
+// the R API where it may not be safe to do so is to use
+// SafeCallIntoR<cpp_type>([&]() { ... }).
+class MainRThread {
+ public:
+  MainRThread() : initialized_(false), executor_(nullptr) {}
+
+  // Call this method from the R thread (e.g., on package load)
+  // to save an internal copy of the thread id.
+  void Initialize() {
+    thread_id_ = std::this_thread::get_id();
+    initialized_ = true;
+    SetError(R_NilValue);
+  }
+
+  bool IsInitialized() { return initialized_; }
+
+  // Check if the current thread is the main R thread
+  bool IsMainThread() { return initialized_ && std::this_thread::get_id() == thread_id_; }
+
+  // The Executor that is running on the main R thread, if it exists
+  arrow::internal::Executor*& Executor() { return executor_; }
+
+  // Save an error token generated from a cpp11::unwind_exception
+  // so that it can be properly handled after some cleanup code
+  // has run (e.g., cancelling some futures or waiting for them
+  // to finish).
+  void SetError(cpp11::sexp token) { error_token_ = token; }
+
+  void ResetError() { error_token_ = R_NilValue; }
+
+  // Check if there is a saved error
+  bool HasError() { return error_token_ != R_NilValue; }
+
+  // Throw a cpp11::unwind_exception() with the saved token if it exists
+  void ClearError() {
+    if (HasError()) {
+      cpp11::unwind_exception e(error_token_);
+      ResetError();
+      throw e;
+    }
+  }
+
+ private:
+  bool initialized_;
+  std::thread::id thread_id_;
+  cpp11::sexp error_token_;
+  arrow::internal::Executor* executor_;
+};
+
+// Retrieve the MainRThread singleton
+MainRThread& GetMainRThread();
+
+// Call into R and return a C++ object. Note that you can't return
+// a SEXP (use cpp11::as_cpp<T> to convert it to a C++ type inside
+// `fun`).
+template <typename T>
+arrow::Future<T> SafeCallIntoRAsync(std::function<T(void)> fun) {
+  MainRThread& main_r_thread = GetMainRThread();
+  if (main_r_thread.IsMainThread()) {
+    // If we're on the main thread, run the task immediately and let
+    // the cpp11::unwind_exception be thrown since it will be caught
+    // at the top level.
+    return arrow::Future<T>::MakeFinished(fun());
+    // try {
+
+    // } catch (cpp11::unwind_exception& e) {
+    //   auto error_status = arrow::Status::ExecutionError("R code execution error");
+    //   auto error_detail = std::make_shared<MainRThread::UnwindStatusDetail>(e.token);
+    //   return arrow::Future<T>::MakeFinished(error_status.WithDetail(error_detail));
+    // }

Review Comment:
   ```suggestion
   ```



##########
r/src/safe-call-into-r.h:
##########
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef SAFE_CALL_INTO_R_INCLUDED
+#define SAFE_CALL_INTO_R_INCLUDED
+
+#include "./arrow_types.h"
+
+#include <arrow/util/future.h>
+#include <arrow/util/thread_pool.h>
+
+#include <functional>
+#include <thread>
+
+// The MainRThread class keeps track of the thread on which it is safe
+// to call the R API to facilitate its safe use (or erroring
+// if it is not safe). The MainRThread singleton can be accessed from
+// any thread using GetMainRThread(); the preferred way to call
+// the R API where it may not be safe to do so is to use
+// SafeCallIntoR<cpp_type>([&]() { ... }).
+class MainRThread {
+ public:
+  MainRThread() : initialized_(false), executor_(nullptr) {}
+
+  // Call this method from the R thread (e.g., on package load)
+  // to save an internal copy of the thread id.
+  void Initialize() {
+    thread_id_ = std::this_thread::get_id();
+    initialized_ = true;
+    SetError(R_NilValue);
+  }
+
+  bool IsInitialized() { return initialized_; }
+
+  // Check if the current thread is the main R thread
+  bool IsMainThread() { return initialized_ && std::this_thread::get_id() == thread_id_; }
+
+  // The Executor that is running on the main R thread, if it exists
+  arrow::internal::Executor*& Executor() { return executor_; }
+
+  // Save an error token generated from a cpp11::unwind_exception
+  // so that it can be properly handled after some cleanup code
+  // has run (e.g., cancelling some futures or waiting for them
+  // to finish).
+  void SetError(cpp11::sexp token) { error_token_ = token; }
+
+  void ResetError() { error_token_ = R_NilValue; }
+
+  // Check if there is a saved error
+  bool HasError() { return error_token_ != R_NilValue; }
+
+  // Throw a cpp11::unwind_exception() with the saved token if it exists
+  void ClearError() {
+    if (HasError()) {
+      cpp11::unwind_exception e(error_token_);
+      ResetError();
+      throw e;
+    }
+  }
+
+ private:
+  bool initialized_;
+  std::thread::id thread_id_;
+  cpp11::sexp error_token_;
+  arrow::internal::Executor* executor_;
+};
+
+// Retrieve the MainRThread singleton
+MainRThread& GetMainRThread();
+
+// Call into R and return a C++ object. Note that you can't return
+// a SEXP (use cpp11::as_cpp<T> to convert it to a C++ type inside
+// `fun`).
+template <typename T>
+arrow::Future<T> SafeCallIntoRAsync(std::function<T(void)> fun) {
+  MainRThread& main_r_thread = GetMainRThread();
+  if (main_r_thread.IsMainThread()) {
+    // If we're on the main thread, run the task immediately and let
+    // the cpp11::unwind_exception be thrown since it will be caught
+    // at the top level.
+    return arrow::Future<T>::MakeFinished(fun());
+    // try {
+
+    // } catch (cpp11::unwind_exception& e) {
+    //   auto error_status = arrow::Status::ExecutionError("R code execution error");
+    //   auto error_detail = std::make_shared<MainRThread::UnwindStatusDetail>(e.token);
+    //   return arrow::Future<T>::MakeFinished(error_status.WithDetail(error_detail));
+    // }
+  } else if (main_r_thread.Executor() != nullptr) {
+    // If we are not on the main thread and have an Executor,
+    // use it to run the task on the main R thread. We can't throw
+    // a cpp11::unwind_exception here, so we need to propagate it back
+    // to RunWithCapturedR through the MainRThread singleton.
+    return DeferNotOk(main_r_thread.Executor()->Submit([fun]() {
+      if (GetMainRThread().HasError()) {
+        return arrow::Result<T>(arrow::Status::UnknownError("R code execution error"));
+      }
+
+      try {
+        return arrow::Result<T>(fun());
+      } catch (cpp11::unwind_exception& e) {
+        GetMainRThread().SetError(e.token);
+        return arrow::Result<T>(arrow::Status::UnknownError("R code execution error"));
+      }
+    }));
+  } else {
+    return arrow::Future<T>::MakeFinished(arrow::Status::NotImplemented(
+        "Call to R from a non-R thread without calling RunWithCapturedR"));
+  }
+}
+
+template <typename T>
+arrow::Result<T> SafeCallIntoR(std::function<T(void)> fun) {
+  arrow::Future<T> result = SafeCallIntoRAsync<T>(std::move(fun));
+  return result.result();
+}
+
+template <typename T>
+arrow::Result<T> RunWithCapturedR(std::function<arrow::Future<T>()> make_arrow_call) {
+  if (GetMainRThread().Executor() != nullptr) {
+    return arrow::Status::AlreadyExists("Attempt to use more than one R Executor()");
+  }
+
+  GetMainRThread().ResetError();
+
+  arrow::Result<T> result = arrow::internal::SerialExecutor::RunInSerialExecutor<T>(
+      [make_arrow_call](arrow::internal::Executor* executor) {
+        GetMainRThread().Executor() = executor;
+        arrow::Future<T> result = make_arrow_call();
+        return result;

Review Comment:
   ```suggestion
           return make_arrow_call();
   ```



##########
r/src/safe-call-into-r.h:
##########
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef SAFE_CALL_INTO_R_INCLUDED
+#define SAFE_CALL_INTO_R_INCLUDED
+
+#include "./arrow_types.h"
+
+#include <arrow/util/future.h>
+#include <arrow/util/thread_pool.h>
+
+#include <functional>
+#include <thread>
+
+// The MainRThread class keeps track of the thread on which it is safe
+// to call the R API to facilitate its safe use (or erroring
+// if it is not safe). The MainRThread singleton can be accessed from
+// any thread using GetMainRThread(); the preferred way to call
+// the R API where it may not be safe to do so is to use
+// SafeCallIntoR<cpp_type>([&]() { ... }).
+class MainRThread {
+ public:
+  MainRThread() : initialized_(false), executor_(nullptr) {}
+
+  // Call this method from the R thread (e.g., on package load)
+  // to save an internal copy of the thread id.
+  void Initialize() {
+    thread_id_ = std::this_thread::get_id();
+    initialized_ = true;
+    SetError(R_NilValue);
+  }
+
+  bool IsInitialized() { return initialized_; }
+
+  // Check if the current thread is the main R thread
+  bool IsMainThread() { return initialized_ && std::this_thread::get_id() == thread_id_; }
+
+  // The Executor that is running on the main R thread, if it exists
+  arrow::internal::Executor*& Executor() { return executor_; }
+
+  // Save an error token generated from a cpp11::unwind_exception
+  // so that it can be properly handled after some cleanup code
+  // has run (e.g., cancelling some futures or waiting for them
+  // to finish).
+  void SetError(cpp11::sexp token) { error_token_ = token; }
+
+  void ResetError() { error_token_ = R_NilValue; }
+
+  // Check if there is a saved error
+  bool HasError() { return error_token_ != R_NilValue; }
+
+  // Throw a cpp11::unwind_exception() with the saved token if it exists
+  void ClearError() {
+    if (HasError()) {
+      cpp11::unwind_exception e(error_token_);
+      ResetError();
+      throw e;
+    }
+  }
+
+ private:
+  bool initialized_;
+  std::thread::id thread_id_;
+  cpp11::sexp error_token_;
+  arrow::internal::Executor* executor_;
+};
+
+// Retrieve the MainRThread singleton
+MainRThread& GetMainRThread();
+
+// Call into R and return a C++ object. Note that you can't return
+// a SEXP (use cpp11::as_cpp<T> to convert it to a C++ type inside
+// `fun`).
+template <typename T>
+arrow::Future<T> SafeCallIntoRAsync(std::function<T(void)> fun) {
+  MainRThread& main_r_thread = GetMainRThread();
+  if (main_r_thread.IsMainThread()) {
+    // If we're on the main thread, run the task immediately and let
+    // the cpp11::unwind_exception be thrown since it will be caught
+    // at the top level.
+    return arrow::Future<T>::MakeFinished(fun());
+    // try {
+
+    // } catch (cpp11::unwind_exception& e) {
+    //   auto error_status = arrow::Status::ExecutionError("R code execution error");
+    //   auto error_detail = std::make_shared<MainRThread::UnwindStatusDetail>(e.token);
+    //   return arrow::Future<T>::MakeFinished(error_status.WithDetail(error_detail));
+    // }
+  } else if (main_r_thread.Executor() != nullptr) {
+    // If we are not on the main thread and have an Executor,
+    // use it to run the task on the main R thread. We can't throw
+    // a cpp11::unwind_exception here, so we need to propagate it back
+    // to RunWithCapturedR through the MainRThread singleton.
+    return DeferNotOk(main_r_thread.Executor()->Submit([fun]() {
+      if (GetMainRThread().HasError()) {
+        return arrow::Result<T>(arrow::Status::UnknownError("R code execution error"));
+      }
+
+      try {
+        return arrow::Result<T>(fun());
+      } catch (cpp11::unwind_exception& e) {
+        GetMainRThread().SetError(e.token);
+        return arrow::Result<T>(arrow::Status::UnknownError("R code execution error"));
+      }
+    }));
+  } else {
+    return arrow::Future<T>::MakeFinished(arrow::Status::NotImplemented(
+        "Call to R from a non-R thread without calling RunWithCapturedR"));
+  }
+}
+
+template <typename T>
+arrow::Result<T> SafeCallIntoR(std::function<T(void)> fun) {
+  arrow::Future<T> result = SafeCallIntoRAsync<T>(std::move(fun));
+  return result.result();
+}
+
+template <typename T>
+arrow::Result<T> RunWithCapturedR(std::function<arrow::Future<T>()> make_arrow_call) {
+  if (GetMainRThread().Executor() != nullptr) {
+    return arrow::Status::AlreadyExists("Attempt to use more than one R Executor()");
+  }
+
+  GetMainRThread().ResetError();
+
+  arrow::Result<T> result = arrow::internal::SerialExecutor::RunInSerialExecutor<T>(
+      [make_arrow_call](arrow::internal::Executor* executor) {
+        GetMainRThread().Executor() = executor;
+        arrow::Future<T> result = make_arrow_call();
+        return result;
+      });
+
+  GetMainRThread().Executor() = nullptr;

Review Comment:
   Your reading is correct.  By the time `RunInSerialExecutor` returns that pointer is no longer valid.



##########
r/src/safe-call-into-r.h:
##########
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef SAFE_CALL_INTO_R_INCLUDED
+#define SAFE_CALL_INTO_R_INCLUDED
+
+#include "./arrow_types.h"
+
+#include <arrow/util/future.h>
+#include <arrow/util/thread_pool.h>
+
+#include <functional>
+#include <thread>
+
+// The MainRThread class keeps track of the thread on which it is safe
+// to call the R API to facilitate its safe use (or erroring
+// if it is not safe). The MainRThread singleton can be accessed from
+// any thread using GetMainRThread(); the preferred way to call
+// the R API where it may not be safe to do so is to use
+// SafeCallIntoR<cpp_type>([&]() { ... }).
+class MainRThread {
+ public:
+  MainRThread() : initialized_(false), executor_(nullptr) {}
+
+  // Call this method from the R thread (e.g., on package load)
+  // to save an internal copy of the thread id.
+  void Initialize() {
+    thread_id_ = std::this_thread::get_id();
+    initialized_ = true;
+    SetError(R_NilValue);
+  }
+
+  bool IsInitialized() { return initialized_; }
+
+  // Check if the current thread is the main R thread
+  bool IsMainThread() { return initialized_ && std::this_thread::get_id() == thread_id_; }
+
+  // The Executor that is running on the main R thread, if it exists
+  arrow::internal::Executor*& Executor() { return executor_; }
+
+  // Save an error token generated from a cpp11::unwind_exception
+  // so that it can be properly handled after some cleanup code
+  // has run (e.g., cancelling some futures or waiting for them
+  // to finish).
+  void SetError(cpp11::sexp token) { error_token_ = token; }
+
+  void ResetError() { error_token_ = R_NilValue; }
+
+  // Check if there is a saved error
+  bool HasError() { return error_token_ != R_NilValue; }
+
+  // Throw a cpp11::unwind_exception() with the saved token if it exists
+  void ClearError() {
+    if (HasError()) {
+      cpp11::unwind_exception e(error_token_);
+      ResetError();
+      throw e;
+    }
+  }
+
+ private:
+  bool initialized_;
+  std::thread::id thread_id_;
+  cpp11::sexp error_token_;
+  arrow::internal::Executor* executor_;
+};
+
+// Retrieve the MainRThread singleton
+MainRThread& GetMainRThread();
+
+// Call into R and return a C++ object. Note that you can't return
+// a SEXP (use cpp11::as_cpp<T> to convert it to a C++ type inside
+// `fun`).
+template <typename T>
+arrow::Future<T> SafeCallIntoRAsync(std::function<T(void)> fun) {
+  MainRThread& main_r_thread = GetMainRThread();
+  if (main_r_thread.IsMainThread()) {
+    // If we're on the main thread, run the task immediately and let
+    // the cpp11::unwind_exception be thrown since it will be caught
+    // at the top level.
+    return arrow::Future<T>::MakeFinished(fun());

Review Comment:
   This is purely pedantic but I'm pretty sure you can just do...
   
   ```
   return fun();
   return arrow::Status::NotImplemented(...);
   ```
   
   ...and it will automatically promote to a finished `arrow::Future` using implicit conversion.  However, I'm totally fine with this more explicit style as well.  It helps maintain a sense of equality between the three cases.



##########
r/src/safe-call-into-r.h:
##########
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef SAFE_CALL_INTO_R_INCLUDED
+#define SAFE_CALL_INTO_R_INCLUDED
+
+#include "./arrow_types.h"
+
+#include <arrow/util/future.h>
+#include <arrow/util/thread_pool.h>
+
+#include <functional>
+#include <thread>
+
+// The MainRThread class keeps track of the thread on which it is safe
+// to call the R API to facilitate its safe use (or erroring
+// if it is not safe). The MainRThread singleton can be accessed from
+// any thread using GetMainRThread(); the preferred way to call
+// the R API where it may not be safe to do so is to use
+// SafeCallIntoR<cpp_type>([&]() { ... }).
+class MainRThread {
+ public:
+  MainRThread() : initialized_(false), executor_(nullptr) {}
+
+  // Call this method from the R thread (e.g., on package load)
+  // to save an internal copy of the thread id.
+  void Initialize() {
+    thread_id_ = std::this_thread::get_id();
+    initialized_ = true;
+    SetError(R_NilValue);
+  }
+
+  bool IsInitialized() { return initialized_; }
+
+  // Check if the current thread is the main R thread
+  bool IsMainThread() { return initialized_ && std::this_thread::get_id() == thread_id_; }
+
+  // The Executor that is running on the main R thread, if it exists
+  arrow::internal::Executor*& Executor() { return executor_; }
+
+  // Save an error token generated from a cpp11::unwind_exception
+  // so that it can be properly handled after some cleanup code
+  // has run (e.g., cancelling some futures or waiting for them
+  // to finish).
+  void SetError(cpp11::sexp token) { error_token_ = token; }
+
+  void ResetError() { error_token_ = R_NilValue; }
+
+  // Check if there is a saved error
+  bool HasError() { return error_token_ != R_NilValue; }
+
+  // Throw a cpp11::unwind_exception() with the saved token if it exists
+  void ClearError() {
+    if (HasError()) {
+      cpp11::unwind_exception e(error_token_);
+      ResetError();
+      throw e;
+    }
+  }
+
+ private:
+  bool initialized_;
+  std::thread::id thread_id_;
+  cpp11::sexp error_token_;
+  arrow::internal::Executor* executor_;
+};
+
+// Retrieve the MainRThread singleton
+MainRThread& GetMainRThread();
+
+// Call into R and return a C++ object. Note that you can't return
+// a SEXP (use cpp11::as_cpp<T> to convert it to a C++ type inside
+// `fun`).
+template <typename T>
+arrow::Future<T> SafeCallIntoRAsync(std::function<T(void)> fun) {
+  MainRThread& main_r_thread = GetMainRThread();
+  if (main_r_thread.IsMainThread()) {
+    // If we're on the main thread, run the task immediately and let
+    // the cpp11::unwind_exception be thrown since it will be caught
+    // at the top level.
+    return arrow::Future<T>::MakeFinished(fun());
+    // try {
+
+    // } catch (cpp11::unwind_exception& e) {
+    //   auto error_status = arrow::Status::ExecutionError("R code execution error");
+    //   auto error_detail = std::make_shared<MainRThread::UnwindStatusDetail>(e.token);
+    //   return arrow::Future<T>::MakeFinished(error_status.WithDetail(error_detail));
+    // }
+  } else if (main_r_thread.Executor() != nullptr) {
+    // If we are not on the main thread and have an Executor,
+    // use it to run the task on the main R thread. We can't throw
+    // a cpp11::unwind_exception here, so we need to propagate it back
+    // to RunWithCapturedR through the MainRThread singleton.
+    return DeferNotOk(main_r_thread.Executor()->Submit([fun]() {
+      if (GetMainRThread().HasError()) {
+        return arrow::Result<T>(arrow::Status::UnknownError("R code execution error"));
+      }
+
+      try {
+        return arrow::Result<T>(fun());
+      } catch (cpp11::unwind_exception& e) {
+        GetMainRThread().SetError(e.token);
+        return arrow::Result<T>(arrow::Status::UnknownError("R code execution error"));
+      }
+    }));
+  } else {
+    return arrow::Future<T>::MakeFinished(arrow::Status::NotImplemented(
+        "Call to R from a non-R thread without calling RunWithCapturedR"));
+  }
+}
+
+template <typename T>
+arrow::Result<T> SafeCallIntoR(std::function<T(void)> fun) {
+  arrow::Future<T> result = SafeCallIntoRAsync<T>(std::move(fun));
+  return result.result();

Review Comment:
   ```suggestion
     arrow::Future<T> future = SafeCallIntoRAsync<T>(std::move(fun));
     return future.result();
   ```



##########
r/src/safe-call-into-r-impl.cpp:
##########
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "./arrow_types.h"
+#if defined(ARROW_R_WITH_ARROW)
+
+#include <functional>
+#include <thread>
+#include "./safe-call-into-r.h"
+
+MainRThread& GetMainRThread() {
+  static MainRThread main_r_thread;
+  return main_r_thread;
+}
+
+// [[arrow::export]]
+void InitializeMainRThread() { GetMainRThread().Initialize(); }
+
+// [[arrow::export]]
+std::string TestSafeCallIntoR(cpp11::function r_fun_that_returns_a_string,
+                              std::string opt) {
+  if (opt == "async_with_executor") {
+    std::thread* thread_ptr;
+
+    auto result =
+        RunWithCapturedR<std::string>([&thread_ptr, r_fun_that_returns_a_string]() {
+          auto fut = arrow::Future<std::string>::Make();
+          thread_ptr = new std::thread([fut, r_fun_that_returns_a_string]() mutable {
+            auto result = SafeCallIntoR<std::string>([&] {
+              return cpp11::as_cpp<std::string>(r_fun_that_returns_a_string());
+            });
+
+            fut.MarkFinished(result);
+          });
+
+          return fut;
+        });
+
+    thread_ptr->join();
+    delete thread_ptr;
+
+    return arrow::ValueOrStop(result);
+  } else if (opt == "async_without_executor") {
+    std::thread* thread_ptr;
+
+    auto fut = arrow::Future<std::string>::Make();
+    thread_ptr = new std::thread([fut, r_fun_that_returns_a_string]() mutable {
+      auto result = SafeCallIntoR<std::string>(
+          [&] { return cpp11::as_cpp<std::string>(r_fun_that_returns_a_string()); });
+
+      if (result.ok()) {
+        fut.MarkFinished(result.ValueUnsafe());
+      } else {
+        fut.MarkFinished(result.status());
+      }
+    });
+
+    thread_ptr->join();
+    delete thread_ptr;
+
+    // We should be able to get this far, but fut will contain an error
+    // because it tried to evaluate R code from another thread
+    return arrow::ValueOrStop(fut.result());
+
+  } else if (opt == "on_main_thread") {
+    auto result = SafeCallIntoR<std::string>(
+        [&]() { return cpp11::as_cpp<std::string>(r_fun_that_returns_a_string()); });
+    arrow::StopIfNotOk(result.status());
+    return result.ValueUnsafe();
+  } else {
+    return "";

Review Comment:
   Don't you want to assert or stop or something if you get into this unreachable spot?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org