You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by jo...@apache.org on 2022/04/19 21:41:08 UTC

[arrow] branch master updated: ARROW-15517: [R] Use WriteNode in write_dataset()

This is an automated email from the ASF dual-hosted git repository.

jonkeane pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b3f4677b9 ARROW-15517: [R] Use WriteNode in write_dataset()
4b3f4677b9 is described below

commit 4b3f4677b995cb7263e4a4e65daf00189f638617
Author: Neal Richardson <ne...@gmail.com>
AuthorDate: Tue Apr 19 16:40:57 2022 -0500

    ARROW-15517: [R] Use WriteNode in write_dataset()
    
    This should allow streaming writes in more cases, e.g. with a join.
    
    Closes #12316 from nealrichardson/write-node
    
    Authored-by: Neal Richardson <ne...@gmail.com>
    Signed-off-by: Jonathan Keane <jk...@gmail.com>
---
 r/R/arrowExports.R                    |  8 ++--
 r/R/dataset-format.R                  |  4 +-
 r/R/dataset-write.R                   | 87 +++++++++++++++++++++++++++--------
 r/R/dplyr.R                           | 11 ++++-
 r/R/metadata.R                        | 22 ++++++++-
 r/R/parquet.R                         | 38 ++++++++-------
 r/R/query-engine.R                    | 29 +++++-------
 r/src/arrowExports.cpp                | 62 +++++++++++++------------
 r/src/compute-exec.cpp                | 49 ++++++++++++++++++--
 r/src/dataset.cpp                     | 24 ----------
 r/tests/testthat/test-dataset-write.R | 70 ++++++++++++++++++++--------
 r/tests/testthat/test-metadata.R      | 36 ++++++++++++---
 12 files changed, 291 insertions(+), 149 deletions(-)

diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index 7bf77f1e66..6b969336c9 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -420,6 +420,10 @@ ExecNode_Scan <- function(plan, dataset, filter, materialized_field_names) {
   .Call(`_arrow_ExecNode_Scan`, plan, dataset, filter, materialized_field_names)
 }
 
+ExecPlan_Write <- function(plan, final_node, metadata, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group) {
+  invisible(.Call(`_arrow_ExecPlan_Write`, plan, final_node, metadata, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group))
+}
+
 ExecNode_Filter <- function(input, filter) {
   .Call(`_arrow_ExecNode_Filter`, input, filter)
 }
@@ -748,10 +752,6 @@ dataset___Scanner__schema <- function(sc) {
   .Call(`_arrow_dataset___Scanner__schema`, sc)
 }
 
-dataset___Dataset__Write <- function(file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group) {
-  invisible(.Call(`_arrow_dataset___Dataset__Write`, file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group))
-}
-
 dataset___Scanner__TakeRows <- function(scanner, indices) {
   .Call(`_arrow_dataset___Scanner__TakeRows`, scanner, indices)
 }
