You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2023/06/01 21:45:54 UTC

[arrow] branch main updated: GH-35730: [C++] Add the ability to specify custom schema on a dataset write (#35860)

This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 018e7d3f9c GH-35730: [C++] Add the ability to specify custom schema on a dataset write (#35860)
018e7d3f9c is described below

commit 018e7d3f9c4bcacce716dd607994486a31ee71bb
Author: Weston Pace <we...@gmail.com>
AuthorDate: Thu Jun 1 14:45:40 2023 -0700

    GH-35730: [C++] Add the ability to specify custom schema on a dataset write (#35860)
    
    ### Rationale for this change
    
    The dataset write node previously allowed you to specify custom key/value metadata on a write node.  This was added to support saving schema metadata.  However, it doesn't capture field metadata or field nullability.  This PR replaces that capability with the ability to specify a custom schema instead.  The custom schema must have the same number of fields as the input to the write node and each field must have the same type.
    
    ### What changes are included in this PR?
    
    Added `custom_schema` to `WriteNodeOptions` and removed `custom_metadata`.
    
    ### Are these changes tested?
    
    Yes, I added a new C++ unit test to verify that the custom info is applied to written files.
    
    ### Are there any user-facing changes?
    
    No.  Only new functionality (which is user facing)
    
    * Closes: #35730
    
    Lead-authored-by: Weston Pace <we...@gmail.com>
    Co-authored-by: Nic Crane <th...@gmail.com>
    Co-authored-by: Joris Van den Bossche <jo...@gmail.com>
    Co-authored-by: anjakefala <an...@voltrondata.com>
    Co-authored-by: Antoine Pitrou <pi...@free.fr>
    Signed-off-by: Weston Pace <we...@gmail.com>
---
 cpp/src/arrow/dataset/CMakeLists.txt     |   1 +
 cpp/src/arrow/dataset/file_base.cc       |  53 ++++++++--
 cpp/src/arrow/dataset/file_base.h        |  10 ++
 cpp/src/arrow/dataset/write_node_test.cc | 174 +++++++++++++++++++++++++++++++
 python/pyarrow/tests/test_dataset.py     |  53 ++++++++++
 r/R/arrowExports.R                       |   4 +-
 r/R/query-engine.R                       |   4 +-
 r/R/schema.R                             |   2 +-
 r/src/arrowExports.cpp                   |  10 +-
 r/src/compute-exec.cpp                   |  28 ++---
 10 files changed, 310 insertions(+), 29 deletions(-)

diff --git a/cpp/src/arrow/dataset/CMakeLists.txt b/cpp/src/arrow/dataset/CMakeLists.txt
index 7bffdbf08c..e3221d8283 100644
--- a/cpp/src/arrow/dataset/CMakeLists.txt
+++ b/cpp/src/arrow/dataset/CMakeLists.txt
@@ -151,6 +151,7 @@ add_arrow_dataset_test(file_test)
 add_arrow_dataset_test(partition_test)
 add_arrow_dataset_test(scanner_test)
 add_arrow_dataset_test(subtree_test)
+add_arrow_dataset_test(write_node_test)
 
 if(ARROW_CSV)
   add_arrow_dataset_test(file_csv_test)
diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc
index b300bd67ce..2fcd57d2f3 100644
--- a/cpp/src/arrow/dataset/file_base.cc
+++ b/cpp/src/arrow/dataset/file_base.cc
@@ -387,16 +387,16 @@ Status WriteBatch(
 
 class DatasetWritingSinkNodeConsumer : public acero::SinkNodeConsumer {
  public:
-  DatasetWritingSinkNodeConsumer(std::shared_ptr<const KeyValueMetadata> custom_metadata,
+  DatasetWritingSinkNodeConsumer(std::shared_ptr<Schema> custom_schema,
                                  FileSystemDatasetWriteOptions write_options)
-      : custom_metadata_(std::move(custom_metadata)),
+      : custom_schema_(std::move(custom_schema)),
         write_options_(std::move(write_options)) {}
 
   Status Init(const std::shared_ptr<Schema>& schema,
               acero::BackpressureControl* backpressure_control,
               acero::ExecPlan* plan) override {
-    if (custom_metadata_) {
-      schema_ = schema->WithMetadata(custom_metadata_);
+    if (custom_schema_) {
+      schema_ = custom_schema_;
     } else {
       schema_ = schema;
     }
@@ -434,7 +434,7 @@ class DatasetWritingSinkNodeConsumer : public acero::SinkNodeConsumer {
                       });
   }
 
-  std::shared_ptr<const KeyValueMetadata> custom_metadata_;
+  std::shared_ptr<Schema> custom_schema_;
   std::unique_ptr<internal::DatasetWriter> dataset_writer_;
   FileSystemDatasetWriteOptions write_options_;
   Future<> finished_ = Future<>::Make();
@@ -453,13 +453,16 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio
 
   // The projected_schema is currently used by pyarrow to preserve the custom metadata
   // when reading from a single input file.
-  const auto& custom_metadata = scanner->options()->projected_schema->metadata();
+  const auto& custom_schema = scanner->options()->projected_schema;
+
+  WriteNodeOptions write_node_options(write_options);
+  write_node_options.custom_schema = custom_schema;
 
   acero::Declaration plan = acero::Declaration::Sequence({
       {"scan", ScanNodeOptions{dataset, scanner->options()}},
       {"filter", acero::FilterNodeOptions{scanner->options()->filter}},
       {"project", acero::ProjectNodeOptions{std::move(exprs), std::move(names)}},
-      {"write", WriteNodeOptions{write_options, custom_metadata}},
+      {"write", std::move(write_node_options)},
   });
 
   return acero::DeclarationToStatus(std::move(plan), scanner->options()->use_threads);
@@ -475,16 +478,50 @@ Result<acero::ExecNode*> MakeWriteNode(acero::ExecPlan* plan,
 
   const WriteNodeOptions write_node_options =
       checked_cast<const WriteNodeOptions&>(options);
+  std::shared_ptr<Schema> custom_schema = write_node_options.custom_schema;
   const std::shared_ptr<const KeyValueMetadata>& custom_metadata =
       write_node_options.custom_metadata;
   const FileSystemDatasetWriteOptions& write_options = write_node_options.write_options;
 
+  const std::shared_ptr<Schema>& input_schema = inputs[0]->output_schema();
+
+  if (custom_schema != nullptr) {
+    if (custom_metadata) {
+      return Status::TypeError(
+          "Do not provide both custom_metadata and custom_schema.  If custom_schema is "
+          "used then custom_schema->metadata should be used instead of custom_metadata");
+    }
+
+    if (custom_schema->num_fields() != input_schema->num_fields()) {
+      return Status::TypeError(
+          "The provided custom_schema did not have the same number of fields as the "
+          "data.  The custom schema can only be used to add metadata / nullability to "
+          "fields and cannot change the type or number of fields.");
+    }
+    for (int field_idx = 0; field_idx < input_schema->num_fields(); field_idx++) {
+      if (!input_schema->field(field_idx)->type()->Equals(
+              custom_schema->field(field_idx)->type())) {
+        return Status::TypeError("The provided custom_schema specified type ",
+                                 custom_schema->field(field_idx)->type()->ToString(),
+                                 " for field ", field_idx, "and the input data has type ",
+                                 input_schema->field(field_idx),
+                                 "The custom schema can only be used to add metadata / "
+                                 "nullability to fields and "
+                                 "cannot change the type or number of fields.");
+      }
+    }
+  }
+
+  if (custom_metadata) {
+    custom_schema = input_schema->WithMetadata(custom_metadata);
+  }
+
   if (!write_options.partitioning) {
     return Status::Invalid("Must provide partitioning");
   }
 
   std::shared_ptr<DatasetWritingSinkNodeConsumer> consumer =
-      std::make_shared<DatasetWritingSinkNodeConsumer>(custom_metadata, write_options);
+      std::make_shared<DatasetWritingSinkNodeConsumer>(custom_schema, write_options);
 
   ARROW_ASSIGN_OR_RAISE(
       auto node,
diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h
index 788a1bb432..d33d88e996 100644
--- a/cpp/src/arrow/dataset/file_base.h
+++ b/cpp/src/arrow/dataset/file_base.h
@@ -33,6 +33,7 @@
 #include "arrow/dataset/visibility.h"
 #include "arrow/filesystem/filesystem.h"
 #include "arrow/io/file.h"
+#include "arrow/type_fwd.h"
 #include "arrow/util/compression.h"
 
 namespace arrow {
@@ -470,6 +471,15 @@ class ARROW_DS_EXPORT WriteNodeOptions : public acero::ExecNodeOptions {
 
   /// \brief Options to control how to write the dataset
   FileSystemDatasetWriteOptions write_options;
+  /// \brief Optional schema to attach to all written batches
+  ///
+  /// By default, we will use the output schema of the input.
+  ///
+  /// This can be used to alter schema metadata, field nullability, or field metadata.
+  /// However, this cannot be used to change the type of data.  If the custom schema does
+  /// not have the same number of fields and the same data types as the input then the
+  /// plan will fail.
+  std::shared_ptr<Schema> custom_schema;
   /// \brief Optional metadata to attach to written batches
   std::shared_ptr<const KeyValueMetadata> custom_metadata;
 };
diff --git a/cpp/src/arrow/dataset/write_node_test.cc b/cpp/src/arrow/dataset/write_node_test.cc
new file mode 100644
index 0000000000..f420bd2e6c
--- /dev/null
+++ b/cpp/src/arrow/dataset/write_node_test.cc
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock-matchers.h>
+#include <gtest/gtest.h>
+
+#include <memory>
+
+#include "arrow/acero/exec_plan.h"
+#include "arrow/acero/options.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/dataset/partition.h"
+#include "arrow/dataset/plan.h"
+#include "arrow/filesystem/filesystem.h"
+#include "arrow/filesystem/mockfs.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/ipc/reader.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/matchers.h"
+
+#include "arrow/table.h"
+#include "arrow/util/key_value_metadata.h"
+
+namespace arrow {
+
+namespace dataset {
+
+class SimpleWriteNodeTest : public ::testing::Test {
+ protected:
+  void SetUp() override {
+    internal::Initialize();
+    mock_fs_ = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
+    auto ipc_format = std::make_shared<dataset::IpcFileFormat>();
+
+    fs_write_options_.filesystem = mock_fs_;
+    fs_write_options_.base_dir = "/my_dataset";
+    fs_write_options_.basename_template = "{i}.arrow";
+    fs_write_options_.file_write_options = ipc_format->DefaultWriteOptions();
+    fs_write_options_.partitioning = dataset::Partitioning::Default();
+  }
+
+  std::shared_ptr<fs::internal::MockFileSystem> mock_fs_;
+  dataset::FileSystemDatasetWriteOptions fs_write_options_;
+};
+
+TEST_F(SimpleWriteNodeTest, CustomNullability) {
+  // Create an input table with a nullable and a non-nullable type
+  ExecBatch batch = gen::Gen({gen::Step()})->FailOnError()->ExecBatch(/*num_rows=*/1);
+  std::shared_ptr<Schema> test_schema =
+      schema({field("nullable_i32", uint32(), /*nullable=*/true),
+              field("non_nullable_i32", uint32(), /*nullable=*/false)});
+  std::shared_ptr<RecordBatch> record_batch =
+      RecordBatch::Make(test_schema, /*num_rows=*/1,
+                        {batch.values[0].make_array(), batch.values[0].make_array()});
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<Table> table,
+                       Table::FromRecordBatches({std::move(record_batch)}));
+
+  ASSERT_TRUE(table->field(0)->nullable());
+  ASSERT_FALSE(table->field(1)->nullable());
+
+  dataset::WriteNodeOptions write_options(fs_write_options_);
+  write_options.custom_schema = test_schema;
+
+  // Write the data to disk (these plans use a project because it destroys whatever
+  // metadata happened to be in the table source node's output schema).  This more
+  // accurately simulates reading from a dataset.
+  acero::Declaration plan = acero::Declaration::Sequence(
+      {{"table_source", acero::TableSourceNodeOptions(table)},
+       {"project",
+        acero::ProjectNodeOptions({compute::field_ref(0), compute::field_ref(1)})},
+       {"write", write_options}});
+
+  ASSERT_OK(DeclarationToStatus(plan));
+
+  // Read the file back out and verify the nullability
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<io::RandomAccessFile> file,
+                       mock_fs_->OpenInputFile("/my_dataset/0.arrow"));
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<ipc::RecordBatchFileReader> file_reader,
+                       ipc::RecordBatchFileReader::Open(file));
+  std::shared_ptr<Schema> file_schema = file_reader->schema();
+
+  ASSERT_TRUE(file_schema->field(0)->nullable());
+  ASSERT_FALSE(file_schema->field(1)->nullable());
+
+  // Invalid custom schema
+
+  // Incorrect # of fields
+  write_options.custom_schema = schema({});
+  plan = acero::Declaration::Sequence(
+      {{"table_source", acero::TableSourceNodeOptions(table)},
+       {"project",
+        acero::ProjectNodeOptions({compute::field_ref(0), compute::field_ref(1)})},
+       {"write", write_options}});
+
+  ASSERT_THAT(
+      DeclarationToStatus(plan),
+      Raises(StatusCode::TypeError,
+             ::testing::HasSubstr("did not have the same number of fields as the data")));
+
+  // Incorrect types
+  write_options.custom_schema =
+      schema({field("nullable_i32", int32()), field("non_nullable_i32", int32())});
+  plan = acero::Declaration::Sequence(
+      {{"table_source", acero::TableSourceNodeOptions(table)},
+       {"project",
+        acero::ProjectNodeOptions({compute::field_ref(0), compute::field_ref(1)})},
+       {"write", write_options}});
+  ASSERT_THAT(
+      DeclarationToStatus(plan),
+      Raises(StatusCode::TypeError, ::testing::HasSubstr("and the input data has type")));
+
+  // Cannot have both custom_schema and custom_metadata
+  write_options.custom_schema = test_schema;
+  write_options.custom_metadata = key_value_metadata({{"foo", "bar"}});
+  plan = acero::Declaration::Sequence(
+      {{"table_source", acero::TableSourceNodeOptions(std::move(table))},
+       {"project",
+        acero::ProjectNodeOptions({compute::field_ref(0), compute::field_ref(1)})},
+       {"write", write_options}});
+  ASSERT_THAT(DeclarationToStatus(plan),
+              Raises(StatusCode::TypeError,
+                     ::testing::HasSubstr(
+                         "Do not provide both custom_metadata and custom_schema")));
+}
+
+TEST_F(SimpleWriteNodeTest, CustomMetadata) {
+  constexpr int64_t kRowsPerChunk = 1;
+  constexpr int64_t kNumChunks = 1;
+  // Create an input table with no schema metadata
+  std::shared_ptr<Table> table =
+      gen::Gen({gen::Step()})->FailOnError()->Table(kRowsPerChunk, kNumChunks);
+
+  std::shared_ptr<KeyValueMetadata> custom_metadata =
+      key_value_metadata({{"foo", "bar"}});
+
+  dataset::WriteNodeOptions write_options(fs_write_options_);
+  write_options.custom_metadata = custom_metadata;
+
+  // Write the data to disk
+  acero::Declaration plan = acero::Declaration::Sequence(
+      {{"table_source", acero::TableSourceNodeOptions(table)},
+       {"project", acero::ProjectNodeOptions({compute::field_ref(0)})},
+       {"write", write_options}});
+
+  ASSERT_OK(DeclarationToStatus(plan));
+
+  // Read the file back out and verify the schema metadata
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<io::RandomAccessFile> file,
+                       mock_fs_->OpenInputFile("/my_dataset/0.arrow"));
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<ipc::RecordBatchFileReader> file_reader,
+                       ipc::RecordBatchFileReader::Open(file));
+  std::shared_ptr<Schema> file_schema = file_reader->schema();
+
+  ASSERT_TRUE(custom_metadata->Equals(*file_schema->metadata()));
+}
+
+}  // namespace dataset
+}  // namespace arrow
diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py
index 66562b76c9..144da21cf5 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -5172,6 +5172,59 @@ def test_dataset_partition_with_slash(tmpdir):
     assert encoded_paths == file_paths
 
 
+@pytest.mark.parquet
+def test_write_dataset_preserve_nullability(tempdir):
+    # GH-35730
+    schema_nullable = pa.schema([
+        pa.field("x", pa.int64(), nullable=False),
+        pa.field("y", pa.int64(), nullable=True)])
+
+    arrays = [[1, 2, 3], [None, 5, None]]
+    table = pa.Table.from_arrays(arrays, schema=schema_nullable)
+
+    pq.write_to_dataset(table, tempdir / "nulltest1")
+    dataset = ds.dataset(tempdir / "nulltest1", format="parquet")
+    # nullability of field is preserved
+    assert dataset.to_table().schema.equals(schema_nullable)
+
+    ds.write_dataset(table, tempdir / "nulltest2", format="parquet")
+    dataset = ds.dataset(tempdir / "nulltest2", format="parquet")
+    assert dataset.to_table().schema.equals(schema_nullable)
+
+    ds.write_dataset([table, table], tempdir / "nulltest3", format="parquet")
+    dataset = ds.dataset(tempdir / "nulltest3", format="parquet")
+    assert dataset.to_table().schema.equals(schema_nullable)
+
+
+def test_write_dataset_preserve_field_metadata(tempdir):
+    schema_metadata = pa.schema([
+        pa.field("x", pa.int64(), metadata={b'foo': b'bar'}),
+        pa.field("y", pa.int64())])
+
+    schema_no_meta = pa.schema([
+        pa.field("x", pa.int64()),
+        pa.field("y", pa.int64())])
+
+    arrays = [[1, 2, 3], [None, 5, None]]
+    table = pa.Table.from_arrays(arrays, schema=schema_metadata)
+    table_no_meta = pa.Table.from_arrays(arrays, schema=schema_no_meta)
+
+    # If no schema is provided the schema of the first table will be used
+    ds.write_dataset([table, table_no_meta], tempdir / "test1", format="parquet")
+    dataset = ds.dataset(tempdir / "test1", format="parquet")
+    assert dataset.to_table().schema.equals(schema_metadata, check_metadata=True)
+
+    ds.write_dataset([table_no_meta, table], tempdir / "test2", format="parquet")
+    dataset = ds.dataset(tempdir / "test2", format="parquet")
+    assert dataset.to_table().schema.equals(schema_no_meta, check_metadata=True)
+
+    # If a schema is provided it will override the schema of the input
+    ds.write_dataset([table_no_meta, table], tempdir / "test3", format="parquet",
+                     schema=schema_metadata)
+    dataset = ds.dataset(tempdir / "test3", format="parquet")
+    assert dataset.to_table().schema.equals(schema_metadata, check_metadata=True)
+
+
 @pytest.mark.parametrize('dstype', [
     "fs", "mem"
 ])
diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index b494623eed..8a76e678ba 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -468,8 +468,8 @@ ExecNode_Scan <- function(plan, dataset, filter, projection) {
   .Call(`_arrow_ExecNode_Scan`, plan, dataset, filter, projection)
 }
 
-ExecPlan_Write <- function(plan, final_node, metadata, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group) {
-  invisible(.Call(`_arrow_ExecPlan_Write`, plan, final_node, metadata, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group))
+ExecPlan_Write <- function(plan, final_node, schema, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group) {
+  invisible(.Call(`_arrow_ExecPlan_Write`, plan, final_node, schema, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group))
 }
 
 ExecNode_Filter <- function(input, filter) {
diff --git a/r/R/query-engine.R b/r/R/query-engine.R
index 79227546dd..0f8a84f9b8 100644
--- a/r/R/query-engine.R
+++ b/r/R/query-engine.R
@@ -236,10 +236,12 @@ ExecPlan <- R6Class("ExecPlan",
     },
     Write = function(node, ...) {
       # TODO(ARROW-16200): take FileSystemDatasetWriteOptions not ...
+      final_metadata <- prepare_key_value_metadata(node$final_metadata())
+
       ExecPlan_Write(
         self,
         node,
-        prepare_key_value_metadata(node$final_metadata()),
+        node$schema$WithMetadata(final_metadata),
         ...
       )
     },
diff --git a/r/R/schema.R b/r/R/schema.R
index c8316854c2..9ff38487c4 100644
--- a/r/R/schema.R
+++ b/r/R/schema.R
@@ -229,7 +229,7 @@ prepare_key_value_metadata <- function(metadata) {
       call. = FALSE
     )
   }
-  if (is.list(metadata[["r"]])) {
+  if (!is_empty(metadata) && is.list(metadata[["r"]])) {
     metadata[["r"]] <- .serialize_arrow_r_metadata(metadata[["r"]])
   }
   map_chr(metadata, as.character)
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index 80877e827d..ca4a4be38d 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -1084,12 +1084,12 @@ extern "C" SEXP _arrow_ExecNode_Scan(SEXP plan_sexp, SEXP dataset_sexp, SEXP fil
 
 // compute-exec.cpp
 #if defined(ARROW_R_WITH_DATASET)
-void ExecPlan_Write(const std::shared_ptr<acero::ExecPlan>& plan, const std::shared_ptr<acero::ExecNode>& final_node, cpp11::strings metadata, const std::shared_ptr<ds::FileWriteOptions>& file_write_options, const std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir, const std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template, arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions, uint32_t max_open_files, uint64_t max_rows_p [...]
-extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp, SEXP metadata_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){
+void ExecPlan_Write(const std::shared_ptr<acero::ExecPlan>& plan, const std::shared_ptr<acero::ExecNode>& final_node, const std::shared_ptr<arrow::Schema>& schema, const std::shared_ptr<ds::FileWriteOptions>& file_write_options, const std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir, const std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template, arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions, uint32_t max_open_files [...]
+extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp, SEXP schema_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){
 BEGIN_CPP11
 	arrow::r::Input<const std::shared_ptr<acero::ExecPlan>&>::type plan(plan_sexp);
 	arrow::r::Input<const std::shared_ptr<acero::ExecNode>&>::type final_node(final_node_sexp);
-	arrow::r::Input<cpp11::strings>::type metadata(metadata_sexp);
+	arrow::r::Input<const std::shared_ptr<arrow::Schema>&>::type schema(schema_sexp);
 	arrow::r::Input<const std::shared_ptr<ds::FileWriteOptions>&>::type file_write_options(file_write_options_sexp);
 	arrow::r::Input<const std::shared_ptr<fs::FileSystem>&>::type filesystem(filesystem_sexp);
 	arrow::r::Input<std::string>::type base_dir(base_dir_sexp);
@@ -1101,12 +1101,12 @@ BEGIN_CPP11
 	arrow::r::Input<uint64_t>::type max_rows_per_file(max_rows_per_file_sexp);
 	arrow::r::Input<uint64_t>::type min_rows_per_group(min_rows_per_group_sexp);
 	arrow::r::Input<uint64_t>::type max_rows_per_group(max_rows_per_group_sexp);
-	ExecPlan_Write(plan, final_node, metadata, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group);
+	ExecPlan_Write(plan, final_node, schema, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group);
 	return R_NilValue;
 END_CPP11
 }
 #else
-extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp, SEXP metadata_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){
+extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp, SEXP schema_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){
 	Rf_error("Cannot call ExecPlan_Write(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. ");
 }
 #endif
diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp
index 0f269bed11..e0b3c62c47 100644
--- a/r/src/compute-exec.cpp
+++ b/r/src/compute-exec.cpp
@@ -307,15 +307,18 @@ std::shared_ptr<acero::ExecNode> ExecNode_Scan(
 }
 
 // [[dataset::export]]
-void ExecPlan_Write(
-    const std::shared_ptr<acero::ExecPlan>& plan,
-    const std::shared_ptr<acero::ExecNode>& final_node, cpp11::strings metadata,
-    const std::shared_ptr<ds::FileWriteOptions>& file_write_options,
-    const std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir,
-    const std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template,
-    arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions,
-    uint32_t max_open_files, uint64_t max_rows_per_file, uint64_t min_rows_per_group,
-    uint64_t max_rows_per_group) {
+void ExecPlan_Write(const std::shared_ptr<acero::ExecPlan>& plan,
+                    const std::shared_ptr<acero::ExecNode>& final_node,
+                    const std::shared_ptr<arrow::Schema>& schema,
+                    const std::shared_ptr<ds::FileWriteOptions>& file_write_options,
+                    const std::shared_ptr<fs::FileSystem>& filesystem,
+                    std::string base_dir,
+                    const std::shared_ptr<ds::Partitioning>& partitioning,
+                    std::string basename_template,
+                    arrow::dataset::ExistingDataBehavior existing_data_behavior,
+                    int max_partitions, uint32_t max_open_files,
+                    uint64_t max_rows_per_file, uint64_t min_rows_per_group,
+                    uint64_t max_rows_per_group) {
   arrow::dataset::internal::Initialize();
 
   // TODO(ARROW-16200): expose FileSystemDatasetWriteOptions in R
@@ -333,9 +336,10 @@ void ExecPlan_Write(
   opts.min_rows_per_group = min_rows_per_group;
   opts.max_rows_per_group = max_rows_per_group;
 
-  auto kv = strings_to_kvm(metadata);
-  MakeExecNodeOrStop("write", final_node->plan(), {final_node.get()},
-                     ds::WriteNodeOptions{std::move(opts), std::move(kv)});
+  ds::WriteNodeOptions options(std::move(opts));
+  options.custom_schema = std::move(schema);
+
+  MakeExecNodeOrStop("write", final_node->plan(), {final_node.get()}, std::move(options));
 
   StopIfNotOk(plan->Validate());