You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pa...@apache.org on 2023/12/19 17:17:57 UTC

(arrow-adbc) branch main updated: feat(r): Reference count child objects (#1334)

This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 595df253 feat(r): Reference count child objects (#1334)
595df253 is described below

commit 595df25383d6b2ab94d919899dfdaf95023a0836
Author: Dewey Dunnington <de...@voltrondata.com>
AuthorDate: Tue Dec 19 13:17:51 2023 -0400

    feat(r): Reference count child objects (#1334)
    
    This PR adds a `.child_count` for all database, connection, and
    statement objects to ensure that we do not call the C-level release
    callback while a child object is available.
    
    Since very early commits, we had set `R_ExternalPtrProtected()` to the
    parent object, which meant that if you never called
    `adbc_XXX_release()`, R would correctly sort out the object dependencies
    and call the release callbacks in the correct order.
    
    One major exception to that was the returned `ArrowArrayStream`
    wrappers, for which there was no mechanism to add a dependent SEXP until
    I added it (in nanoarrow 0.2, I think); however, it wasn't actually used
    for anything except for the stream returned by `read_adbc()`.
    
    @nbenn / @krlmlr you are probably the best candidates to give this a
    review/ensure that it fixes or makes it easier to debug the issues you
    have been having in adbi! Note that
    https://github.com/apache/arrow-nanoarrow/pull/333 may have been the
    primary culprit (I will push that out to CRAN ASAP as a tweak release).
    
    Closes https://github.com/apache/arrow-nanoarrow/issues/323 ,
    https://github.com/apache/arrow-adbc/issues/1128 . Perhaps related:
    https://github.com/apache/arrow-adbc/issues/1348 .
    
    I triggered some extended checks as well (as they include valgrind):
    https://github.com/paleolimbot/arrow-adbc/actions/runs/7133874110
---
 r/adbcdrivermanager/DESCRIPTION                    |  2 +-
 r/adbcdrivermanager/NAMESPACE                      |  1 -
 r/adbcdrivermanager/R/adbc.R                       | 40 ++++++++---
 r/adbcdrivermanager/R/helpers.R                    | 79 ++++++++++++----------
 r/adbcdrivermanager/R/utils.R                      | 34 +++++++++-
 r/adbcdrivermanager/man/adbc_connection_join.Rd    |  8 ---
 .../man/adbc_statement_set_sql_query.Rd            |  9 ++-
 r/adbcdrivermanager/man/adbc_xptr_move.Rd          |  5 +-
 r/adbcdrivermanager/man/with_adbc.Rd               |  7 +-
 r/adbcdrivermanager/src/init.c                     |  2 +
 r/adbcdrivermanager/src/radbc.cc                   | 34 ++++++++++
 r/adbcdrivermanager/src/radbc.h                    |  2 +
 r/adbcdrivermanager/src/utils.c                    | 11 +++
 .../tests/testthat/test-driver_monkey.R            |  5 ++
 r/adbcdrivermanager/tests/testthat/test-helpers.R  |  5 +-
 r/adbcdrivermanager/tests/testthat/test-utils.R    |  4 +-
 16 files changed, 183 insertions(+), 65 deletions(-)

diff --git a/r/adbcdrivermanager/DESCRIPTION b/r/adbcdrivermanager/DESCRIPTION
index 17c70324..a5b69b81 100644
--- a/r/adbcdrivermanager/DESCRIPTION
+++ b/r/adbcdrivermanager/DESCRIPTION
@@ -24,4 +24,4 @@ Config/build/bootstrap: TRUE
 URL: https://github.com/apache/arrow-adbc
 BugReports: https://github.com/apache/arrow-adbc/issues
 Imports:
-    nanoarrow
+    nanoarrow (>= 0.3.0)
diff --git a/r/adbcdrivermanager/NAMESPACE b/r/adbcdrivermanager/NAMESPACE
index 894851a0..5c671a2b 100644
--- a/r/adbcdrivermanager/NAMESPACE
+++ b/r/adbcdrivermanager/NAMESPACE
@@ -83,7 +83,6 @@ export(adbc_statement_release)
 export(adbc_statement_set_options)
 export(adbc_statement_set_sql_query)
 export(adbc_statement_set_substrait_plan)
-export(adbc_stream_join)
 export(adbc_xptr_is_valid)
 export(adbc_xptr_move)
 export(execute_adbc)
diff --git a/r/adbcdrivermanager/R/adbc.R b/r/adbcdrivermanager/R/adbc.R
index 60e271d9..65a4e6eb 100644
--- a/r/adbcdrivermanager/R/adbc.R
+++ b/r/adbcdrivermanager/R/adbc.R
@@ -70,6 +70,8 @@ adbc_database_init_default <- function(driver, options = NULL, subclass = charac
 #' @rdname adbc_database_init
 #' @export
 adbc_database_release <- function(database) {
+  stop_for_nonzero_child_count(database)
+
   error <- adbc_allocate_error()
   status <- .Call(RAdbcDatabaseRelease, database, error)
   stop_for_error(status, error)
@@ -119,6 +121,8 @@ adbc_connection_init_default <- function(database, options = NULL, subclass = ch
 #' @rdname adbc_connection_init
 #' @export
 adbc_connection_release <- function(connection) {
+  stop_for_nonzero_child_count(connection)
+
   if (isTRUE(connection$.release_database)) {
     database <- connection$database
     on.exit(adbc_database_release(database))
@@ -188,7 +192,7 @@ adbc_connection_get_info <- function(connection, info_codes = NULL) {
   )
   stop_for_error(status, error)
 
-  out_stream
+  adbc_child_stream(connection, out_stream)
 }
 
 #' @rdname adbc_connection_get_info
@@ -211,7 +215,7 @@ adbc_connection_get_objects <- function(connection, depth = 0L, catalog = NULL,
   )
   stop_for_error(status, error)
 
-  out_stream
+  adbc_child_stream(connection, out_stream)
 }
 
 #' @rdname adbc_connection_get_info
@@ -241,7 +245,7 @@ adbc_connection_get_table_types <- function(connection) {
   status <- .Call(RAdbcConnectionGetTableTypes, connection, out_stream, error)
   stop_for_error(status, error)
 
-  out_stream
+  adbc_child_stream(connection, out_stream)
 }
 
 #' @rdname adbc_connection_get_info
@@ -258,7 +262,7 @@ adbc_connection_read_partition <- function(connection, serialized_partition) {
   )
   stop_for_error(status, error)
 
-  out_stream
+  adbc_child_stream(connection, out_stream)
 }
 
 #' @rdname adbc_connection_get_info
@@ -297,7 +301,7 @@ adbc_connection_get_statistic_names <- function(connection) {
   status <- .Call(RAdbcConnectionGetStatisticNames, connection, out_stream, error)
   stop_for_error(status, error)
 
-  out_stream
+  adbc_child_stream(connection, out_stream)
 }
 
 #' @rdname adbc_connection_get_info
@@ -319,7 +323,7 @@ adbc_connection_get_statistics <- function(connection, catalog, db_schema,
   )
   stop_for_error(status, error)
 
-  out_stream
+  adbc_child_stream(connection, out_stream)
 }
 
 #' @rdname adbc_connection_get_info
@@ -384,6 +388,8 @@ adbc_statement_init_default <- function(connection, options = NULL, subclass = c
 #' @rdname adbc_statement_init
 #' @export
 adbc_statement_release <- function(statement) {
+  stop_for_nonzero_child_count(statement)
+
   if (isTRUE(statement$.release_connection)) {
     connection <- statement$connection
     on.exit(adbc_connection_release(connection))
@@ -407,6 +413,8 @@ adbc_statement_release <- function(statement) {
 #'   or object that can be coerced to one.
 #' @param schema A [nanoarrow_schema][nanoarrow::as_nanoarrow_schema] or object
 #'   that can be coerced to one.
+#' @param stream_join_parent Use `TRUE` to invalidate `statement` and tie its
+#'   lifecycle to `stream`.
 #'
 #' @return
 #'   - `adbc_statement_set_sql_query()`, `adbc_statement_set_substrait_plan()`,
@@ -483,9 +491,25 @@ adbc_statement_bind_stream <- function(statement, stream, schema = NULL) {
 
 #' @rdname adbc_statement_set_sql_query
 #' @export
-adbc_statement_execute_query <- function(statement, stream = NULL) {
+adbc_statement_execute_query <- function(statement, stream = NULL,
+                                         stream_join_parent = FALSE) {
   error <- adbc_allocate_error()
-  result <- .Call(RAdbcStatementExecuteQuery, statement, stream, error)
+
+  if (is.null(stream)) {
+    result <- .Call(RAdbcStatementExecuteQuery, statement, NULL, error)
+  } else {
+    stream_tmp <- nanoarrow::nanoarrow_allocate_array_stream()
+    result <- .Call(RAdbcStatementExecuteQuery, statement, stream_tmp, error)
+    if (identical(result$status, 0L)) {
+      stream_tmp <- adbc_child_stream(
+        statement,
+        stream_tmp,
+        release_parent = stream_join_parent
+      )
+      nanoarrow::nanoarrow_pointer_export(stream_tmp, stream)
+    }
+  }
+
   stop_for_error(result$status, error)
   result$rows_affected
 }
diff --git a/r/adbcdrivermanager/R/helpers.R b/r/adbcdrivermanager/R/helpers.R
index 26aadd1e..eea3a3ee 100644
--- a/r/adbcdrivermanager/R/helpers.R
+++ b/r/adbcdrivermanager/R/helpers.R
@@ -104,11 +104,7 @@ execute_adbc.default <- function(db_or_con, query, ..., bind = NULL, stream = NU
     adbc_statement_prepare(stmt)
   }
 
-  adbc_statement_execute_query(stmt, stream)
-
-  if (!is.null(stream)) {
-    adbc_stream_join(stream, stmt)
-  }
+  adbc_statement_execute_query(stmt, stream, stream_join_parent = TRUE)
 
   invisible(db_or_con)
 }
@@ -150,12 +146,13 @@ write_adbc.default <- function(tbl, db_or_con, target_table, ...,
 #' it is good practice to explicitly clean up these objects. These helpers
 #' are designed to make explicit and predictable cleanup easy to accomplish.
 #'
-#' Note that you can use [adbc_connection_join()],
-#' [adbc_statement_join()], and [adbc_stream_join()]
+#' Note that you can use [adbc_connection_join()] and [adbc_statement_join()]
 #' to tie the lifecycle of the parent object to that of the child object.
 #' These functions mark any previous references to the parent object as
 #' released so you can still use local and with helpers to manage the parent
-#' object before it is joined.
+#' object before it is joined. Use `stream_join_parent = TRUE` in
+#' [adbc_statement_execute_query()] to tie the lifecycle of a statement to
+#' the output stream.
 #'
 #' @param x An ADBC database, ADBC connection, ADBC statement, or
 #'   nanoarrow_array_stream returned from calls to an ADBC function.
@@ -217,8 +214,6 @@ local_adbc <- function(x, .local_envir = parent.frame()) {
 #' @param database A database created with [adbc_database_init()]
 #' @param connection A connection created with [adbc_connection_init()]
 #' @param statement A statement created with [adbc_statement_init()]
-#' @param stream A [nanoarrow_array_stream][nanoarrow::as_nanoarrow_array_stream]
-#' @inheritParams with_adbc
 #'
 #' @return The input, invisibly.
 #' @export
@@ -244,10 +239,15 @@ local_adbc <- function(x, .local_envir = parent.frame()) {
 #'
 adbc_connection_join <- function(connection, database) {
   assert_adbc(connection, "adbc_connection")
-  assert_adbc(database, "adbc_database")
+
+  stopifnot(
+    identical(database, connection$database),
+    identical(database$.child_count, 1L)
+  )
 
   connection$.release_database <- TRUE
-  connection$database <- adbc_xptr_move(database)
+  connection$database <- adbc_xptr_move(database, check_child_count = FALSE)
+  xptr_set_protected(connection, connection$database)
   invisible(connection)
 }
 
@@ -255,42 +255,53 @@ adbc_connection_join <- function(connection, database) {
 #' @export
 adbc_statement_join <- function(statement, connection) {
   assert_adbc(statement, "adbc_statement")
-  assert_adbc(connection, "adbc_connection")
+
+  stopifnot(
+    identical(connection, statement$connection),
+    identical(connection$.child_count, 1L)
+  )
 
   statement$.release_connection <- TRUE
-  statement$connection <- adbc_xptr_move(connection)
+  statement$connection <- adbc_xptr_move(connection, check_child_count = FALSE)
+  xptr_set_protected(statement, statement$connection)
   invisible(statement)
 }
 
-#' @rdname adbc_connection_join
-#' @export
-adbc_stream_join <- function(stream, x) {
-  if (utils::packageVersion("nanoarrow") < "0.1.0.9000") {
-    stop("adbc_stream_join_statement() requires nanoarrow >= 0.2.0")
-  }
-
-  assert_adbc(stream, "nanoarrow_array_stream")
-  assert_adbc(x)
+adbc_child_stream <- function(parent, stream, release_parent = FALSE) {
+  assert_adbc(parent)
 
+  # This finalizer will run immediately on release (if released explicitly
+  # on the main R thread) or on garbage collection otherwise.
   self_contained_finalizer <- function() {
-    try(adbc_release_non_null(x))
+    try({
+      parent$.child_count <- parent$.child_count - 1L
+      if (release_parent) {
+        adbc_release_non_null(parent)
+      }
+    })
   }
 
   # Make sure we don't keep any variables around that aren't needed
-  # for the finalizer and make sure we invalidate the original statement
+  # for the finalizer and make sure we do keep around a strong reference
+  # to parent.
   self_contained_finalizer_env <- as.environment(
-    list(x = adbc_xptr_move(x))
+    list(
+      parent = if (release_parent) adbc_xptr_move(parent) else parent,
+      release_parent = release_parent
+    )
   )
   parent.env(self_contained_finalizer_env) <- asNamespace("adbcdrivermanager")
   environment(self_contained_finalizer) <- self_contained_finalizer_env
 
-  # This finalizer will run immediately on release (if released explicitly
-  # on the main R thread) or on garbage collection otherwise.
-
-  # Until the release version of nanoarrow contains this we will get a check
-  # warning for nanoarrow::array_stream_set_finalizer()
-  set_finalizer <- asNamespace("nanoarrow")[["array_stream_set_finalizer"]]
-  set_finalizer(stream, self_contained_finalizer)
+  # Set the finalizer using nanoarrow's method for this
+  stream_out <- nanoarrow::array_stream_set_finalizer(
+    stream,
+    self_contained_finalizer
+  )
 
-  invisible(stream)
+  # Once we're sure this will succeed, increment the parent child count
+  # Use whatever version is in the finalizer env (we might have moved parent)
+  self_contained_finalizer_env$parent$.child_count <-
+    self_contained_finalizer_env$parent$.child_count + 1L
+  stream_out
 }
diff --git a/r/adbcdrivermanager/R/utils.R b/r/adbcdrivermanager/R/utils.R
index 1d7a01be..c747a612 100644
--- a/r/adbcdrivermanager/R/utils.R
+++ b/r/adbcdrivermanager/R/utils.R
@@ -16,13 +16,23 @@
 # under the License.
 
 new_env <- function() {
-  new.env(parent = emptyenv())
+  env <- new.env(parent = emptyenv())
+  # A previous version of this just did env$.child_count <- 0L,
+  # which, perhaps because of compilation, results in env$.child_count
+  # referring to the exact same SEXP for every ADBC object! Use vector()
+  # to ensure a fresh allocation.
+  env$.child_count <- vector("integer", length = 1L)
+  env
 }
 
 xptr_env <- function(xptr) {
   .Call(RAdbcXptrEnv, xptr)
 }
 
+xptr_set_protected <- function(xptr, prot) {
+  .Call(RAdbcXptrSetProtected, xptr, prot)
+}
+
 #' @export
 length.adbc_xptr <- function(x) {
   length(xptr_env(x))
@@ -80,6 +90,20 @@ str.adbc_xptr <- function(object, ...) {
   invisible(object)
 }
 
+stop_for_nonzero_child_count <- function(obj) {
+  child_count <- obj$.child_count
+  if (!identical(child_count, 0L)) {
+    msg <- sprintf(
+      "<%s> has %d unreleased child object%s",
+      paste(class(obj), collapse = "/"),
+      child_count,
+      if (child_count != 1) "s" else ""
+    )
+    cnd <- simpleError(msg, call = sys.call(-1))
+    class(cnd) <- union("adbc_error_child_count_not_zero", class(cnd))
+    stop(cnd)
+  }
+}
 
 #' Low-level pointer details
 #'
@@ -93,6 +117,8 @@ str.adbc_xptr <- function(object, ...) {
 #'
 #' @param x An 'adbc_database', 'adbc_connection', 'adbc_statement', or
 #'   'nanoarrow_array_stream'
+#' @param check_child_count Ensures that `x` has a zero child count before
+#'   performing the move. This should almost always be `TRUE`.
 #'
 #' @return
 #' - `adbc_xptr_move()`: A freshly-allocated R object identical to `x`
@@ -107,7 +133,11 @@ str.adbc_xptr <- function(object, ...) {
 #' adbc_xptr_is_valid(db)
 #' adbc_xptr_is_valid(db_new)
 #'
-adbc_xptr_move <- function(x) {
+adbc_xptr_move <- function(x, check_child_count = TRUE) {
+  if (check_child_count && (".child_count" %in% names(x))) {
+    stop_for_nonzero_child_count(x)
+  }
+
   if (inherits(x, "adbc_database")) {
     .Call(RAdbcMoveDatabase, x)
   } else if (inherits(x, "adbc_connection")) {
diff --git a/r/adbcdrivermanager/man/adbc_connection_join.Rd b/r/adbcdrivermanager/man/adbc_connection_join.Rd
index 823a33b7..418f9b1d 100644
--- a/r/adbcdrivermanager/man/adbc_connection_join.Rd
+++ b/r/adbcdrivermanager/man/adbc_connection_join.Rd
@@ -3,14 +3,11 @@
 \name{adbc_connection_join}
 \alias{adbc_connection_join}
 \alias{adbc_statement_join}
-\alias{adbc_stream_join}
 \title{Join the lifecycle of a unique parent to its child}
 \usage{
 adbc_connection_join(connection, database)
 
 adbc_statement_join(statement, connection)
-
-adbc_stream_join(stream, x)
 }
 \arguments{
 \item{connection}{A connection created with \code{\link[=adbc_connection_init]{adbc_connection_init()}}}
@@ -18,11 +15,6 @@ adbc_stream_join(stream, x)
 \item{database}{A database created with \code{\link[=adbc_database_init]{adbc_database_init()}}}
 
 \item{statement}{A statement created with \code{\link[=adbc_statement_init]{adbc_statement_init()}}}
-
-\item{stream}{A \link[nanoarrow:as_nanoarrow_array_stream]{nanoarrow_array_stream}}
-
-\item{x}{An ADBC database, ADBC connection, ADBC statement, or
-nanoarrow_array_stream returned from calls to an ADBC function.}
 }
 \value{
 The input, invisibly.
diff --git a/r/adbcdrivermanager/man/adbc_statement_set_sql_query.Rd b/r/adbcdrivermanager/man/adbc_statement_set_sql_query.Rd
index f83f61c7..71a85751 100644
--- a/r/adbcdrivermanager/man/adbc_statement_set_sql_query.Rd
+++ b/r/adbcdrivermanager/man/adbc_statement_set_sql_query.Rd
@@ -24,7 +24,11 @@ adbc_statement_bind(statement, values, schema = NULL)
 
 adbc_statement_bind_stream(statement, stream, schema = NULL)
 
-adbc_statement_execute_query(statement, stream = NULL)
+adbc_statement_execute_query(
+  statement,
+  stream = NULL,
+  stream_join_parent = FALSE
+)
 
 adbc_statement_execute_schema(statement)
 
@@ -45,6 +49,9 @@ that can be coerced to one.}
 
 \item{stream}{A \link[nanoarrow:as_nanoarrow_array_stream]{nanoarrow_array_stream}
 or object that can be coerced to one.}
+
+\item{stream_join_parent}{Use \code{TRUE} to invalidate \code{statement} and tie its
+lifecycle to \code{stream}.}
 }
 \value{
 \itemize{
diff --git a/r/adbcdrivermanager/man/adbc_xptr_move.Rd b/r/adbcdrivermanager/man/adbc_xptr_move.Rd
index 2dc2ceab..bc7a92e7 100644
--- a/r/adbcdrivermanager/man/adbc_xptr_move.Rd
+++ b/r/adbcdrivermanager/man/adbc_xptr_move.Rd
@@ -5,13 +5,16 @@
 \alias{adbc_xptr_is_valid}
 \title{Low-level pointer details}
 \usage{
-adbc_xptr_move(x)
+adbc_xptr_move(x, check_child_count = TRUE)
 
 adbc_xptr_is_valid(x)
 }
 \arguments{
 \item{x}{An 'adbc_database', 'adbc_connection', 'adbc_statement', or
 'nanoarrow_array_stream'}
+
+\item{check_child_count}{Ensures that \code{x} has a zero child count before
+performing the move. This should almost always be \code{TRUE}.}
 }
 \value{
 \itemize{
diff --git a/r/adbcdrivermanager/man/with_adbc.Rd b/r/adbcdrivermanager/man/with_adbc.Rd
index 5e16a9ca..ba05dbef 100644
--- a/r/adbcdrivermanager/man/with_adbc.Rd
+++ b/r/adbcdrivermanager/man/with_adbc.Rd
@@ -33,12 +33,13 @@ it is good practice to explicitly clean up these objects. These helpers
 are designed to make explicit and predictable cleanup easy to accomplish.
 }
 \details{
-Note that you can use \code{\link[=adbc_connection_join]{adbc_connection_join()}},
-\code{\link[=adbc_statement_join]{adbc_statement_join()}}, and \code{\link[=adbc_stream_join]{adbc_stream_join()}}
+Note that you can use \code{\link[=adbc_connection_join]{adbc_connection_join()}} and \code{\link[=adbc_statement_join]{adbc_statement_join()}}
 to tie the lifecycle of the parent object to that of the child object.
 These functions mark any previous references to the parent object as
 released so you can still use local and with helpers to manage the parent
-object before it is joined.
+object before it is joined. Use \code{stream_join_parent = TRUE} in
+\code{\link[=adbc_statement_execute_query]{adbc_statement_execute_query()}} to tie the lifecycle of a statement to
+the output stream.
 }
 \examples{
 # Using with_adbc():
diff --git a/r/adbcdrivermanager/src/init.c b/r/adbcdrivermanager/src/init.c
index 77c097b8..ad7ff6dc 100644
--- a/r/adbcdrivermanager/src/init.c
+++ b/r/adbcdrivermanager/src/init.c
@@ -99,6 +99,7 @@ SEXP RAdbcStatementExecutePartitions(SEXP statement_xptr, SEXP out_schema_xptr,
                                      SEXP partitions_xptr, SEXP error_xptr);
 SEXP RAdbcStatementCancel(SEXP statement_xptr, SEXP error_xptr);
 SEXP RAdbcXptrEnv(SEXP xptr);
+SEXP RAdbcXptrSetProtected(SEXP xptr, SEXP prot);
 
 static const R_CallMethodDef CallEntries[] = {
     {"RAdbcVoidDriverInitFunc", (DL_FUNC)&RAdbcVoidDriverInitFunc, 0},
@@ -160,6 +161,7 @@ static const R_CallMethodDef CallEntries[] = {
     {"RAdbcStatementExecutePartitions", (DL_FUNC)&RAdbcStatementExecutePartitions, 4},
     {"RAdbcStatementCancel", (DL_FUNC)&RAdbcStatementCancel, 2},
     {"RAdbcXptrEnv", (DL_FUNC)&RAdbcXptrEnv, 1},
+    {"RAdbcXptrSetProtected", (DL_FUNC)&RAdbcXptrSetProtected, 2},
     {NULL, NULL, 0}};
 /* end generated by tools/make-callentries.R */
 
diff --git a/r/adbcdrivermanager/src/radbc.cc b/r/adbcdrivermanager/src/radbc.cc
index fe09de13..da27e4d0 100644
--- a/r/adbcdrivermanager/src/radbc.cc
+++ b/r/adbcdrivermanager/src/radbc.cc
@@ -41,6 +41,24 @@ static void adbc_error_warn(int code, AdbcError* error, const char* context) {
   }
 }
 
+static int adbc_update_parent_child_count(SEXP xptr, int delta) {
+  SEXP parent_xptr = R_ExternalPtrProtected(xptr);
+  if (parent_xptr == R_NilValue) {
+    return NA_INTEGER;
+  }
+
+  SEXP parent_env = R_ExternalPtrTag(parent_xptr);
+  if (parent_env == R_NilValue) {
+    return NA_INTEGER;
+  }
+
+  SEXP child_count_sexp = Rf_findVarInFrame(parent_env, Rf_install(".child_count"));
+  int* child_count = INTEGER(child_count_sexp);
+  int old_value = child_count[0];
+  child_count[0] = child_count[0] + delta;
+  return old_value;
+}
+
 static void finalize_driver_xptr(SEXP driver_xptr) {
   auto driver = reinterpret_cast<AdbcDriver*>(R_ExternalPtrAddr(driver_xptr));
   if (driver == nullptr) {
@@ -186,6 +204,9 @@ static void finalize_connection_xptr(SEXP connection_xptr) {
     AdbcError error = ADBC_ERROR_INIT;
     int status = AdbcConnectionRelease(connection, &error);
     adbc_error_warn(status, &error, "finalize_connection_xptr()");
+    if (status == ADBC_STATUS_OK) {
+      adbc_update_parent_child_count(connection_xptr, -1);
+    }
   }
 
   adbc_xptr_default_finalize<AdbcConnection>(connection_xptr);
@@ -236,6 +257,7 @@ extern "C" SEXP RAdbcConnectionInit(SEXP connection_xptr, SEXP database_xptr,
     // Keep the database pointer alive for as long as the connection pointer
     // is alive
     R_SetExternalPtrProtected(connection_xptr, database_xptr);
+    adbc_update_parent_child_count(connection_xptr, 1);
   }
 
   return adbc_wrap_status(result);
@@ -245,6 +267,10 @@ extern "C" SEXP RAdbcConnectionRelease(SEXP connection_xptr, SEXP error_xptr) {
   auto connection = adbc_from_xptr<AdbcConnection>(connection_xptr);
   auto error = adbc_from_xptr<AdbcError>(error_xptr);
   int status = AdbcConnectionRelease(connection, error);
+  if (status == ADBC_STATUS_OK) {
+    adbc_update_parent_child_count(connection_xptr, -1);
+  }
+
   return adbc_wrap_status(status);
 }
 
@@ -384,6 +410,9 @@ static void finalize_statement_xptr(SEXP statement_xptr) {
     AdbcError error = ADBC_ERROR_INIT;
     int status = AdbcStatementRelease(statement, &error);
     adbc_error_warn(status, &error, "finalize_statement_xptr()");
+    if (status == ADBC_STATUS_OK) {
+      adbc_update_parent_child_count(statement_xptr, -1);
+    }
   }
 
   adbc_xptr_default_finalize<AdbcStatement>(statement_xptr);
@@ -401,6 +430,7 @@ extern "C" SEXP RAdbcStatementNew(SEXP connection_xptr) {
   adbc_error_stop(status, &error);
 
   R_SetExternalPtrProtected(statement_xptr, connection_xptr);
+  adbc_update_parent_child_count(statement_xptr, 1);
 
   UNPROTECT(1);
   return statement_xptr;
@@ -430,6 +460,10 @@ extern "C" SEXP RAdbcStatementRelease(SEXP statement_xptr, SEXP error_xptr) {
   auto statement = adbc_from_xptr<AdbcStatement>(statement_xptr);
   auto error = adbc_from_xptr<AdbcError>(error_xptr);
   int status = AdbcStatementRelease(statement, error);
+  if (status == ADBC_STATUS_OK) {
+    adbc_update_parent_child_count(statement_xptr, -1);
+  }
+
   return adbc_wrap_status(status);
 }
 
diff --git a/r/adbcdrivermanager/src/radbc.h b/r/adbcdrivermanager/src/radbc.h
index 27772802..4f1ec283 100644
--- a/r/adbcdrivermanager/src/radbc.h
+++ b/r/adbcdrivermanager/src/radbc.h
@@ -22,6 +22,8 @@
 
 #include <utility>
 
+#include <adbc.h>
+
 template <typename T>
 static inline const char* adbc_xptr_class();
 
diff --git a/r/adbcdrivermanager/src/utils.c b/r/adbcdrivermanager/src/utils.c
index 71258fc0..74e5c90c 100644
--- a/r/adbcdrivermanager/src/utils.c
+++ b/r/adbcdrivermanager/src/utils.c
@@ -26,3 +26,14 @@ SEXP RAdbcXptrEnv(SEXP xptr) {
 
   return R_ExternalPtrTag(xptr);
 }
+
+SEXP RAdbcXptrSetProtected(SEXP xptr, SEXP prot) {
+  if (TYPEOF(xptr) != EXTPTRSXP) {
+    Rf_error("object is not an external pointer");
+  }
+
+  SEXP old_prot = PROTECT(R_ExternalPtrProtected(xptr));
+  R_SetExternalPtrProtected(xptr, prot);
+  UNPROTECT(1);
+  return old_prot;
+}
diff --git a/r/adbcdrivermanager/tests/testthat/test-driver_monkey.R b/r/adbcdrivermanager/tests/testthat/test-driver_monkey.R
index d3ed532a..5275b408 100644
--- a/r/adbcdrivermanager/tests/testthat/test-driver_monkey.R
+++ b/r/adbcdrivermanager/tests/testthat/test-driver_monkey.R
@@ -27,6 +27,11 @@ test_that("the monkey driver sees, and the monkey driver does", {
   stream <- nanoarrow::nanoarrow_allocate_array_stream()
   expect_identical(adbc_statement_execute_query(stmt, stream), -1)
   expect_identical(as.data.frame(stream$get_next()), input)
+  expect_error(
+    adbc_statement_release(stmt),
+    class = "adbc_error_child_count_not_zero"
+  )
+  stream$release()
   adbc_statement_release(stmt)
 
   stmt <- adbc_statement_init(con, input)
diff --git a/r/adbcdrivermanager/tests/testthat/test-helpers.R b/r/adbcdrivermanager/tests/testthat/test-helpers.R
index 3e65053e..44448359 100644
--- a/r/adbcdrivermanager/tests/testthat/test-helpers.R
+++ b/r/adbcdrivermanager/tests/testthat/test-helpers.R
@@ -129,8 +129,6 @@ test_that("joiners work for databases, connections, and statements", {
 })
 
 test_that("joiners work with streams", {
-  skip_if_not(packageVersion("nanoarrow") >= "0.1.0.9000")
-
   stream <- local({
     db <- local_adbc(adbc_database_init(adbc_driver_monkey()))
 
@@ -143,8 +141,7 @@ test_that("joiners work with streams", {
     expect_false(adbc_xptr_is_valid(con))
 
     stream <- local_adbc(nanoarrow::nanoarrow_allocate_array_stream())
-    adbc_statement_execute_query(stmt, stream)
-    adbc_stream_join(stream, stmt)
+    adbc_statement_execute_query(stmt, stream, stream_join_parent = TRUE)
     expect_false(adbc_xptr_is_valid(stmt))
 
     adbc_xptr_move(stream)
diff --git a/r/adbcdrivermanager/tests/testthat/test-utils.R b/r/adbcdrivermanager/tests/testthat/test-utils.R
index 0a9c7a49..e99209c9 100644
--- a/r/adbcdrivermanager/tests/testthat/test-utils.R
+++ b/r/adbcdrivermanager/tests/testthat/test-utils.R
@@ -17,8 +17,8 @@
 
 test_that("external pointer embedded environment works", {
   db <- adbc_database_init(adbc_driver_void())
-  expect_identical(names(db), "driver")
-  expect_identical(length(db), 1L)
+  expect_setequal(names(db), c("driver", ".child_count"))
+  expect_identical(db$.child_count, 0L)
 
   db$key <- "value"
   expect_identical(db$key, "value")