You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pa...@apache.org on 2023/01/13 17:58:05 UTC

[arrow] branch master updated: GH-14474: Opportunistically delete R references to shared pointers where possible (#15278)

This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new e17cfb411a GH-14474: Opportunistically delete R references to shared pointers where possible (#15278)
e17cfb411a is described below

commit e17cfb411a9aecafb671311c2a5a1e78ba797e78
Author: Dewey Dunnington <de...@voltrondata.com>
AuthorDate: Fri Jan 13 13:57:59 2023 -0400

    GH-14474: Opportunistically delete R references to shared pointers where possible (#15278)
    
    (Still trying to set up a Windows environment so I can check to see if this helps at all)
    * Closes: #14474
    
    Lead-authored-by: Dewey Dunnington <de...@voltrondata.com>
    Co-authored-by: Dewey Dunnington <de...@fishandwhistle.net>
    Signed-off-by: Dewey Dunnington <de...@fishandwhistle.net>
---
 r/R/arrow-object.R                    | 13 +++++++++++++
 r/R/arrowExports.R                    |  8 ++++++++
 r/R/dataset-write.R                   |  2 ++
 r/R/dplyr.R                           |  9 +++++++--
 r/R/query-engine.R                    | 10 +++++++++-
 r/R/record-batch-reader.R             |  8 +++++++-
 r/R/table.R                           |  5 ++++-
 r/src/arrowExports.cpp                | 20 ++++++++++++++++++++
 r/src/arrow_cpp11.h                   |  8 +++++++-
 r/src/compute-exec.cpp                |  6 ++++++
 r/src/recordbatchreader.cpp           |  7 +++++++
 r/tests/testthat/test-dataset-write.R | 18 ++++++++++++++++++
 r/tests/testthat/test-dataset.R       | 33 +++++++++++++++++++++++++++++++++
 13 files changed, 141 insertions(+), 6 deletions(-)

diff --git a/r/R/arrow-object.R b/r/R/arrow-object.R
index ac067d4aa5..516f407aaf 100644
--- a/r/R/arrow-object.R
+++ b/r/R/arrow-object.R
@@ -45,6 +45,19 @@ ArrowObject <- R6Class("ArrowObject",
         cat(self$ToString(), "\n", sep = "")
       }
       invisible(self)
+    },
+    .unsafe_delete = function() {
+      # The best we can do in a generic way is to set the underlying
+      # pointer to NULL. Subclasses specialize this so that we can actually
+      # call the underlying shared pointer's reset() method for the
+      # shared_ptr<SubclassType> in C++.
+      self$`.:xp:.` <- NULL
+
+      # Return NULL, because keeping this R6 object in scope is not a good idea.
+      # This syntax would allow the rare use that has to actually do this to
+      # do `object <- object$.unsafe_delete()` and reduce the chance that an
+      # IDE like RStudio will try try to call other methods which will error
+      invisible(NULL)
     }
   )
 )
diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index c1b90b4341..38f1ecfb97 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -452,6 +452,10 @@ ExecPlan_ToString <- function(plan) {
   .Call(`_arrow_ExecPlan_ToString`, plan)
 }
 
+ExecPlan_UnsafeDelete <- function(plan) {
+  invisible(.Call(`_arrow_ExecPlan_UnsafeDelete`, plan))
+}
+
 ExecNode_output_schema <- function(node) {
   .Call(`_arrow_ExecNode_output_schema`, node)
 }
@@ -1764,6 +1768,10 @@ RecordBatchReader__Close <- function(reader) {
   invisible(.Call(`_arrow_RecordBatchReader__Close`, reader))
 }
 
+RecordBatchReader__UnsafeDelete <- function(reader) {
+  invisible(.Call(`_arrow_RecordBatchReader__UnsafeDelete`, reader))
+}
+
 RecordBatchReader__ReadNext <- function(reader) {
   .Call(`_arrow_RecordBatchReader__ReadNext`, reader)
 }
diff --git a/r/R/dataset-write.R b/r/R/dataset-write.R
index e0181ee74f..e5cd76363a 100644
--- a/r/R/dataset-write.R
+++ b/r/R/dataset-write.R
@@ -151,6 +151,8 @@ write_dataset <- function(dataset,
   }
 
   plan <- ExecPlan$create()
