You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pa...@apache.org on 2023/01/13 17:58:05 UTC
[arrow] branch master updated: GH-14474: Opportunistically delete R references to shared pointers where possible (#15278)
This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new e17cfb411a GH-14474: Opportunistically delete R references to shared pointers where possible (#15278)
e17cfb411a is described below
commit e17cfb411a9aecafb671311c2a5a1e78ba797e78
Author: Dewey Dunnington <de...@voltrondata.com>
AuthorDate: Fri Jan 13 13:57:59 2023 -0400
GH-14474: Opportunistically delete R references to shared pointers where possible (#15278)
(Still trying to set up a Windows environment so I can check to see if this helps at all)
* Closes: #14474
Lead-authored-by: Dewey Dunnington <de...@voltrondata.com>
Co-authored-by: Dewey Dunnington <de...@fishandwhistle.net>
Signed-off-by: Dewey Dunnington <de...@fishandwhistle.net>
---
r/R/arrow-object.R | 13 +++++++++++++
r/R/arrowExports.R | 8 ++++++++
r/R/dataset-write.R | 2 ++
r/R/dplyr.R | 9 +++++++--
r/R/query-engine.R | 10 +++++++++-
r/R/record-batch-reader.R | 8 +++++++-
r/R/table.R | 5 ++++-
r/src/arrowExports.cpp | 20 ++++++++++++++++++++
r/src/arrow_cpp11.h | 8 +++++++-
r/src/compute-exec.cpp | 6 ++++++
r/src/recordbatchreader.cpp | 7 +++++++
r/tests/testthat/test-dataset-write.R | 18 ++++++++++++++++++
r/tests/testthat/test-dataset.R | 33 +++++++++++++++++++++++++++++++++
13 files changed, 141 insertions(+), 6 deletions(-)
diff --git a/r/R/arrow-object.R b/r/R/arrow-object.R
index ac067d4aa5..516f407aaf 100644
--- a/r/R/arrow-object.R
+++ b/r/R/arrow-object.R
@@ -45,6 +45,19 @@ ArrowObject <- R6Class("ArrowObject",
cat(self$ToString(), "\n", sep = "")
}
invisible(self)
+ },
+ .unsafe_delete = function() {
+ # The best we can do in a generic way is to set the underlying
+ # pointer to NULL. Subclasses specialize this so that we can actually
+ # call the underlying shared pointer's reset() method for the
+ # shared_ptr<SubclassType> in C++.
+ self$`.:xp:.` <- NULL
+
+ # Return NULL, because keeping this R6 object in scope is not a good idea.
+ # This syntax would allow the rare use that has to actually do this to
+ # do `object <- object$.unsafe_delete()` and reduce the chance that an
+ # IDE like RStudio will try try to call other methods which will error
+ invisible(NULL)
}
)
)
diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index c1b90b4341..38f1ecfb97 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -452,6 +452,10 @@ ExecPlan_ToString <- function(plan) {
.Call(`_arrow_ExecPlan_ToString`, plan)
}
+ExecPlan_UnsafeDelete <- function(plan) {
+ invisible(.Call(`_arrow_ExecPlan_UnsafeDelete`, plan))
+}
+
ExecNode_output_schema <- function(node) {
.Call(`_arrow_ExecNode_output_schema`, node)
}
@@ -1764,6 +1768,10 @@ RecordBatchReader__Close <- function(reader) {
invisible(.Call(`_arrow_RecordBatchReader__Close`, reader))
}
+RecordBatchReader__UnsafeDelete <- function(reader) {
+ invisible(.Call(`_arrow_RecordBatchReader__UnsafeDelete`, reader))
+}
+
RecordBatchReader__ReadNext <- function(reader) {
.Call(`_arrow_RecordBatchReader__ReadNext`, reader)
}
diff --git a/r/R/dataset-write.R b/r/R/dataset-write.R
index e0181ee74f..e5cd76363a 100644
--- a/r/R/dataset-write.R
+++ b/r/R/dataset-write.R
@@ -151,6 +151,8 @@ write_dataset <- function(dataset,
}
plan <- ExecPlan$create()
+ on.exit(plan$.unsafe_delete())
+
final_node <- plan$Build(dataset)
if (!is.null(final_node$extras$sort %||% final_node$extras$head %||% final_node$extras$tail)) {
# Because sorting and topK are only handled in the SinkNode (or in R!),
diff --git a/r/R/dplyr.R b/r/R/dplyr.R
index 5b620c6cc4..72e7480968 100644
--- a/r/R/dplyr.R
+++ b/r/R/dplyr.R
@@ -295,8 +295,13 @@ show_exec_plan <- function(x) {
}
result <- as_record_batch_reader(adq)
- cat(result$Plan()$ToString())
- result$Close()
+ plan <- result$Plan()
+ on.exit({
+ plan$.unsafe_delete()
+ result$.unsafe_delete()
+ })
+
+ cat(plan$ToString())
invisible(x)
}
diff --git a/r/R/query-engine.R b/r/R/query-engine.R
index 7e737af3ff..2f0b421fae 100644
--- a/r/R/query-engine.R
+++ b/r/R/query-engine.R
@@ -83,7 +83,9 @@ ExecPlan <- R6Class("ExecPlan",
# SinkNode, so if there are any steps done after head/tail, we need to
# evaluate the query up to then and then do a new query for the rest.
# as_record_batch_reader() will build and run an ExecPlan
- node <- self$SourceNode(as_record_batch_reader(.data$.data))
+ reader <- as_record_batch_reader(.data$.data)
+ on.exit(reader$.unsafe_delete())
+ node <- self$SourceNode(reader)
} else {
# Recurse
node <- self$Build(.data$.data)
@@ -260,6 +262,10 @@ ExecPlan <- R6Class("ExecPlan",
},
ToString = function() {
ExecPlan_ToString(self)
+ },
+ .unsafe_delete = function() {
+ ExecPlan_UnsafeDelete(self)
+ super$.unsafe_delete()
}
)
)
@@ -379,6 +385,8 @@ do_exec_plan_substrait <- function(substrait_plan) {
}
plan <- ExecPlan$create()
+ on.exit(plan$.unsafe_delete())
+
ExecPlan_run_substrait(plan, substrait_plan)
}
diff --git a/r/R/record-batch-reader.R b/r/R/record-batch-reader.R
index 40dd2f516d..f02b4e93e8 100644
--- a/r/R/record-batch-reader.R
+++ b/r/R/record-batch-reader.R
@@ -100,7 +100,11 @@ RecordBatchReader <- R6Class("RecordBatchReader",
read_table = function() Table__from_RecordBatchReader(self),
Close = function() RecordBatchReader__Close(self),
export_to_c = function(stream_ptr) ExportRecordBatchReader(self, stream_ptr),
- ToString = function() self$schema$ToString()
+ ToString = function() self$schema$ToString(),
+ .unsafe_delete = function() {
+ RecordBatchReader__UnsafeDelete(self)
+ super$.unsafe_delete()
+ }
),
active = list(
schema = function() RecordBatchReader__schema(self)
@@ -255,6 +259,8 @@ as_record_batch_reader.arrow_dplyr_query <- function(x, ...) {
# See query-engine.R for ExecPlan/Nodes
plan <- ExecPlan$create()
final_node <- plan$Build(x)
+ on.exit(plan$.unsafe_delete())
+
plan$Run(final_node)
}
diff --git a/r/R/table.R b/r/R/table.R
index 3318060ce0..aac2f914af 100644
--- a/r/R/table.R
+++ b/r/R/table.R
@@ -339,7 +339,10 @@ as_arrow_table.Dataset <- function(x, ...) {
#' @rdname as_arrow_table
#' @export
as_arrow_table.arrow_dplyr_query <- function(x, ...) {
- out <- as_arrow_table(as_record_batch_reader(x))
+ reader <- as_record_batch_reader(x)
+ on.exit(reader$.unsafe_delete())
+
+ out <- as_arrow_table(reader)
# arrow_dplyr_query holds group_by information. Set it on the table metadata.
set_group_attributes(
out,
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index 05303a36e3..b7bda1870f 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -972,6 +972,15 @@ BEGIN_CPP11
END_CPP11
}
// compute-exec.cpp
+void ExecPlan_UnsafeDelete(const std::shared_ptr<compute::ExecPlan>& plan);
+extern "C" SEXP _arrow_ExecPlan_UnsafeDelete(SEXP plan_sexp){
+BEGIN_CPP11
+ arrow::r::Input<const std::shared_ptr<compute::ExecPlan>&>::type plan(plan_sexp);
+ ExecPlan_UnsafeDelete(plan);
+ return R_NilValue;
+END_CPP11
+}
+// compute-exec.cpp
std::shared_ptr<arrow::Schema> ExecNode_output_schema(const std::shared_ptr<compute::ExecNode>& node);
extern "C" SEXP _arrow_ExecNode_output_schema(SEXP node_sexp){
BEGIN_CPP11
@@ -4537,6 +4546,15 @@ BEGIN_CPP11
END_CPP11
}
// recordbatchreader.cpp
+void RecordBatchReader__UnsafeDelete(const std::shared_ptr<arrow::RecordBatchReader>& reader);
+extern "C" SEXP _arrow_RecordBatchReader__UnsafeDelete(SEXP reader_sexp){
+BEGIN_CPP11
+ arrow::r::Input<const std::shared_ptr<arrow::RecordBatchReader>&>::type reader(reader_sexp);
+ RecordBatchReader__UnsafeDelete(reader);
+ return R_NilValue;
+END_CPP11
+}
+// recordbatchreader.cpp
std::shared_ptr<arrow::RecordBatch> RecordBatchReader__ReadNext(const std::shared_ptr<arrow::RecordBatchReader>& reader);
extern "C" SEXP _arrow_RecordBatchReader__ReadNext(SEXP reader_sexp){
BEGIN_CPP11
@@ -5393,6 +5411,7 @@ static const R_CallMethodDef CallEntries[] = {
{ "_arrow_ExecPlanReader__PlanStatus", (DL_FUNC) &_arrow_ExecPlanReader__PlanStatus, 1},
{ "_arrow_ExecPlan_run", (DL_FUNC) &_arrow_ExecPlan_run, 5},
{ "_arrow_ExecPlan_ToString", (DL_FUNC) &_arrow_ExecPlan_ToString, 1},
+ { "_arrow_ExecPlan_UnsafeDelete", (DL_FUNC) &_arrow_ExecPlan_UnsafeDelete, 1},
{ "_arrow_ExecNode_output_schema", (DL_FUNC) &_arrow_ExecNode_output_schema, 1},
{ "_arrow_ExecNode_Scan", (DL_FUNC) &_arrow_ExecNode_Scan, 4},
{ "_arrow_ExecPlan_Write", (DL_FUNC) &_arrow_ExecPlan_Write, 14},
@@ -5721,6 +5740,7 @@ static const R_CallMethodDef CallEntries[] = {
{ "_arrow_RecordBatch__ReferencedBufferSize", (DL_FUNC) &_arrow_RecordBatch__ReferencedBufferSize, 1},
{ "_arrow_RecordBatchReader__schema", (DL_FUNC) &_arrow_RecordBatchReader__schema, 1},
{ "_arrow_RecordBatchReader__Close", (DL_FUNC) &_arrow_RecordBatchReader__Close, 1},
+ { "_arrow_RecordBatchReader__UnsafeDelete", (DL_FUNC) &_arrow_RecordBatchReader__UnsafeDelete, 1},
{ "_arrow_RecordBatchReader__ReadNext", (DL_FUNC) &_arrow_RecordBatchReader__ReadNext, 1},
{ "_arrow_RecordBatchReader__batches", (DL_FUNC) &_arrow_RecordBatchReader__batches, 1},
{ "_arrow_RecordBatchReader__from_batches", (DL_FUNC) &_arrow_RecordBatchReader__from_batches, 2},
diff --git a/r/src/arrow_cpp11.h b/r/src/arrow_cpp11.h
index 956cb2b2a2..d8c4b719d1 100644
--- a/r/src/arrow_cpp11.h
+++ b/r/src/arrow_cpp11.h
@@ -209,7 +209,13 @@ Pointer r6_to_pointer(SEXP self) {
cpp11::decay_t<typename std::remove_pointer<Pointer>::type>>();
cpp11::stop("Invalid R object for %s, must be an ArrowObject", type_name.c_str());
}
- void* p = R_ExternalPtrAddr(Rf_findVarInFrame(self, arrow::r::symbols::xp));
+
+ SEXP xp = Rf_findVarInFrame(self, arrow::r::symbols::xp);
+ if (xp == R_NilValue) {
+ cpp11::stop("Invalid: self$`.:xp:.` is NULL");
+ }
+
+ void* p = R_ExternalPtrAddr(xp);
if (p == nullptr) {
SEXP klass = Rf_getAttrib(self, R_ClassSymbol);
cpp11::stop("Invalid <%s>, external pointer to null", CHAR(STRING_ELT(klass, 0)));
diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp
index ac5b8d2feb..e33dcdc2d8 100644
--- a/r/src/compute-exec.cpp
+++ b/r/src/compute-exec.cpp
@@ -269,6 +269,12 @@ std::string ExecPlan_ToString(const std::shared_ptr<compute::ExecPlan>& plan) {
return plan->ToString();
}
+// [[arrow::export]]
+void ExecPlan_UnsafeDelete(const std::shared_ptr<compute::ExecPlan>& plan) {
+ auto& plan_unsafe = const_cast<std::shared_ptr<compute::ExecPlan>&>(plan);
+ plan_unsafe.reset();
+}
+
// [[arrow::export]]
std::shared_ptr<arrow::Schema> ExecNode_output_schema(
const std::shared_ptr<compute::ExecNode>& node) {
diff --git a/r/src/recordbatchreader.cpp b/r/src/recordbatchreader.cpp
index 9ea4d91701..6294205131 100644
--- a/r/src/recordbatchreader.cpp
+++ b/r/src/recordbatchreader.cpp
@@ -32,6 +32,13 @@ void RecordBatchReader__Close(const std::shared_ptr<arrow::RecordBatchReader>& r
return arrow::StopIfNotOk(reader->Close());
}
+// [[arrow::export]]
+void RecordBatchReader__UnsafeDelete(
+ const std::shared_ptr<arrow::RecordBatchReader>& reader) {
+ auto& reader_unsafe = const_cast<std::shared_ptr<arrow::RecordBatchReader>&>(reader);
+ reader_unsafe.reset();
+}
+
// [[arrow::export]]
std::shared_ptr<arrow::RecordBatch> RecordBatchReader__ReadNext(
const std::shared_ptr<arrow::RecordBatchReader>& reader) {
diff --git a/r/tests/testthat/test-dataset-write.R b/r/tests/testthat/test-dataset-write.R
index b13660be7c..8dd66614fd 100644
--- a/r/tests/testthat/test-dataset-write.R
+++ b/r/tests/testthat/test-dataset-write.R
@@ -788,3 +788,21 @@ test_that("Dataset write max rows per group", {
expect_equal(row_group_sizes, c(12, 18))
})
+
+test_that("Can delete filesystem dataset after write_dataset", {
+ # While this test should pass on all platforms, this is primarily
+ # a test for Windows because that platform won't allow open files
+ # to be deleted.
+ dataset_dir2 <- tempfile()
+ ds0 <- open_dataset(hive_dir)
+ write_dataset(ds0, dataset_dir2)
+
+ dataset_dir3 <- tempfile()
+ on.exit(unlink(dataset_dir3, recursive = TRUE))
+
+ ds <- open_dataset(dataset_dir2)
+ write_dataset(ds, dataset_dir3)
+
+ unlink(dataset_dir2, recursive = TRUE)
+ expect_false(dir.exists(dataset_dir2))
+})
diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R
index 5a587e9590..91b405fc01 100644
--- a/r/tests/testthat/test-dataset.R
+++ b/r/tests/testthat/test-dataset.R
@@ -948,6 +948,39 @@ test_that("Dataset and query print methods", {
)
})
+test_that("Can delete filesystem dataset files after collection", {
+ # While this test should pass on all platforms, this is primarily
+ # a test for Windows because that platform won't allow open files
+ # to be deleted.
+ dataset_dir2 <- tempfile()
+ ds0 <- open_dataset(dataset_dir)
+ write_dataset(ds0, dataset_dir2)
+
+ ds <- open_dataset(dataset_dir2)
+ collected <- ds %>% arrange(int) %>% collect()
+ unlink(dataset_dir2, recursive = TRUE)
+ expect_false(dir.exists(dataset_dir2))
+
+ expect_identical(
+ collected,
+ ds0 %>% arrange(int) %>% collect()
+ )
+
+ # Also try with head(), since this creates a nested query whose interior
+ # components should also be cleaned up to allow deleting the original
+ # dataset
+ write_dataset(ds0, dataset_dir2)
+ ds <- open_dataset(dataset_dir2)
+ collected <- ds %>% arrange(int) %>% head() %>% arrange(int) %>% collect()
+ unlink(dataset_dir2, recursive = TRUE)
+ expect_false(dir.exists(dataset_dir2))
+
+ expect_identical(
+ collected,
+ ds0 %>% arrange(int) %>% head() %>% arrange(int) %>% collect()
+ )
+})
+
test_that("Scanner$ScanBatches", {
ds <- open_dataset(ipc_dir, format = "feather")
batches <- ds$NewScan()$Finish()$ScanBatches()