You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by np...@apache.org on 2022/04/23 13:07:11 UTC

[arrow] branch master updated: ARROW-15168: [R] Add S3 generics to create main Arrow objects

This is an automated email from the ASF dual-hosted git repository.

npr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new c13ef50ab2 ARROW-15168: [R] Add S3 generics to create main Arrow objects
c13ef50ab2 is described below

commit c13ef50ab239247d07a8530c74d1840c1013083f
Author: Dewey Dunnington <de...@fishandwhistle.net>
AuthorDate: Sat Apr 23 09:06:59 2022 -0400

    ARROW-15168: [R] Add S3 generics to create main Arrow objects
    
    Closes #12817 from paleolimbot/r-s3-generics
    
    Lead-authored-by: Dewey Dunnington <de...@fishandwhistle.net>
    Co-authored-by: Neal Richardson <ne...@gmail.com>
    Signed-off-by: Neal Richardson <ne...@gmail.com>
---
 r/DESCRIPTION                                      |   2 +-
 r/NAMESPACE                                        |  53 +++++++-
 r/R/array.R                                        | 133 +++++++++++++++++-
 r/R/arrowExports.R                                 |   8 ++
 r/R/chunked-array.R                                |  46 ++++++-
 r/R/dplyr-funcs-datetime.R                         |   8 +-
 r/R/extension.R                                    |  16 ++-
 r/R/feather.R                                      |   6 +-
 r/R/{ipc_stream.R => ipc-stream.R}                 |   7 +-
 r/R/metadata.R                                     |  12 --
 r/R/parquet.R                                      |   7 +-
 r/R/python.R                                       |  93 +++++++++++--
 r/R/record-batch-reader.R                          |  61 ++++++++-
 r/R/record-batch-writer.R                          |   2 +-
 r/R/record-batch.R                                 |  59 +++++++-
 r/R/schema.R                                       |  31 ++++-
 r/R/table.R                                        |  66 ++++++++-
 r/R/type.R                                         |  97 ++++++++++++--
 r/R/util.R                                         |  22 ++-
 r/_pkgdown.yml                                     |   9 +-
 r/extra-tests/test-read-files.R                    |   5 +-
 r/extra-tests/write-files.R                        |   1 +
 r/man/ChunkedArray.Rd                              |   2 -
 r/man/RecordBatch.Rd                               |   2 -
 r/man/RecordBatchReader.Rd                         |   2 -
 r/man/Schema.Rd                                    |   2 -
 r/man/Table.Rd                                     |   2 -
 r/man/array.Rd                                     |   2 -
 r/man/as_arrow_array.Rd                            |  39 ++++++
 r/man/as_arrow_table.Rd                            |  43 ++++++
 r/man/as_chunked_array.Rd                          |  34 +++++
 r/man/as_data_type.Rd                              |  33 +++++
 r/man/as_record_batch.Rd                           |  40 ++++++
 r/man/as_record_batch_reader.Rd                    |  46 +++++++
 r/man/as_schema.Rd                                 |  29 ++++
 r/man/concat_arrays.Rd                             |   2 -
 r/man/infer_type.Rd                                |  35 +++++
 r/man/new_extension_type.Rd                        |   2 -
 r/man/read_ipc_stream.Rd                           |   2 +-
 r/man/type.Rd                                      |  27 ----
 r/man/unify_schemas.Rd                             |   2 -
 r/man/vctrs_extension_array.Rd                     |   4 +-
 r/man/write_ipc_stream.Rd                          |   2 +-
 r/man/write_to_raw.Rd                              |   2 +-
 r/src/arrowExports.cpp                             |  19 +++
 r/src/arrow_types.h                                |  62 ++++++++-
 r/src/r_to_arrow.cpp                               | 119 ++++++++++++----
 r/src/recordbatchreader.cpp                        |  20 +++
 r/src/table.cpp                                    |  35 +++--
 r/src/type_infer.cpp                               |  50 ++++---
 r/tests/testthat/_snaps/Array.md                   |  20 +++
 r/tests/testthat/_snaps/feather.md                 |   6 +
 r/tests/testthat/_snaps/ipc-stream.md              |   6 +
 r/tests/testthat/_snaps/parquet.md                 |   6 +
 r/tests/testthat/_snaps/type.md                    |   8 ++
 r/tests/testthat/_snaps/util.md                    |  10 ++
 .../golden-files/data-arrow-sf_7.0.0.feather       | Bin 0 -> 1946 bytes
 r/tests/testthat/helper-data.R                     |   6 +-
 r/tests/testthat/test-Array.R                      | 149 ++++++++++++++++++++-
 r/tests/testthat/test-RecordBatch.R                |  45 +++++++
 r/tests/testthat/test-Table.R                      |  42 ++++++
 r/tests/testthat/test-backwards-compatibility.R    |  26 ++++
 r/tests/testthat/test-chunked-array.R              |  41 ++++++
 r/tests/testthat/test-data-type.R                  |  15 +++
 r/tests/testthat/test-feather.R                    |   5 +-
 .../{test-ipc_stream.R => test-ipc-stream.R}       |  15 ++-
 r/tests/testthat/test-metadata.R                   |  41 ++----
 r/tests/testthat/test-parquet.R                    |   5 +-
 r/tests/testthat/test-python.R                     |  48 ++++++-
 r/tests/testthat/test-record-batch-reader.R        |  46 +++++++
 r/tests/testthat/test-schema.R                     |  10 ++
 r/tests/testthat/test-type.R                       |  90 +++++++++----
 r/tests/testthat/test-util.R                       |  41 ++++++
 73 files changed, 1817 insertions(+), 267 deletions(-)

diff --git a/r/DESCRIPTION b/r/DESCRIPTION
index 46a3eefb68..b42873355c 100644
--- a/r/DESCRIPTION
+++ b/r/DESCRIPTION
@@ -116,7 +116,7 @@ Collate:
     'filesystem.R'
     'flight.R'
     'install-arrow.R'
-    'ipc_stream.R'
+    'ipc-stream.R'
     'json.R'
     'memory-pool.R'
     'message.R'
diff --git a/r/NAMESPACE b/r/NAMESPACE
index d6e67c85a8..30e4625ca6 100644
--- a/r/NAMESPACE
+++ b/r/NAMESPACE
@@ -38,6 +38,45 @@ S3method(as.list,ArrowTabular)
 S3method(as.list,Schema)
 S3method(as.raw,Buffer)
 S3method(as.vector,ArrowDatum)
+S3method(as_arrow_array,Array)
+S3method(as_arrow_array,ChunkedArray)
+S3method(as_arrow_array,Scalar)
+S3method(as_arrow_array,data.frame)
+S3method(as_arrow_array,default)
+S3method(as_arrow_array,pyarrow.lib.Array)
+S3method(as_arrow_table,RecordBatch)
+S3method(as_arrow_table,Table)
+S3method(as_arrow_table,data.frame)
+S3method(as_arrow_table,default)
+S3method(as_arrow_table,pyarrow.lib.RecordBatch)
+S3method(as_arrow_table,pyarrow.lib.Table)
+S3method(as_chunked_array,Array)
+S3method(as_chunked_array,ChunkedArray)
+S3method(as_chunked_array,default)
+S3method(as_chunked_array,pyarrow.lib.ChunkedArray)
+S3method(as_data_type,DataType)
+S3method(as_data_type,Field)
+S3method(as_data_type,Schema)
+S3method(as_data_type,pyarrow.lib.DataType)
+S3method(as_data_type,pyarrow.lib.Field)
+S3method(as_record_batch,RecordBatch)
+S3method(as_record_batch,Table)
+S3method(as_record_batch,data.frame)
+S3method(as_record_batch,pyarrow.lib.RecordBatch)
+S3method(as_record_batch,pyarrow.lib.Table)
+S3method(as_record_batch_reader,Dataset)
+S3method(as_record_batch_reader,RecordBatch)
+S3method(as_record_batch_reader,RecordBatchReader)
+S3method(as_record_batch_reader,Scanner)
+S3method(as_record_batch_reader,Table)
+S3method(as_record_batch_reader,arrow_dplyr_query)
+S3method(as_record_batch_reader,data.frame)
+S3method(as_record_batch_reader,pyarrow.lib.RecordBatch)
+S3method(as_record_batch_reader,pyarrow.lib.RecordBatchReader)
+S3method(as_record_batch_reader,pyarrow.lib.Table)
+S3method(as_schema,Schema)
+S3method(as_schema,StructType)
+S3method(as_schema,pyarrow.lib.Schema)
 S3method(c,Array)
 S3method(c,ChunkedArray)
 S3method(c,Dataset)
@@ -55,6 +94,9 @@ S3method(head,Dataset)
 S3method(head,RecordBatchReader)
 S3method(head,Scanner)
 S3method(head,arrow_dplyr_query)
+S3method(infer_type,ArrowDatum)
+S3method(infer_type,Expression)
+S3method(infer_type,default)
 S3method(is.finite,ArrowDatum)
 S3method(is.infinite,ArrowDatum)
 S3method(is.na,ArrowDatum)
@@ -104,9 +146,6 @@ S3method(tail,Dataset)
 S3method(tail,RecordBatchReader)
 S3method(tail,Scanner)
 S3method(tail,arrow_dplyr_query)
-S3method(type,ArrowDatum)
-S3method(type,Expression)
-S3method(type,default)
 S3method(unique,ArrowDatum)
 S3method(vec_ptype_abbr,arrow_fixed_size_binary)
 S3method(vec_ptype_abbr,arrow_fixed_size_list)
@@ -214,6 +253,13 @@ export(arrow_with_engine)
 export(arrow_with_json)
 export(arrow_with_parquet)
 export(arrow_with_s3)
+export(as_arrow_array)
+export(as_arrow_table)
+export(as_chunked_array)
+export(as_data_type)
+export(as_record_batch)
+export(as_record_batch_reader)
+export(as_schema)
 export(binary)
 export(bool)
 export(boolean)
@@ -253,6 +299,7 @@ export(float32)
 export(float64)
 export(halffloat)
 export(hive_partition)
+export(infer_type)
 export(install_arrow)
 export(install_pyarrow)
 export(int16)
diff --git a/r/R/array.R b/r/R/array.R
index 4e7fbdab7c..d3cca9594f 100644
--- a/r/R/array.R
+++ b/r/R/array.R
@@ -85,7 +85,7 @@
 #'
 #' @rdname array
 #' @name array
-#' @examplesIf arrow_available()
+#' @examples
 #' my_array <- Array$create(1:10)
 #' my_array$type
 #' my_array$cast(int8())