+  on.exit(plan$.unsafe_delete())
+
   final_node <- plan$Build(dataset)
   if (!is.null(final_node$extras$sort %||% final_node$extras$head %||% final_node$extras$tail)) {
     # Because sorting and topK are only handled in the SinkNode (or in R!),
diff --git a/r/R/dplyr.R b/r/R/dplyr.R
index 5b620c6cc4..72e7480968 100644
--- a/r/R/dplyr.R
+++ b/r/R/dplyr.R
@@ -295,8 +295,13 @@ show_exec_plan <- function(x) {
   }
 
   result <- as_record_batch_reader(adq)
-  cat(result$Plan()$ToString())
-  result$Close()
+  plan <- result$Plan()
+  on.exit({
+    plan$.unsafe_delete()
+    result$.unsafe_delete()
+  })
+
+  cat(plan$ToString())
 
   invisible(x)
 }
diff --git a/r/R/query-engine.R b/r/R/query-engine.R
index 7e737af3ff..2f0b421fae 100644
--- a/r/R/query-engine.R
+++ b/r/R/query-engine.R
@@ -83,7 +83,9 @@ ExecPlan <- R6Class("ExecPlan",
           # SinkNode, so if there are any steps done after head/tail, we need to
           # evaluate the query up to then and then do a new query for the rest.
           # as_record_batch_reader() will build and run an ExecPlan
-          node <- self$SourceNode(as_record_batch_reader(.data$.data))
+          reader <- as_record_batch_reader(.data$.data)
+          on.exit(reader$.unsafe_delete())
+          node <- self$SourceNode(reader)
         } else {
           # Recurse
           node <- self$Build(.data$.data)
@@ -260,6 +262,10 @@ ExecPlan <- R6Class("ExecPlan",
     },
     ToString = function() {
       ExecPlan_ToString(self)
+    },
+    .unsafe_delete = function() {
+      ExecPlan_UnsafeDelete(self)
+      super$.unsafe_delete()
     }
   )
 )
@@ -379,6 +385,8 @@ do_exec_plan_substrait <- function(substrait_plan) {
   }
 
   plan <- ExecPlan$create()
+  on.exit(plan$.unsafe_delete())
+
   ExecPlan_run_substrait(plan, substrait_plan)
 }
 
