You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by th...@apache.org on 2023/06/29 09:54:09 UTC
[arrow] branch main updated: GH-35649: [R] Always call `RecordBatchReader::ReadNext()` from DuckDB from the main R thread (#36307)
This is an automated email from the ASF dual-hosted git repository.
thisisnic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new a75299d8cb GH-35649: [R] Always call `RecordBatchReader::ReadNext()` from DuckDB from the main R thread (#36307)
a75299d8cb is described below
commit a75299d8cb221ccd9ef954c2cd44cd5b19b0779b
Author: Dewey Dunnington <de...@voltrondata.com>
AuthorDate: Thu Jun 29 06:54:00 2023 -0300
GH-35649: [R] Always call `RecordBatchReader::ReadNext()` from DuckDB from the main R thread (#36307)
### Rationale for this change
When passing a DuckDB result to Arrow via `to_arrow()` whose input was an Arrow dataset, calls to R code from other threads can occur in some DuckDB operations. This caused a crash or hang on Linux when attempting to combine `pivot_longer()` and `write_dataset()`.
### What changes are included in this PR?
- Added a wrapper class around the `RecordBatchReader` that routes calls to `ReadNext()` through `SafeCallIntoR()`.
### Are these changes tested?
I can't find a new case that isn't covered by our existing tests, although I did remove a skip that was causing a similar problem at one point (#33033). Because it's difficult to predict/test where duckdb evaluates R code, it's hard to know exactly what to test here (I would have expected R code to be evaluated/a crash to occur with many of our existing tests, but even the `pivot_longer()` example does not crash on MacOS and Windows 🤷 ).
I did verify on Ubuntu 22.04 that the reprex kindly provided by @ PMassicotte errors before this PR and does not error after this PR:
```r
library(tidyverse)
library(arrow)
one_level_tree <- tempfile()
mtcars |>
to_duckdb() |>
pivot_longer(everything()) |>
to_arrow() |>
# collect() |> # collecting make it work, otherwise, it hangs on write_dataset()
write_dataset(one_level_tree, partitioning = "name")
list.files(one_level_tree, recursive = TRUE)
```
### Are there any user-facing changes?
There are no user facing changes.
* Closes: #35649
Authored-by: Dewey Dunnington <de...@voltrondata.com>
Signed-off-by: Nic Crane <th...@gmail.com>
---
r/R/arrowExports.R | 4 ++++
r/R/duckdb.R | 3 ++-
r/src/arrowExports.cpp | 9 +++++++++
r/src/recordbatchreader.cpp | 28 ++++++++++++++++++++++++++++
r/tests/testthat/test-duckdb.R | 12 +-----------
5 files changed, 44 insertions(+), 12 deletions(-)
diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index 2d935ff871..10732100cd 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -1852,6 +1852,10 @@ RecordBatchReader__Head <- function(reader, num_rows) {
.Call(`_arrow_RecordBatchReader__Head`, reader, num_rows)
}
+MakeSafeRecordBatchReader <- function(reader) {
+ .Call(`_arrow_MakeSafeRecordBatchReader`, reader)
+}
+
ipc___RecordBatchStreamReader__Open <- function(stream) {
.Call(`_arrow_ipc___RecordBatchStreamReader__Open`, stream)
}
diff --git a/r/R/duckdb.R b/r/R/duckdb.R
index 84c738359d..eee777ba83 100644
--- a/r/R/duckdb.R
+++ b/r/R/duckdb.R
@@ -155,5 +155,6 @@ to_arrow <- function(.data) {
# Run the query
res <- DBI::dbSendQuery(dbplyr::remote_con(.data), dbplyr::remote_query(.data), arrow = TRUE)
- duckdb::duckdb_fetch_record_batch(res)
+ reader <- duckdb::duckdb_fetch_record_batch(res)
+ MakeSafeRecordBatchReader(reader)
}
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index e5fcf217d9..1d617b252e 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -4876,6 +4876,14 @@ BEGIN_CPP11
END_CPP11
}
// recordbatchreader.cpp
+std::shared_ptr<arrow::RecordBatchReader> MakeSafeRecordBatchReader(const std::shared_ptr<arrow::RecordBatchReader>& reader);
+extern "C" SEXP _arrow_MakeSafeRecordBatchReader(SEXP reader_sexp){
+BEGIN_CPP11
+ arrow::r::Input<const std::shared_ptr<arrow::RecordBatchReader>&>::type reader(reader_sexp);
+ return cpp11::as_sexp(MakeSafeRecordBatchReader(reader));
+END_CPP11
+}
+// recordbatchreader.cpp
std::shared_ptr<arrow::ipc::RecordBatchStreamReader> ipc___RecordBatchStreamReader__Open(const std::shared_ptr<arrow::io::InputStream>& stream);
extern "C" SEXP _arrow_ipc___RecordBatchStreamReader__Open(SEXP stream_sexp){
BEGIN_CPP11
@@ -6042,6 +6050,7 @@ static const R_CallMethodDef CallEntries[] = {
{ "_arrow_RecordBatchReader__from_Table", (DL_FUNC) &_arrow_RecordBatchReader__from_Table, 1},
{ "_arrow_Table__from_RecordBatchReader", (DL_FUNC) &_arrow_Table__from_RecordBatchReader, 1},
{ "_arrow_RecordBatchReader__Head", (DL_FUNC) &_arrow_RecordBatchReader__Head, 2},
+ { "_arrow_MakeSafeRecordBatchReader", (DL_FUNC) &_arrow_MakeSafeRecordBatchReader, 1},
{ "_arrow_ipc___RecordBatchStreamReader__Open", (DL_FUNC) &_arrow_ipc___RecordBatchStreamReader__Open, 1},
{ "_arrow_ipc___RecordBatchFileReader__schema", (DL_FUNC) &_arrow_ipc___RecordBatchFileReader__schema, 1},
{ "_arrow_ipc___RecordBatchFileReader__num_record_batches", (DL_FUNC) &_arrow_ipc___RecordBatchFileReader__num_record_batches, 1},
diff --git a/r/src/recordbatchreader.cpp b/r/src/recordbatchreader.cpp
index 6294205131..235fd84818 100644
--- a/r/src/recordbatchreader.cpp
+++ b/r/src/recordbatchreader.cpp
@@ -197,6 +197,34 @@ std::shared_ptr<arrow::RecordBatchReader> RecordBatchReader__Head(
}
}
+// Some types of RecordBatchReader input (e.g., DuckDB output) will crash or hang
+// if scanned from another thread (e.g., by an ExecPlan). This RecordBatchReader
+// wrapper ensures that ReadNext() is always called from the R thread.
+class SafeRecordBatchReader : public arrow::RecordBatchReader {
+ public:
+ explicit SafeRecordBatchReader(std::shared_ptr<arrow::RecordBatchReader> parent)
+ : parent_(parent) {}
+
+ std::shared_ptr<arrow::Schema> schema() const override { return parent_->schema(); }
+
+ arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch_out) override {
+ return SafeCallIntoRVoid(
+ [batch_out, this] { return this->parent_->ReadNext(batch_out); },
+ "SafeRecordBatchReader::ReadNext()");
+ }
+
+ arrow::Status Close() override { return parent_->Close(); }
+
+ private:
+ std::shared_ptr<arrow::RecordBatchReader> parent_;
+};
+
+// [[arrow::export]]
+std::shared_ptr<arrow::RecordBatchReader> MakeSafeRecordBatchReader(
+ const std::shared_ptr<arrow::RecordBatchReader>& reader) {
+ return std::make_shared<SafeRecordBatchReader>(reader);
+}
+
// -------- RecordBatchStreamReader
// [[arrow::export]]
diff --git a/r/tests/testthat/test-duckdb.R b/r/tests/testthat/test-duckdb.R
index 409e99b70f..d5bf3d9271 100644
--- a/r/tests/testthat/test-duckdb.R
+++ b/r/tests/testthat/test-duckdb.R
@@ -34,7 +34,7 @@ test_that("meaningful error message when duckdb is not installed", {
)
})
-skip_if_not_installed("duckdb", minimum_version = "0.3.1")
+skip_if_not_installed("duckdb", minimum_version = "0.3.2")
skip_if_not_installed("dbplyr")
library(duckdb, quietly = TRUE)
@@ -138,9 +138,6 @@ test_that("to_duckdb then to_arrow", {
})
test_that("to_arrow roundtrip, with dataset", {
- # these will continue to error until 0.3.2 is released
- # https://github.com/duckdb/duckdb/pull/2957
- skip_if_not_installed("duckdb", minimum_version = "0.3.2")
# With a multi-part dataset
tf <- tempfile()
new_ds <- rbind(
@@ -174,9 +171,6 @@ test_that("to_arrow roundtrip, with dataset", {
})
test_that("to_arrow roundtrip, with dataset (without wrapping)", {
- # these will continue to error until 0.3.2 is released
- # https://github.com/duckdb/duckdb/pull/2957
- skip_if_not_installed("duckdb", minimum_version = "0.3.2")
# With a multi-part dataset
tf <- tempfile()
new_ds <- rbind(
@@ -205,10 +199,6 @@ dbExecute(con, "PRAGMA threads=2")
on.exit(dbDisconnect(con, shutdown = TRUE), add = TRUE)
test_that("Joining, auto-cleanup enabled", {
- # ARROW-17643, ARROW-17818: A change in duckdb 0.5.0 caused this test to fail
- # TODO: ARROW-17809 Follow up with the latest duckdb release to solve the issue
- skip("ARROW-17818: Latest DuckDB causes this test to fail")
-
ds <- InMemoryDataset$create(example_data)
table_one_name <- "my_arrow_table_1"