@@ -217,6 +217,135 @@ Array$create <- function(x, type = NULL) {
 Array$import_from_c <- ImportArray
 
 
+#' Convert an object to an Arrow Array
+#'
+#' The `as_arrow_array()` function is identical to `Array$create()` except
+#' that it is an S3 generic, which allows methods to be defined in other
+#' packages to convert objects to [Array]. `Array$create()` is slightly faster
+#' because it tries to convert in C++ before falling back on
+#' `as_arrow_array()`.
+#'
+#' @param x An object to convert to an Arrow Array
+#' @param ... Passed to S3 methods
+#' @param type A [type][data-type] for the final Array. A value of `NULL`
+#'   will default to the type guessed by [infer_type()].
+#'
+#' @return An [Array] with type `type`.
+#' @export
+#'
+#' @examples
+#' as_arrow_array(1:5)
+#'
+as_arrow_array <- function(x, ..., type = NULL) {
+  UseMethod("as_arrow_array")
+}
+
+#' @export
+as_arrow_array.default <- function(x, ..., type = NULL, from_vec_to_array = FALSE) {
+  # If from_vec_to_array is TRUE, this is a call from C++ after
+  # trying the internal C++ conversion and S3 dispatch has failed
+  # failed to find a method for the object. This call happens when creating
+  # Array, ChunkedArray, RecordBatch, and Table objects from data.frame
+  # if the internal C++ conversion (faster and can usually be parallelized)
+  # is not implemented. If the C++ call has reached this default method,
+  # we error. If from_vec_to_array is FALSE, we call vec_to_Array to use the
+  # internal C++ conversion.
+  if (from_vec_to_array) {
+    # Last ditch attempt: if vctrs::vec_is(x), we can use the vctrs
+    # extension type.
+    if (vctrs::vec_is(x) && is.null(type)) {
+      vctrs_extension_array(x)
+    } else if (vctrs::vec_is(x) && inherits(type, "VctrsExtensionType")) {
+      vctrs_extension_array(
+        x,
+        ptype = type$ptype(),
+        storage_type = type$storage_type()
+      )
+    } else {
+      stop_cant_convert_array(x, type)
+    }
+  } else {
+    vec_to_Array(x, type)
+  }
+}
+
+#' @rdname as_arrow_array
+#' @export
+as_arrow_array.Array <- function(x, ..., type = NULL) {
+  if (is.null(type)) {
+    x
+  } else {
+    x$cast(type)
+  }
+}
+
+#' @rdname as_arrow_array
+#' @export
+as_arrow_array.Scalar <- function(x, ..., type = NULL) {
+  as_arrow_array(x$as_array(), ..., type = type)
+}
+
+#' @rdname as_arrow_array
+#' @export
+as_arrow_array.ChunkedArray <- function(x, ..., type = NULL) {
+  concat_arrays(!!! x$chunks, type = type)
+}
+
+# data.frame conversion can happen in C++ when all the columns can be
+# converted in C++ and when `type` is not an ExtensionType; however,
+# when calling as_arrow_array(), this method will get called regardless
+# of whether or not this can or can't happen.
+#' @export
+as_arrow_array.data.frame <- function(x, ..., type = NULL) {
+  type <- type %||% infer_type(x)
+
+  if (inherits(type, "VctrsExtensionType")) {
+    storage <- as_arrow_array(x, type = type$storage_type())
+    new_extension_array(storage, type)
+  } else if (inherits(type, "StructType")) {
+    fields <- type$fields()
+    names <- map_chr(fields, "name")
+    types <- map(fields, "type")
+    arrays <- Map(as_arrow_array, x, types)
+    names(arrays) <- names
+
+    # TODO(ARROW-16266): a hack because there is no StructArray$create() yet
+    batch <- record_batch(!!! arrays)
+    array_ptr <- allocate_arrow_array()
+    schema_ptr <- allocate_arrow_schema()
+    on.exit({
+      delete_arrow_array(array_ptr)
+      delete_arrow_schema(schema_ptr)
+    })
+
+    batch$export_to_c(array_ptr, schema_ptr)
+    Array$import_from_c(array_ptr, schema_ptr)
+  } else {
+    stop_cant_convert_array(x, type)
+  }
+}
+
+stop_cant_convert_array <- function(x, type) {
+  if (is.null(type)) {
+    abort(
+      sprintf(
+        "Can't create Array from object of type %s",
+        paste(class(x), collapse = " / ")
+      ),
+      call = rlang::caller_env()
+    )
+  } else {
+    abort(
+      sprintf(
+        "Can't create Array<%s> from object of type %s",
+        format(type$code()),
+        paste(class(x), collapse = " / ")
+      ),
+      call = rlang::caller_env()
+    )
+  }
+}
+
 #' Concatenate zero or more Arrays
 #'
 #' Concatenates zero or more [Array] objects into a single
@@ -231,7 +360,7 @@ Array$import_from_c <- ImportArray
 #' @return A single [Array]
 #' @export
 #'
-#' @examplesIf arrow_available()
+#' @examples
 #' concat_arrays(Array$create(1:3), Array$create(4:5))
 concat_arrays <- function(..., type = NULL) {
   dots <- lapply(list2(...), Array$create, type = type)
diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index 23309f70d1..214c062973 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -1724,6 +1724,14 @@ RecordBatchReader__batches <- function(reader) {
   .Call(`_arrow_RecordBatchReader__batches`, reader)
 }
 
+RecordBatchReader__from_batches <- function(batches, schema_sxp) {
+  .Call(`_arrow_RecordBatchReader__from_batches`, batches, schema_sxp)
+}
+
+RecordBatchReader__from_Table <- function(table) {
+  .Call(`_arrow_RecordBatchReader__from_Table`, table)
+}
+
 Table__from_RecordBatchReader <- function(reader) {
   .Call(`_arrow_Table__from_RecordBatchReader`, reader)
 }
diff --git a/r/R/chunked-array.R b/r/R/chunked-array.R
index c91b125af4..24ca7e6e58 100644
--- a/r/R/chunked-array.R
+++ b/r/R/chunked-array.R
@@ -58,7 +58,7 @@
 #' @rdname ChunkedArray
 #' @name ChunkedArray
 #' @seealso [Array]
-#' @examplesIf arrow_available()
+#' @examples
 #' # Pass items into chunked_array as separate objects to create chunks
 #' class_scores <- chunked_array(c(87, 88, 89), c(94, 93, 92), c(71, 72, 73))
 #' class_scores$num_chunks
@@ -170,3 +170,47 @@ c.ChunkedArray <- function(...) {
 #' @rdname ChunkedArray
 #' @export
 chunked_array <- ChunkedArray$create
+
+#' Convert an object to an Arrow ChunkedArray
+#'
+#' Whereas [chunked_array()] constructs a [ChunkedArray] from zero or more
+#' [Array]s or R vectors, `as_chunked_array()` converts a single object to a
+#' [ChunkedArray].
+#'
+#' @param x An object to convert to an Arrow Chunked Array
+#' @inheritParams as_arrow_array
+#'
+#' @return A [ChunkedArray].
+#' @export
+#'
+#' @examples
+#' as_chunked_array(1:5)
+#'
+as_chunked_array <- function(x, ..., type = NULL) {
+  UseMethod("as_chunked_array")
+}
+
+#' @rdname as_chunked_array
+#' @export
+as_chunked_array.ChunkedArray <- function(x, ..., type = NULL) {
+  if (is.null(type)) {
+    x
+  } else {
+    x$cast(type)
+  }
+}
+
+#' @rdname as_chunked_array
+#' @export
+as_chunked_array.Array <- function(x, ..., type = NULL) {
+  if (is.null(type)) {
+    chunked_array(x)
+  } else {
+    chunked_array(x$cast(type))
+  }
+}
+
+#' @export
+as_chunked_array.default <- function(x, ..., type = NULL) {
+  ChunkedArray$create(x)
+}
diff --git a/r/R/dplyr-funcs-datetime.R b/r/R/dplyr-funcs-datetime.R
index a6bc79ec7c..4f2517e8ef 100644
--- a/r/R/dplyr-funcs-datetime.R
+++ b/r/R/dplyr-funcs-datetime.R
@@ -179,7 +179,13 @@ register_bindings_datetime <- function() {
   })
   register_binding("tz", function(x) {
     if (!call_binding("is.POSIXct", x)) {
-      abort(paste0("timezone extraction for objects of class `", type(x)$ToString(), "` not supported in Arrow"))
+      abort(
+        paste0(
+          "timezone extraction for objects of class `",
+          infer_type(x)$ToString(),
+          "` not supported in Arrow"
+        )
+      )
     }
 
     x$type()$timezone()
diff --git a/r/R/extension.R b/r/R/extension.R
index 111a0e8620..0f7d7f1b21 100644
--- a/r/R/extension.R
+++ b/r/R/extension.R
@@ -180,11 +180,13 @@ ExtensionType <- R6Class("ExtensionType",
       } else if (inherits(extension_array, "ExtensionArray")) {
         extension_array$storage()$as_vector()
       } else {
-        classes <- paste(class(extension_array), collapse = " / ")
         abort(
           c(
             "`extension_array` must be a ChunkedArray or ExtensionArray",
-            i = glue::glue("Got object of type {classes}")
+            i = sprintf(
+              "Got object of type %s",
+              paste(class(extension_array), collapse = " / ")
+            )
           )
         )
       }
@@ -309,7 +311,7 @@ ExtensionType$create <- function(storage_type,
 #'      and `reregister_extension_type()` return `NULL`, invisibly.
 #' @export
 #'
-#' @examplesIf arrow_available()
+#' @examples
 #' # Create the R6 type whose methods control how Array objects are
 #' # converted to R objects, how equality between types is computed,
 #' # and how types are printed.
@@ -509,7 +511,7 @@ VctrsExtensionType <- R6Class("VctrsExtensionType",
 #'     extension name "arrow.r.vctrs".
 #' @export
 #'
-#' @examplesIf arrow_available()
+#' @examples
 #' (array <- vctrs_extension_array(as.POSIXlt("2022-01-02 03:45", tz = "UTC")))
 #' array$type
 #' as.vector(array)
@@ -532,9 +534,9 @@ vctrs_extension_array <- function(x, ptype = vctrs::vec_ptype(x),
 
 #' @rdname vctrs_extension_array
 #' @export
-vctrs_extension_type <- function(ptype,
-                                 storage_type = type(vctrs::vec_data(ptype))) {
-  ptype <- vctrs::vec_ptype(ptype)
+vctrs_extension_type <- function(x,
+                                 storage_type = infer_type(vctrs::vec_data(x))) {
+  ptype <- vctrs::vec_ptype(x)
 
   new_extension_type(
     storage_type = storage_type,
diff --git a/r/R/feather.R b/r/R/feather.R
index 6065c285e8..5fff42bd99 100644
--- a/r/R/feather.R
+++ b/r/R/feather.R
@@ -100,11 +100,7 @@ write_feather <- function(x,
   compression <- compression_from_name(compression)
 
   x_out <- x
-  if (is.data.frame(x) || inherits(x, "RecordBatch")) {
-    x <- Table$create(x)
-  }
-
-  assert_that(is_writable_table(x))
+  x <- as_writable_table(x)
 
   if (!inherits(sink, "OutputStream")) {
     sink <- make_output_stream(sink)
diff --git a/r/R/ipc_stream.R b/r/R/ipc-stream.R
similarity index 98%
rename from r/R/ipc_stream.R
rename to r/R/ipc-stream.R
index c0a8565e85..e2056360d5 100644
--- a/r/R/ipc_stream.R
+++ b/r/R/ipc-stream.R
@@ -42,10 +42,8 @@
 #' write_ipc_stream(mtcars, tf)
 write_ipc_stream <- function(x, sink, ...) {
   x_out <- x # So we can return the data we got
-  if (is.data.frame(x)) {
-    x <- Table$create(x)
-  }
-  assert_that(is_writable_table(x))
+  x <- as_writable_table(x)
+
   if (!inherits(sink, "OutputStream")) {
     sink <- make_output_stream(sink)
     on.exit(sink$close())
@@ -54,6 +52,7 @@ write_ipc_stream <- function(x, sink, ...) {
   writer <- RecordBatchStreamWriter$create(sink, x$schema)
   writer$write(x)
   writer$close()
+
   invisible(x_out)
 }
 
diff --git a/r/R/metadata.R b/r/R/metadata.R
index f0411eb54a..147318f559 100644
--- a/r/R/metadata.R
+++ b/r/R/metadata.R
@@ -187,18 +187,6 @@ arrow_attributes <- function(x, only_top_level = FALSE) {
     if (all(map_lgl(columns, is.null))) {
       columns <- NULL
     }
-  } else if (inherits(x, c("sfc", "sf"))) {
-    # Check if there are any columns that look like sf columns, warn that we will
-    # not be saving this data for now (but only if arrow.preserve_row_level_metadata
-    # is set to FALSE)
-    warning(
-      "One of the columns given appears to be an `sfc` SF column. Due to their unique ",
-      "nature, these columns do not convert to Arrow well. We are working on ",
-      "better ways to do this, but in the interim we recommend converting any `sfc` ",
-      "columns to WKB (well-known binary) columns before using them with Arrow ",
-      "(for example, with `sf::st_as_binary(col)`).",
-      call. = FALSE
-    )
   }
 
   if (length(att) || !is.null(columns)) {
diff --git a/r/R/parquet.R b/r/R/parquet.R
index 4d63791a4f..64857acfcb 100644
--- a/r/R/parquet.R
+++ b/r/R/parquet.R
@@ -156,12 +156,7 @@ write_parquet <- function(x,
                           properties = NULL,
                           arrow_properties = NULL) {
   x_out <- x
-
-  if (is.data.frame(x) || inherits(x, "RecordBatch")) {
-    x <- Table$create(x)
-  }
-
-  assert_that(is_writable_table(x))
+  x <- as_writable_table(x)
 
   if (!inherits(sink, "OutputStream")) {
     sink <- make_output_stream(sink)
diff --git a/r/R/python.R b/r/R/python.R
index 8c333adbb5..023d914f16 100644
--- a/r/R/python.R
+++ b/r/R/python.R
@@ -105,19 +105,27 @@ py_to_r.pyarrow.lib.ChunkedArray <- function(x, ...) {
 }
 
 r_to_py.Table <- function(x, convert = FALSE) {
-  # Import with convert = FALSE so that `_import_from_c` returns a Python object
-  pa <- reticulate::import("pyarrow", convert = FALSE)
-  out <- pa$Table$from_arrays(x$columns, schema = x$schema)
-  # But set the convert attribute on the return object to the requested value
+  # TODO(ARROW-16269): Going through RecordBatchReader maintains schema
+  # metadata (e.g., extension types) more faithfully than column-wise
+  # construction; however, may re-chunk columns unnecessarily.
+  py_rbr <- reticulate::r_to_py(as_record_batch_reader(x), convert = FALSE)
+  out <- py_rbr$read_all()
   assign("convert", convert, out)
   out
 }
 
 py_to_r.pyarrow.lib.Table <- function(x, ...) {
-  colnames <- maybe_py_to_r(x$column_names)
-  r_cols <- maybe_py_to_r(x$columns)
-  names(r_cols) <- colnames
-  Table$create(!!!r_cols, schema = maybe_py_to_r(x$schema))
+  # TODO(ARROW-16269): Going through RecordBatchReader maintains schema
+  # metadata (e.g., extension types) more faithfully than column-wise
+  # construction; however, may re-chunk columns unnecessarily.
+  pa <- reticulate::import("pyarrow", convert = FALSE)
+  py_rbr <- pa$lib$RecordBatchReader$from_batches(
+    x$schema,
+    x$to_batches()
+  )
+
+  r_rbr <- maybe_py_to_r(py_rbr)
+  r_rbr$read_table()
 }
 
 py_to_r.pyarrow.lib.Schema <- function(x, ...) {
@@ -235,6 +243,75 @@ maybe_py_to_r <- function(x) {
   x
 }
 
+
+#' @export
+as_arrow_array.pyarrow.lib.Array <- function(x, ..., type = NULL) {
+  as_arrow_array(py_to_r.pyarrow.lib.Array(x), type = type)
+}
+
+# nolint start
+#' @export
+as_chunked_array.pyarrow.lib.ChunkedArray <- function(x, ..., type = NULL) {
+  as_chunked_array(py_to_r.pyarrow.lib.ChunkedArray(x), type = type)
+}
+# nolint end
+
+#' @export
+as_record_batch.pyarrow.lib.RecordBatch <- function(x, ..., schema = NULL) {
+  as_record_batch(py_to_r.pyarrow.lib.RecordBatch(x), schema = schema)
+}
+
+#' @export
+as_arrow_table.pyarrow.lib.RecordBatch <- function(x, ..., schema = NULL) {
+  as_arrow_table(py_to_r.pyarrow.lib.RecordBatch(x), schema = schema)
+}
+
+# Some of these function names are longer than 40 characters
+# (but have to be named such because of S3 method naming)
+# nolint start
+#' @export
+as_record_batch_reader.pyarrow.lib.RecordBatch <- function(x, ...) {
+  as_record_batch_reader(py_to_r.pyarrow.lib.RecordBatch(x))
+}
+# nolint end
+
+#' @export
+as_record_batch.pyarrow.lib.Table <- function(x, ..., schema = NULL) {
+  as_record_batch(py_to_r.pyarrow.lib.Table(x), schema = schema)
+}
+
+#' @export
+as_arrow_table.pyarrow.lib.Table <- function(x, ..., schema = NULL) {
+  as_arrow_table(py_to_r.pyarrow.lib.Table(x), schema = schema)
+}
+
+#' @export
+as_record_batch_reader.pyarrow.lib.Table <- function(x, ...) {
+  as_record_batch_reader(py_to_r.pyarrow.lib.Table(x))
+}
+
+#' @export
+as_schema.pyarrow.lib.Schema <- function(x, ...) {
+  py_to_r.pyarrow.lib.Schema(x)
+}
+
+#' @export
+as_data_type.pyarrow.lib.Field <- function(x, ...) {
+  as_data_type(py_to_r.pyarrow.lib.Field(x))
+}
+
+#' @export
+as_data_type.pyarrow.lib.DataType <- function(x, ...) {
+  as_data_type(py_to_r.pyarrow.lib.DataType(x))
+}
+
+# nolint start
+#' @export
+as_record_batch_reader.pyarrow.lib.RecordBatchReader <- function(x, ...) {
+  py_to_r.pyarrow.lib.RecordBatchReader(x)
+}
+# nolint end
+
 #' Install pyarrow for use with reticulate
 #'
 #' `pyarrow` is the Python package for Apache Arrow. This function helps with
diff --git a/r/R/record-batch-reader.R b/r/R/record-batch-reader.R
index fb61933e6b..bc28757c98 100644
--- a/r/R/record-batch-reader.R
+++ b/r/R/record-batch-reader.R
@@ -57,7 +57,7 @@
 #' @rdname RecordBatchReader
 #' @name RecordBatchReader
 #' @include arrow-package.R
-#' @examplesIf arrow_available()
+#' @examples
 #' tf <- tempfile()
 #' on.exit(unlink(tf))
 #'
@@ -176,3 +176,62 @@ RecordBatchFileReader$create <- function(file) {
   assert_is(file, "InputStream")
   ipc___RecordBatchFileReader__Open(file)
 }
+
+#' Convert an object to an Arrow RecordBatchReader
+#'
+#' @param x An object to convert to a [RecordBatchReader]
+#' @param ... Passed to S3 methods
+#'
+#' @return A [RecordBatchReader]
+#' @export
+#'
+#' @examplesIf arrow_with_dataset()
+#' reader <- as_record_batch_reader(data.frame(col1 = 1, col2 = "two"))
+#' reader$read_next_batch()
+#'
+as_record_batch_reader <- function(x, ...) {
+  UseMethod("as_record_batch_reader")
+}
+
+#' @rdname as_record_batch_reader
+#' @export
+as_record_batch_reader.RecordBatchReader <- function(x, ...) {
+  x
+}
+
+#' @rdname as_record_batch_reader
+#' @export
+as_record_batch_reader.Table <- function(x, ...) {
+  RecordBatchReader__from_Table(x)
+}
+
+#' @rdname as_record_batch_reader
+#' @export
+as_record_batch_reader.RecordBatch <- function(x, ...) {
+  RecordBatchReader__from_batches(list(x), NULL)
+}
+
+#' @rdname as_record_batch_reader
+#' @export
+as_record_batch_reader.data.frame <- function(x, ...) {
+  as_record_batch_reader(as_record_batch(x))
+}
+
+#' @rdname as_record_batch_reader
+#' @export
+as_record_batch_reader.Dataset <- function(x, ...) {
+  Scanner$create(x)$ToRecordBatchReader()
+}
+
+#' @rdname as_record_batch_reader
+#' @export
+as_record_batch_reader.arrow_dplyr_query <- function(x, ...) {
+  # TODO(ARROW-15271): make ExecPlan return RBR
+  as_record_batch_reader(collect.arrow_dplyr_query(x, as_data_frame = FALSE))
+}
+
+#' @rdname as_record_batch_reader
+#' @export
+as_record_batch_reader.Scanner <- function(x, ...) {
+  x$ToRecordBatchReader()
+}
diff --git a/r/R/record-batch-writer.R b/r/R/record-batch-writer.R
index 8675e785a4..58a66c2b9e 100644
--- a/r/R/record-batch-writer.R
+++ b/r/R/record-batch-writer.R
@@ -105,7 +105,7 @@ RecordBatchWriter <- R6Class("RecordBatchWriter",
       } else if (inherits(x, "Table")) {
         self$write_table(x)
       } else {
-        self$write_table(Table$create(x))
+        self$write_table(as_arrow_table(x))
       }
     },
     close = function() ipc___RecordBatchWriter__Close(self)
diff --git a/r/R/record-batch.R b/r/R/record-batch.R
index 03f97a5130..49f97da3ce 100644
--- a/r/R/record-batch.R
+++ b/r/R/record-batch.R
@@ -176,7 +176,7 @@ RecordBatch$import_from_c <- ImportRecordBatch
 #' @param schema a [Schema], or `NULL` (the default) to infer the schema from
 #' the data in `...`. When providing an Arrow IPC buffer, `schema` is required.
 #' @rdname RecordBatch
-#' @examplesIf arrow_available()
+#' @examples
 #' batch <- record_batch(name = rownames(mtcars), mtcars)
 #' dim(batch)
 #' dim(head(batch))
@@ -242,3 +242,60 @@ cbind.RecordBatch <- function(...) {
 
   RecordBatch$create(!!! columns)
 }
+
+#' Convert an object to an Arrow RecordBatch
+#'
+#' Whereas [record_batch()] constructs a [RecordBatch] from one or more columns,
+#' `as_record_batch()` converts a single object to an Arrow [RecordBatch].
+#'
+#' @param x An object to convert to an Arrow RecordBatch
+#' @param ... Passed to S3 methods
+#' @inheritParams record_batch
+#'
+#' @return A [RecordBatch]
+#' @export
+#'
+#' @examples
+#' # use as_record_batch() for a single object
+#' as_record_batch(data.frame(col1 = 1, col2 = "two"))
+#'
+#' # use record_batch() to create from columns
+#' record_batch(col1 = 1, col2 = "two")
+#'
+as_record_batch <- function(x, ..., schema = NULL) {
+  UseMethod("as_record_batch")
+}
+
+#' @rdname as_record_batch
+#' @export
+as_record_batch.RecordBatch <- function(x, ..., schema = NULL) {
+  if (is.null(schema)) {
+    x
+  } else {
+    x$cast(schema)
+  }
+}
+
+#' @rdname as_record_batch
+#' @export
+as_record_batch.Table <- function(x, ..., schema = NULL) {
+  if (x$num_columns == 0) {
+    batch <- record_batch(data.frame())
+    return(batch$Take(rep_len(0, x$num_rows)))
+  }
+
+  arrays_out <- lapply(x$columns, as_arrow_array)
+  names(arrays_out) <- names(x)
+  out <- RecordBatch$create(!!! arrays_out)
+  if (!is.null(schema)) {
+    out <- out$cast(schema)
+  }
+
+  out
+}
+
+#' @rdname as_record_batch
+#' @export
+as_record_batch.data.frame <- function(x, ..., schema = NULL) {
+  RecordBatch$create(x, schema = schema)
+}
diff --git a/r/R/schema.R b/r/R/schema.R
index 2dab25bf79..446f24f6ad 100644
--- a/r/R/schema.R
+++ b/r/R/schema.R
@@ -77,7 +77,7 @@
 #'
 #' @rdname Schema
 #' @name Schema
-#' @examplesIf arrow_available()
+#' @examples
 #' schema(a = int32(), b = float64())
 #'
 #' schema(
@@ -333,7 +333,7 @@ read_schema <- function(stream, ...) {
 #' @return A `Schema` with the union of fields contained in the inputs, or
 #'   `NULL` if any of `schemas` is `NULL`
 #' @export
-#' @examplesIf arrow_available()
+#' @examples
 #' a <- schema(b = double(), c = bool())
 #' z <- schema(b = double(), k = utf8())
 #' unify_schemas(a, z)
@@ -350,3 +350,30 @@ print.arrow_r_metadata <- function(x, ...) {
   utils::str(.unserialize_arrow_r_metadata(x))
   invisible(x)
 }
+
+#' Convert an object to an Arrow DataType
+#'
+#' @param x An object to convert to a [schema()]
+#' @param ... Passed to S3 methods.
+#'
+#' @return A [Schema] object.
+#' @export
+#'
+#' @examples
+#' as_schema(schema(col1 = int32()))
+#'
+as_schema <- function(x, ...) {
+  UseMethod("as_schema")
+}
+
+#' @rdname as_schema
+#' @export
+as_schema.Schema <- function(x, ...) {
+  x
+}
+
+#' @rdname as_schema
+#' @export
+as_schema.StructType <- function(x, ...) {
+  schema(!!! x$fields())
+}
diff --git a/r/R/table.R b/r/R/table.R
index 102d0ecd10..1b5a83d6fd 100644
--- a/r/R/table.R
+++ b/r/R/table.R
@@ -237,7 +237,7 @@ cbind.Table <- function(...) {
 #' @param schema a [Schema], or `NULL` (the default) to infer the schema from
 #' the data in `...`. When providing an Arrow IPC buffer, `schema` is required.
 #' @rdname Table
-#' @examplesIf arrow_available()
+#' @examples
 #' tbl <- arrow_table(name = rownames(mtcars), mtcars)
 #' dim(tbl)
 #' dim(head(tbl))
@@ -247,3 +247,67 @@ cbind.Table <- function(...) {
 #' as.data.frame(tbl[4:8, c("gear", "hp", "wt")])
 #' @export
 arrow_table <- Table$create
+
+
+#' Convert an object to an Arrow Table
+#'
+#' Whereas [arrow_table()] constructs a table from one or more columns,
+#' `as_arrow_table()` converts a single object to an Arrow [Table].
+#'
+#' @param x An object to convert to an Arrow Table
+#' @param ... Passed to S3 methods
+#' @inheritParams arrow_table
+#'
+#' @return A [Table]
+#' @export
+#'
+#' @examples
+#' # use as_arrow_table() for a single object
+#' as_arrow_table(data.frame(col1 = 1, col2 = "two"))
+#'
+#' # use arrow_table() to create from columns
+#' arrow_table(col1 = 1, col2 = "two")
+#'
+as_arrow_table <- function(x, ..., schema = NULL) {
+  UseMethod("as_arrow_table")
+}
+
+#' @rdname as_arrow_table
+#' @export
+as_arrow_table.default <- function(x, ...) {
+  # throw a classed error here so that we can customize the error message
+  # in as_writable_table()
+  abort(
+    sprintf(
+      "No method for `as_arrow_table()` for object of class %s",
+      paste(class(x), collapse = " / ")
+    ),
+    class = "arrow_no_method_as_arrow_table"
+  )
+}
+
+#' @rdname as_arrow_table
+#' @export
+as_arrow_table.Table <- function(x, ..., schema = NULL) {
+  if (is.null(schema)) {
+    x
+  } else {
+    x$cast(schema)
+  }
+}
+
+#' @rdname as_arrow_table
+#' @export
+as_arrow_table.RecordBatch <- function(x, ..., schema = NULL) {
+  if (is.null(schema)) {
+    Table$create(x)
+  } else {
+    Table$create(x$cast(schema))
+  }
+}
+
+#' @rdname as_arrow_table
+#' @export
+as_arrow_table.data.frame <- function(x, ..., schema = NULL) {
+  Table$create(x, schema = schema)
+}
diff --git a/r/R/type.R b/r/R/type.R
index 38082328dc..5527212843 100644
--- a/r/R/type.R
+++ b/r/R/type.R
@@ -56,29 +56,64 @@ DataType$import_from_c <- ImportType
 INTEGER_TYPES <- as.character(outer(c("uint", "int"), c(8, 16, 32, 64), paste0))
 FLOAT_TYPES <- c("float16", "float32", "float64", "halffloat", "float", "double")
 
-#' infer the arrow Array type from an R vector
+#' Infer the arrow Array type from an R object
 #'
-#' @param x an R vector
+#' @param x an R object (usually a vector) to be converted to an [Array] or
+#'   [ChunkedArray].
+#' @param ... Passed to S3 methods
 #'
-#' @return an arrow logical type
+#' @return An arrow [data type][data-type]
 #' @examplesIf arrow_available()
-#' type(1:10)
-#' type(1L:10L)
-#' type(c(1, 1.5, 2))
-#' type(c("A", "B", "C"))
-#' type(mtcars)
-#' type(Sys.Date())
-#' @export
-type <- function(x) UseMethod("type")
+#' infer_type(1:10)
+#' infer_type(1L:10L)
+#' infer_type(c(1, 1.5, 2))
+#' infer_type(c("A", "B", "C"))
+#' infer_type(mtcars)
+#' infer_type(Sys.Date())
+#' infer_type(as.POSIXlt(Sys.Date()))
+#' infer_type(vctrs::new_vctr(1:5, class = "my_custom_vctr_class"))
+#' @export
+infer_type <- function(x, ...) UseMethod("infer_type")
+
+#' @rdname infer_type
+#' @export
+type <- function(x) {
+  .Deprecated("infer_type")
+  infer_type(x)
+}
 
 #' @export
-type.default <- function(x) Array__infer_type(x)
+infer_type.default <- function(x, ..., from_array_infer_type = FALSE) {
+  # If from_array_infer_type is TRUE, this is a call from C++ after
+  # checking the internal C++ type inference and S3 dispatch has failed
+  # to find a method for the object. This call happens when
+  # creating Array, ChunkedArray, RecordBatch, and Table objects from
+  # data.frame. If the C++ call has reached this default method,
+  # we error. If from_array_infer_type is FALSE, we call Array__infer_type
+  # to use the internal C++ type inference.
+  if (from_array_infer_type) {
+    # Last ditch attempt: if vctrs::vec_is(x), we can use the vctrs
+    # extension type.
+    if (vctrs::vec_is(x)) {
+     vctrs_extension_type(x)
+    } else {
+      abort(
+        sprintf(
+          "Can't infer Arrow data type from object inheriting from %s",
+          paste(class(x), collapse = " / ")
+        )
+      )
+    }
+  } else {
+    Array__infer_type(x)
+  }
+}
 
 #' @export
-type.ArrowDatum <- function(x) x$type
+infer_type.ArrowDatum <- function(x, ...) x$type
 
 #' @export
-type.Expression <- function(x) x$type()
+infer_type.Expression <- function(x, ...) x$type()
 
 #----- metadata
 
@@ -714,6 +749,40 @@ canonical_type_str <- function(type_str) {
   )
 }
 
+#' Convert an object to an Arrow DataType
+#'
+#' @param x An object to convert to an Arrow [DataType][data-type]
+#' @param ... Passed to S3 methods.
+#'
+#' @return A [DataType][data-type] object.
+#' @export
+#'
+#' @examplesIf arrow_available()
+#' as_data_type(int32())
+#'
+as_data_type <- function(x, ...) {
+  UseMethod("as_data_type")
+}
+
+#' @rdname as_data_type
+#' @export
+as_data_type.DataType <- function(x, ...) {
+  x
+}
+
+#' @rdname as_data_type
+#' @export
+as_data_type.Field <- function(x, ...) {
+  x$type
+}
+
+#' @rdname as_data_type
+#' @export
+as_data_type.Schema <- function(x, ...) {
+  struct__(x$fields)
+}
+
+
 # vctrs support -----------------------------------------------------------
 duplicate_string <- function(x, times) {
   paste0(rep(x, times = times), collapse = "")
diff --git a/r/R/util.R b/r/R/util.R
index 430f0332d5..ff2bb070b8 100644
--- a/r/R/util.R
+++ b/r/R/util.R
@@ -138,18 +138,16 @@ handle_parquet_io_error <- function(e, format, call) {
   abort(msg, call = call)
 }
 
-is_writable_table <- function(x) {
-  inherits(x, c("data.frame", "ArrowTabular"))
-}
-
-# This attribute is used when is_writable is passed into assert_that, and allows
-# the call to form part of the error message when is_writable is FALSE
-attr(is_writable_table, "fail") <- function(call, env) {
-  paste0(
-    deparse(call$x),
-    " must be an object of class 'data.frame', 'RecordBatch', or 'Table', not '",
-    class(env[[deparse(call$x)]])[[1]],
-    "'."
+as_writable_table <- function(x) {
+  tryCatch(
+    as_arrow_table(x),
+    arrow_no_method_as_arrow_table = function(e) {
+      abort(
+        "Object must be coercible to an Arrow Table using `as_arrow_table()`",
+        parent = e,
+        call = rlang::caller_env(2)
+      )
+    }
   )
 }
 
diff --git a/r/_pkgdown.yml b/r/_pkgdown.yml
index 713af8578f..dc5ed15c47 100644
--- a/r/_pkgdown.yml
+++ b/r/_pkgdown.yml
@@ -133,6 +133,7 @@ reference:
       - RecordBatchWriter
       - CsvReadOptions
       - CsvWriteOptions
+      - as_record_batch_reader
   - title: Arrow data containers
     contents:
       - array
@@ -147,11 +148,15 @@ reference:
       - concat_tables
       - ExtensionArray
       - vctrs_extension_array
+      - as_arrow_array
+      - as_chunked_array
+      - as_record_batch
+      - as_arrow_table
   - title: Arrow data types and schema
     contents:
       - Schema
       - unify_schemas
-      - type
+      - infer_type
       - dictionary
       - Field
       - read_schema
@@ -162,6 +167,8 @@ reference:
       - new_extension_type
       - vctrs_extension_type
       - ExtensionType
+      - as_data_type
+      - as_schema
   - title: Flight
     contents:
       - load_flight_server
diff --git a/r/extra-tests/test-read-files.R b/r/extra-tests/test-read-files.R
index a2453e2516..4201e00d7c 100644
--- a/r/extra-tests/test-read-files.R
+++ b/r/extra-tests/test-read-files.R
@@ -51,7 +51,6 @@ test_that("Can see the metadata (parquet)", {
   )
 
   # column-level attributes
-  expect_equal(attributes(df$a), list(class = "special_string"))
   expect_equal(
     attributes(df$c),
     list(
@@ -94,7 +93,6 @@ for (comp in c("lz4", "uncompressed", "zstd")) {
     )
 
     # column-level attributes
-    expect_equal(attributes(df$a), list(class = "special_string"))
     expect_equal(
       attributes(df$c),
       list(
@@ -115,7 +113,7 @@ test_that("Can read feather version 1", {
   expect_equal(
     attributes(df),
     list(
-      names = c("a", "b", "d"),
+      names = c("b", "d"),
       class = c("tbl_df", "tbl", "data.frame"),
       row.names = 1L
     )
@@ -153,7 +151,6 @@ test_that("Can see the metadata (stream)", {
   )
 
   # column-level attributes
-  expect_equal(attributes(df$a), list(class = "special_string"))
   expect_equal(
     attributes(df$c),
     list(
diff --git a/r/extra-tests/write-files.R b/r/extra-tests/write-files.R
index 4495507f3b..f6d780ec3d 100644
--- a/r/extra-tests/write-files.R
+++ b/r/extra-tests/write-files.R
@@ -33,6 +33,7 @@ for (comp in c("lz4", "uncompressed", "zstd")) {
 }
 
 example_with_metadata_v1 <- example_with_metadata
+example_with_metadata_v1$a <- NULL
 example_with_metadata_v1$c <- NULL
 write_feather(example_with_metadata_v1, "extra-tests/files/ex_data_v1.feather", version = 1)
 
diff --git a/r/man/ChunkedArray.Rd b/r/man/ChunkedArray.Rd
index ab5e0f73c2..5cb3c4fe74 100644
--- a/r/man/ChunkedArray.Rd
+++ b/r/man/ChunkedArray.Rd
@@ -55,7 +55,6 @@ within the array's internal data. This can be an expensive check, potentially \c
 }
 
 \examples{
-\dontshow{if (arrow_available()) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf}
 # Pass items into chunked_array as separate objects to create chunks
 class_scores <- chunked_array(c(87, 88, 89), c(94, 93, 92), c(71, 72, 73))
 class_scores$num_chunks
@@ -79,7 +78,6 @@ doubles$type
 a <- chunked_array(c(1, 2), 3)
 b <- chunked_array(c(4, 5), 6)
 c(a, b)
-\dontshow{\}) # examplesIf}
 }
 \seealso{
 \link{Array}
diff --git a/r/man/RecordBatch.Rd b/r/man/RecordBatch.Rd
index 8dc0eac351..f936a6125b 100644
--- a/r/man/RecordBatch.Rd
+++ b/r/man/RecordBatch.Rd
@@ -81,7 +81,6 @@ All list elements are coerced to string. See \code{schema()} for more informatio
 }
 
 \examples{
-\dontshow{if (arrow_available()) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf}
 batch <- record_batch(name = rownames(mtcars), mtcars)
 dim(batch)
 dim(head(batch))
@@ -89,5 +88,4 @@ names(batch)
 batch$mpg
 batch[["cyl"]]
 as.data.frame(batch[4:8, c("gear", "hp", "wt")])
-\dontshow{\}) # examplesIf}
 }
diff --git a/r/man/RecordBatchReader.Rd b/r/man/RecordBatchReader.Rd
index 90c796a669..08e229e1c5 100644
--- a/r/man/RecordBatchReader.Rd
+++ b/r/man/RecordBatchReader.Rd
@@ -44,7 +44,6 @@ are in the file.
 }
 
 \examples{
-\dontshow{if (arrow_available()) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf}
 tf <- tempfile()
 on.exit(unlink(tf))
 
@@ -78,7 +77,6 @@ all.equal(df, chickwts, check.attributes = FALSE)
 # Unlike the Writers, we don't have to close RecordBatchReaders,
 # but we do still need to close the file connection
 read_file_obj$close()
-\dontshow{\}) # examplesIf}
 }
 \seealso{
 \code{\link[=read_ipc_stream]{read_ipc_stream()}} and \code{\link[=read_feather]{read_feather()}} provide a much simpler interface
diff --git a/r/man/Schema.Rd b/r/man/Schema.Rd
index 7a2d255190..f81f6c397d 100644
--- a/r/man/Schema.Rd
+++ b/r/man/Schema.Rd
@@ -75,7 +75,6 @@ the metadata is dropped.
 }
 
 \examples{
-\dontshow{if (arrow_available()) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf}
 schema(a = int32(), b = float64())
 
 schema(
@@ -89,5 +88,4 @@ tab1 <- arrow_table(df)
 tab1$schema
 tab2 <- arrow_table(df, schema = schema(col1 = int8(), col2 = float32()))
 tab2$schema
-\dontshow{\}) # examplesIf}
 }
diff --git a/r/man/Table.Rd b/r/man/Table.Rd
index c045895c8c..0423728ef6 100644
--- a/r/man/Table.Rd
+++ b/r/man/Table.Rd
@@ -81,7 +81,6 @@ All list elements are coerced to string. See \code{schema()} for more informatio
 }
 
 \examples{
-\dontshow{if (arrow_available()) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf}
 tbl <- arrow_table(name = rownames(mtcars), mtcars)
 dim(tbl)
 dim(head(tbl))
@@ -89,5 +88,4 @@ names(tbl)
 tbl$mpg
 tbl[["cyl"]]
 as.data.frame(tbl[4:8, c("gear", "hp", "wt")])
-\dontshow{\}) # examplesIf}
 }
diff --git a/r/man/array.Rd b/r/man/array.Rd
index 0b1371a568..371c53ac87 100644
--- a/r/man/array.Rd
+++ b/r/man/array.Rd
@@ -85,7 +85,6 @@ within the array's internal data. This can be an expensive check, potentially \c
 }
 
 \examples{
-\dontshow{if (arrow_available()) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf}
 my_array <- Array$create(1:10)
 my_array$type
 my_array$cast(int8())
@@ -105,5 +104,4 @@ new_array$offset
 na_array2 <- na_array
 na_array2 == na_array # element-wise comparison
 na_array2$Equals(na_array) # overall comparison
-\dontshow{\}) # examplesIf}
 }
diff --git a/r/man/as_arrow_array.Rd b/r/man/as_arrow_array.Rd
new file mode 100644
index 0000000000..aba552a8c3
--- /dev/null
+++ b/r/man/as_arrow_array.Rd
@@ -0,0 +1,39 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/array.R
+\name{as_arrow_array}
+\alias{as_arrow_array}
+\alias{as_arrow_array.Array}
+\alias{as_arrow_array.Scalar}
+\alias{as_arrow_array.ChunkedArray}
+\title{Convert an object to an Arrow Array}
+\usage{
+as_arrow_array(x, ..., type = NULL)
+
+\method{as_arrow_array}{Array}(x, ..., type = NULL)
+
+\method{as_arrow_array}{Scalar}(x, ..., type = NULL)
+
+\method{as_arrow_array}{ChunkedArray}(x, ..., type = NULL)
+}
+\arguments{
+\item{x}{An object to convert to an Arrow Array}
+
+\item{...}{Passed to S3 methods}
+
+\item{type}{A \link[=data-type]{type} for the final Array. A value of \code{NULL}
+will default to the type guessed by \code{\link[=infer_type]{infer_type()}}.}
+}
+\value{
+An \link{Array} with type \code{type}.
+}
+\description{
+The \code{as_arrow_array()} function is identical to \code{Array$create()} except
+that it is an S3 generic, which allows methods to be defined in other
+packages to convert objects to \link{Array}. \code{Array$create()} is slightly faster
+because it tries to convert in C++ before falling back on
+\code{as_arrow_array()}.
+}
+\examples{
+as_arrow_array(1:5)
+
+}
diff --git a/r/man/as_arrow_table.Rd b/r/man/as_arrow_table.Rd
new file mode 100644
index 0000000000..0ba563f581
--- /dev/null
+++ b/r/man/as_arrow_table.Rd
@@ -0,0 +1,43 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/table.R
+\name{as_arrow_table}
+\alias{as_arrow_table}
+\alias{as_arrow_table.default}
+\alias{as_arrow_table.Table}
+\alias{as_arrow_table.RecordBatch}
+\alias{as_arrow_table.data.frame}
+\title{Convert an object to an Arrow Table}
+\usage{
+as_arrow_table(x, ..., schema = NULL)
+
+\method{as_arrow_table}{default}(x, ...)
+
+\method{as_arrow_table}{Table}(x, ..., schema = NULL)
+
+\method{as_arrow_table}{RecordBatch}(x, ..., schema = NULL)
+
+\method{as_arrow_table}{data.frame}(x, ..., schema = NULL)
+}
+\arguments{
+\item{x}{An object to convert to an Arrow Table}
+
+\item{...}{Passed to S3 methods}
+
+\item{schema}{a \link{Schema}, or \code{NULL} (the default) to infer the schema from
+the data in \code{...}. When providing an Arrow IPC buffer, \code{schema} is required.}
+}
+\value{
+A \link{Table}
+}
+\description{
+Whereas \code{\link[=arrow_table]{arrow_table()}} constructs a table from one or more columns,
+\code{as_arrow_table()} converts a single object to an Arrow \link{Table}.
+}
+\examples{
+# use as_arrow_table() for a single object
+as_arrow_table(data.frame(col1 = 1, col2 = "two"))
+
+# use arrow_table() to create from columns
+arrow_table(col1 = 1, col2 = "two")
+
+}
diff --git a/r/man/as_chunked_array.Rd b/r/man/as_chunked_array.Rd
new file mode 100644
index 0000000000..7ad541e267
--- /dev/null
+++ b/r/man/as_chunked_array.Rd
@@ -0,0 +1,34 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/chunked-array.R
+\name{as_chunked_array}
+\alias{as_chunked_array}
+\alias{as_chunked_array.ChunkedArray}
+\alias{as_chunked_array.Array}
+\title{Convert an object to an Arrow ChunkedArray}
+\usage{
+as_chunked_array(x, ..., type = NULL)
+
+\method{as_chunked_array}{ChunkedArray}(x, ..., type = NULL)
+
+\method{as_chunked_array}{Array}(x, ..., type = NULL)
+}
+\arguments{
+\item{x}{An object to convert to an Arrow Chunked Array}
+
+\item{...}{Passed to S3 methods}
+
+\item{type}{A \link[=data-type]{type} for the final Array. A value of \code{NULL}
+will default to the type guessed by \code{\link[=infer_type]{infer_type()}}.}
+}
+\value{
+A \link{ChunkedArray}.
+}
+\description{
+Whereas \code{\link[=chunked_array]{chunked_array()}} constructs a \link{ChunkedArray} from zero or more
+\link{Array}s or R vectors, \code{as_chunked_array()} converts a single object to a
+\link{ChunkedArray}.
+}
+\examples{
+as_chunked_array(1:5)
+
+}
diff --git a/r/man/as_data_type.Rd b/r/man/as_data_type.Rd
new file mode 100644
index 0000000000..5fa584400d
--- /dev/null
+++ b/r/man/as_data_type.Rd
@@ -0,0 +1,33 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/type.R
+\name{as_data_type}
+\alias{as_data_type}
+\alias{as_data_type.DataType}
+\alias{as_data_type.Field}
+\alias{as_data_type.Schema}
+\title{Convert an object to an Arrow DataType}
+\usage{
+as_data_type(x, ...)
+
+\method{as_data_type}{DataType}(x, ...)
+
+\method{as_data_type}{Field}(x, ...)
+
+\method{as_data_type}{Schema}(x, ...)
+}
+\arguments{
+\item{x}{An object to convert to an Arrow \link[=data-type]{DataType}}
+
+\item{...}{Passed to S3 methods.}
+}
+\value{
+A \link[=data-type]{DataType} object.
+}
+\description{
+Convert an object to an Arrow DataType
+}
+\examples{
+\dontshow{if (arrow_available()) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf}
+as_data_type(int32())
+\dontshow{\}) # examplesIf}
+}
diff --git a/r/man/as_record_batch.Rd b/r/man/as_record_batch.Rd
new file mode 100644
index 0000000000..c8830c1071
--- /dev/null
+++ b/r/man/as_record_batch.Rd
@@ -0,0 +1,40 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/record-batch.R
+\name{as_record_batch}
+\alias{as_record_batch}
+\alias{as_record_batch.RecordBatch}
+\alias{as_record_batch.Table}
+\alias{as_record_batch.data.frame}
+\title{Convert an object to an Arrow RecordBatch}
+\usage{
+as_record_batch(x, ..., schema = NULL)
+
+\method{as_record_batch}{RecordBatch}(x, ..., schema = NULL)
+
+\method{as_record_batch}{Table}(x, ..., schema = NULL)
+
+\method{as_record_batch}{data.frame}(x, ..., schema = NULL)
+}
+\arguments{
+\item{x}{An object to convert to an Arrow RecordBatch}
+
+\item{...}{Passed to S3 methods}
+
+\item{schema}{a \link{Schema}, or \code{NULL} (the default) to infer the schema from
+the data in \code{...}. When providing an Arrow IPC buffer, \code{schema} is required.}
+}
+\value{
+A \link{RecordBatch}
+}
+\description{
+Whereas \code{\link[=record_batch]{record_batch()}} constructs a \link{RecordBatch} from one or more columns,
+\code{as_record_batch()} converts a single object to an Arrow \link{RecordBatch}.
+}
+\examples{
+# use as_record_batch() for a single object
+as_record_batch(data.frame(col1 = 1, col2 = "two"))
+
+# use record_batch() to create from columns
+record_batch(col1 = 1, col2 = "two")
+
+}
diff --git a/r/man/as_record_batch_reader.Rd b/r/man/as_record_batch_reader.Rd
new file mode 100644
index 0000000000..e635c0b98b
--- /dev/null
+++ b/r/man/as_record_batch_reader.Rd
@@ -0,0 +1,46 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/record-batch-reader.R
+\name{as_record_batch_reader}
+\alias{as_record_batch_reader}
+\alias{as_record_batch_reader.RecordBatchReader}
+\alias{as_record_batch_reader.Table}
+\alias{as_record_batch_reader.RecordBatch}
+\alias{as_record_batch_reader.data.frame}
+\alias{as_record_batch_reader.Dataset}
+\alias{as_record_batch_reader.arrow_dplyr_query}
+\alias{as_record_batch_reader.Scanner}
+\title{Convert an object to an Arrow RecordBatchReader}
+\usage{
+as_record_batch_reader(x, ...)
+
+\method{as_record_batch_reader}{RecordBatchReader}(x, ...)
+
+\method{as_record_batch_reader}{Table}(x, ...)
+
+\method{as_record_batch_reader}{RecordBatch}(x, ...)
+
+\method{as_record_batch_reader}{data.frame}(x, ...)
+
+\method{as_record_batch_reader}{Dataset}(x, ...)
+
+\method{as_record_batch_reader}{arrow_dplyr_query}(x, ...)
+
+\method{as_record_batch_reader}{Scanner}(x, ...)
+}
+\arguments{
+\item{x}{An object to convert to a \link{RecordBatchReader}}
+
+\item{...}{Passed to S3 methods}
+}
+\value{
+A \link{RecordBatchReader}
+}
+\description{
+Convert an object to an Arrow RecordBatchReader
+}
+\examples{
+\dontshow{if (arrow_with_dataset()) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf}
+reader <- as_record_batch_reader(data.frame(col1 = 1, col2 = "two"))
+reader$read_next_batch()
+\dontshow{\}) # examplesIf}
+}
diff --git a/r/man/as_schema.Rd b/r/man/as_schema.Rd
new file mode 100644
index 0000000000..9a760baa2f
--- /dev/null
+++ b/r/man/as_schema.Rd
@@ -0,0 +1,29 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/schema.R
+\name{as_schema}
+\alias{as_schema}
+\alias{as_schema.Schema}
+\alias{as_schema.StructType}
+\title{Convert an object to an Arrow DataType}
+\usage{
+as_schema(x, ...)
+
+\method{as_schema}{Schema}(x, ...)
+
+\method{as_schema}{StructType}(x, ...)
+}
+\arguments{
+\item{x}{An object to convert to a \code{\link[=schema]{schema()}}}
+
+\item{...}{Passed to S3 methods.}
+}
+\value{
+A \link{Schema} object.
+}
+\description{
+Convert an object to an Arrow DataType
+}
+\examples{
+as_schema(schema(col1 = int32()))
+
+}
diff --git a/r/man/concat_arrays.Rd b/r/man/concat_arrays.Rd
index 0cbe7ba578..23a759cdcb 100644
--- a/r/man/concat_arrays.Rd
+++ b/r/man/concat_arrays.Rd
@@ -25,7 +25,5 @@ the behavior of a single Array but don't need a
 single object, use \link{ChunkedArray}.
 }
 \examples{
-\dontshow{if (arrow_available()) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf}
 concat_arrays(Array$create(1:3), Array$create(4:5))
-\dontshow{\}) # examplesIf}
 }
diff --git a/r/man/infer_type.Rd b/r/man/infer_type.Rd
new file mode 100644
index 0000000000..f031f196a0
--- /dev/null
+++ b/r/man/infer_type.Rd
@@ -0,0 +1,35 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/type.R
+\name{infer_type}
+\alias{infer_type}
+\alias{type}
+\title{Infer the arrow Array type from an R object}
+\usage{
+infer_type(x, ...)
+
+type(x)
+}
+\arguments{
+\item{x}{an R object (usually a vector) to be converted to an \link{Array} or
+\link{ChunkedArray}.}
+
+\item{...}{Passed to S3 methods}
+}
+\value{
+An arrow \link[=data-type]{data type}
+}
+\description{
+Infer the arrow Array type from an R object
+}
+\examples{
+\dontshow{if (arrow_available()) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf}
+infer_type(1:10)
+infer_type(1L:10L)
+infer_type(c(1, 1.5, 2))
+infer_type(c("A", "B", "C"))
+infer_type(mtcars)
+infer_type(Sys.Date())
+infer_type(as.POSIXlt(Sys.Date()))
+infer_type(vctrs::new_vctr(1:5, class = "my_custom_vctr_class"))
+\dontshow{\}) # examplesIf}
+}
diff --git a/r/man/new_extension_type.Rd b/r/man/new_extension_type.Rd
index 96d5c10c93..99b70a3471 100644
--- a/r/man/new_extension_type.Rd
+++ b/r/man/new_extension_type.Rd
@@ -86,7 +86,6 @@ extension type that uses most of these features, see
 \code{\link[=vctrs_extension_type]{vctrs_extension_type()}}.
 }
 \examples{
-\dontshow{if (arrow_available()) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf}
 # Create the R6 type whose methods control how Array objects are
 # converted to R objects, how equality between types is computed,
 # and how types are printed.
@@ -163,5 +162,4 @@ array$type$center()
 array$type$scale()
 
 as.vector(array)
-\dontshow{\}) # examplesIf}
 }
diff --git a/r/man/read_ipc_stream.Rd b/r/man/read_ipc_stream.Rd
index ba33321178..4cd1e4757e 100644
--- a/r/man/read_ipc_stream.Rd
+++ b/r/man/read_ipc_stream.Rd
@@ -1,5 +1,5 @@
 % Generated by roxygen2: do not edit by hand
-% Please edit documentation in R/deprecated.R, R/ipc_stream.R
+% Please edit documentation in R/deprecated.R, R/ipc-stream.R
 \name{read_arrow}
 \alias{read_arrow}
 \alias{read_ipc_stream}
diff --git a/r/man/type.Rd b/r/man/type.Rd
deleted file mode 100644
index d55bbe24bd..0000000000
--- a/r/man/type.Rd
+++ /dev/null
@@ -1,27 +0,0 @@
-% Generated by roxygen2: do not edit by hand
-% Please edit documentation in R/type.R
-\name{type}
-\alias{type}
-\title{infer the arrow Array type from an R vector}
-\usage{
-type(x)
-}
-\arguments{
-\item{x}{an R vector}
-}
-\value{
-an arrow logical type
-}
-\description{
-infer the arrow Array type from an R vector
-}
-\examples{
-\dontshow{if (arrow_available()) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf}
-type(1:10)
-type(1L:10L)
-type(c(1, 1.5, 2))
-type(c("A", "B", "C"))
-type(mtcars)
-type(Sys.Date())
-\dontshow{\}) # examplesIf}
-}
diff --git a/r/man/unify_schemas.Rd b/r/man/unify_schemas.Rd
index 50c80c2dda..2e538ba136 100644
--- a/r/man/unify_schemas.Rd
+++ b/r/man/unify_schemas.Rd
@@ -19,9 +19,7 @@ A \code{Schema} with the union of fields contained in the inputs, or
 Combine and harmonize schemas
 }
 \examples{
-\dontshow{if (arrow_available()) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf}
 a <- schema(b = double(), c = bool())
 z <- schema(b = double(), k = utf8())
 unify_schemas(a, z)
-\dontshow{\}) # examplesIf}
 }
diff --git a/r/man/vctrs_extension_array.Rd b/r/man/vctrs_extension_array.Rd
index b80ce48dc2..a3b9d902a1 100644
--- a/r/man/vctrs_extension_array.Rd
+++ b/r/man/vctrs_extension_array.Rd
@@ -7,7 +7,7 @@
 \usage{
 vctrs_extension_array(x, ptype = vctrs::vec_ptype(x), storage_type = NULL)
 
-vctrs_extension_type(ptype, storage_type = type(vctrs::vec_data(ptype)))
+vctrs_extension_type(x, storage_type = infer_type(vctrs::vec_data(x)))
 }
 \arguments{
 \item{x}{A vctr (i.e., \code{\link[vctrs:vec_assert]{vctrs::vec_is()}} returns \code{TRUE}).}
@@ -37,7 +37,6 @@ create a \code{\link[=vctrs_extension_array]{vctrs_extension_array()}}, which pa
 converted back into an R vector.
 }
 \examples{
-\dontshow{if (arrow_available()) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf}
 (array <- vctrs_extension_array(as.POSIXlt("2022-01-02 03:45", tz = "UTC")))
 array$type
 as.vector(array)
@@ -46,5 +45,4 @@ temp_feather <- tempfile()
 write_feather(arrow_table(col = array), temp_feather)
 read_feather(temp_feather)
 unlink(temp_feather)
-\dontshow{\}) # examplesIf}
 }
diff --git a/r/man/write_ipc_stream.Rd b/r/man/write_ipc_stream.Rd
index 2f215f25fd..4eece99f77 100644
--- a/r/man/write_ipc_stream.Rd
+++ b/r/man/write_ipc_stream.Rd
@@ -1,5 +1,5 @@
 % Generated by roxygen2: do not edit by hand
-% Please edit documentation in R/deprecated.R, R/ipc_stream.R
+% Please edit documentation in R/deprecated.R, R/ipc-stream.R
 \name{write_arrow}
 \alias{write_arrow}
 \alias{write_ipc_stream}
diff --git a/r/man/write_to_raw.Rd b/r/man/write_to_raw.Rd
index a3c6e324b5..7623861eed 100644
--- a/r/man/write_to_raw.Rd
+++ b/r/man/write_to_raw.Rd
@@ -1,5 +1,5 @@
 % Generated by roxygen2: do not edit by hand
-% Please edit documentation in R/ipc_stream.R
+% Please edit documentation in R/ipc-stream.R
 \name{write_to_raw}
 \alias{write_to_raw}
 \title{Write Arrow data to a raw vector}
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index 5440dd3e62..1724d5e30d 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -4459,6 +4459,23 @@ BEGIN_CPP11
 END_CPP11
 }
 // recordbatchreader.cpp
+std::shared_ptr<arrow::RecordBatchReader> RecordBatchReader__from_batches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, cpp11::sexp schema_sxp);
+extern "C" SEXP _arrow_RecordBatchReader__from_batches(SEXP batches_sexp, SEXP schema_sxp_sexp){
+BEGIN_CPP11
+	arrow::r::Input<const std::vector<std::shared_ptr<arrow::RecordBatch>>&>::type batches(batches_sexp);
+	arrow::r::Input<cpp11::sexp>::type schema_sxp(schema_sxp_sexp);
+	return cpp11::as_sexp(RecordBatchReader__from_batches(batches, schema_sxp));
+END_CPP11
+}
+// recordbatchreader.cpp
+std::shared_ptr<arrow::RecordBatchReader> RecordBatchReader__from_Table(const std::shared_ptr<arrow::Table>& table);
+extern "C" SEXP _arrow_RecordBatchReader__from_Table(SEXP table_sexp){
+BEGIN_CPP11
+	arrow::r::Input<const std::shared_ptr<arrow::Table>&>::type table(table_sexp);
+	return cpp11::as_sexp(RecordBatchReader__from_Table(table));
+END_CPP11
+}
+// recordbatchreader.cpp
 std::shared_ptr<arrow::Table> Table__from_RecordBatchReader(const std::shared_ptr<arrow::RecordBatchReader>& reader);
 extern "C" SEXP _arrow_Table__from_RecordBatchReader(SEXP reader_sexp){
 BEGIN_CPP11
@@ -5560,6 +5577,8 @@ static const R_CallMethodDef CallEntries[] = {
 		{ "_arrow_RecordBatchReader__schema", (DL_FUNC) &_arrow_RecordBatchReader__schema, 1}, 
 		{ "_arrow_RecordBatchReader__ReadNext", (DL_FUNC) &_arrow_RecordBatchReader__ReadNext, 1}, 
 		{ "_arrow_RecordBatchReader__batches", (DL_FUNC) &_arrow_RecordBatchReader__batches, 1}, 
+		{ "_arrow_RecordBatchReader__from_batches", (DL_FUNC) &_arrow_RecordBatchReader__from_batches, 2}, 
+		{ "_arrow_RecordBatchReader__from_Table", (DL_FUNC) &_arrow_RecordBatchReader__from_Table, 1}, 
 		{ "_arrow_Table__from_RecordBatchReader", (DL_FUNC) &_arrow_Table__from_RecordBatchReader, 1}, 
 		{ "_arrow_RecordBatchReader__Head", (DL_FUNC) &_arrow_RecordBatchReader__Head, 2}, 
 		{ "_arrow_ipc___RecordBatchStreamReader__Open", (DL_FUNC) &_arrow_ipc___RecordBatchStreamReader__Open, 1}, 
diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h
index 0d58fe3825..fd533092d0 100644
--- a/r/src/arrow_types.h
+++ b/r/src/arrow_types.h
@@ -79,11 +79,38 @@ arrow::compute::ExecContext* gc_context();
 
 namespace arrow {
 
+// Most of the time we can safely call R code and assume that any evaluation
+// error will throw a cpp11::unwind_exception. There are other times (e.g.,
+// when using RTasks) that we need to wait for a background task to finish or
+// run cleanup code if execution fails. This class allows us to attach
+// the `token` required to reconstruct the cpp11::unwind_exception and throw it
+// when it is safe to do so. This is done automatically by StopIfNotOk(), which
+// checks for a .detail() inheriting from UnwindProtectDetail.
+class UnwindProtectDetail : public StatusDetail {
+ public:
+  SEXP token;
+  explicit UnwindProtectDetail(SEXP token) : token(token) {}
+  virtual const char* type_id() const { return "UnwindProtectDetail"; }
+  virtual std::string ToString() const { return "R code execution error"; }
+};
+
+static inline Status StatusUnwindProtect(SEXP token) {
+  return Status::Invalid("R code execution error")
+      .WithDetail(std::make_shared<UnwindProtectDetail>(token));
+}
+
 static inline void StopIfNotOk(const Status& status) {
   if (!status.ok()) {
-    // ARROW-13039: be careful not to interpret our error message as a %-format string
-    std::string s = status.ToString();
-    cpp11::stop("%s", s.c_str());
+    auto detail = status.detail();
+    const UnwindProtectDetail* unwind_detail =
+        dynamic_cast<const UnwindProtectDetail*>(detail.get());
+    if (unwind_detail) {
+      throw cpp11::unwind_exception(unwind_detail->token);
+    } else {
+      // ARROW-13039: be careful not to interpret our error message as a %-format string
+      std::string s = status.ToString();
+      cpp11::stop("%s", s.c_str());
+    }
   }
 }
 
@@ -100,6 +127,35 @@ std::shared_ptr<arrow::DataType> InferArrowType(SEXP x);
 std::shared_ptr<arrow::Array> vec_to_arrow__reuse_memory(SEXP x);
 bool can_reuse_memory(SEXP x, const std::shared_ptr<arrow::DataType>& type);
 
+// These are the types of objects whose conversion to Arrow Arrays is handled
+// entirely in C++. Other types of objects are converted using the
+// infer_type() S3 generic and the as_arrow_array() S3 generic.
+// For data.frame, we need to recurse because the internal conversion
+// can't accomodate calling into R. If the user specifies a target type
+// and that target type is an ExtensionType, we also can't convert
+// natively (but we check for this separately when it applies).
+static inline bool can_convert_native(SEXP x) {
+  if (!Rf_isObject(x)) {
+    return true;
+  } else if (Rf_inherits(x, "data.frame")) {
+    for (R_xlen_t i = 0; i < Rf_xlength(x); i++) {
+      if (!can_convert_native(VECTOR_ELT(x, i))) {
+        return false;
+      }
+    }
+
+    return true;
+  } else {
+    return Rf_inherits(x, "factor") || Rf_inherits(x, "Date") ||
+           Rf_inherits(x, "integer64") || Rf_inherits(x, "POSIXct") ||
+           Rf_inherits(x, "hms") || Rf_inherits(x, "difftime") ||
+           Rf_inherits(x, "data.frame") || Rf_inherits(x, "arrow_binary") ||
+           Rf_inherits(x, "arrow_large_binary") ||
+           Rf_inherits(x, "arrow_fixed_size_binary") ||
+           Rf_inherits(x, "vctrs_unspecified") || Rf_inherits(x, "AsIs");
+  }
+}
+
 Status count_fields(SEXP lst, int* out);
 
 void inspect(SEXP obj);
diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp
index 8139755ce4..b9cc81c4b8 100644
--- a/r/src/r_to_arrow.cpp
+++ b/r/src/r_to_arrow.cpp
@@ -267,6 +267,50 @@ class RConverter : public Converter<SEXP, RConversionOptions> {
   }
 };
 
+// A Converter that calls as_arrow_array(x, type = options.type)
+class AsArrowArrayConverter : public RConverter {
+ public:
+  // This is not run in parallel by default, so it's safe to call into R here;
+  // however, it is not safe to throw an exception here because the caller
+  // might be waiting for other conversions to finish in the background. To
+  // avoid this, use StatusUnwindProtect() to communicate the error back to
+  // ValueOrStop() (which reconstructs the exception and re-throws it).
+  Status Extend(SEXP values, int64_t size, int64_t offset = 0) {
+    try {
+      cpp11::sexp as_array_result = cpp11::package("arrow")["as_arrow_array"](
+          values, cpp11::named_arg("type") = cpp11::as_sexp(options().type),
+          cpp11::named_arg("from_vec_to_array") = cpp11::as_sexp<bool>(true));
+
+      // Check that the R method returned an Array
+      if (!Rf_inherits(as_array_result, "Array")) {
+        return Status::Invalid("as_arrow_array() did not return object of type Array");
+      }
+
+      auto array = cpp11::as_cpp<std::shared_ptr<arrow::Array>>(as_array_result);
+
+      // We need the type to be equal because the schema has already been finalized
+      if (!array->type()->Equals(options().type)) {
+        return Status::Invalid(
+            "as_arrow_array() returned an Array with an incorrect type");
+      }
+
+      arrays_.push_back(std::move(array));
+      return Status::OK();
+    } catch (cpp11::unwind_exception& e) {
+      return StatusUnwindProtect(e.token);
+    }
+  }
+
+  // This is sometimes run in parallel so we can't call into R
+  Result<std::shared_ptr<ChunkedArray>> ToChunkedArray() {
+    return std::make_shared<ChunkedArray>(std::move(arrays_));
+  }
+
+ private:
+  cpp11::writable::list objects_;
+  std::vector<std::shared_ptr<Array>> arrays_;
+};
+
 template <typename T, typename Enable = void>
 class RPrimitiveConverter;
 
@@ -1217,28 +1261,35 @@ std::shared_ptr<arrow::ChunkedArray> vec_to_arrow_ChunkedArray(
         cpp11::as_cpp<std::shared_ptr<arrow::Array>>(x));
   }
 
-  // short circuit if `x` is an altrep vector that shells a chunked Array
-  auto maybe = altrep::vec_to_arrow_altrep_bypass(x);
-  if (maybe.get()) {
-    return maybe;
-  }
-
   RConversionOptions options;
   options.strict = !type_inferred;
   options.type = type;
   options.size = vctrs::vec_size(x);
 
-  // maybe short circuit when zero-copy is possible
-  if (can_reuse_memory(x, options.type)) {
-    return std::make_shared<arrow::ChunkedArray>(vec_to_arrow__reuse_memory(x));
-  }
+  // If we can handle this in C++ we do so; otherwise we use the
+  // AsArrowArrayConverter, which calls as_arrow_array().
+  std::unique_ptr<RConverter> converter;
+  if (can_convert_native(x) && type->id() != Type::EXTENSION) {
+    // short circuit if `x` is an altrep vector that shells a chunked Array
+    auto maybe = altrep::vec_to_arrow_altrep_bypass(x);
+    if (maybe.get()) {
+      return maybe;
+    }
+
+    // maybe short circuit when zero-copy is possible
+    if (can_reuse_memory(x, type)) {
+      return std::make_shared<arrow::ChunkedArray>(vec_to_arrow__reuse_memory(x));
+    }
 
-  // otherwise go through the converter api
-  auto converter = ValueOrStop(MakeConverter<RConverter, RConverterTrait>(
-      options.type, options, gc_memory_pool()));
+    // Otherwise go through the converter API.
+    converter = ValueOrStop(MakeConverter<RConverter, RConverterTrait>(
+        options.type, options, gc_memory_pool()));
+  } else {
+    converter = std::unique_ptr<RConverter>(new AsArrowArrayConverter());
+    StopIfNotOk(converter->Construct(type, options, gc_memory_pool()));
+  }
 
   StopIfNotOk(converter->Extend(x, options.size));
-
   return ValueOrStop(converter->ToChunkedArray());
 }
 
@@ -1418,20 +1469,36 @@ std::shared_ptr<arrow::Table> Table__from_dots(SEXP lst, SEXP schema_sxp,
       options.type = schema->field(j)->type();
       options.size = vctrs::vec_size(x);
 
-      // first try to add a task to do a zero copy in parallel
-      if (arrow::r::vector_from_r_memory(x, options.type, columns, j, tasks)) {
-        continue;
-      }
+      // If we can handle this in C++  we do so; otherwise we use the
+      // AsArrowArrayConverter, which calls as_arrow_array().
+      std::unique_ptr<arrow::r::RConverter> converter;
+      if (arrow::r::can_convert_native(x) &&
+          options.type->id() != arrow::Type::EXTENSION) {
+        // first try to add a task to do a zero copy in parallel
+        if (arrow::r::vector_from_r_memory(x, options.type, columns, j, tasks)) {
+          continue;
+        }
 
-      // if unsuccessful: use RConverter api
-      auto converter_result =
-          arrow::MakeConverter<arrow::r::RConverter, arrow::r::RConverterTrait>(
-              options.type, options, gc_memory_pool());
-      if (!converter_result.ok()) {
-        status = converter_result.status();
-        break;
+        // otherwise go through the Converter API
+        auto converter_result =
+            arrow::MakeConverter<arrow::r::RConverter, arrow::r::RConverterTrait>(
+                options.type, options, gc_memory_pool());
+        if (converter_result.ok()) {
+          converter = std::move(converter_result.ValueUnsafe());
+        } else {
+          status = converter_result.status();
+          break;
+        }
+      } else {
+        converter =
+            std::unique_ptr<arrow::r::RConverter>(new arrow::r::AsArrowArrayConverter());
+        status = converter->Construct(options.type, options, gc_memory_pool());
+        if (!status.ok()) {
+          break;
+        }
       }
-      converters[j] = std::move(converter_result.ValueUnsafe());
+
+      converters[j] = std::move(converter);
     }
   }
 
diff --git a/r/src/recordbatchreader.cpp b/r/src/recordbatchreader.cpp
index 0fff4e2805..7db6a99750 100644
--- a/r/src/recordbatchreader.cpp
+++ b/r/src/recordbatchreader.cpp
@@ -41,6 +41,26 @@ cpp11::list RecordBatchReader__batches(
   return arrow::r::to_r_list(ValueOrStop(reader->ToRecordBatches()));
 }
 
+// [[arrow::export]]
+std::shared_ptr<arrow::RecordBatchReader> RecordBatchReader__from_batches(
+    const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches,
+    cpp11::sexp schema_sxp) {
+  bool infer_schema = !Rf_inherits(schema_sxp, "Schema");
+
+  if (infer_schema) {
+    return ValueOrStop(arrow::RecordBatchReader::Make(std::move(batches)));
+  } else {
+    auto schema = cpp11::as_cpp<std::shared_ptr<arrow::Schema>>(schema_sxp);
+    return ValueOrStop(arrow::RecordBatchReader::Make(std::move(batches), schema));
+  }
+}
+
+// [[arrow::export]]
+std::shared_ptr<arrow::RecordBatchReader> RecordBatchReader__from_Table(
+    const std::shared_ptr<arrow::Table>& table) {
+  return std::make_shared<arrow::TableBatchReader>(table);
+}
+
 // [[arrow::export]]
 std::shared_ptr<arrow::Table> Table__from_RecordBatchReader(
     const std::shared_ptr<arrow::RecordBatchReader>& reader) {
diff --git a/r/src/table.cpp b/r/src/table.cpp
index 2d2d35b06a..2c60cb352e 100644
--- a/r/src/table.cpp
+++ b/r/src/table.cpp
@@ -193,13 +193,13 @@ SEXP arrow_attributes(SEXP x, bool only_top_level) {
   return att;
 }
 
-SEXP CollectColumnMetadata(SEXP lst, int num_fields, bool& has_metadata) {
+cpp11::writable::list CollectColumnMetadata(SEXP lst, int num_fields) {
   // Preallocate for the lambda to fill in
   cpp11::writable::list metadata_columns(num_fields);
 
   cpp11::writable::strings metadata_columns_names(num_fields);
 
-  auto extract_one_metadata = [&metadata_columns, &metadata_columns_names, &has_metadata](
+  auto extract_one_metadata = [&metadata_columns, &metadata_columns_names](
                                   int j, SEXP x, std::string name) {
     metadata_columns_names[j] = name;
 
@@ -208,10 +208,6 @@ SEXP CollectColumnMetadata(SEXP lst, int num_fields, bool& has_metadata) {
       return;
     }
     metadata_columns[j] = arrow_attributes(x, false);
-
-    if (!Rf_isNull(metadata_columns[j])) {
-      has_metadata = true;
-    }
   };
   arrow::r::TraverseDots(lst, num_fields, extract_one_metadata);
 
@@ -226,7 +222,7 @@ arrow::Status AddMetadataFromDots(SEXP lst, int num_fields,
   cpp11::writable::list metadata(2);
   metadata.names() = arrow::r::data::names_metadata;
 
-  bool has_metadata = false;
+  bool has_top_level_metadata = false;
 
   // "top level" attributes, only relevant if the first object is not named and a data
   // frame
@@ -234,12 +230,33 @@ arrow::Status AddMetadataFromDots(SEXP lst, int num_fields,
   if (names[0] == "" && Rf_inherits(VECTOR_ELT(lst, 0), "data.frame")) {
     SEXP top_level = metadata[0] = arrow_attributes(VECTOR_ELT(lst, 0), true);
     if (!Rf_isNull(top_level) && XLENGTH(top_level) > 0) {
-      has_metadata = true;
+      has_top_level_metadata = true;
     }
   }
 
   // recurse to get all columns metadata
-  metadata[1] = CollectColumnMetadata(lst, num_fields, has_metadata);
+  cpp11::writable::list metadata_columns = CollectColumnMetadata(lst, num_fields);
+
+  // Remove metadata for ExtensionType columns, because these have their own mechanism for
+  // preserving R type information
+  for (R_xlen_t i = 0; i < schema->num_fields(); i++) {
+    if (schema->field(i)->type()->id() == Type::EXTENSION) {
+      metadata_columns[i] = R_NilValue;
+    }
+  }
+
+  // If all metadata_columns are NULL and there is no top-level metadata, set has_metadata
+  // to false
+  bool has_metadata = has_top_level_metadata;
+  for (R_xlen_t i = 0; i < metadata_columns.size(); i++) {
+    if (metadata_columns[i] != R_NilValue) {
+      has_metadata = true;
+      break;
+    }
+  }
+
+  // Assign to the output metadata
+  metadata[1] = metadata_columns;
 
   if (has_metadata) {
     SEXP serialise_call =
diff --git a/r/src/type_infer.cpp b/r/src/type_infer.cpp
index 75b1e85c42..19415add72 100644
--- a/r/src/type_infer.cpp
+++ b/r/src/type_infer.cpp
@@ -179,26 +179,38 @@ std::shared_ptr<arrow::DataType> InferArrowType(SEXP x) {
     return arrow::r::altrep::vec_to_arrow_altrep_bypass(x)->type();
   }
 
-  switch (TYPEOF(x)) {
-    case ENVSXP:
-      return InferArrowTypeFromVector<ENVSXP>(x);
-    case LGLSXP:
-      return InferArrowTypeFromVector<LGLSXP>(x);
-    case INTSXP:
-      return InferArrowTypeFromVector<INTSXP>(x);
-    case REALSXP:
-      return InferArrowTypeFromVector<REALSXP>(x);
-    case RAWSXP:
-      return uint8();
-    case STRSXP:
-      return InferArrowTypeFromVector<STRSXP>(x);
-    case VECSXP:
-      return InferArrowTypeFromVector<VECSXP>(x);
-    default:
-      break;
-  }
+  // If we handle the conversion in C++ we do so here; otherwise we call
+  // the type() S3 generic to infer the type of the object. For data.frame,
+  // this code is sufficiently recursive such that it correctly calls into
+  // R to infer column types where can_convert_native() is false.
+  if (can_convert_native(x) || Rf_inherits(x, "data.frame")) {
+    switch (TYPEOF(x)) {
+      case ENVSXP:
+        return InferArrowTypeFromVector<ENVSXP>(x);
+      case LGLSXP:
+        return InferArrowTypeFromVector<LGLSXP>(x);
+      case INTSXP:
+        return InferArrowTypeFromVector<INTSXP>(x);
+      case REALSXP:
+        return InferArrowTypeFromVector<REALSXP>(x);
+      case RAWSXP:
+        return uint8();
+      case STRSXP:
+        return InferArrowTypeFromVector<STRSXP>(x);
+      case VECSXP:
+        return InferArrowTypeFromVector<VECSXP>(x);
+      default:
+        cpp11::stop("Cannot infer type from vector");
+    }
+  } else {
+    cpp11::sexp type_result = cpp11::package("arrow")["infer_type"](
+        x, cpp11::named_arg("from_array_infer_type") = true);
+    if (!Rf_inherits(type_result, "DataType")) {
+      cpp11::stop("type() did not return an object of type DataType");
+    }
 
-  cpp11::stop("Cannot infer type from vector");
+    return cpp11::as_cpp<std::shared_ptr<arrow::DataType>>(type_result);
+  }
 }
 
 }  // namespace r
diff --git a/r/tests/testthat/_snaps/Array.md b/r/tests/testthat/_snaps/Array.md
index 3f8ebe966d..f6ec523510 100644
--- a/r/tests/testthat/_snaps/Array.md
+++ b/r/tests/testthat/_snaps/Array.md
@@ -1,3 +1,23 @@
+# as_arrow_array() works for vctrs_vctr types
+
+    Can't create Array<float64()> from object of type custom_vctr / vctrs_vctr
+
+# as_arrow_array() default method errors
+
+    Can't create Array from object of type class_not_supported
+
+---
+
+    Can't create Array<float64()> from object of type class_not_supported
+
+---
+
+    Can't create Array<float64()> from object of type class_not_supported
+
+---
+
+    Can't create Array<float64()> from object of type class_not_supported
+
 # Array doesn't support c()
 
     Use `concat_arrays()` or `ChunkedArray$create()` instead.
diff --git a/r/tests/testthat/_snaps/feather.md b/r/tests/testthat/_snaps/feather.md
new file mode 100644
index 0000000000..a8268571de
--- /dev/null
+++ b/r/tests/testthat/_snaps/feather.md
@@ -0,0 +1,6 @@
+# write_feather with invalid input type
+
+    Object must be coercible to an Arrow Table using `as_arrow_table()`
+    Caused by error in `as_arrow_table()`:
+    ! No method for `as_arrow_table()` for object of class Array / ArrowDatum / ArrowObject / R6
+
diff --git a/r/tests/testthat/_snaps/ipc-stream.md b/r/tests/testthat/_snaps/ipc-stream.md
new file mode 100644
index 0000000000..5334817f72
--- /dev/null
+++ b/r/tests/testthat/_snaps/ipc-stream.md
@@ -0,0 +1,6 @@
+# write_ipc_stream errors for invalid input type
+
+    Object must be coercible to an Arrow Table using `as_arrow_table()`
+    Caused by error in `as_arrow_table()`:
+    ! No method for `as_arrow_table()` for object of class Array / ArrowDatum / ArrowObject / R6
+
diff --git a/r/tests/testthat/_snaps/parquet.md b/r/tests/testthat/_snaps/parquet.md
new file mode 100644
index 0000000000..bb4f18cfde
--- /dev/null
+++ b/r/tests/testthat/_snaps/parquet.md
@@ -0,0 +1,6 @@
+# write_parquet() with invalid input type
+
+    Object must be coercible to an Arrow Table using `as_arrow_table()`
+    Caused by error in `as_arrow_table()`:
+    ! No method for `as_arrow_table()` for object of class Array / ArrowDatum / ArrowObject / R6
+
diff --git a/r/tests/testthat/_snaps/type.md b/r/tests/testthat/_snaps/type.md
new file mode 100644
index 0000000000..cb15d2d8a9
--- /dev/null
+++ b/r/tests/testthat/_snaps/type.md
@@ -0,0 +1,8 @@
+# infer_type() default method errors for unknown classes
+
+    Can't infer Arrow data type from object inheriting from class_not_supported
+
+---
+
+    Can't infer Arrow data type from object inheriting from class_not_supported
+
diff --git a/r/tests/testthat/_snaps/util.md b/r/tests/testthat/_snaps/util.md
new file mode 100644
index 0000000000..afa4589a6b
--- /dev/null
+++ b/r/tests/testthat/_snaps/util.md
@@ -0,0 +1,10 @@
+# as_writable_table() errors for invalid input
+
+    Object must be coercible to an Arrow Table using `as_arrow_table()`
+    Caused by error in `as_arrow_table()`:
+    ! No method for `as_arrow_table()` for object of class character
+
+---
+
+    Invalid: cannot convert
+
diff --git a/r/tests/testthat/golden-files/data-arrow-sf_7.0.0.feather b/r/tests/testthat/golden-files/data-arrow-sf_7.0.0.feather
new file mode 100644
index 0000000000..c2bb880d3f
Binary files /dev/null and b/r/tests/testthat/golden-files/data-arrow-sf_7.0.0.feather differ
diff --git a/r/tests/testthat/helper-data.R b/r/tests/testthat/helper-data.R
index c693e84b2a..1088be6850 100644
--- a/r/tests/testthat/helper-data.R
+++ b/r/tests/testthat/helper-data.R
@@ -120,9 +120,6 @@ make_string_of_size <- function(size = 1) {
   paste(rep(letters, length.out = 1000 * size), collapse = "")
 }
 
-example_with_extra_metadata <- example_with_metadata
-attributes(example_with_extra_metadata$b) <- list(lots = rep(make_string_of_size(1), 100))
-
 example_with_logical_factors <- tibble::tibble(
   starting_a_fight = factor(c(FALSE, TRUE, TRUE, TRUE)),
   consoling_a_child = factor(c(TRUE, FALSE, TRUE, TRUE)),
@@ -135,6 +132,9 @@ example_with_logical_factors <- tibble::tibble(
   )
 )
 
+example_with_extra_metadata <- example_with_metadata
+attributes(example_with_extra_metadata$b) <- list(lots = rep(make_string_of_size(1), 100))
+
 # The values in each column of this tibble are in ascending order. There are
 # some ties, so tests should use two or more columns to ensure deterministic
 # sort order. The Arrow C++ library orders strings lexicographically as byte
diff --git a/r/tests/testthat/test-Array.R b/r/tests/testthat/test-Array.R
index 1e774d7fb6..3d95ad117c 100644
--- a/r/tests/testthat/test-Array.R
+++ b/r/tests/testthat/test-Array.R
@@ -502,7 +502,7 @@ test_that("Array$create() handles data frame -> struct arrays (ARROW-3811)", {
   expect_as_vector(a, df)
 
   df <- structure(
-    list(col = structure(list(structure(list(list(structure(1))), class = "inner")), class = "outer")),
+    list(col = list(list(list(1)))),
     class = "data.frame", row.names = c(NA, -1L)
   )
   a <- Array$create(df)
@@ -1010,6 +1010,153 @@ test_that("auto int64 conversion to int can be disabled (ARROW-10093)", {
   })
 })
 
+test_that("as_arrow_array() default method calls Array$create()", {
+  expect_equal(
+    as_arrow_array(1:10),
+    Array$create(1:10)
+  )
+
+  expect_equal(
+    as_arrow_array(1:10, type = float64()),
+    Array$create(1:10, type = float64())
+  )
+})
+
+test_that("as_arrow_array() works for Array", {
+  array <- Array$create(logical(), type = null())
+  expect_identical(as_arrow_array(array), array)
+  expect_equal(
+    as_arrow_array(array, type = int32()),
+    Array$create(integer())
+  )
+})
+
+test_that("as_arrow_array() works for Array", {
+  scalar <- Scalar$create(TRUE)
+  expect_equal(as_arrow_array(scalar), Array$create(TRUE))
+  expect_equal(
+    as_arrow_array(scalar, type = int32()),
+    Array$create(1L)
+  )
+})
+
+test_that("as_arrow_array() works for ChunkedArray", {
+  expect_equal(
+    as_arrow_array(chunked_array(type = null())),
+    Array$create(logical(), type = null())
+  )
+
+  expect_equal(
+    as_arrow_array(chunked_array(1:3, 4:6)),
+    Array$create(1:6)
+  )
+
+  expect_equal(
+    as_arrow_array(chunked_array(1:3, 4:6), type = float64()),
+    Array$create(1:6, type = float64())
+  )
+})
+
+test_that("as_arrow_array() works for vctrs_vctr types", {
+  vctr <- vctrs::new_vctr(1:5, class = "custom_vctr")
+  expect_equal(
+    as_arrow_array(vctr),
+    vctrs_extension_array(vctr)
+  )
+
+  # with explicit type
+  expect_equal(
+    as_arrow_array(
+      vctr,
+      type = vctrs_extension_type(
+        vctrs::vec_ptype(vctr),
+        storage_type = float64()
+      )
+    ),
+    vctrs_extension_array(
+      vctr,
+      storage_type = float64()
+    )
+  )
+
+  # with impossible type
+  expect_snapshot_error(as_arrow_array(vctr, type = float64()))
+})
+
+test_that("as_arrow_array() works for nested extension types", {
+  vctr <- vctrs::new_vctr(1:5, class = "custom_vctr")
+
+  nested <- tibble::tibble(x = vctr)
+  type <- infer_type(nested)
+
+  # with type = NULL
+  nested_array <- as_arrow_array(nested)
+  expect_identical(as.vector(nested_array), nested)
+
+  # with explicit type
+  expect_equal(as_arrow_array(nested, type = type), nested_array)
+
+  # with extension type
+  extension_array <- vctrs_extension_array(nested)
+  expect_equal(
+    as_arrow_array(nested, type = extension_array$type),
+    extension_array
+  )
+
+  # with an extension type for the data.frame but no extension columns
+  nested_plain <- tibble::tibble(x = 1:5)
+  extension_array <- vctrs_extension_array(nested_plain)
+  expect_equal(
+    as_arrow_array(nested, type = extension_array$type),
+    extension_array
+  )
+})
+
+test_that("Array$create() calls as_arrow_array() for nested extension types", {
+  vctr <- vctrs::new_vctr(1:5, class = "custom_vctr")
+
+  nested <- tibble::tibble(x = vctr)
+  type <- infer_type(nested)
+
+  # with type = NULL
+  nested_array <- Array$create(nested)
+  expect_identical(as.vector(nested_array), nested)
+
+  # with explicit type
+  expect_equal(Array$create(nested, type = type), nested_array)
+
+  # with extension type
+  extension_array <- vctrs_extension_array(nested)
+  expect_equal(
+    Array$create(nested, type = extension_array$type),
+    extension_array
+  )
+
+  # with an extension type for the data.frame but no extension columns
+  nested_plain <- tibble::tibble(x = 1:5)
+  extension_array <- vctrs_extension_array(nested_plain)
+  expect_equal(
+    Array$create(nested, type = extension_array$type),
+    extension_array
+  )
+})
+
+test_that("as_arrow_array() default method errors", {
+  vec <- structure(list(), class = "class_not_supported")
+
+  # check errors simulating a call from C++
+  expect_snapshot_error(as_arrow_array(vec, from_vec_to_array = TRUE))
+  expect_snapshot_error(
+    as_arrow_array(vec, type = float64(), from_vec_to_array = TRUE)
+  )
+
+  # check errors actually coming through C++
+  expect_snapshot_error(Array$create(vec, type = float64()))
+  expect_snapshot_error(
+    RecordBatch$create(col = vec, schema = schema(col = float64()))
+  )
+})
+
 test_that("concat_arrays works", {
   concat_empty <- concat_arrays()
   expect_true(concat_empty$type == null())
diff --git a/r/tests/testthat/test-RecordBatch.R b/r/tests/testthat/test-RecordBatch.R
index c284b7b1d5..7ea18a2f42 100644
--- a/r/tests/testthat/test-RecordBatch.R
+++ b/r/tests/testthat/test-RecordBatch.R
@@ -785,3 +785,48 @@ test_that("RecordBatchReader to C-interface to arrow_dplyr_query", {
   # must clean up the pointer or we leak
   delete_arrow_array_stream(stream_ptr)
 })
+
+
+test_that("as_record_batch() works for RecordBatch", {
+  batch <- record_batch(col1 = 1L, col2 = "two")
+  expect_identical(as_record_batch(batch), batch)
+  expect_equal(
+    as_record_batch(batch, schema = schema(col1 = float64(), col2 = string())),
+    record_batch(col1 = Array$create(1, type = float64()), col2 = "two")
+  )
+})
+
+test_that("as_record_batch() works for Table", {
+  batch <- record_batch(col1 = 1L, col2 = "two")
+  table <- arrow_table(col1 = 1L, col2 = "two")
+
+  expect_equal(as_record_batch(table), batch)
+  expect_equal(
+    as_record_batch(table, schema = schema(col1 = float64(), col2 = string())),
+    record_batch(col1 = Array$create(1, type = float64()), col2 = "two")
+  )
+
+  # also check zero column table and make sure row count is preserved
+  table0 <- table[integer()]
+  expect_identical(table0$num_columns, 0L)
+  expect_identical(table0$num_rows, 1L)
+
+  batch0 <- as_record_batch(table0)
+  expect_identical(batch0$num_columns, 0L)
+  expect_identical(batch0$num_rows, 1L)
+})
+
+test_that("as_record_batch() works for data.frame()", {
+  batch <- record_batch(col1 = 1L, col2 = "two")
+  tbl <- tibble::tibble(col1 = 1L, col2 = "two")
+
+  expect_equal(as_record_batch(tbl), batch)
+
+  expect_equal(
+    as_record_batch(
+      tbl,
+      schema = schema(col1 = float64(), col2 = string())
+    ),
+    record_batch(col1 = Array$create(1, type = float64()), col2 = "two")
+  )
+})
diff --git a/r/tests/testthat/test-Table.R b/r/tests/testthat/test-Table.R
index 89c22b97e1..523b6a1f16 100644
--- a/r/tests/testthat/test-Table.R
+++ b/r/tests/testthat/test-Table.R
@@ -688,3 +688,45 @@ test_that("ARROW-12729 - length returns number of columns in Table", {
 
   expect_identical(length(tab), 3L)
 })
+
+test_that("as_arrow_table() works for Table", {
+  table <- arrow_table(col1 = 1L, col2 = "two")
+  expect_identical(as_arrow_table(table), table)
+  expect_equal(
+    as_arrow_table(table, schema = schema(col1 = float64(), col2 = string())),
+    arrow_table(col1 = Array$create(1, type = float64()), col2 = "two")
+  )
+})
+
+test_that("as_arrow_table() works for RecordBatch", {
+  table <- arrow_table(col1 = 1L, col2 = "two")
+  batch <- record_batch(col1 = 1L, col2 = "two")
+
+  expect_equal(as_arrow_table(batch), table)
+  expect_equal(
+    as_arrow_table(batch, schema = schema(col1 = float64(), col2 = string())),
+    arrow_table(col1 = Array$create(1, type = float64()), col2 = "two")
+  )
+})
+
+test_that("as_arrow_table() works for data.frame()", {
+  table <- arrow_table(col1 = 1L, col2 = "two")
+  tbl <- tibble::tibble(col1 = 1L, col2 = "two")
+
+  expect_equal(as_arrow_table(tbl), table)
+
+  expect_equal(
+    as_arrow_table(
+      tbl,
+      schema = schema(col1 = float64(), col2 = string())
+    ),
+    arrow_table(col1 = Array$create(1, type = float64()), col2 = "two")
+  )
+})
+
+test_that("as_arrow_table() errors for invalid input", {
+  expect_error(
+    as_arrow_table("no as_arrow_table() method"),
+    class = "arrow_no_method_as_arrow_table"
+  )
+})
diff --git a/r/tests/testthat/test-backwards-compatibility.R b/r/tests/testthat/test-backwards-compatibility.R
index 32e86d5f68..8210bd2e78 100644
--- a/r/tests/testthat/test-backwards-compatibility.R
+++ b/r/tests/testthat/test-backwards-compatibility.R
@@ -118,4 +118,30 @@ for (comp in c("lz4", "uncompressed", "zstd")) {
   })
 }
 
+test_that("sfc columns written by arrow <= 7.0.0 can be re-read", {
+  # nolint start
+  # df <- data.frame(x = I(list(structure(1, foo = "bar"), structure(2, baz = "qux"))))
+  # class(df$x) <- c("sfc_MULTIPOLYGON", "sfc", "list")
+  # withr::with_options(
+  #   list("arrow.preserve_row_level_metadata" = TRUE), {
+  #     arrow::write_feather(
+  #       df,
+  #       "tests/testthat/golden-files/data-arrow-sf_7.0.0.feather",
+  #       compression = "uncompressed"
+  #     )
+  #   })
+  # nolint end
+
+  df <- read_feather(
+    test_path("golden-files/data-arrow-sf_7.0.0.feather")
+  )
+
+  # make sure the class was restored
+  expect_s3_class(df$x, c("sfc_MULTIPOLYGON", "sfc", "list"))
+
+  # make sure the row-level metadata was restored
+  expect_identical(attr(df$x[[1]], "foo"), "bar")
+  expect_identical(attr(df$x[[2]], "baz"), "qux")
+})
+
 # TODO: streams(?)
diff --git a/r/tests/testthat/test-chunked-array.R b/r/tests/testthat/test-chunked-array.R
index 87d2a9d92a..1ea3a2bd7b 100644
--- a/r/tests/testthat/test-chunked-array.R
+++ b/r/tests/testthat/test-chunked-array.R
@@ -505,3 +505,44 @@ test_that("Handling string data with embedded nuls", {
     )
   })
 })
+
+test_that("as_chunked_array() default method calls chunked_array()", {
+  expect_equal(
+    as_chunked_array(chunked_array(1:3, 4:5)),
+    chunked_array(1:3, 4:5)
+  )
+
+  expect_equal(
+    as_chunked_array(chunked_array(1:3, 4:5), type = float64()),
+    chunked_array(
+      Array$create(1:3, type = float64()),
+      Array$create(4:5, type = float64())
+    )
+  )
+})
+
+test_that("as_chunked_array() works for ChunkedArray", {
+  array <- chunked_array(type = null())
+  expect_identical(as_chunked_array(array), array)
+  expect_equal(
+    as_chunked_array(array, type = int32()),
+    chunked_array(type = int32())
+  )
+})
+
+test_that("as_chunked_array() works for Array", {
+  expect_equal(
+    as_chunked_array(Array$create(logical(), type = null())),
+    chunked_array(type = null())
+  )
+
+  expect_equal(
+    as_chunked_array(Array$create(1:6)),
+    chunked_array(Array$create(1:6))
+  )
+
+  expect_equal(
+    as_chunked_array(Array$create(1:6), type = float64()),
+    chunked_array(Array$create(1:6, type = float64()))
+  )
+})
diff --git a/r/tests/testthat/test-data-type.R b/r/tests/testthat/test-data-type.R
index 430043ce05..73995edd3f 100644
--- a/r/tests/testthat/test-data-type.R
+++ b/r/tests/testthat/test-data-type.R
@@ -600,3 +600,18 @@ test_that("DataType$code()", {
   })
 
 })
+
+test_that("as_data_type() works for DataType", {
+  expect_equal(as_data_type(int32()), int32())
+})
+
+test_that("as_data_type() works for Field", {
+  expect_equal(as_data_type(field("a field", int32())), int32())
+})
+
+test_that("as_data_type() works for Schema", {
+  expect_equal(
+    as_data_type(schema(col1 = int32(), col2 = string())),
+    struct(col1 = int32(), col2 = string())
+  )
+})
diff --git a/r/tests/testthat/test-feather.R b/r/tests/testthat/test-feather.R
index 65fb2f0375..ed02c2c7de 100644
--- a/r/tests/testthat/test-feather.R
+++ b/r/tests/testthat/test-feather.R
@@ -105,10 +105,7 @@ test_that("write_feather option error handling", {
 
 test_that("write_feather with invalid input type", {
   bad_input <- Array$create(1:5)
-  expect_error(
-    write_feather(bad_input, feather_file),
-    regexp = "x must be an object of class 'data.frame', 'RecordBatch', or 'Table', not 'Array'."
-  )
+  expect_snapshot_error(write_feather(bad_input, feather_file))
 })
 
 test_that("read_feather supports col_select = <names>", {
diff --git a/r/tests/testthat/test-ipc_stream.R b/r/tests/testthat/test-ipc-stream.R
similarity index 74%
rename from r/tests/testthat/test-ipc_stream.R
rename to r/tests/testthat/test-ipc-stream.R
index 905a22f679..c59c39ad90 100644
--- a/r/tests/testthat/test-ipc_stream.R
+++ b/r/tests/testthat/test-ipc-stream.R
@@ -15,7 +15,6 @@
 # specific language governing permissions and limitations
 # under the License.
 
-
 test_that("read_ipc_stream() and write_ipc_stream() accept connection objects", {
   tf <- tempfile()
   on.exit(unlink(tf))
@@ -30,3 +29,17 @@ test_that("read_ipc_stream() and write_ipc_stream() accept connection objects",
   expect_identical(read_ipc_stream(tf), test_tbl)
   expect_identical(read_ipc_stream(file(tf)), read_ipc_stream(tf))
 })
+
+test_that("write_ipc_stream can write Table", {
+  table <- arrow_table(col1 = 1, col2 = "two")
+  tf <- tempfile()
+  on.exit(unlink(tf))
+
+  expect_identical(write_ipc_stream(table, tf), table)
+  expect_equal(read_ipc_stream(tf, as_data_frame = FALSE), table)
+})
+
+test_that("write_ipc_stream errors for invalid input type", {
+  bad_input <- Array$create(1:5)
+  expect_snapshot_error(write_ipc_stream(bad_input, feather_file))
+})
diff --git a/r/tests/testthat/test-metadata.R b/r/tests/testthat/test-metadata.R
index 4db20d04df..4421c81673 100644
--- a/r/tests/testthat/test-metadata.R
+++ b/r/tests/testthat/test-metadata.R
@@ -63,6 +63,13 @@ test_that("R metadata is not stored for types that map to Arrow types (factor, D
   expect_null(Table$create(example_with_times[1:3])$metadata$r)
 })
 
+test_that("R metadata is not stored for ExtensionType columns", {
+  tab <- Table$create(
+    x = vctrs::new_vctr(1:5, class = "special_integer")
+  )
+  expect_null(tab$metadata$r)
+})
+
 test_that("classes are not stored for arrow_binary/arrow_large_binary/arrow_fixed_size_binary (ARROW-14140)", {
   raws <- charToRaw("bonjour")
 
@@ -152,6 +159,7 @@ test_that("RecordBatch metadata", {
 })
 
 test_that("RecordBatch R metadata", {
+
   expect_identical(as.data.frame(record_batch(example_with_metadata)), example_with_metadata)
 })
 
@@ -182,7 +190,7 @@ test_that("haven types roundtrip via feather", {
 
 test_that("Date/time type roundtrip", {
   rb <- record_batch(example_with_times)
-  expect_r6_class(rb$schema$posixlt$type, "StructType")
+  expect_r6_class(rb$schema$posixlt$type, "VctrsExtensionType")
   expect_identical(as.data.frame(rb), example_with_times)
 })
 
@@ -310,31 +318,6 @@ test_that("Dataset writing does handle other metadata", {
   )
 })
 
-test_that("When we encounter SF cols, we warn", {
-  df <- data.frame(x = I(list(structure(1, foo = "bar"), structure(2, baz = "qux"))))
-  class(df$x) <- c("sfc_MULTIPOLYGON", "sfc", "list")
-
-  expect_warning(
-    tab <- Table$create(df),
-    "One of the columns given appears to be an"
-  )
-
-  # but the table was read fine, just sans (row-level) metadata
-  r_metadata <- .unserialize_arrow_r_metadata(tab$metadata$r)
-  expect_null(r_metadata$columns$x$columns)
-
-  # But we can re-enable this / read data that has already been written with
-  # row-level metadata without a warning
-  withr::with_options(
-    list("arrow.preserve_row_level_metadata" = TRUE),
-    {
-      expect_warning(tab <- Table$create(df), NA)
-      expect_identical(attr(as.data.frame(tab)$x[[1]], "foo"), "bar")
-      expect_identical(attr(as.data.frame(tab)$x[[2]], "baz"), "qux")
-    }
-  )
-})
-
 test_that("dplyr with metadata", {
   skip_if_not_available("dataset")
 
@@ -358,7 +341,7 @@ test_that("dplyr with metadata", {
   )
   compare_dplyr_binding(
     .input %>%
-      mutate(z = nchar(a)) %>%
+      mutate(z = nchar(d)) %>%
       select(z, a) %>%
       collect(),
     example_with_metadata
@@ -367,7 +350,7 @@ test_that("dplyr with metadata", {
   # of grouping columns appear to come through
   compare_dplyr_binding(
     .input %>%
-      group_by(a) %>%
+      group_by(d) %>%
       summarize(n()) %>%
       collect(),
     example_with_metadata
@@ -376,7 +359,7 @@ test_that("dplyr with metadata", {
   # carry through
   compare_dplyr_binding(
     .input %>%
-      mutate(a = nchar(a)) %>%
+      mutate(a = b) %>%
       select(a) %>%
       collect(),
     example_with_metadata
diff --git a/r/tests/testthat/test-parquet.R b/r/tests/testthat/test-parquet.R
index 3f43c3d894..dbafd5d62c 100644
--- a/r/tests/testthat/test-parquet.R
+++ b/r/tests/testthat/test-parquet.R
@@ -113,10 +113,7 @@ test_that("write_parquet() handles grouped_df", {
 
 test_that("write_parquet() with invalid input type", {
   bad_input <- Array$create(1:5)
-  expect_error(
-    write_parquet(bad_input, tempfile()),
-    regexp = "x must be an object of class 'data.frame', 'RecordBatch', or 'Table', not 'Array'."
-  )
+  expect_snapshot_error(write_parquet(bad_input, tempfile()))
 })
 
 test_that("write_parquet() can truncate timestamps", {
diff --git a/r/tests/testthat/test-python.R b/r/tests/testthat/test-python.R
index 9d969b6e8d..1a83ebd85f 100644
--- a/r/tests/testthat/test-python.R
+++ b/r/tests/testthat/test-python.R
@@ -37,6 +37,8 @@ test_that("Array from Python", {
   pa <- reticulate::import("pyarrow")
   py <- pa$array(c(1, 2, 3))
   expect_equal(py, Array$create(c(1, 2, 3)))
+
+  expect_equal(as_arrow_array(py), Array$create(c(1, 2, 3)))
 })
 
 test_that("Array to Python", {
@@ -53,6 +55,11 @@ test_that("RecordBatch to/from Python", {
   py <- reticulate::r_to_py(batch)
   expect_s3_class(py, "pyarrow.lib.RecordBatch")
   expect_equal(reticulate::py_to_r(py), batch)
+
+  expect_equal(as_record_batch(py), batch)
+  expect_equal(as_arrow_table(py), as_arrow_table(batch))
+  reader <- as_record_batch_reader(py)
+  expect_equal(reader$read_next_batch(), batch)
 })
 
 test_that("Table and ChunkedArray from Python", {
@@ -65,6 +72,11 @@ test_that("Table and ChunkedArray from Python", {
   expect_s3_class(pytab[0], "pyarrow.lib.ChunkedArray")
   expect_equal(reticulate::py_to_r(pytab[0]), tab$col1)
   expect_equal(reticulate::py_to_r(pytab), tab)
+
+  expect_equal(as_arrow_table(pytab), tab)
+  expect_equal(as_record_batch(pytab), as_record_batch(tab))
+  reader <- as_record_batch_reader(pytab)
+  expect_equal(reader$read_table(), tab)
 })
 
 test_that("Table and ChunkedArray to Python", {
@@ -81,19 +93,33 @@ test_that("Table and ChunkedArray to Python", {
 })
 
 test_that("RecordBatch with metadata roundtrip", {
-  batch <- RecordBatch$create(example_with_times)
+  batch <- RecordBatch$create(example_with_metadata)
   pybatch <- reticulate::r_to_py(batch)
   expect_s3_class(pybatch, "pyarrow.lib.RecordBatch")
-  expect_equal(reticulate::py_to_r(pybatch), batch)
-  expect_identical(as.data.frame(reticulate::py_to_r(pybatch)), example_with_times)
+
+  # Because batch$a is VctrsExtensionType, (which pyarrow doesn't know
+  # about) we don't quite have equality; however, we still have the
+  # ability to roundtrip preserving the extension type.
+  rbatch <- reticulate::py_to_r(pybatch)
+  expect_identical(rbatch$metadata, batch$metadata)
+  expect_equal(rbatch$a, batch$a)
+  expect_equal(rbatch[c("b", "c", "d")], batch[c("b", "c", "d")])
+  expect_identical(as.data.frame(rbatch), example_with_metadata)
 })
 
 test_that("Table with metadata roundtrip", {
-  tab <- Table$create(example_with_times)
+  tab <- Table$create(example_with_metadata)
   pytab <- reticulate::r_to_py(tab)
   expect_s3_class(pytab, "pyarrow.lib.Table")
-  expect_equal(reticulate::py_to_r(pytab), tab)
-  expect_identical(as.data.frame(reticulate::py_to_r(pytab)), example_with_times)
+
+  # Because tab$a is VctrsExtensionType, (which pyarrow doesn't know
+  # about) we don't quite have equality; however, we still have the
+  # ability to roundtrip preserving the extension type.
+  rtab <- reticulate::py_to_r(pytab)
+  expect_identical(rtab$metadata, tab$metadata)
+  expect_equal(rtab$a, tab$a)
+  expect_equal(rtab[c("b", "c", "d")], tab[c("b", "c", "d")])
+  expect_identical(as.data.frame(rtab), example_with_metadata)
 })
 
 test_that("DataType roundtrip", {
@@ -101,6 +127,8 @@ test_that("DataType roundtrip", {
   py <- reticulate::r_to_py(r)
   expect_s3_class(py, "pyarrow.lib.DataType")
   expect_equal(reticulate::py_to_r(py), r)
+
+  expect_equal(as_data_type(py), r)
 })
 
 test_that("Field roundtrip", {
@@ -108,6 +136,8 @@ test_that("Field roundtrip", {
   py <- reticulate::r_to_py(r)
   expect_s3_class(py, "pyarrow.lib.Field")
   expect_equal(reticulate::py_to_r(py), r)
+
+  expect_equal(as_data_type(py), as_data_type(r))
 })
 
 test_that("RecordBatchReader to python", {
@@ -145,4 +175,10 @@ test_that("RecordBatchReader from python", {
   rt_table <- back_to_r$read_table()
   expect_r6_class(rt_table, "Table")
   expect_identical(as.data.frame(rt_table), example_data)
+
+  scan <- Scanner$create(tab)
+  reader <- scan$ToRecordBatchReader()
+  pyreader <- reticulate::r_to_py(reader)
+  back_to_r <- as_record_batch_reader(pyreader)
+  expect_equal(back_to_r$read_table(), rt_table)
 })
diff --git a/r/tests/testthat/test-record-batch-reader.R b/r/tests/testthat/test-record-batch-reader.R
index f9b80d2c53..2fe22685cd 100644
--- a/r/tests/testthat/test-record-batch-reader.R
+++ b/r/tests/testthat/test-record-batch-reader.R
@@ -185,3 +185,49 @@ y: string"
     rbind(as.data.frame(batch), as.data.frame(batch))
   )
 })
+
+test_that("as_record_batch_reader() works for RecordBatchReader", {
+  skip_if_not_available("dataset")
+
+  batch <- record_batch(a = 1, b = "two")
+  reader <- Scanner$create(batch)$ToRecordBatchReader()
+  expect_identical(as_record_batch_reader(reader), reader)
+})
+
+test_that("as_record_batch_reader() works for Scanner", {
+  skip_if_not_available("dataset")
+
+  batch <- record_batch(a = 1, b = "two")
+  scanner <- Scanner$create(batch)
+  reader <- as_record_batch_reader(scanner)
+  expect_equal(reader$read_next_batch(), batch)
+})
+
+test_that("as_record_batch_reader() works for Dataset", {
+  skip_if_not_available("dataset")
+
+  dataset <- InMemoryDataset$create(arrow_table(a = 1, b = "two"))
+  reader <- as_record_batch_reader(dataset)
+  expect_equal(
+    reader$read_next_batch(),
+    record_batch(a = 1, b = "two")
+  )
+})
+
+test_that("as_record_batch_reader() works for Table", {
+  table <- arrow_table(a = 1, b = "two")
+  reader <- as_record_batch_reader(table)
+  expect_equal(reader$read_next_batch(), record_batch(a = 1, b = "two"))
+})
+
+test_that("as_record_batch_reader() works for RecordBatch", {
+  batch <- record_batch(a = 1, b = "two")
+  reader <- as_record_batch_reader(batch)
+  expect_equal(reader$read_next_batch(), batch)
+})
+
+test_that("as_record_batch_reader() works for data.frame", {
+  df <- tibble::tibble(a = 1, b = "two")
+  reader <- as_record_batch_reader(df)
+  expect_equal(reader$read_next_batch(), record_batch(a = 1, b = "two"))
+})
diff --git a/r/tests/testthat/test-schema.R b/r/tests/testthat/test-schema.R
index 8301fd5158..c7046de3cb 100644
--- a/r/tests/testthat/test-schema.R
+++ b/r/tests/testthat/test-schema.R
@@ -251,3 +251,13 @@ test_that("Schemas from lists", {
   expect_equal(name_list_schema, schema(b = double(), c = string(), d = int8()))
   expect_equal(field_list_schema, schema(b = double(), c = bool(), d = string()))
 })
+
+test_that("as_schema() works for Schema objects", {
+  schema <- schema(col1 = int32())
+  expect_identical(as_schema(schema), schema)
+})
+
+test_that("as_schema() works for StructType objects", {
+  struct_type <- struct(col1 = int32())
+  expect_equal(as_schema(struct_type), schema(col1 = int32()))
+})
diff --git a/r/tests/testthat/test-type.R b/r/tests/testthat/test-type.R
index 71c2302116..46da8c9f8c 100644
--- a/r/tests/testthat/test-type.R
+++ b/r/tests/testthat/test-type.R
@@ -16,47 +16,73 @@
 # under the License.
 
 
-test_that("type() gets the right type for arrow::Array", {
+test_that("infer_type() gets the right type for arrow::Array", {
   a <- Array$create(1:10)
-  expect_equal(type(a), a$type)
+  expect_equal(infer_type(a), a$type)
 })
 
-test_that("type() gets the right type for ChunkedArray", {
+test_that("infer_type() gets the right type for ChunkedArray", {
   a <- chunked_array(1:10, 1:10)
-  expect_equal(type(a), a$type)
+  expect_equal(infer_type(a), a$type)
 })
 
-test_that("type() infers from R type", {
-  expect_equal(type(1:10), int32())
-  expect_equal(type(1), float64())
-  expect_equal(type(TRUE), boolean())
-  expect_equal(type(raw()), uint8())
-  expect_equal(type(""), utf8())
+test_that("infer_type() infers from R type", {
+  expect_equal(infer_type(1:10), int32())
+  expect_equal(infer_type(1), float64())
+  expect_equal(infer_type(TRUE), boolean())
+  expect_equal(infer_type(raw()), uint8())
+  expect_equal(infer_type(""), utf8())
   expect_equal(
-    type(example_data$fct),
+    infer_type(example_data$fct),
     dictionary(int8(), utf8(), FALSE)
   )
   expect_equal(
-    type(lubridate::ymd_hms("2019-02-14 13:55:05")),
+    infer_type(lubridate::ymd_hms("2019-02-14 13:55:05")),
     timestamp(TimeUnit$MICRO, "UTC")
   )
   expect_equal(
-    type(hms::hms(56, 34, 12)),
+    infer_type(hms::hms(56, 34, 12)),
     time32(unit = TimeUnit$SECOND)
   )
   expect_equal(
-    type(as.difftime(123, units = "days")),
+    infer_type(as.difftime(123, units = "days")),
     duration(unit = TimeUnit$SECOND)
   )
   expect_equal(
-    type(bit64::integer64()),
+    infer_type(bit64::integer64()),
     int64()
   )
 })
 
-test_that("type() can infer struct types from data frames", {
+test_that("infer_type() default method errors for unknown classes", {
+  vec <- structure(list(), class = "class_not_supported")
+
+  # check simulating a call from C++
+  expect_snapshot_error(infer_type(vec, from_array_infer_type = TRUE))
+
+  # also check the error when infer_type() is called from Array__infer_type()
+  expect_snapshot_error(infer_type(vec))
+})
+
+test_that("infer_type() can infer struct types from data frames", {
   df <- tibble::tibble(x = 1:10, y = rnorm(10), z = letters[1:10])
-  expect_equal(type(df), struct(x = int32(), y = float64(), z = utf8()))
+  expect_equal(infer_type(df), struct(x = int32(), y = float64(), z = utf8()))
+})
+
+test_that("infer_type() can infer type for vctr_vctr subclasses", {
+  vctr <- vctrs::new_vctr(1:5, class = "custom_vctr")
+  expect_equal(
+    infer_type(vctr),
+    vctrs_extension_type(vctrs::vec_ptype(vctr))
+  )
+})
+
+test_that("infer_type() can infer nested extension types", {
+  vctr <- vctrs::new_vctr(1:5, class = "custom_vctr")
+  expect_equal(
+    infer_type(tibble::tibble(x = vctr)),
+    struct(x = infer_type(vctr))
+  )
 })
 
 test_that("DataType$Equals", {
@@ -230,15 +256,31 @@ test_that("Type strings are correctly canonicalized", {
   )
 })
 
-test_that("type() gets the right type for Expression", {
+test_that("infer_type() gets the right type for Expression", {
   x <- Expression$scalar(32L)
   y <- Expression$scalar(10)
   add_xy <- Expression$create("add", x, y)
 
-  expect_equal(x$type(), type(x))
-  expect_equal(type(x), int32())
-  expect_equal(y$type(), type(y))
-  expect_equal(type(y), float64())
-  expect_equal(add_xy$type(), type(add_xy))
-  expect_equal(type(add_xy), float64())
+  expect_equal(x$type(), infer_type(x))
+  expect_equal(infer_type(x), int32())
+  expect_equal(y$type(), infer_type(y))
+  expect_equal(infer_type(y), float64())
+  expect_equal(add_xy$type(), infer_type(add_xy))
+  expect_equal(infer_type(add_xy), float64())
+})
+
+test_that("infer_type() infers type for POSIXlt", {
+  posix_lt <- as.POSIXlt("2021-01-01 01:23:45", tz = "UTC")
+  expect_equal(
+    infer_type(posix_lt),
+    vctrs_extension_type(posix_lt[integer(0)])
+  )
+})
+
+test_that("infer_type() infers type for vctrs", {
+  vec <- vctrs::new_vctr(1:5, class = "special_integer")
+  expect_equal(
+    infer_type(vec),
+    vctrs_extension_type(vec[integer(0)])
+  )
 })
diff --git a/r/tests/testthat/test-util.R b/r/tests/testthat/test-util.R
new file mode 100644
index 0000000000..20fdedf3e1
--- /dev/null
+++ b/r/tests/testthat/test-util.R
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+test_that("as_writable_table() works for data.frame, RecordBatch, and Table", {
+  table <- arrow_table(col1 = 1, col2 = "two")
+  expect_identical(as_writable_table(table), table)
+
+  batch <- record_batch(col1 = 1, col2 = "two")
+  expect_equal(as_writable_table(batch), table)
+
+  tbl <- tibble::tibble(col1 = 1, col2 = "two")
+  # because of metadata
+  table_from_tbl <- as_writable_table(tbl)
+  table_from_tbl$metadata <- NULL
+  expect_equal(table_from_tbl, table)
+})
+
+test_that("as_writable_table() errors for invalid input", {
+  # check errors from a wrapper function (i.e., simulate write_*() functions)
+  wrapper_fun <- function(x) as_writable_table(x)
+
+  # make sure we get the custom error message
+  expect_snapshot_error(wrapper_fun("not a table"))
+
+  # make sure other errors make it through
+  expect_snapshot_error(wrapper_fun(data.frame(x = I(list(1, "a")))))
+})