diff --git a/r/R/record-batch-reader.R b/r/R/record-batch-reader.R
index 40dd2f516d..f02b4e93e8 100644
--- a/r/R/record-batch-reader.R
+++ b/r/R/record-batch-reader.R
@@ -100,7 +100,11 @@ RecordBatchReader <- R6Class("RecordBatchReader",
     read_table = function() Table__from_RecordBatchReader(self),
     Close = function() RecordBatchReader__Close(self),
     export_to_c = function(stream_ptr) ExportRecordBatchReader(self, stream_ptr),
-    ToString = function() self$schema$ToString()
+    ToString = function() self$schema$ToString(),
+    .unsafe_delete = function() {
+      RecordBatchReader__UnsafeDelete(self)
+      super$.unsafe_delete()
+    }
   ),
   active = list(
     schema = function() RecordBatchReader__schema(self)
@@ -255,6 +259,8 @@ as_record_batch_reader.arrow_dplyr_query <- function(x, ...) {
   # See query-engine.R for ExecPlan/Nodes
   plan <- ExecPlan$create()
   final_node <- plan$Build(x)
+  on.exit(plan$.unsafe_delete())
+
   plan$Run(final_node)
 }
 
diff --git a/r/R/table.R b/r/R/table.R
index 3318060ce0..aac2f914af 100644
--- a/r/R/table.R
+++ b/r/R/table.R
@@ -339,7 +339,10 @@ as_arrow_table.Dataset <- function(x, ...) {
 #' @rdname as_arrow_table
 #' @export
 as_arrow_table.arrow_dplyr_query <- function(x, ...) {
-  out <- as_arrow_table(as_record_batch_reader(x))
+  reader <- as_record_batch_reader(x)
+  on.exit(reader$.unsafe_delete())
+
+  out <- as_arrow_table(reader)
   # arrow_dplyr_query holds group_by information. Set it on the table metadata.
   set_group_attributes(
     out,
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index 05303a36e3..b7bda1870f 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -972,6 +972,15 @@ BEGIN_CPP11
 END_CPP11
 }
 // compute-exec.cpp
+void ExecPlan_UnsafeDelete(const std::shared_ptr<compute::ExecPlan>& plan);
+extern "C" SEXP _arrow_ExecPlan_UnsafeDelete(SEXP plan_sexp){
+BEGIN_CPP11
+	arrow::r::Input<const std::shared_ptr<compute::ExecPlan>&>::type plan(plan_sexp);
+	ExecPlan_UnsafeDelete(plan);
+	return R_NilValue;
+END_CPP11
+}
+// compute-exec.cpp
 std::shared_ptr<arrow::Schema> ExecNode_output_schema(const std::shared_ptr<compute::ExecNode>& node);
 extern "C" SEXP _arrow_ExecNode_output_schema(SEXP node_sexp){
 BEGIN_CPP11
@@ -4537,6 +4546,15 @@ BEGIN_CPP11
 END_CPP11
 }
 // recordbatchreader.cpp
+void RecordBatchReader__UnsafeDelete(const std::shared_ptr<arrow::RecordBatchReader>& reader);
+extern "C" SEXP _arrow_RecordBatchReader__UnsafeDelete(SEXP reader_sexp){
+BEGIN_CPP11
+	arrow::r::Input<const std::shared_ptr<arrow::RecordBatchReader>&>::type reader(reader_sexp);
+	RecordBatchReader__UnsafeDelete(reader);
+	return R_NilValue;
+END_CPP11
+}
+// recordbatchreader.cpp
 std::shared_ptr<arrow::RecordBatch> RecordBatchReader__ReadNext(const std::shared_ptr<arrow::RecordBatchReader>& reader);
 extern "C" SEXP _arrow_RecordBatchReader__ReadNext(SEXP reader_sexp){
 BEGIN_CPP11
@@ -5393,6 +5411,7 @@ static const R_CallMethodDef CallEntries[] = {
 		{ "_arrow_ExecPlanReader__PlanStatus", (DL_FUNC) &_arrow_ExecPlanReader__PlanStatus, 1}, 
 		{ "_arrow_ExecPlan_run", (DL_FUNC) &_arrow_ExecPlan_run, 5}, 
 		{ "_arrow_ExecPlan_ToString", (DL_FUNC) &_arrow_ExecPlan_ToString, 1}, 
+		{ "_arrow_ExecPlan_UnsafeDelete", (DL_FUNC) &_arrow_ExecPlan_UnsafeDelete, 1}, 
 		{ "_arrow_ExecNode_output_schema", (DL_FUNC) &_arrow_ExecNode_output_schema, 1}, 
 		{ "_arrow_ExecNode_Scan", (DL_FUNC) &_arrow_ExecNode_Scan, 4}, 
 		{ "_arrow_ExecPlan_Write", (DL_FUNC) &_arrow_ExecPlan_Write, 14}, 
@@ -5721,6 +5740,7 @@ static const R_CallMethodDef CallEntries[] = {
 		{ "_arrow_RecordBatch__ReferencedBufferSize", (DL_FUNC) &_arrow_RecordBatch__ReferencedBufferSize, 1}, 
 		{ "_arrow_RecordBatchReader__schema", (DL_FUNC) &_arrow_RecordBatchReader__schema, 1}, 
 		{ "_arrow_RecordBatchReader__Close", (DL_FUNC) &_arrow_RecordBatchReader__Close, 1}, 
+		{ "_arrow_RecordBatchReader__UnsafeDelete", (DL_FUNC) &_arrow_RecordBatchReader__UnsafeDelete, 1}, 
 		{ "_arrow_RecordBatchReader__ReadNext", (DL_FUNC) &_arrow_RecordBatchReader__ReadNext, 1}, 
 		{ "_arrow_RecordBatchReader__batches", (DL_FUNC) &_arrow_RecordBatchReader__batches, 1}, 
 		{ "_arrow_RecordBatchReader__from_batches", (DL_FUNC) &_arrow_RecordBatchReader__from_batches, 2}, 
diff --git a/r/src/arrow_cpp11.h b/r/src/arrow_cpp11.h
index 956cb2b2a2..d8c4b719d1 100644
--- a/r/src/arrow_cpp11.h
+++ b/r/src/arrow_cpp11.h
@@ -209,7 +209,13 @@ Pointer r6_to_pointer(SEXP self) {
         cpp11::decay_t<typename std::remove_pointer<Pointer>::type>>();
     cpp11::stop("Invalid R object for %s, must be an ArrowObject", type_name.c_str());
   }
-  void* p = R_ExternalPtrAddr(Rf_findVarInFrame(self, arrow::r::symbols::xp));
+
+  SEXP xp = Rf_findVarInFrame(self, arrow::r::symbols::xp);
+  if (xp == R_NilValue) {
+    cpp11::stop("Invalid: self$`.:xp:.` is NULL");
+  }
+
+  void* p = R_ExternalPtrAddr(xp);
   if (p == nullptr) {
     SEXP klass = Rf_getAttrib(self, R_ClassSymbol);
     cpp11::stop("Invalid <%s>, external pointer to null", CHAR(STRING_ELT(klass, 0)));
diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp
index ac5b8d2feb..e33dcdc2d8 100644
--- a/r/src/compute-exec.cpp
+++ b/r/src/compute-exec.cpp
@@ -269,6 +269,12 @@ std::string ExecPlan_ToString(const std::shared_ptr<compute::ExecPlan>& plan) {
   return plan->ToString();
 }
 
+// [[arrow::export]]
+void ExecPlan_UnsafeDelete(const std::shared_ptr<compute::ExecPlan>& plan) {
+  auto& plan_unsafe = const_cast<std::shared_ptr<compute::ExecPlan>&>(plan);
+  plan_unsafe.reset();
+}
+
 // [[arrow::export]]
 std::shared_ptr<arrow::Schema> ExecNode_output_schema(
     const std::shared_ptr<compute::ExecNode>& node) {
diff --git a/r/src/recordbatchreader.cpp b/r/src/recordbatchreader.cpp
index 9ea4d91701..6294205131 100644
--- a/r/src/recordbatchreader.cpp
+++ b/r/src/recordbatchreader.cpp
@@ -32,6 +32,13 @@ void RecordBatchReader__Close(const std::shared_ptr<arrow::RecordBatchReader>& r
   return arrow::StopIfNotOk(reader->Close());
 }
 
+// [[arrow::export]]
+void RecordBatchReader__UnsafeDelete(
+    const std::shared_ptr<arrow::RecordBatchReader>& reader) {
+  auto& reader_unsafe = const_cast<std::shared_ptr<arrow::RecordBatchReader>&>(reader);
+  reader_unsafe.reset();
+}
+
 // [[arrow::export]]
 std::shared_ptr<arrow::RecordBatch> RecordBatchReader__ReadNext(
     const std::shared_ptr<arrow::RecordBatchReader>& reader) {
diff --git a/r/tests/testthat/test-dataset-write.R b/r/tests/testthat/test-dataset-write.R
index b13660be7c..8dd66614fd 100644
--- a/r/tests/testthat/test-dataset-write.R
+++ b/r/tests/testthat/test-dataset-write.R
@@ -788,3 +788,21 @@ test_that("Dataset write max rows per group", {
 
   expect_equal(row_group_sizes, c(12, 18))
 })
+
+test_that("Can delete filesystem dataset after write_dataset", {
+  # While this test should pass on all platforms, this is primarily
+  # a test for Windows because that platform won't allow open files
+  # to be deleted.
+  dataset_dir2 <- tempfile()
+  ds0 <- open_dataset(hive_dir)
+  write_dataset(ds0, dataset_dir2)
+
+  dataset_dir3 <- tempfile()
+  on.exit(unlink(dataset_dir3, recursive = TRUE))
+
+  ds <- open_dataset(dataset_dir2)
+  write_dataset(ds, dataset_dir3)
+
+  unlink(dataset_dir2, recursive = TRUE)
+  expect_false(dir.exists(dataset_dir2))
+})
diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R
index 5a587e9590..91b405fc01 100644
--- a/r/tests/testthat/test-dataset.R
+++ b/r/tests/testthat/test-dataset.R
@@ -948,6 +948,39 @@ test_that("Dataset and query print methods", {
   )
 })
 
+test_that("Can delete filesystem dataset files after collection", {
+  # While this test should pass on all platforms, this is primarily
+  # a test for Windows because that platform won't allow open files
+  # to be deleted.
+  dataset_dir2 <- tempfile()
+  ds0 <- open_dataset(dataset_dir)
+  write_dataset(ds0, dataset_dir2)
+
+  ds <- open_dataset(dataset_dir2)
+  collected <- ds %>% arrange(int) %>% collect()
+  unlink(dataset_dir2, recursive = TRUE)
+  expect_false(dir.exists(dataset_dir2))
+
+  expect_identical(
+    collected,
+    ds0 %>% arrange(int) %>% collect()
+  )
+
+  # Also try with head(), since this creates a nested query whose interior
+  # components should also be cleaned up to allow deleting the original
+  # dataset
+  write_dataset(ds0, dataset_dir2)
+  ds <- open_dataset(dataset_dir2)
+  collected <- ds %>% arrange(int) %>% head() %>% arrange(int) %>% collect()
+  unlink(dataset_dir2, recursive = TRUE)
+  expect_false(dir.exists(dataset_dir2))
+
+  expect_identical(
+    collected,
+    ds0 %>% arrange(int) %>% head() %>% arrange(int) %>% collect()
+  )
+})
+
 test_that("Scanner$ScanBatches", {
   ds <- open_dataset(ipc_dir, format = "feather")
   batches <- ds$NewScan()$Finish()$ScanBatches()