diff --git a/r/R/dataset-format.R b/r/R/dataset-format.R
index f00efd0350..acc1a41b02 100644
--- a/r/R/dataset-format.R
+++ b/r/R/dataset-format.R
@@ -390,7 +390,7 @@ ParquetFragmentScanOptions$create <- function(use_buffered_stream = FALSE,
 FileWriteOptions <- R6Class("FileWriteOptions",
   inherit = ArrowObject,
   public = list(
-    update = function(table, ...) {
+    update = function(column_names, ...) {
       check_additional_args <- function(format, passed_args) {
         if (format == "parquet") {
           supported_args <- names(formals(write_parquet))
@@ -437,7 +437,7 @@ FileWriteOptions <- R6Class("FileWriteOptions",
       if (self$type == "parquet") {
         dataset___ParquetFileWriteOptions__update(
           self,
-          ParquetWriterProperties$create(table, ...),
+          ParquetWriterProperties$create(column_names, ...),
           ParquetArrowWriterProperties$create(...)
         )
       } else if (self$type == "ipc") {
diff --git a/r/R/dataset-write.R b/r/R/dataset-write.R
index d7c73908e7..09b3ebdbe6 100644
--- a/r/R/dataset-write.R
+++ b/r/R/dataset-write.R
@@ -136,41 +136,88 @@ write_dataset <- function(dataset,
   if (inherits(dataset, "arrow_dplyr_query")) {
     # partitioning vars need to be in the `select` schema
     dataset <- ensure_group_vars(dataset)
-  } else if (inherits(dataset, "grouped_df")) {
-    force(partitioning)
-    # Drop the grouping metadata before writing; we've already consumed it
-    # now to construct `partitioning` and don't want it in the metadata$r
-    dataset <- dplyr::ungroup(dataset)
+  } else {
+    if (inherits(dataset, "grouped_df")) {
+      force(partitioning)
+      # Drop the grouping metadata before writing; we've already consumed it
+      # now to construct `partitioning` and don't want it in the metadata$r
+      dataset <- dplyr::ungroup(dataset)
+    }
+    dataset <- tryCatch(
+      as_adq(dataset),
+      error = function(e) {
+        supported <- c(
+          "Dataset", "RecordBatch", "Table", "arrow_dplyr_query", "data.frame"
+        )
+        stop(
+          "'dataset' must be a ",
+          oxford_paste(supported, "or", quote = FALSE),
+          ", not ",
+          deparse(class(dataset)),
+          call. = FALSE
+        )
+      }
+    )
+  }
+
+  plan <- ExecPlan$create()
+  final_node <- plan$Build(dataset)
+  if (!is.null(final_node$sort %||% final_node$head %||% final_node$tail)) {
+    # Because sorting and topK are only handled in the SinkNode (or in R!),
+    # they wouldn't get picked up in the WriteNode. So let's Run this ExecPlan
+    # to capture those, and then create a new plan for writing
+    # TODO(ARROW-15681): do sorting in WriteNode in C++
+    dataset <- as_adq(plan$Run(final_node))
+    plan <- ExecPlan$create()
+    final_node <- plan$Build(dataset)
   }
 
-  scanner <- Scanner$create(dataset)
   if (!inherits(partitioning, "Partitioning")) {
-    partition_schema <- scanner$schema[partitioning]
+    partition_schema <- final_node$schema[partitioning]
     if (isTRUE(hive_style)) {
-      partitioning <- HivePartitioning$create(partition_schema, null_fallback = list(...)$null_fallback)
+      partitioning <- HivePartitioning$create(
+        partition_schema,
+        null_fallback = list(...)$null_fallback
+      )
     } else {
       partitioning <- DirectoryPartitioning$create(partition_schema)
     }
   }
 
-  if (!missing(max_rows_per_file) && missing(max_rows_per_group) && max_rows_per_group > max_rows_per_file) {
-    max_rows_per_group <- max_rows_per_file
-  }
-
   path_and_fs <- get_path_and_filesystem(path)
-  options <- FileWriteOptions$create(format, table = scanner, ...)
+  output_schema <- final_node$schema
+  options <- FileWriteOptions$create(
+    format,
+    column_names = names(output_schema),
+    ...
+  )
 
+  # TODO(ARROW-16200): expose FileSystemDatasetWriteOptions in R
+  # and encapsulate this logic better
   existing_data_behavior_opts <- c("delete_matching", "overwrite", "error")
   existing_data_behavior <- match(match.arg(existing_data_behavior), existing_data_behavior_opts) - 1L
 
-  validate_positive_int_value(max_partitions, "max_partitions must be a positive, non-missing integer")
-  validate_positive_int_value(max_open_files, "max_open_files must be a positive, non-missing integer")
-  validate_positive_int_value(min_rows_per_group, "min_rows_per_group must be a positive, non-missing integer")
-  validate_positive_int_value(max_rows_per_group, "max_rows_per_group must be a positive, non-missing integer")
+  if (!missing(max_rows_per_file) && missing(max_rows_per_group) && max_rows_per_group > max_rows_per_file) {
+    max_rows_per_group <- max_rows_per_file
+  }
 
-  dataset___Dataset__Write(
+  validate_positive_int_value(max_partitions)
+  validate_positive_int_value(max_open_files)
+  validate_positive_int_value(min_rows_per_group)
+  validate_positive_int_value(max_rows_per_group)
+
+  new_r_meta <- get_r_metadata_from_old_schema(
+    output_schema,
+    source_data(dataset)$schema,
+    drop_attributes = has_aggregation(dataset)
+  )
+  if (!is.null(new_r_meta)) {
+    output_schema$r_metadata <- new_r_meta
+  }
+  plan$Write(
+    final_node, prepare_key_value_metadata(output_schema$metadata),
     options, path_and_fs$fs, path_and_fs$path,
-    partitioning, basename_template, scanner,
+    partitioning, basename_template,
     existing_data_behavior, max_partitions,
     max_open_files, max_rows_per_file,
     min_rows_per_group, max_rows_per_group
@@ -179,6 +226,6 @@ write_dataset <- function(dataset,
 
 validate_positive_int_value <- function(value, msg) {
   if (!is_integerish(value, n = 1) || is.na(value) || value < 0) {
-    abort(msg)
+    abort(paste(substitute(value), "must be a positive, non-missing integer"))
   }
 }
diff --git a/r/R/dplyr.R b/r/R/dplyr.R
index e6d7889078..c9650fb065 100644
--- a/r/R/dplyr.R
+++ b/r/R/dplyr.R
@@ -24,7 +24,12 @@ arrow_dplyr_query <- function(.data) {
   # RecordBatch, or Dataset) and the state of the user's dplyr query--things
   # like selected columns, filters, and group vars.
   # An arrow_dplyr_query can contain another arrow_dplyr_query in .data
-  gv <- dplyr::group_vars(.data) %||% character()
+  gv <- tryCatch(
+    # If dplyr is not available, or if the input doesn't have a group_vars
+    # method, assume no group vars
+    dplyr::group_vars(.data) %||% character(),
+    error = function(e) character()
+  )
 
   if (inherits(.data, "data.frame")) {
     .data <- Table$create(.data)
@@ -247,7 +252,9 @@ abandon_ship <- function(call, .data, msg) {
 query_on_dataset <- function(x) inherits(source_data(x), c("Dataset", "RecordBatchReader"))
 
 source_data <- function(x) {
-  if (is_collapsed(x)) {
+  if (!inherits(x, "arrow_dplyr_query")) {
+    x
+  } else if (is_collapsed(x)) {
     source_data(x$.data)
   } else {
     x$.data
diff --git a/r/R/metadata.R b/r/R/metadata.R
index d88297dd92..f0411eb54a 100644
--- a/r/R/metadata.R
+++ b/r/R/metadata.R
@@ -133,7 +133,6 @@ remove_attributes <- function(x) {
 }
 
 arrow_attributes <- function(x, only_top_level = FALSE) {
-
   att <- attributes(x)
   removed_attributes <- remove_attributes(x)
 
@@ -208,3 +207,24 @@ arrow_attributes <- function(x, only_top_level = FALSE) {
     NULL
   }
 }
+
+get_r_metadata_from_old_schema <- function(new_schema,
+                                           old_schema,
+                                           drop_attributes = FALSE) {
+  # TODO: do we care about other (non-R) metadata preservation?
+  # How would we know if it were meaningful?
+  r_meta <- old_schema$r_metadata
+  if (!is.null(r_meta)) {
+    # Filter r_metadata$columns on columns with name _and_ type match
+    common_names <- intersect(names(r_meta$columns), names(new_schema))
+    keep <- common_names[
+      map_lgl(common_names, ~ old_schema[[.]] == new_schema[[.]])
+    ]
+    r_meta$columns <- r_meta$columns[keep]
+    if (drop_attributes) {
+      # dplyr drops top-level attributes if you do summarize
+      r_meta$attributes <- NULL
+    }
+  }
+  r_meta
+}
diff --git a/r/R/parquet.R b/r/R/parquet.R
index 3a07c224ed..c6c00ed3a4 100644
--- a/r/R/parquet.R
+++ b/r/R/parquet.R
@@ -186,7 +186,7 @@ write_parquet <- function(x,
     x$schema,
     sink,
     properties = properties %||% ParquetWriterProperties$create(
-      x,
+      names(x),
       version = version,
       compression = compression,
       compression_level = compression_level,
@@ -307,33 +307,33 @@ ParquetWriterPropertiesBuilder <- R6Class("ParquetWriterPropertiesBuilder",
     set_version = function(version) {
       parquet___WriterProperties___Builder__version(self, make_valid_version(version))
     },
-    set_compression = function(table, compression) {
+    set_compression = function(column_names, compression) {
       compression <- compression_from_name(compression)
       assert_that(is.integer(compression))
       private$.set(
-        table, compression,
+        column_names, compression,
         parquet___ArrowWriterProperties___Builder__set_compressions
       )
     },
-    set_compression_level = function(table, compression_level) {
+    set_compression_level = function(column_names, compression_level) {
       # cast to integer but keep names
       compression_level <- set_names(as.integer(compression_level), names(compression_level))
       private$.set(
-        table, compression_level,
+        column_names, compression_level,
         parquet___ArrowWriterProperties___Builder__set_compression_levels
       )
     },
-    set_dictionary = function(table, use_dictionary) {
+    set_dictionary = function(column_names, use_dictionary) {
       assert_that(is.logical(use_dictionary))
       private$.set(
-        table, use_dictionary,
+        column_names, use_dictionary,
         parquet___ArrowWriterProperties___Builder__set_use_dictionary
       )
     },
-    set_write_statistics = function(table, write_statistics) {
+    set_write_statistics = function(column_names, write_statistics) {
       assert_that(is.logical(write_statistics))
       private$.set(
-        table, write_statistics,
+        column_names, write_statistics,
         parquet___ArrowWriterProperties___Builder__set_write_statistics
       )
     },
@@ -342,9 +342,8 @@ ParquetWriterPropertiesBuilder <- R6Class("ParquetWriterPropertiesBuilder",
     }
   ),
   private = list(
-    .set = function(table, value, FUN) {
+    .set = function(column_names, value, FUN) {
       msg <- paste0("unsupported ", substitute(value), "= specification")
-      column_names <- names(table)
       given_names <- names(value)
       if (is.null(given_names)) {
         if (length(value) %in% c(1L, length(column_names))) {
@@ -364,7 +363,7 @@ ParquetWriterPropertiesBuilder <- R6Class("ParquetWriterPropertiesBuilder",
   )
 )
 
-ParquetWriterProperties$create <- function(table,
+ParquetWriterProperties$create <- function(column_names,
                                            version = NULL,
                                            compression = default_parquet_compression(),
                                            compression_level = NULL,
@@ -377,16 +376,16 @@ ParquetWriterProperties$create <- function(table,
     builder$set_version(version)
   }
   if (!is.null(compression)) {
-    builder$set_compression(table, compression = compression)
+    builder$set_compression(column_names, compression = compression)
   }
   if (!is.null(compression_level)) {
-    builder$set_compression_level(table, compression_level = compression_level)
+    builder$set_compression_level(column_names, compression_level = compression_level)
   }
   if (!is.null(use_dictionary)) {
-    builder$set_dictionary(table, use_dictionary)
+    builder$set_dictionary(column_names, use_dictionary)
   }
   if (!is.null(write_statistics)) {
-    builder$set_write_statistics(table, write_statistics)
+    builder$set_write_statistics(column_names, write_statistics)
   }
   if (!is.null(data_page_size)) {
     builder$set_data_page_size(data_page_size)
@@ -600,10 +599,9 @@ ParquetArrowReaderProperties$create <- function(use_threads = option_use_threads
   parquet___arrow___ArrowReaderProperties__Make(isTRUE(use_threads))
 }
 
-calculate_chunk_size <- function(rows,  columns,
-  target_cells_per_group = getOption("arrow.parquet_cells_per_group", 2.5e8),
-  max_chunks = getOption("arrow.parquet_max_chunks", 200)
-  ) {
+calculate_chunk_size <- function(rows, columns,
+                                 target_cells_per_group = getOption("arrow.parquet_cells_per_group", 2.5e8),
+                                 max_chunks = getOption("arrow.parquet_max_chunks", 200)) {
 
   # Ensure is a float to prevent integer overflow issues
   num_cells <- as.numeric(rows) * as.numeric(columns)
diff --git a/r/R/query-engine.R b/r/R/query-engine.R
index 6c1b14036f..c794bc9de6 100644
--- a/r/R/query-engine.R
+++ b/r/R/query-engine.R
@@ -33,26 +33,15 @@ do_exec_plan <- function(.data) {
 
   if (ncol(tab)) {
     # Apply any column metadata from the original schema, where appropriate
-    original_schema <- source_data(.data)$schema
-    # TODO: do we care about other (non-R) metadata preservation?
-    # How would we know if it were meaningful?
-    r_meta <- original_schema$r_metadata
-    if (!is.null(r_meta)) {
-      # Filter r_metadata$columns on columns with name _and_ type match
-      new_schema <- tab$schema
-      common_names <- intersect(names(r_meta$columns), names(tab))
-      keep <- common_names[
-        map_lgl(common_names, ~ original_schema[[.]] == new_schema[[.]])
-      ]
-      r_meta$columns <- r_meta$columns[keep]
-      if (has_aggregation(.data)) {
-        # dplyr drops top-level attributes if you do summarize
-        r_meta$attributes <- NULL
-      }
-      tab$r_metadata <- r_meta
+    new_r_metadata <- get_r_metadata_from_old_schema(
+      tab$schema,
+      source_data(.data)$schema,
+      drop_attributes = has_aggregation(.data)
+    )
+    if (!is.null(new_r_metadata)) {
+      tab$r_metadata <- new_r_metadata
     }
   }
-
   tab
 }
 
@@ -244,6 +233,10 @@ ExecPlan <- R6Class("ExecPlan",
       }
       out
     },
+    Write = function(node, ...) {
+      # TODO(ARROW-16200): take FileSystemDatasetWriteOptions not ...
+      ExecPlan_Write(self, node, ...)
+    },
     Stop = function() ExecPlan_StopProducing(self)
   )
 )
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index 81dcc0dddc..fb9f3b94d1 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -898,11 +898,11 @@ END_CPP11
 }
 // compute-exec.cpp
 #if defined(ARROW_R_WITH_DATASET)
-std::shared_ptr<compute::ExecNode> ExecNode_Scan(const std::shared_ptr<compute::ExecPlan>& plan, const std::shared_ptr<arrow::dataset::Dataset>& dataset, const std::shared_ptr<compute::Expression>& filter, std::vector<std::string> materialized_field_names);
+std::shared_ptr<compute::ExecNode> ExecNode_Scan(const std::shared_ptr<compute::ExecPlan>& plan, const std::shared_ptr<ds::Dataset>& dataset, const std::shared_ptr<compute::Expression>& filter, std::vector<std::string> materialized_field_names);
 extern "C" SEXP _arrow_ExecNode_Scan(SEXP plan_sexp, SEXP dataset_sexp, SEXP filter_sexp, SEXP materialized_field_names_sexp){
 BEGIN_CPP11
 	arrow::r::Input<const std::shared_ptr<compute::ExecPlan>&>::type plan(plan_sexp);
-	arrow::r::Input<const std::shared_ptr<arrow::dataset::Dataset>&>::type dataset(dataset_sexp);
+	arrow::r::Input<const std::shared_ptr<ds::Dataset>&>::type dataset(dataset_sexp);
 	arrow::r::Input<const std::shared_ptr<compute::Expression>&>::type filter(filter_sexp);
 	arrow::r::Input<std::vector<std::string>>::type materialized_field_names(materialized_field_names_sexp);
 	return cpp11::as_sexp(ExecNode_Scan(plan, dataset, filter, materialized_field_names));
@@ -914,6 +914,35 @@ extern "C" SEXP _arrow_ExecNode_Scan(SEXP plan_sexp, SEXP dataset_sexp, SEXP fil
 }
 #endif
 
+// compute-exec.cpp
+#if defined(ARROW_R_WITH_DATASET)
+void ExecPlan_Write(const std::shared_ptr<compute::ExecPlan>& plan, const std::shared_ptr<compute::ExecNode>& final_node, cpp11::strings metadata, const std::shared_ptr<ds::FileWriteOptions>& file_write_options, const std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir, const std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template, arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions, uint32_t max_open_files, uint64_t max_ro [...]
+extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp, SEXP metadata_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){
+BEGIN_CPP11
+	arrow::r::Input<const std::shared_ptr<compute::ExecPlan>&>::type plan(plan_sexp);
+	arrow::r::Input<const std::shared_ptr<compute::ExecNode>&>::type final_node(final_node_sexp);
+	arrow::r::Input<cpp11::strings>::type metadata(metadata_sexp);
+	arrow::r::Input<const std::shared_ptr<ds::FileWriteOptions>&>::type file_write_options(file_write_options_sexp);
+	arrow::r::Input<const std::shared_ptr<fs::FileSystem>&>::type filesystem(filesystem_sexp);
+	arrow::r::Input<std::string>::type base_dir(base_dir_sexp);
+	arrow::r::Input<const std::shared_ptr<ds::Partitioning>&>::type partitioning(partitioning_sexp);
+	arrow::r::Input<std::string>::type basename_template(basename_template_sexp);
+	arrow::r::Input<arrow::dataset::ExistingDataBehavior>::type existing_data_behavior(existing_data_behavior_sexp);
+	arrow::r::Input<int>::type max_partitions(max_partitions_sexp);
+	arrow::r::Input<uint32_t>::type max_open_files(max_open_files_sexp);
+	arrow::r::Input<uint64_t>::type max_rows_per_file(max_rows_per_file_sexp);
+	arrow::r::Input<uint64_t>::type min_rows_per_group(min_rows_per_group_sexp);
+	arrow::r::Input<uint64_t>::type max_rows_per_group(max_rows_per_group_sexp);
+	ExecPlan_Write(plan, final_node, metadata, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group);
+	return R_NilValue;
+END_CPP11
+}
+#else
+extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp, SEXP metadata_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){
+	Rf_error("Cannot call ExecPlan_Write(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. ");
+}
+#endif
+
 // compute-exec.cpp
 std::shared_ptr<compute::ExecNode> ExecNode_Filter(const std::shared_ptr<compute::ExecNode>& input, const std::shared_ptr<compute::Expression>& filter);
 extern "C" SEXP _arrow_ExecNode_Filter(SEXP input_sexp, SEXP filter_sexp){
@@ -2041,33 +2070,6 @@ extern "C" SEXP _arrow_dataset___Scanner__schema(SEXP sc_sexp){
 }
 #endif
 
-// dataset.cpp
-#if defined(ARROW_R_WITH_DATASET)
-void dataset___Dataset__Write(const std::shared_ptr<ds::FileWriteOptions>& file_write_options, const std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir, const std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template, const std::shared_ptr<ds::Scanner>& scanner, arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions, uint32_t max_open_files, uint64_t max_rows_per_file, uint64_t min_rows_per_group, uint64_t max_rows_per_group);
-extern "C" SEXP _arrow_dataset___Dataset__Write(SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP scanner_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){
-BEGIN_CPP11
-	arrow::r::Input<const std::shared_ptr<ds::FileWriteOptions>&>::type file_write_options(file_write_options_sexp);
-	arrow::r::Input<const std::shared_ptr<fs::FileSystem>&>::type filesystem(filesystem_sexp);
-	arrow::r::Input<std::string>::type base_dir(base_dir_sexp);
-	arrow::r::Input<const std::shared_ptr<ds::Partitioning>&>::type partitioning(partitioning_sexp);
-	arrow::r::Input<std::string>::type basename_template(basename_template_sexp);
-	arrow::r::Input<const std::shared_ptr<ds::Scanner>&>::type scanner(scanner_sexp);
-	arrow::r::Input<arrow::dataset::ExistingDataBehavior>::type existing_data_behavior(existing_data_behavior_sexp);
-	arrow::r::Input<int>::type max_partitions(max_partitions_sexp);
-	arrow::r::Input<uint32_t>::type max_open_files(max_open_files_sexp);
-	arrow::r::Input<uint64_t>::type max_rows_per_file(max_rows_per_file_sexp);
-	arrow::r::Input<uint64_t>::type min_rows_per_group(min_rows_per_group_sexp);
-	arrow::r::Input<uint64_t>::type max_rows_per_group(max_rows_per_group_sexp);
-	dataset___Dataset__Write(file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group);
-	return R_NilValue;
-END_CPP11
-}
-#else
-extern "C" SEXP _arrow_dataset___Dataset__Write(SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP scanner_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){
-	Rf_error("Cannot call dataset___Dataset__Write(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. ");
-}
-#endif
-
 // dataset.cpp
 #if defined(ARROW_R_WITH_DATASET)
 std::shared_ptr<arrow::Table> dataset___Scanner__TakeRows(const std::shared_ptr<ds::Scanner>& scanner, const std::shared_ptr<arrow::Array>& indices);
@@ -5197,6 +5199,7 @@ static const R_CallMethodDef CallEntries[] = {
 		{ "_arrow_ExecPlan_StopProducing", (DL_FUNC) &_arrow_ExecPlan_StopProducing, 1}, 
 		{ "_arrow_ExecNode_output_schema", (DL_FUNC) &_arrow_ExecNode_output_schema, 1}, 
 		{ "_arrow_ExecNode_Scan", (DL_FUNC) &_arrow_ExecNode_Scan, 4}, 
+		{ "_arrow_ExecPlan_Write", (DL_FUNC) &_arrow_ExecPlan_Write, 14}, 
 		{ "_arrow_ExecNode_Filter", (DL_FUNC) &_arrow_ExecNode_Filter, 2}, 
 		{ "_arrow_ExecNode_Project", (DL_FUNC) &_arrow_ExecNode_Project, 3}, 
 		{ "_arrow_ExecNode_Aggregate", (DL_FUNC) &_arrow_ExecNode_Aggregate, 5}, 
@@ -5279,7 +5282,6 @@ static const R_CallMethodDef CallEntries[] = {
 		{ "_arrow_dataset___Scanner__ToRecordBatchReader", (DL_FUNC) &_arrow_dataset___Scanner__ToRecordBatchReader, 1}, 
 		{ "_arrow_dataset___Scanner__head", (DL_FUNC) &_arrow_dataset___Scanner__head, 2}, 
 		{ "_arrow_dataset___Scanner__schema", (DL_FUNC) &_arrow_dataset___Scanner__schema, 1}, 
-		{ "_arrow_dataset___Dataset__Write", (DL_FUNC) &_arrow_dataset___Dataset__Write, 12}, 
 		{ "_arrow_dataset___Scanner__TakeRows", (DL_FUNC) &_arrow_dataset___Scanner__TakeRows, 2}, 
 		{ "_arrow_dataset___Scanner__CountRows", (DL_FUNC) &_arrow_dataset___Scanner__CountRows, 1}, 
 		{ "_arrow_Int8__initialize", (DL_FUNC) &_arrow_Int8__initialize, 0}, 
diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp
index e7d8df55bb..4c3cc92257 100644
--- a/r/src/compute-exec.cpp
+++ b/r/src/compute-exec.cpp
@@ -121,19 +121,20 @@ std::shared_ptr<arrow::Schema> ExecNode_output_schema(
 
 #if defined(ARROW_R_WITH_DATASET)
 
+#include <arrow/dataset/file_base.h>
 #include <arrow/dataset/plan.h>
 #include <arrow/dataset/scanner.h>
 
 // [[dataset::export]]
 std::shared_ptr<compute::ExecNode> ExecNode_Scan(
     const std::shared_ptr<compute::ExecPlan>& plan,
-    const std::shared_ptr<arrow::dataset::Dataset>& dataset,
+    const std::shared_ptr<ds::Dataset>& dataset,
     const std::shared_ptr<compute::Expression>& filter,
     std::vector<std::string> materialized_field_names) {
   arrow::dataset::internal::Initialize();
 
   // TODO: pass in FragmentScanOptions
-  auto options = std::make_shared<arrow::dataset::ScanOptions>();
+  auto options = std::make_shared<ds::ScanOptions>();
 
   options->use_threads = arrow::r::GetBoolOption("arrow.use_threads", true);
 
@@ -154,7 +155,49 @@ std::shared_ptr<compute::ExecNode> ExecNode_Scan(
                       .Bind(*dataset->schema()));
 
   return MakeExecNodeOrStop("scan", plan.get(), {},
-                            arrow::dataset::ScanNodeOptions{dataset, options});
+                            ds::ScanNodeOptions{dataset, options});
+}
+
+// [[dataset::export]]
+void ExecPlan_Write(
+    const std::shared_ptr<compute::ExecPlan>& plan,
+    const std::shared_ptr<compute::ExecNode>& final_node, cpp11::strings metadata,
+    const std::shared_ptr<ds::FileWriteOptions>& file_write_options,
+    const std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir,
+    const std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template,
+    arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions,
+    uint32_t max_open_files, uint64_t max_rows_per_file, uint64_t min_rows_per_group,
+    uint64_t max_rows_per_group) {
+  arrow::dataset::internal::Initialize();
+
+  // TODO(ARROW-16200): expose FileSystemDatasetWriteOptions in R
+  // and encapsulate this logic better
+  ds::FileSystemDatasetWriteOptions opts;
+  opts.file_write_options = file_write_options;
+  opts.existing_data_behavior = existing_data_behavior;
+  opts.filesystem = filesystem;
+  opts.base_dir = base_dir;
+  opts.partitioning = partitioning;
+  opts.basename_template = basename_template;
+  opts.max_partitions = max_partitions;
+  opts.max_open_files = max_open_files;
+  opts.max_rows_per_file = max_rows_per_file;
+  opts.min_rows_per_group = min_rows_per_group;
+  opts.max_rows_per_group = max_rows_per_group;
+
+  // TODO: factor this out to a strings_to_KVM() helper
+  auto values = cpp11::as_cpp<std::vector<std::string>>(metadata);
+  auto names = cpp11::as_cpp<std::vector<std::string>>(metadata.attr("names"));
+
+  auto kv =
+      std::make_shared<arrow::KeyValueMetadata>(std::move(names), std::move(values));
+
+  MakeExecNodeOrStop("write", final_node->plan(), {final_node.get()},
+                     ds::WriteNodeOptions{std::move(opts), std::move(kv)});
+
+  StopIfNotOk(plan->Validate());
+  StopIfNotOk(plan->StartProducing());
+  StopIfNotOk(plan->finished().status());
 }
 
 #endif
diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp
index 4881830560..4ff30d9d94 100644
--- a/r/src/dataset.cpp
+++ b/r/src/dataset.cpp
@@ -511,30 +511,6 @@ std::shared_ptr<arrow::Schema> dataset___Scanner__schema(
   return sc->options()->projected_schema;
 }
 
-// [[dataset::export]]
-void dataset___Dataset__Write(
-    const std::shared_ptr<ds::FileWriteOptions>& file_write_options,
-    const std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir,
-    const std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template,
-    const std::shared_ptr<ds::Scanner>& scanner,
-    arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions,
-    uint32_t max_open_files, uint64_t max_rows_per_file, uint64_t min_rows_per_group,
-    uint64_t max_rows_per_group) {
-  ds::FileSystemDatasetWriteOptions opts;
-  opts.file_write_options = file_write_options;
-  opts.existing_data_behavior = existing_data_behavior;
-  opts.filesystem = filesystem;
-  opts.base_dir = base_dir;
-  opts.partitioning = partitioning;
-  opts.basename_template = basename_template;
-  opts.max_partitions = max_partitions;
-  opts.max_open_files = max_open_files;
-  opts.max_rows_per_file = max_rows_per_file;
-  opts.min_rows_per_group = min_rows_per_group;
-  opts.max_rows_per_group = max_rows_per_group;
-  StopIfNotOk(ds::FileSystemDataset::Write(opts, scanner));
-}
-
 // [[dataset::export]]
 std::shared_ptr<arrow::Table> dataset___Scanner__TakeRows(
     const std::shared_ptr<ds::Scanner>& scanner,
diff --git a/r/tests/testthat/test-dataset-write.R b/r/tests/testthat/test-dataset-write.R
index aafb4bf292..5b657148a5 100644
--- a/r/tests/testthat/test-dataset-write.R
+++ b/r/tests/testthat/test-dataset-write.R
@@ -244,6 +244,24 @@ test_that("Dataset writing: dplyr methods", {
     new_ds %>% select(c(names(df1), "twice")) %>% collect(),
     df1 %>% filter(int == 4) %>% mutate(twice = int * 2)
   )
+
+  # head
+  dst_dir4 <- tempfile()
+  ds %>%
+    mutate(twice = int * 2) %>%
+    arrange(int) %>%
+    head(3) %>%
+    write_dataset(dst_dir4, format = "feather")
+  new_ds <- open_dataset(dst_dir4, format = "feather")
+
+  expect_equal(
+    new_ds %>%
+      select(c(names(df1), "twice")) %>%
+      collect(),
+    df1 %>%
+      mutate(twice = int * 2) %>%
+      head(3)
+  )
 })
 
 test_that("Dataset writing: non-hive", {
@@ -321,6 +339,7 @@ test_that("Dataset writing: from RecordBatch", {
   dst_dir <- tempfile()
   stacked <- record_batch(rbind(df1, df2))
   stacked %>%
+    mutate(twice = int * 2) %>%
     group_by(int) %>%
     write_dataset(dst_dir, format = "feather")
   expect_true(dir.exists(dst_dir))
@@ -438,7 +457,7 @@ test_that("Writing a dataset: CSV format options", {
 
 test_that("Dataset writing: unsupported features/input validation", {
   skip_if_not_available("parquet")
-  expect_error(write_dataset(4), 'dataset must be a "Dataset"')
+  expect_error(write_dataset(4), "'dataset' must be a Dataset, ")
 
   ds <- open_dataset(hive_dir)
   expect_error(
@@ -520,7 +539,6 @@ test_that("max_rows_per_group is adjusted if at odds with max_rows_per_file", {
   expect_silent(
     write_dataset(df, dst_dir, max_rows_per_file = 5)
   )
-
 })
 
 
@@ -571,17 +589,27 @@ test_that("Dataset write max open files", {
   partitioning <- "c2"
   num_of_unique_c2_groups <- 5
 
-  record_batch_1 <- record_batch(c1 = c(1, 2, 3, 4, 0, 10),
-                                 c2 = c("a", "b", "c", "d", "e", "a"))
-  record_batch_2 <- record_batch(c1 = c(5, 6, 7, 8, 0, 1),
-                                 c2 = c("a", "b", "c", "d", "e", "c"))
-  record_batch_3 <- record_batch(c1 = c(9, 10, 11, 12, 0, 1),
-                                 c2 = c("a", "b", "c", "d", "e", "d"))
-  record_batch_4 <- record_batch(c1 = c(13, 14, 15, 16, 0, 1),
-                                 c2 = c("a", "b", "c", "d", "e", "b"))
+  record_batch_1 <- record_batch(
+    c1 = c(1, 2, 3, 4, 0, 10),
+    c2 = c("a", "b", "c", "d", "e", "a")
+  )
+  record_batch_2 <- record_batch(
+    c1 = c(5, 6, 7, 8, 0, 1),
+    c2 = c("a", "b", "c", "d", "e", "c")
+  )
+  record_batch_3 <- record_batch(
+    c1 = c(9, 10, 11, 12, 0, 1),
+    c2 = c("a", "b", "c", "d", "e", "d")
+  )
+  record_batch_4 <- record_batch(
+    c1 = c(13, 14, 15, 16, 0, 1),
+    c2 = c("a", "b", "c", "d", "e", "b")
+  )
 
-  table <- Table$create(d1 = record_batch_1, d2 = record_batch_2,
-                        d3 = record_batch_3, d4 = record_batch_4)
+  table <- Table$create(
+    d1 = record_batch_1, d2 = record_batch_2,
+    d3 = record_batch_3, d4 = record_batch_4
+  )
 
   write_dataset(table, path = dst_dir, format = file_format, partitioning = partitioning)
 
@@ -643,12 +671,18 @@ test_that("Dataset write max rows per files", {
 
 test_that("Dataset min_rows_per_group", {
   skip_if_not_available("parquet")
-  rb1 <- record_batch(c1 = c(1, 2, 3, 4),
-                      c2 = c("a", "b", "e", "a"))
-  rb2 <- record_batch(c1 = c(5, 6, 7, 8, 9),
-                      c2 = c("a", "b", "c", "d", "h"))
-  rb3 <- record_batch(c1 = c(10, 11),
-                      c2 = c("a", "b"))
+  rb1 <- record_batch(
+    c1 = c(1, 2, 3, 4),
+    c2 = c("a", "b", "e", "a")
+  )
+  rb2 <- record_batch(
+    c1 = c(5, 6, 7, 8, 9),
+    c2 = c("a", "b", "c", "d", "h")
+  )
+  rb3 <- record_batch(
+    c1 = c(10, 11),
+    c2 = c("a", "b")
+  )
 
   dataset <- Table$create(d1 = rb1, d2 = rb2, d3 = rb3)
 
diff --git a/r/tests/testthat/test-metadata.R b/r/tests/testthat/test-metadata.R
index 3217b58d6c..4db20d04df 100644
--- a/r/tests/testthat/test-metadata.R
+++ b/r/tests/testthat/test-metadata.R
@@ -226,11 +226,13 @@ test_that("Row-level metadata (does not by default) roundtrip", {
   # But we can re-enable this / read data that has already been written with
   # row-level metadata
   withr::with_options(
-    list("arrow.preserve_row_level_metadata" = TRUE), {
+    list("arrow.preserve_row_level_metadata" = TRUE),
+    {
       tab <- Table$create(df)
       expect_identical(attr(as.data.frame(tab)$x[[1]], "foo"), "bar")
       expect_identical(attr(as.data.frame(tab)$x[[2]], "baz"), "qux")
-    })
+    }
+  )
 })
 
 
@@ -256,7 +258,8 @@ test_that("Row-level metadata (does not) roundtrip in datasets", {
   dst_dir <- make_temp_dir()
 
   withr::with_options(
-    list("arrow.preserve_row_level_metadata" = TRUE), {
+    list("arrow.preserve_row_level_metadata" = TRUE),
+    {
       expect_warning(
         write_dataset(df, dst_dir, partitioning = "part"),
         "Row-level metadata is not compatible with datasets and will be discarded"
@@ -286,7 +289,25 @@ test_that("Row-level metadata (does not) roundtrip in datasets", {
         df_from_ds <- ds %>% select(int) %>% collect(),
         NA
       )
-    })
+    }
+  )
+})
+
+test_that("Dataset writing does handle other metadata", {
+  skip_if_not_available("dataset")
+  skip_if_not_available("parquet")
+
+  dst_dir <- make_temp_dir()
+  write_dataset(example_with_metadata, dst_dir, partitioning = "b")
+
+  ds <- open_dataset(dst_dir)
+  expect_equal(
+    ds %>%
+      # partitioning on b puts it last, so move it back
+      select(a, b, c, d) %>%
+      collect(),
+    example_with_metadata
+  )
 })
 
 test_that("When we encounter SF cols, we warn", {
@@ -305,11 +326,13 @@ test_that("When we encounter SF cols, we warn", {
   # But we can re-enable this / read data that has already been written with
   # row-level metadata without a warning
   withr::with_options(
-    list("arrow.preserve_row_level_metadata" = TRUE), {
+    list("arrow.preserve_row_level_metadata" = TRUE),
+    {
       expect_warning(tab <- Table$create(df), NA)
       expect_identical(attr(as.data.frame(tab)$x[[1]], "foo"), "bar")
       expect_identical(attr(as.data.frame(tab)$x[[2]], "baz"), "qux")
-    })
+    }
+  )
 })
 
 test_that("dplyr with metadata", {
@@ -369,7 +392,6 @@ test_that("grouped_df metadata is recorded (efficiently)", {
 })
 
 test_that("grouped_df non-arrow metadata is preserved", {
-
   simple_tbl <- tibble(a = 1:2, b = 3:4)
   attr(simple_tbl, "other_metadata") <- "look I'm still here!"
   grouped <- group_by(simple_tbl, a)