You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2023/06/01 21:45:54 UTC
[arrow] branch main updated: GH-35730: [C++] Add the ability to specify custom schema on a dataset write (#35860)
This is an automated email from the ASF dual-hosted git repository.
westonpace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 018e7d3f9c GH-35730: [C++] Add the ability to specify custom schema on a dataset write (#35860)
018e7d3f9c is described below
commit 018e7d3f9c4bcacce716dd607994486a31ee71bb
Author: Weston Pace <we...@gmail.com>
AuthorDate: Thu Jun 1 14:45:40 2023 -0700
GH-35730: [C++] Add the ability to specify custom schema on a dataset write (#35860)
### Rationale for this change
The dataset write node previously allowed you to specify custom key/value metadata on a write node. This was added to support saving schema metadata. However, it doesn't capture field metadata or field nullability. This PR replaces that capability with the ability to specify a custom schema instead. The custom schema must have the same number of fields as the input to the write node and each field must have the same type.
### What changes are included in this PR?
Added `custom_schema` to `WriteNodeOptions` and removed `custom_metadata`.
### Are these changes tested?
Yes, I added a new C++ unit test to verify that the custom info is applied to written files.
### Are there any user-facing changes?
No. Only new functionality (which is user facing)
* Closes: #35730
Lead-authored-by: Weston Pace <we...@gmail.com>
Co-authored-by: Nic Crane <th...@gmail.com>
Co-authored-by: Joris Van den Bossche <jo...@gmail.com>
Co-authored-by: anjakefala <an...@voltrondata.com>
Co-authored-by: Antoine Pitrou <pi...@free.fr>
Signed-off-by: Weston Pace <we...@gmail.com>
---
cpp/src/arrow/dataset/CMakeLists.txt | 1 +
cpp/src/arrow/dataset/file_base.cc | 53 ++++++++--
cpp/src/arrow/dataset/file_base.h | 10 ++
cpp/src/arrow/dataset/write_node_test.cc | 174 +++++++++++++++++++++++++++++++
python/pyarrow/tests/test_dataset.py | 53 ++++++++++
r/R/arrowExports.R | 4 +-
r/R/query-engine.R | 4 +-
r/R/schema.R | 2 +-
r/src/arrowExports.cpp | 10 +-
r/src/compute-exec.cpp | 28 ++---
10 files changed, 310 insertions(+), 29 deletions(-)
diff --git a/cpp/src/arrow/dataset/CMakeLists.txt b/cpp/src/arrow/dataset/CMakeLists.txt
index 7bffdbf08c..e3221d8283 100644
--- a/cpp/src/arrow/dataset/CMakeLists.txt
+++ b/cpp/src/arrow/dataset/CMakeLists.txt
@@ -151,6 +151,7 @@ add_arrow_dataset_test(file_test)
add_arrow_dataset_test(partition_test)
add_arrow_dataset_test(scanner_test)
add_arrow_dataset_test(subtree_test)
+add_arrow_dataset_test(write_node_test)
if(ARROW_CSV)
add_arrow_dataset_test(file_csv_test)
diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc
index b300bd67ce..2fcd57d2f3 100644
--- a/cpp/src/arrow/dataset/file_base.cc
+++ b/cpp/src/arrow/dataset/file_base.cc
@@ -387,16 +387,16 @@ Status WriteBatch(
class DatasetWritingSinkNodeConsumer : public acero::SinkNodeConsumer {
public:
- DatasetWritingSinkNodeConsumer(std::shared_ptr<const KeyValueMetadata> custom_metadata,
+ DatasetWritingSinkNodeConsumer(std::shared_ptr<Schema> custom_schema,
FileSystemDatasetWriteOptions write_options)
- : custom_metadata_(std::move(custom_metadata)),
+ : custom_schema_(std::move(custom_schema)),
write_options_(std::move(write_options)) {}
Status Init(const std::shared_ptr<Schema>& schema,
acero::BackpressureControl* backpressure_control,
acero::ExecPlan* plan) override {
- if (custom_metadata_) {
- schema_ = schema->WithMetadata(custom_metadata_);
+ if (custom_schema_) {
+ schema_ = custom_schema_;
} else {
schema_ = schema;
}
@@ -434,7 +434,7 @@ class DatasetWritingSinkNodeConsumer : public acero::SinkNodeConsumer {
});
}
- std::shared_ptr<const KeyValueMetadata> custom_metadata_;
+ std::shared_ptr<Schema> custom_schema_;
std::unique_ptr<internal::DatasetWriter> dataset_writer_;
FileSystemDatasetWriteOptions write_options_;
Future<> finished_ = Future<>::Make();
@@ -453,13 +453,16 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio
// The projected_schema is currently used by pyarrow to preserve the custom metadata
// when reading from a single input file.
- const auto& custom_metadata = scanner->options()->projected_schema->metadata();
+ const auto& custom_schema = scanner->options()->projected_schema;
+
+ WriteNodeOptions write_node_options(write_options);
+ write_node_options.custom_schema = custom_schema;
acero::Declaration plan = acero::Declaration::Sequence({
{"scan", ScanNodeOptions{dataset, scanner->options()}},
{"filter", acero::FilterNodeOptions{scanner->options()->filter}},
{"project", acero::ProjectNodeOptions{std::move(exprs), std::move(names)}},
- {"write", WriteNodeOptions{write_options, custom_metadata}},
+ {"write", std::move(write_node_options)},
});
return acero::DeclarationToStatus(std::move(plan), scanner->options()->use_threads);
@@ -475,16 +478,50 @@ Result<acero::ExecNode*> MakeWriteNode(acero::ExecPlan* plan,
const WriteNodeOptions write_node_options =
checked_cast<const WriteNodeOptions&>(options);
+ std::shared_ptr<Schema> custom_schema = write_node_options.custom_schema;
const std::shared_ptr<const KeyValueMetadata>& custom_metadata =
write_node_options.custom_metadata;
const FileSystemDatasetWriteOptions& write_options = write_node_options.write_options;
+ const std::shared_ptr<Schema>& input_schema = inputs[0]->output_schema();
+
+ if (custom_schema != nullptr) {
+ if (custom_metadata) {
+ return Status::TypeError(
+ "Do not provide both custom_metadata and custom_schema. If custom_schema is "
+ "used then custom_schema->metadata should be used instead of custom_metadata");
+ }
+
+ if (custom_schema->num_fields() != input_schema->num_fields()) {
+ return Status::TypeError(
+ "The provided custom_schema did not have the same number of fields as the "
+ "data. The custom schema can only be used to add metadata / nullability to "
+ "fields and cannot change the type or number of fields.");
+ }
+ for (int field_idx = 0; field_idx < input_schema->num_fields(); field_idx++) {
+ if (!input_schema->field(field_idx)->type()->Equals(
+ custom_schema->field(field_idx)->type())) {
+ return Status::TypeError("The provided custom_schema specified type ",
+ custom_schema->field(field_idx)->type()->ToString(),
+ " for field ", field_idx, "and the input data has type ",
+ input_schema->field(field_idx),
+ "The custom schema can only be used to add metadata / "
+ "nullability to fields and "
+ "cannot change the type or number of fields.");
+ }
+ }
+ }
+
+ if (custom_metadata) {
+ custom_schema = input_schema->WithMetadata(custom_metadata);
+ }
+
if (!write_options.partitioning) {
return Status::Invalid("Must provide partitioning");
}
std::shared_ptr<DatasetWritingSinkNodeConsumer> consumer =
- std::make_shared<DatasetWritingSinkNodeConsumer>(custom_metadata, write_options);
+ std::make_shared<DatasetWritingSinkNodeConsumer>(custom_schema, write_options);
ARROW_ASSIGN_OR_RAISE(
auto node,
diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h
index 788a1bb432..d33d88e996 100644
--- a/cpp/src/arrow/dataset/file_base.h
+++ b/cpp/src/arrow/dataset/file_base.h
@@ -33,6 +33,7 @@
#include "arrow/dataset/visibility.h"
#include "arrow/filesystem/filesystem.h"
#include "arrow/io/file.h"
+#include "arrow/type_fwd.h"
#include "arrow/util/compression.h"
namespace arrow {
@@ -470,6 +471,15 @@ class ARROW_DS_EXPORT WriteNodeOptions : public acero::ExecNodeOptions {
/// \brief Options to control how to write the dataset
FileSystemDatasetWriteOptions write_options;
+ /// \brief Optional schema to attach to all written batches
+ ///
+ /// By default, we will use the output schema of the input.
+ ///
+ /// This can be used to alter schema metadata, field nullability, or field metadata.
+ /// However, this cannot be used to change the type of data. If the custom schema does
+ /// not have the same number of fields and the same data types as the input then the
+ /// plan will fail.
+ std::shared_ptr<Schema> custom_schema;
/// \brief Optional metadata to attach to written batches
std::shared_ptr<const KeyValueMetadata> custom_metadata;
};
diff --git a/cpp/src/arrow/dataset/write_node_test.cc b/cpp/src/arrow/dataset/write_node_test.cc
new file mode 100644
index 0000000000..f420bd2e6c
--- /dev/null
+++ b/cpp/src/arrow/dataset/write_node_test.cc
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock-matchers.h>
+#include <gtest/gtest.h>
+
+#include <memory>
+
+#include "arrow/acero/exec_plan.h"
+#include "arrow/acero/options.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/dataset/partition.h"
+#include "arrow/dataset/plan.h"
+#include "arrow/filesystem/filesystem.h"
+#include "arrow/filesystem/mockfs.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/ipc/reader.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/matchers.h"
+
+#include "arrow/table.h"
+#include "arrow/util/key_value_metadata.h"
+
+namespace arrow {
+
+namespace dataset {
+
+class SimpleWriteNodeTest : public ::testing::Test {
+ protected:
+ void SetUp() override {
+ internal::Initialize();
+ mock_fs_ = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
+ auto ipc_format = std::make_shared<dataset::IpcFileFormat>();
+
+ fs_write_options_.filesystem = mock_fs_;
+ fs_write_options_.base_dir = "/my_dataset";
+ fs_write_options_.basename_template = "{i}.arrow";
+ fs_write_options_.file_write_options = ipc_format->DefaultWriteOptions();
+ fs_write_options_.partitioning = dataset::Partitioning::Default();
+ }
+
+ std::shared_ptr<fs::internal::MockFileSystem> mock_fs_;
+ dataset::FileSystemDatasetWriteOptions fs_write_options_;
+};
+
+TEST_F(SimpleWriteNodeTest, CustomNullability) {
+ // Create an input table with a nullable and a non-nullable type
+ ExecBatch batch = gen::Gen({gen::Step()})->FailOnError()->ExecBatch(/*num_rows=*/1);
+ std::shared_ptr<Schema> test_schema =
+ schema({field("nullable_i32", uint32(), /*nullable=*/true),
+ field("non_nullable_i32", uint32(), /*nullable=*/false)});
+ std::shared_ptr<RecordBatch> record_batch =
+ RecordBatch::Make(test_schema, /*num_rows=*/1,
+ {batch.values[0].make_array(), batch.values[0].make_array()});
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<Table> table,
+ Table::FromRecordBatches({std::move(record_batch)}));
+
+ ASSERT_TRUE(table->field(0)->nullable());
+ ASSERT_FALSE(table->field(1)->nullable());
+
+ dataset::WriteNodeOptions write_options(fs_write_options_);
+ write_options.custom_schema = test_schema;
+
+ // Write the data to disk (these plans use a project because it destroys whatever
+ // metadata happened to be in the table source node's output schema). This more
+ // accurately simulates reading from a dataset.
+ acero::Declaration plan = acero::Declaration::Sequence(
+ {{"table_source", acero::TableSourceNodeOptions(table)},
+ {"project",
+ acero::ProjectNodeOptions({compute::field_ref(0), compute::field_ref(1)})},
+ {"write", write_options}});
+
+ ASSERT_OK(DeclarationToStatus(plan));
+
+ // Read the file back out and verify the nullability
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<io::RandomAccessFile> file,
+ mock_fs_->OpenInputFile("/my_dataset/0.arrow"));
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<ipc::RecordBatchFileReader> file_reader,
+ ipc::RecordBatchFileReader::Open(file));
+ std::shared_ptr<Schema> file_schema = file_reader->schema();
+
+ ASSERT_TRUE(file_schema->field(0)->nullable());
+ ASSERT_FALSE(file_schema->field(1)->nullable());
+
+ // Invalid custom schema
+
+ // Incorrect # of fields
+ write_options.custom_schema = schema({});
+ plan = acero::Declaration::Sequence(
+ {{"table_source", acero::TableSourceNodeOptions(table)},
+ {"project",
+ acero::ProjectNodeOptions({compute::field_ref(0), compute::field_ref(1)})},
+ {"write", write_options}});
+
+ ASSERT_THAT(
+ DeclarationToStatus(plan),
+ Raises(StatusCode::TypeError,
+ ::testing::HasSubstr("did not have the same number of fields as the data")));
+
+ // Incorrect types
+ write_options.custom_schema =
+ schema({field("nullable_i32", int32()), field("non_nullable_i32", int32())});
+ plan = acero::Declaration::Sequence(
+ {{"table_source", acero::TableSourceNodeOptions(table)},
+ {"project",
+ acero::ProjectNodeOptions({compute::field_ref(0), compute::field_ref(1)})},
+ {"write", write_options}});
+ ASSERT_THAT(
+ DeclarationToStatus(plan),
+ Raises(StatusCode::TypeError, ::testing::HasSubstr("and the input data has type")));
+
+ // Cannot have both custom_schema and custom_metadata
+ write_options.custom_schema = test_schema;
+ write_options.custom_metadata = key_value_metadata({{"foo", "bar"}});
+ plan = acero::Declaration::Sequence(
+ {{"table_source", acero::TableSourceNodeOptions(std::move(table))},
+ {"project",
+ acero::ProjectNodeOptions({compute::field_ref(0), compute::field_ref(1)})},
+ {"write", write_options}});
+ ASSERT_THAT(DeclarationToStatus(plan),
+ Raises(StatusCode::TypeError,
+ ::testing::HasSubstr(
+ "Do not provide both custom_metadata and custom_schema")));
+}
+
+TEST_F(SimpleWriteNodeTest, CustomMetadata) {
+ constexpr int64_t kRowsPerChunk = 1;
+ constexpr int64_t kNumChunks = 1;
+ // Create an input table with no schema metadata
+ std::shared_ptr<Table> table =
+ gen::Gen({gen::Step()})->FailOnError()->Table(kRowsPerChunk, kNumChunks);
+
+ std::shared_ptr<KeyValueMetadata> custom_metadata =
+ key_value_metadata({{"foo", "bar"}});
+
+ dataset::WriteNodeOptions write_options(fs_write_options_);
+ write_options.custom_metadata = custom_metadata;
+
+ // Write the data to disk
+ acero::Declaration plan = acero::Declaration::Sequence(
+ {{"table_source", acero::TableSourceNodeOptions(table)},
+ {"project", acero::ProjectNodeOptions({compute::field_ref(0)})},
+ {"write", write_options}});
+
+ ASSERT_OK(DeclarationToStatus(plan));
+
+ // Read the file back out and verify the schema metadata
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<io::RandomAccessFile> file,
+ mock_fs_->OpenInputFile("/my_dataset/0.arrow"));
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<ipc::RecordBatchFileReader> file_reader,
+ ipc::RecordBatchFileReader::Open(file));
+ std::shared_ptr<Schema> file_schema = file_reader->schema();
+
+ ASSERT_TRUE(custom_metadata->Equals(*file_schema->metadata()));
+}
+
+} // namespace dataset
+} // namespace arrow
diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py
index 66562b76c9..144da21cf5 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -5172,6 +5172,59 @@ def test_dataset_partition_with_slash(tmpdir):
assert encoded_paths == file_paths
+@pytest.mark.parquet
+def test_write_dataset_preserve_nullability(tempdir):
+ # GH-35730
+ schema_nullable = pa.schema([
+ pa.field("x", pa.int64(), nullable=False),
+ pa.field("y", pa.int64(), nullable=True)])
+
+ arrays = [[1, 2, 3], [None, 5, None]]
+ table = pa.Table.from_arrays(arrays, schema=schema_nullable)
+
+ pq.write_to_dataset(table, tempdir / "nulltest1")
+ dataset = ds.dataset(tempdir / "nulltest1", format="parquet")
+ # nullability of field is preserved
+ assert dataset.to_table().schema.equals(schema_nullable)
+
+ ds.write_dataset(table, tempdir / "nulltest2", format="parquet")
+ dataset = ds.dataset(tempdir / "nulltest2", format="parquet")
+ assert dataset.to_table().schema.equals(schema_nullable)
+
+ ds.write_dataset([table, table], tempdir / "nulltest3", format="parquet")
+ dataset = ds.dataset(tempdir / "nulltest3", format="parquet")
+ assert dataset.to_table().schema.equals(schema_nullable)
+
+
+def test_write_dataset_preserve_field_metadata(tempdir):
+ schema_metadata = pa.schema([
+ pa.field("x", pa.int64(), metadata={b'foo': b'bar'}),
+ pa.field("y", pa.int64())])
+
+ schema_no_meta = pa.schema([
+ pa.field("x", pa.int64()),
+ pa.field("y", pa.int64())])
+
+ arrays = [[1, 2, 3], [None, 5, None]]
+ table = pa.Table.from_arrays(arrays, schema=schema_metadata)
+ table_no_meta = pa.Table.from_arrays(arrays, schema=schema_no_meta)
+
+ # If no schema is provided the schema of the first table will be used
+ ds.write_dataset([table, table_no_meta], tempdir / "test1", format="parquet")
+ dataset = ds.dataset(tempdir / "test1", format="parquet")
+ assert dataset.to_table().schema.equals(schema_metadata, check_metadata=True)
+
+ ds.write_dataset([table_no_meta, table], tempdir / "test2", format="parquet")
+ dataset = ds.dataset(tempdir / "test2", format="parquet")
+ assert dataset.to_table().schema.equals(schema_no_meta, check_metadata=True)
+
+ # If a schema is provided it will override the schema of the input
+ ds.write_dataset([table_no_meta, table], tempdir / "test3", format="parquet",
+ schema=schema_metadata)
+ dataset = ds.dataset(tempdir / "test3", format="parquet")
+ assert dataset.to_table().schema.equals(schema_metadata, check_metadata=True)
+
+
@pytest.mark.parametrize('dstype', [
"fs", "mem"
])
diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index b494623eed..8a76e678ba 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -468,8 +468,8 @@ ExecNode_Scan <- function(plan, dataset, filter, projection) {
.Call(`_arrow_ExecNode_Scan`, plan, dataset, filter, projection)
}
-ExecPlan_Write <- function(plan, final_node, metadata, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group) {
- invisible(.Call(`_arrow_ExecPlan_Write`, plan, final_node, metadata, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group))
+ExecPlan_Write <- function(plan, final_node, schema, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group) {
+ invisible(.Call(`_arrow_ExecPlan_Write`, plan, final_node, schema, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group))
}
ExecNode_Filter <- function(input, filter) {
diff --git a/r/R/query-engine.R b/r/R/query-engine.R
index 79227546dd..0f8a84f9b8 100644
--- a/r/R/query-engine.R
+++ b/r/R/query-engine.R
@@ -236,10 +236,12 @@ ExecPlan <- R6Class("ExecPlan",
},
Write = function(node, ...) {
# TODO(ARROW-16200): take FileSystemDatasetWriteOptions not ...
+ final_metadata <- prepare_key_value_metadata(node$final_metadata())
+
ExecPlan_Write(
self,
node,
- prepare_key_value_metadata(node$final_metadata()),
+ node$schema$WithMetadata(final_metadata),
...
)
},
diff --git a/r/R/schema.R b/r/R/schema.R
index c8316854c2..9ff38487c4 100644
--- a/r/R/schema.R
+++ b/r/R/schema.R
@@ -229,7 +229,7 @@ prepare_key_value_metadata <- function(metadata) {
call. = FALSE
)
}
- if (is.list(metadata[["r"]])) {
+ if (!is_empty(metadata) && is.list(metadata[["r"]])) {
metadata[["r"]] <- .serialize_arrow_r_metadata(metadata[["r"]])
}
map_chr(metadata, as.character)
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index 80877e827d..ca4a4be38d 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -1084,12 +1084,12 @@ extern "C" SEXP _arrow_ExecNode_Scan(SEXP plan_sexp, SEXP dataset_sexp, SEXP fil
// compute-exec.cpp
#if defined(ARROW_R_WITH_DATASET)
-void ExecPlan_Write(const std::shared_ptr<acero::ExecPlan>& plan, const std::shared_ptr<acero::ExecNode>& final_node, cpp11::strings metadata, const std::shared_ptr<ds::FileWriteOptions>& file_write_options, const std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir, const std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template, arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions, uint32_t max_open_files, uint64_t max_rows_p [...]
-extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp, SEXP metadata_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){
+void ExecPlan_Write(const std::shared_ptr<acero::ExecPlan>& plan, const std::shared_ptr<acero::ExecNode>& final_node, const std::shared_ptr<arrow::Schema>& schema, const std::shared_ptr<ds::FileWriteOptions>& file_write_options, const std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir, const std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template, arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions, uint32_t max_open_files [...]
+extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp, SEXP schema_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){
BEGIN_CPP11
arrow::r::Input<const std::shared_ptr<acero::ExecPlan>&>::type plan(plan_sexp);
arrow::r::Input<const std::shared_ptr<acero::ExecNode>&>::type final_node(final_node_sexp);
- arrow::r::Input<cpp11::strings>::type metadata(metadata_sexp);
+ arrow::r::Input<const std::shared_ptr<arrow::Schema>&>::type schema(schema_sexp);
arrow::r::Input<const std::shared_ptr<ds::FileWriteOptions>&>::type file_write_options(file_write_options_sexp);
arrow::r::Input<const std::shared_ptr<fs::FileSystem>&>::type filesystem(filesystem_sexp);
arrow::r::Input<std::string>::type base_dir(base_dir_sexp);
@@ -1101,12 +1101,12 @@ BEGIN_CPP11
arrow::r::Input<uint64_t>::type max_rows_per_file(max_rows_per_file_sexp);
arrow::r::Input<uint64_t>::type min_rows_per_group(min_rows_per_group_sexp);
arrow::r::Input<uint64_t>::type max_rows_per_group(max_rows_per_group_sexp);
- ExecPlan_Write(plan, final_node, metadata, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group);
+ ExecPlan_Write(plan, final_node, schema, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group);
return R_NilValue;
END_CPP11
}
#else
-extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp, SEXP metadata_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){
+extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp, SEXP schema_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){
Rf_error("Cannot call ExecPlan_Write(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. ");
}
#endif
diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp
index 0f269bed11..e0b3c62c47 100644
--- a/r/src/compute-exec.cpp
+++ b/r/src/compute-exec.cpp
@@ -307,15 +307,18 @@ std::shared_ptr<acero::ExecNode> ExecNode_Scan(
}
// [[dataset::export]]
-void ExecPlan_Write(
- const std::shared_ptr<acero::ExecPlan>& plan,
- const std::shared_ptr<acero::ExecNode>& final_node, cpp11::strings metadata,
- const std::shared_ptr<ds::FileWriteOptions>& file_write_options,
- const std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir,
- const std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template,
- arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions,
- uint32_t max_open_files, uint64_t max_rows_per_file, uint64_t min_rows_per_group,
- uint64_t max_rows_per_group) {
+void ExecPlan_Write(const std::shared_ptr<acero::ExecPlan>& plan,
+ const std::shared_ptr<acero::ExecNode>& final_node,
+ const std::shared_ptr<arrow::Schema>& schema,
+ const std::shared_ptr<ds::FileWriteOptions>& file_write_options,
+ const std::shared_ptr<fs::FileSystem>& filesystem,
+ std::string base_dir,
+ const std::shared_ptr<ds::Partitioning>& partitioning,
+ std::string basename_template,
+ arrow::dataset::ExistingDataBehavior existing_data_behavior,
+ int max_partitions, uint32_t max_open_files,
+ uint64_t max_rows_per_file, uint64_t min_rows_per_group,
+ uint64_t max_rows_per_group) {
arrow::dataset::internal::Initialize();
// TODO(ARROW-16200): expose FileSystemDatasetWriteOptions in R
@@ -333,9 +336,10 @@ void ExecPlan_Write(
opts.min_rows_per_group = min_rows_per_group;
opts.max_rows_per_group = max_rows_per_group;
- auto kv = strings_to_kvm(metadata);
- MakeExecNodeOrStop("write", final_node->plan(), {final_node.get()},
- ds::WriteNodeOptions{std::move(opts), std::move(kv)});
+ ds::WriteNodeOptions options(std::move(opts));
+ options.custom_schema = std::move(schema);
+
+ MakeExecNodeOrStop("write", final_node->plan(), {final_node.get()}, std::move(options));
StopIfNotOk(plan->Validate());