You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by th...@apache.org on 2023/06/29 09:54:09 UTC

[arrow] branch main updated: GH-35649: [R] Always call `RecordBatchReader::ReadNext()` from DuckDB from the main R thread (#36307)

This is an automated email from the ASF dual-hosted git repository.

thisisnic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new a75299d8cb GH-35649: [R] Always call `RecordBatchReader::ReadNext()` from DuckDB from the main R thread (#36307)
a75299d8cb is described below

commit a75299d8cb221ccd9ef954c2cd44cd5b19b0779b
Author: Dewey Dunnington <de...@voltrondata.com>
AuthorDate: Thu Jun 29 06:54:00 2023 -0300

    GH-35649: [R] Always call `RecordBatchReader::ReadNext()` from DuckDB from the main R thread (#36307)
    
    ### Rationale for this change
    
    When passing a DuckDB result to Arrow via `to_arrow()` whose input was an Arrow dataset, calls to R code from other threads can occur in some DuckDB operations. This caused a crash or hang on Linux when attempting to combine `pivot_longer()` and `write_dataset()`.
    
    ### What changes are included in this PR?
    
    - Added a wrapper class around the `RecordBatchReader` that routes calls to `ReadNext()` through `SafeCallIntoR()`.
    
    ### Are these changes tested?
    
    I can't find a new case that isn't covered by our existing tests, although I did remove a skip that was causing a similar problem at one point (#33033). Because it's difficult to predict/test where duckdb evaluates R code, it's hard to know exactly what to test here (I would have expected R code to be evaluated/a crash to occur with many of our existing tests, but even the `pivot_longer()` example does not crash on MacOS and Windows 🤷 ).
    
    I did verify on Ubuntu 22.04 that the reprex kindly provided by @ PMassicotte errors before this PR and does not error after this PR:
    
    ```r
    library(tidyverse)
    library(arrow)
    
    one_level_tree <- tempfile()
    
    mtcars |>
      to_duckdb() |>
      pivot_longer(everything()) |>
      to_arrow() |>
      # collect() |> # collecting make it work, otherwise, it hangs on write_dataset()
      write_dataset(one_level_tree, partitioning = "name")
    
    list.files(one_level_tree, recursive = TRUE)
    ```
    
    ### Are there any user-facing changes?
    
    There are no user facing changes.
    * Closes: #35649
    
    Authored-by: Dewey Dunnington <de...@voltrondata.com>
    Signed-off-by: Nic Crane <th...@gmail.com>
---
 r/R/arrowExports.R             |  4 ++++
 r/R/duckdb.R                   |  3 ++-
 r/src/arrowExports.cpp         |  9 +++++++++
 r/src/recordbatchreader.cpp    | 28 ++++++++++++++++++++++++++++
 r/tests/testthat/test-duckdb.R | 12 +-----------
 5 files changed, 44 insertions(+), 12 deletions(-)

diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index 2d935ff871..10732100cd 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -1852,6 +1852,10 @@ RecordBatchReader__Head <- function(reader, num_rows) {
   .Call(`_arrow_RecordBatchReader__Head`, reader, num_rows)
 }
 
+MakeSafeRecordBatchReader <- function(reader) {
+  .Call(`_arrow_MakeSafeRecordBatchReader`, reader)
+}
+
 ipc___RecordBatchStreamReader__Open <- function(stream) {
   .Call(`_arrow_ipc___RecordBatchStreamReader__Open`, stream)
 }
diff --git a/r/R/duckdb.R b/r/R/duckdb.R
index 84c738359d..eee777ba83 100644
--- a/r/R/duckdb.R
+++ b/r/R/duckdb.R
@@ -155,5 +155,6 @@ to_arrow <- function(.data) {
   # Run the query
   res <- DBI::dbSendQuery(dbplyr::remote_con(.data), dbplyr::remote_query(.data), arrow = TRUE)
 
-  duckdb::duckdb_fetch_record_batch(res)
+  reader <- duckdb::duckdb_fetch_record_batch(res)
+  MakeSafeRecordBatchReader(reader)
 }
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index e5fcf217d9..1d617b252e 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -4876,6 +4876,14 @@ BEGIN_CPP11
 END_CPP11
 }
 // recordbatchreader.cpp
+std::shared_ptr<arrow::RecordBatchReader> MakeSafeRecordBatchReader(const std::shared_ptr<arrow::RecordBatchReader>& reader);
+extern "C" SEXP _arrow_MakeSafeRecordBatchReader(SEXP reader_sexp){
+BEGIN_CPP11
+	arrow::r::Input<const std::shared_ptr<arrow::RecordBatchReader>&>::type reader(reader_sexp);
+	return cpp11::as_sexp(MakeSafeRecordBatchReader(reader));
+END_CPP11
+}
+// recordbatchreader.cpp
 std::shared_ptr<arrow::ipc::RecordBatchStreamReader> ipc___RecordBatchStreamReader__Open(const std::shared_ptr<arrow::io::InputStream>& stream);
 extern "C" SEXP _arrow_ipc___RecordBatchStreamReader__Open(SEXP stream_sexp){
 BEGIN_CPP11
@@ -6042,6 +6050,7 @@ static const R_CallMethodDef CallEntries[] = {
 		{ "_arrow_RecordBatchReader__from_Table", (DL_FUNC) &_arrow_RecordBatchReader__from_Table, 1}, 
 		{ "_arrow_Table__from_RecordBatchReader", (DL_FUNC) &_arrow_Table__from_RecordBatchReader, 1}, 
 		{ "_arrow_RecordBatchReader__Head", (DL_FUNC) &_arrow_RecordBatchReader__Head, 2}, 
+		{ "_arrow_MakeSafeRecordBatchReader", (DL_FUNC) &_arrow_MakeSafeRecordBatchReader, 1}, 
 		{ "_arrow_ipc___RecordBatchStreamReader__Open", (DL_FUNC) &_arrow_ipc___RecordBatchStreamReader__Open, 1}, 
 		{ "_arrow_ipc___RecordBatchFileReader__schema", (DL_FUNC) &_arrow_ipc___RecordBatchFileReader__schema, 1}, 
 		{ "_arrow_ipc___RecordBatchFileReader__num_record_batches", (DL_FUNC) &_arrow_ipc___RecordBatchFileReader__num_record_batches, 1}, 
diff --git a/r/src/recordbatchreader.cpp b/r/src/recordbatchreader.cpp
index 6294205131..235fd84818 100644
--- a/r/src/recordbatchreader.cpp
+++ b/r/src/recordbatchreader.cpp
@@ -197,6 +197,34 @@ std::shared_ptr<arrow::RecordBatchReader> RecordBatchReader__Head(
   }
 }
 
+// Some types of RecordBatchReader input (e.g., DuckDB output) will crash or hang
+// if scanned from another thread (e.g., by an ExecPlan). This RecordBatchReader
+// wrapper ensures that ReadNext() is always called from the R thread.
+class SafeRecordBatchReader : public arrow::RecordBatchReader {
+ public:
+  explicit SafeRecordBatchReader(std::shared_ptr<arrow::RecordBatchReader> parent)
+      : parent_(parent) {}
+
+  std::shared_ptr<arrow::Schema> schema() const override { return parent_->schema(); }
+
+  arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch_out) override {
+    return SafeCallIntoRVoid(
+        [batch_out, this] { return this->parent_->ReadNext(batch_out); },
+        "SafeRecordBatchReader::ReadNext()");
+  }
+
+  arrow::Status Close() override { return parent_->Close(); }
+
+ private:
+  std::shared_ptr<arrow::RecordBatchReader> parent_;
+};
+
+// [[arrow::export]]
+std::shared_ptr<arrow::RecordBatchReader> MakeSafeRecordBatchReader(
+    const std::shared_ptr<arrow::RecordBatchReader>& reader) {
+  return std::make_shared<SafeRecordBatchReader>(reader);
+}
+
 // -------- RecordBatchStreamReader
 
 // [[arrow::export]]
diff --git a/r/tests/testthat/test-duckdb.R b/r/tests/testthat/test-duckdb.R
index 409e99b70f..d5bf3d9271 100644
--- a/r/tests/testthat/test-duckdb.R
+++ b/r/tests/testthat/test-duckdb.R
@@ -34,7 +34,7 @@ test_that("meaningful error message when duckdb is not installed", {
   )
 })
 
-skip_if_not_installed("duckdb", minimum_version = "0.3.1")
+skip_if_not_installed("duckdb", minimum_version = "0.3.2")
 skip_if_not_installed("dbplyr")
 
 library(duckdb, quietly = TRUE)
@@ -138,9 +138,6 @@ test_that("to_duckdb then to_arrow", {
 })
 
 test_that("to_arrow roundtrip, with dataset", {
-  # these will continue to error until 0.3.2 is released
-  # https://github.com/duckdb/duckdb/pull/2957
-  skip_if_not_installed("duckdb", minimum_version = "0.3.2")
   # With a multi-part dataset
   tf <- tempfile()
   new_ds <- rbind(
@@ -174,9 +171,6 @@ test_that("to_arrow roundtrip, with dataset", {
 })
 
 test_that("to_arrow roundtrip, with dataset (without wrapping)", {
-  # these will continue to error until 0.3.2 is released
-  # https://github.com/duckdb/duckdb/pull/2957
-  skip_if_not_installed("duckdb", minimum_version = "0.3.2")
   # With a multi-part dataset
   tf <- tempfile()
   new_ds <- rbind(
@@ -205,10 +199,6 @@ dbExecute(con, "PRAGMA threads=2")
 on.exit(dbDisconnect(con, shutdown = TRUE), add = TRUE)
 
 test_that("Joining, auto-cleanup enabled", {
-  # ARROW-17643, ARROW-17818: A change in duckdb 0.5.0 caused this test to fail
-  # TODO: ARROW-17809 Follow up with the latest duckdb release to solve the issue
-  skip("ARROW-17818: Latest DuckDB causes this test to fail")
-
   ds <- InMemoryDataset$create(example_data)
 
   table_one_name <- "my_arrow_table_1"