You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by jo...@apache.org on 2022/04/19 21:41:08 UTC
[arrow] branch master updated: ARROW-15517: [R] Use WriteNode in write_dataset()
This is an automated email from the ASF dual-hosted git repository.
jonkeane pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 4b3f4677b9 ARROW-15517: [R] Use WriteNode in write_dataset()
4b3f4677b9 is described below
commit 4b3f4677b995cb7263e4a4e65daf00189f638617
Author: Neal Richardson <ne...@gmail.com>
AuthorDate: Tue Apr 19 16:40:57 2022 -0500
ARROW-15517: [R] Use WriteNode in write_dataset()
This should allow streaming writes in more cases, e.g. with a join.
Closes #12316 from nealrichardson/write-node
Authored-by: Neal Richardson <ne...@gmail.com>
Signed-off-by: Jonathan Keane <jk...@gmail.com>
---
r/R/arrowExports.R | 8 ++--
r/R/dataset-format.R | 4 +-
r/R/dataset-write.R | 87 +++++++++++++++++++++++++++--------
r/R/dplyr.R | 11 ++++-
r/R/metadata.R | 22 ++++++++-
r/R/parquet.R | 38 ++++++++-------
r/R/query-engine.R | 29 +++++-------
r/src/arrowExports.cpp | 62 +++++++++++++------------
r/src/compute-exec.cpp | 49 ++++++++++++++++++--
r/src/dataset.cpp | 24 ----------
r/tests/testthat/test-dataset-write.R | 70 ++++++++++++++++++++--------
r/tests/testthat/test-metadata.R | 36 ++++++++++++---
12 files changed, 291 insertions(+), 149 deletions(-)
diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index 7bf77f1e66..6b969336c9 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -420,6 +420,10 @@ ExecNode_Scan <- function(plan, dataset, filter, materialized_field_names) {
.Call(`_arrow_ExecNode_Scan`, plan, dataset, filter, materialized_field_names)
}
+ExecPlan_Write <- function(plan, final_node, metadata, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group) {
+ invisible(.Call(`_arrow_ExecPlan_Write`, plan, final_node, metadata, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group))
+}
+
ExecNode_Filter <- function(input, filter) {
.Call(`_arrow_ExecNode_Filter`, input, filter)
}
@@ -748,10 +752,6 @@ dataset___Scanner__schema <- function(sc) {
.Call(`_arrow_dataset___Scanner__schema`, sc)
}
-dataset___Dataset__Write <- function(file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group) {
- invisible(.Call(`_arrow_dataset___Dataset__Write`, file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group))
-}
-
dataset___Scanner__TakeRows <- function(scanner, indices) {
.Call(`_arrow_dataset___Scanner__TakeRows`, scanner, indices)
}
diff --git a/r/R/dataset-format.R b/r/R/dataset-format.R
index f00efd0350..acc1a41b02 100644
--- a/r/R/dataset-format.R
+++ b/r/R/dataset-format.R
@@ -390,7 +390,7 @@ ParquetFragmentScanOptions$create <- function(use_buffered_stream = FALSE,
FileWriteOptions <- R6Class("FileWriteOptions",
inherit = ArrowObject,
public = list(
- update = function(table, ...) {
+ update = function(column_names, ...) {
check_additional_args <- function(format, passed_args) {
if (format == "parquet") {
supported_args <- names(formals(write_parquet))
@@ -437,7 +437,7 @@ FileWriteOptions <- R6Class("FileWriteOptions",
if (self$type == "parquet") {
dataset___ParquetFileWriteOptions__update(
self,
- ParquetWriterProperties$create(table, ...),
+ ParquetWriterProperties$create(column_names, ...),
ParquetArrowWriterProperties$create(...)
)
} else if (self$type == "ipc") {
diff --git a/r/R/dataset-write.R b/r/R/dataset-write.R
index d7c73908e7..09b3ebdbe6 100644
--- a/r/R/dataset-write.R
+++ b/r/R/dataset-write.R
@@ -136,41 +136,88 @@ write_dataset <- function(dataset,
if (inherits(dataset, "arrow_dplyr_query")) {
# partitioning vars need to be in the `select` schema
dataset <- ensure_group_vars(dataset)
- } else if (inherits(dataset, "grouped_df")) {
- force(partitioning)
- # Drop the grouping metadata before writing; we've already consumed it
- # now to construct `partitioning` and don't want it in the metadata$r
- dataset <- dplyr::ungroup(dataset)
+ } else {
+ if (inherits(dataset, "grouped_df")) {
+ force(partitioning)
+ # Drop the grouping metadata before writing; we've already consumed it
+ # now to construct `partitioning` and don't want it in the metadata$r
+ dataset <- dplyr::ungroup(dataset)
+ }
+ dataset <- tryCatch(
+ as_adq(dataset),
+ error = function(e) {
+ supported <- c(
+ "Dataset", "RecordBatch", "Table", "arrow_dplyr_query", "data.frame"
+ )
+ stop(
+ "'dataset' must be a ",
+ oxford_paste(supported, "or", quote = FALSE),
+ ", not ",
+ deparse(class(dataset)),
+ call. = FALSE
+ )
+ }
+ )
+ }
+
+ plan <- ExecPlan$create()
+ final_node <- plan$Build(dataset)
+ if (!is.null(final_node$sort %||% final_node$head %||% final_node$tail)) {
+ # Because sorting and topK are only handled in the SinkNode (or in R!),
+ # they wouldn't get picked up in the WriteNode. So let's Run this ExecPlan
+ # to capture those, and then create a new plan for writing
+ # TODO(ARROW-15681): do sorting in WriteNode in C++
+ dataset <- as_adq(plan$Run(final_node))
+ plan <- ExecPlan$create()
+ final_node <- plan$Build(dataset)
}
- scanner <- Scanner$create(dataset)
if (!inherits(partitioning, "Partitioning")) {
- partition_schema <- scanner$schema[partitioning]
+ partition_schema <- final_node$schema[partitioning]
if (isTRUE(hive_style)) {
- partitioning <- HivePartitioning$create(partition_schema, null_fallback = list(...)$null_fallback)
+ partitioning <- HivePartitioning$create(
+ partition_schema,
+ null_fallback = list(...)$null_fallback
+ )
} else {
partitioning <- DirectoryPartitioning$create(partition_schema)
}
}
- if (!missing(max_rows_per_file) && missing(max_rows_per_group) && max_rows_per_group > max_rows_per_file) {
- max_rows_per_group <- max_rows_per_file
- }
-
path_and_fs <- get_path_and_filesystem(path)
- options <- FileWriteOptions$create(format, table = scanner, ...)
+ output_schema <- final_node$schema
+ options <- FileWriteOptions$create(
+ format,
+ column_names = names(output_schema),
+ ...
+ )
+ # TODO(ARROW-16200): expose FileSystemDatasetWriteOptions in R
+ # and encapsulate this logic better
existing_data_behavior_opts <- c("delete_matching", "overwrite", "error")
existing_data_behavior <- match(match.arg(existing_data_behavior), existing_data_behavior_opts) - 1L
- validate_positive_int_value(max_partitions, "max_partitions must be a positive, non-missing integer")
- validate_positive_int_value(max_open_files, "max_open_files must be a positive, non-missing integer")
- validate_positive_int_value(min_rows_per_group, "min_rows_per_group must be a positive, non-missing integer")
- validate_positive_int_value(max_rows_per_group, "max_rows_per_group must be a positive, non-missing integer")
+ if (!missing(max_rows_per_file) && missing(max_rows_per_group) && max_rows_per_group > max_rows_per_file) {
+ max_rows_per_group <- max_rows_per_file
+ }
- dataset___Dataset__Write(
+ validate_positive_int_value(max_partitions)
+ validate_positive_int_value(max_open_files)
+ validate_positive_int_value(min_rows_per_group)
+ validate_positive_int_value(max_rows_per_group)
+
+ new_r_meta <- get_r_metadata_from_old_schema(
+ output_schema,
+ source_data(dataset)$schema,
+ drop_attributes = has_aggregation(dataset)
+ )
+ if (!is.null(new_r_meta)) {
+ output_schema$r_metadata <- new_r_meta
+ }
+ plan$Write(
+ final_node, prepare_key_value_metadata(output_schema$metadata),
options, path_and_fs$fs, path_and_fs$path,
- partitioning, basename_template, scanner,
+ partitioning, basename_template,
existing_data_behavior, max_partitions,
max_open_files, max_rows_per_file,
min_rows_per_group, max_rows_per_group
@@ -179,6 +226,6 @@ write_dataset <- function(dataset,
validate_positive_int_value <- function(value, msg) {
if (!is_integerish(value, n = 1) || is.na(value) || value < 0) {
- abort(msg)
+ abort(paste(substitute(value), "must be a positive, non-missing integer"))
}
}
diff --git a/r/R/dplyr.R b/r/R/dplyr.R
index e6d7889078..c9650fb065 100644
--- a/r/R/dplyr.R
+++ b/r/R/dplyr.R
@@ -24,7 +24,12 @@ arrow_dplyr_query <- function(.data) {
# RecordBatch, or Dataset) and the state of the user's dplyr query--things
# like selected columns, filters, and group vars.
# An arrow_dplyr_query can contain another arrow_dplyr_query in .data
- gv <- dplyr::group_vars(.data) %||% character()
+ gv <- tryCatch(
+ # If dplyr is not available, or if the input doesn't have a group_vars
+ # method, assume no group vars
+ dplyr::group_vars(.data) %||% character(),
+ error = function(e) character()
+ )
if (inherits(.data, "data.frame")) {
.data <- Table$create(.data)
@@ -247,7 +252,9 @@ abandon_ship <- function(call, .data, msg) {
query_on_dataset <- function(x) inherits(source_data(x), c("Dataset", "RecordBatchReader"))
source_data <- function(x) {
- if (is_collapsed(x)) {
+ if (!inherits(x, "arrow_dplyr_query")) {
+ x
+ } else if (is_collapsed(x)) {
source_data(x$.data)
} else {
x$.data
diff --git a/r/R/metadata.R b/r/R/metadata.R
index d88297dd92..f0411eb54a 100644
--- a/r/R/metadata.R
+++ b/r/R/metadata.R
@@ -133,7 +133,6 @@ remove_attributes <- function(x) {
}
arrow_attributes <- function(x, only_top_level = FALSE) {
-
att <- attributes(x)
removed_attributes <- remove_attributes(x)
@@ -208,3 +207,24 @@ arrow_attributes <- function(x, only_top_level = FALSE) {
NULL
}
}
+
+get_r_metadata_from_old_schema <- function(new_schema,
+ old_schema,
+ drop_attributes = FALSE) {
+ # TODO: do we care about other (non-R) metadata preservation?
+ # How would we know if it were meaningful?
+ r_meta <- old_schema$r_metadata
+ if (!is.null(r_meta)) {
+ # Filter r_metadata$columns on columns with name _and_ type match
+ common_names <- intersect(names(r_meta$columns), names(new_schema))
+ keep <- common_names[
+ map_lgl(common_names, ~ old_schema[[.]] == new_schema[[.]])
+ ]
+ r_meta$columns <- r_meta$columns[keep]
+ if (drop_attributes) {
+ # dplyr drops top-level attributes if you do summarize
+ r_meta$attributes <- NULL
+ }
+ }
+ r_meta
+}
diff --git a/r/R/parquet.R b/r/R/parquet.R
index 3a07c224ed..c6c00ed3a4 100644
--- a/r/R/parquet.R
+++ b/r/R/parquet.R
@@ -186,7 +186,7 @@ write_parquet <- function(x,
x$schema,
sink,
properties = properties %||% ParquetWriterProperties$create(
- x,
+ names(x),
version = version,
compression = compression,
compression_level = compression_level,
@@ -307,33 +307,33 @@ ParquetWriterPropertiesBuilder <- R6Class("ParquetWriterPropertiesBuilder",
set_version = function(version) {
parquet___WriterProperties___Builder__version(self, make_valid_version(version))
},
- set_compression = function(table, compression) {
+ set_compression = function(column_names, compression) {
compression <- compression_from_name(compression)
assert_that(is.integer(compression))
private$.set(
- table, compression,
+ column_names, compression,
parquet___ArrowWriterProperties___Builder__set_compressions
)
},
- set_compression_level = function(table, compression_level) {
+ set_compression_level = function(column_names, compression_level) {
# cast to integer but keep names
compression_level <- set_names(as.integer(compression_level), names(compression_level))
private$.set(
- table, compression_level,
+ column_names, compression_level,
parquet___ArrowWriterProperties___Builder__set_compression_levels
)
},
- set_dictionary = function(table, use_dictionary) {
+ set_dictionary = function(column_names, use_dictionary) {
assert_that(is.logical(use_dictionary))
private$.set(
- table, use_dictionary,
+ column_names, use_dictionary,
parquet___ArrowWriterProperties___Builder__set_use_dictionary
)
},
- set_write_statistics = function(table, write_statistics) {
+ set_write_statistics = function(column_names, write_statistics) {
assert_that(is.logical(write_statistics))
private$.set(
- table, write_statistics,
+ column_names, write_statistics,
parquet___ArrowWriterProperties___Builder__set_write_statistics
)
},
@@ -342,9 +342,8 @@ ParquetWriterPropertiesBuilder <- R6Class("ParquetWriterPropertiesBuilder",
}
),
private = list(
- .set = function(table, value, FUN) {
+ .set = function(column_names, value, FUN) {
msg <- paste0("unsupported ", substitute(value), "= specification")
- column_names <- names(table)
given_names <- names(value)
if (is.null(given_names)) {
if (length(value) %in% c(1L, length(column_names))) {
@@ -364,7 +363,7 @@ ParquetWriterPropertiesBuilder <- R6Class("ParquetWriterPropertiesBuilder",
)
)
-ParquetWriterProperties$create <- function(table,
+ParquetWriterProperties$create <- function(column_names,
version = NULL,
compression = default_parquet_compression(),
compression_level = NULL,
@@ -377,16 +376,16 @@ ParquetWriterProperties$create <- function(table,
builder$set_version(version)
}
if (!is.null(compression)) {
- builder$set_compression(table, compression = compression)
+ builder$set_compression(column_names, compression = compression)
}
if (!is.null(compression_level)) {
- builder$set_compression_level(table, compression_level = compression_level)
+ builder$set_compression_level(column_names, compression_level = compression_level)
}
if (!is.null(use_dictionary)) {
- builder$set_dictionary(table, use_dictionary)
+ builder$set_dictionary(column_names, use_dictionary)
}
if (!is.null(write_statistics)) {
- builder$set_write_statistics(table, write_statistics)
+ builder$set_write_statistics(column_names, write_statistics)
}
if (!is.null(data_page_size)) {
builder$set_data_page_size(data_page_size)
@@ -600,10 +599,9 @@ ParquetArrowReaderProperties$create <- function(use_threads = option_use_threads
parquet___arrow___ArrowReaderProperties__Make(isTRUE(use_threads))
}
-calculate_chunk_size <- function(rows, columns,
- target_cells_per_group = getOption("arrow.parquet_cells_per_group", 2.5e8),
- max_chunks = getOption("arrow.parquet_max_chunks", 200)
- ) {
+calculate_chunk_size <- function(rows, columns,
+ target_cells_per_group = getOption("arrow.parquet_cells_per_group", 2.5e8),
+ max_chunks = getOption("arrow.parquet_max_chunks", 200)) {
# Ensure is a float to prevent integer overflow issues
num_cells <- as.numeric(rows) * as.numeric(columns)
diff --git a/r/R/query-engine.R b/r/R/query-engine.R
index 6c1b14036f..c794bc9de6 100644
--- a/r/R/query-engine.R
+++ b/r/R/query-engine.R
@@ -33,26 +33,15 @@ do_exec_plan <- function(.data) {
if (ncol(tab)) {
# Apply any column metadata from the original schema, where appropriate
- original_schema <- source_data(.data)$schema
- # TODO: do we care about other (non-R) metadata preservation?
- # How would we know if it were meaningful?
- r_meta <- original_schema$r_metadata
- if (!is.null(r_meta)) {
- # Filter r_metadata$columns on columns with name _and_ type match
- new_schema <- tab$schema
- common_names <- intersect(names(r_meta$columns), names(tab))
- keep <- common_names[
- map_lgl(common_names, ~ original_schema[[.]] == new_schema[[.]])
- ]
- r_meta$columns <- r_meta$columns[keep]
- if (has_aggregation(.data)) {
- # dplyr drops top-level attributes if you do summarize
- r_meta$attributes <- NULL
- }
- tab$r_metadata <- r_meta
+ new_r_metadata <- get_r_metadata_from_old_schema(
+ tab$schema,
+ source_data(.data)$schema,
+ drop_attributes = has_aggregation(.data)
+ )
+ if (!is.null(new_r_metadata)) {
+ tab$r_metadata <- new_r_metadata
}
}
-
tab
}
@@ -244,6 +233,10 @@ ExecPlan <- R6Class("ExecPlan",
}
out
},
+ Write = function(node, ...) {
+ # TODO(ARROW-16200): take FileSystemDatasetWriteOptions not ...
+ ExecPlan_Write(self, node, ...)
+ },
Stop = function() ExecPlan_StopProducing(self)
)
)
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index 81dcc0dddc..fb9f3b94d1 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -898,11 +898,11 @@ END_CPP11
}
// compute-exec.cpp
#if defined(ARROW_R_WITH_DATASET)
-std::shared_ptr<compute::ExecNode> ExecNode_Scan(const std::shared_ptr<compute::ExecPlan>& plan, const std::shared_ptr<arrow::dataset::Dataset>& dataset, const std::shared_ptr<compute::Expression>& filter, std::vector<std::string> materialized_field_names);
+std::shared_ptr<compute::ExecNode> ExecNode_Scan(const std::shared_ptr<compute::ExecPlan>& plan, const std::shared_ptr<ds::Dataset>& dataset, const std::shared_ptr<compute::Expression>& filter, std::vector<std::string> materialized_field_names);
extern "C" SEXP _arrow_ExecNode_Scan(SEXP plan_sexp, SEXP dataset_sexp, SEXP filter_sexp, SEXP materialized_field_names_sexp){
BEGIN_CPP11
arrow::r::Input<const std::shared_ptr<compute::ExecPlan>&>::type plan(plan_sexp);
- arrow::r::Input<const std::shared_ptr<arrow::dataset::Dataset>&>::type dataset(dataset_sexp);
+ arrow::r::Input<const std::shared_ptr<ds::Dataset>&>::type dataset(dataset_sexp);
arrow::r::Input<const std::shared_ptr<compute::Expression>&>::type filter(filter_sexp);
arrow::r::Input<std::vector<std::string>>::type materialized_field_names(materialized_field_names_sexp);
return cpp11::as_sexp(ExecNode_Scan(plan, dataset, filter, materialized_field_names));
@@ -914,6 +914,35 @@ extern "C" SEXP _arrow_ExecNode_Scan(SEXP plan_sexp, SEXP dataset_sexp, SEXP fil
}
#endif
+// compute-exec.cpp
+#if defined(ARROW_R_WITH_DATASET)
+void ExecPlan_Write(const std::shared_ptr<compute::ExecPlan>& plan, const std::shared_ptr<compute::ExecNode>& final_node, cpp11::strings metadata, const std::shared_ptr<ds::FileWriteOptions>& file_write_options, const std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir, const std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template, arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions, uint32_t max_open_files, uint64_t max_ro [...]
+extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp, SEXP metadata_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){
+BEGIN_CPP11
+ arrow::r::Input<const std::shared_ptr<compute::ExecPlan>&>::type plan(plan_sexp);
+ arrow::r::Input<const std::shared_ptr<compute::ExecNode>&>::type final_node(final_node_sexp);
+ arrow::r::Input<cpp11::strings>::type metadata(metadata_sexp);
+ arrow::r::Input<const std::shared_ptr<ds::FileWriteOptions>&>::type file_write_options(file_write_options_sexp);
+ arrow::r::Input<const std::shared_ptr<fs::FileSystem>&>::type filesystem(filesystem_sexp);
+ arrow::r::Input<std::string>::type base_dir(base_dir_sexp);
+ arrow::r::Input<const std::shared_ptr<ds::Partitioning>&>::type partitioning(partitioning_sexp);
+ arrow::r::Input<std::string>::type basename_template(basename_template_sexp);
+ arrow::r::Input<arrow::dataset::ExistingDataBehavior>::type existing_data_behavior(existing_data_behavior_sexp);
+ arrow::r::Input<int>::type max_partitions(max_partitions_sexp);
+ arrow::r::Input<uint32_t>::type max_open_files(max_open_files_sexp);
+ arrow::r::Input<uint64_t>::type max_rows_per_file(max_rows_per_file_sexp);
+ arrow::r::Input<uint64_t>::type min_rows_per_group(min_rows_per_group_sexp);
+ arrow::r::Input<uint64_t>::type max_rows_per_group(max_rows_per_group_sexp);
+ ExecPlan_Write(plan, final_node, metadata, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group);
+ return R_NilValue;
+END_CPP11
+}
+#else
+extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp, SEXP metadata_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){
+ Rf_error("Cannot call ExecPlan_Write(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. ");
+}
+#endif
+
// compute-exec.cpp
std::shared_ptr<compute::ExecNode> ExecNode_Filter(const std::shared_ptr<compute::ExecNode>& input, const std::shared_ptr<compute::Expression>& filter);
extern "C" SEXP _arrow_ExecNode_Filter(SEXP input_sexp, SEXP filter_sexp){
@@ -2041,33 +2070,6 @@ extern "C" SEXP _arrow_dataset___Scanner__schema(SEXP sc_sexp){
}
#endif
-// dataset.cpp
-#if defined(ARROW_R_WITH_DATASET)
-void dataset___Dataset__Write(const std::shared_ptr<ds::FileWriteOptions>& file_write_options, const std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir, const std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template, const std::shared_ptr<ds::Scanner>& scanner, arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions, uint32_t max_open_files, uint64_t max_rows_per_file, uint64_t min_rows_per_group, uint64_t max_rows_per_group);
-extern "C" SEXP _arrow_dataset___Dataset__Write(SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP scanner_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){
-BEGIN_CPP11
- arrow::r::Input<const std::shared_ptr<ds::FileWriteOptions>&>::type file_write_options(file_write_options_sexp);
- arrow::r::Input<const std::shared_ptr<fs::FileSystem>&>::type filesystem(filesystem_sexp);
- arrow::r::Input<std::string>::type base_dir(base_dir_sexp);
- arrow::r::Input<const std::shared_ptr<ds::Partitioning>&>::type partitioning(partitioning_sexp);
- arrow::r::Input<std::string>::type basename_template(basename_template_sexp);
- arrow::r::Input<const std::shared_ptr<ds::Scanner>&>::type scanner(scanner_sexp);
- arrow::r::Input<arrow::dataset::ExistingDataBehavior>::type existing_data_behavior(existing_data_behavior_sexp);
- arrow::r::Input<int>::type max_partitions(max_partitions_sexp);
- arrow::r::Input<uint32_t>::type max_open_files(max_open_files_sexp);
- arrow::r::Input<uint64_t>::type max_rows_per_file(max_rows_per_file_sexp);
- arrow::r::Input<uint64_t>::type min_rows_per_group(min_rows_per_group_sexp);
- arrow::r::Input<uint64_t>::type max_rows_per_group(max_rows_per_group_sexp);
- dataset___Dataset__Write(file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group);
- return R_NilValue;
-END_CPP11
-}
-#else
-extern "C" SEXP _arrow_dataset___Dataset__Write(SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP scanner_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){
- Rf_error("Cannot call dataset___Dataset__Write(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. ");
-}
-#endif
-
// dataset.cpp
#if defined(ARROW_R_WITH_DATASET)
std::shared_ptr<arrow::Table> dataset___Scanner__TakeRows(const std::shared_ptr<ds::Scanner>& scanner, const std::shared_ptr<arrow::Array>& indices);
@@ -5197,6 +5199,7 @@ static const R_CallMethodDef CallEntries[] = {
{ "_arrow_ExecPlan_StopProducing", (DL_FUNC) &_arrow_ExecPlan_StopProducing, 1},
{ "_arrow_ExecNode_output_schema", (DL_FUNC) &_arrow_ExecNode_output_schema, 1},
{ "_arrow_ExecNode_Scan", (DL_FUNC) &_arrow_ExecNode_Scan, 4},
+ { "_arrow_ExecPlan_Write", (DL_FUNC) &_arrow_ExecPlan_Write, 14},
{ "_arrow_ExecNode_Filter", (DL_FUNC) &_arrow_ExecNode_Filter, 2},
{ "_arrow_ExecNode_Project", (DL_FUNC) &_arrow_ExecNode_Project, 3},
{ "_arrow_ExecNode_Aggregate", (DL_FUNC) &_arrow_ExecNode_Aggregate, 5},
@@ -5279,7 +5282,6 @@ static const R_CallMethodDef CallEntries[] = {
{ "_arrow_dataset___Scanner__ToRecordBatchReader", (DL_FUNC) &_arrow_dataset___Scanner__ToRecordBatchReader, 1},
{ "_arrow_dataset___Scanner__head", (DL_FUNC) &_arrow_dataset___Scanner__head, 2},
{ "_arrow_dataset___Scanner__schema", (DL_FUNC) &_arrow_dataset___Scanner__schema, 1},
- { "_arrow_dataset___Dataset__Write", (DL_FUNC) &_arrow_dataset___Dataset__Write, 12},
{ "_arrow_dataset___Scanner__TakeRows", (DL_FUNC) &_arrow_dataset___Scanner__TakeRows, 2},
{ "_arrow_dataset___Scanner__CountRows", (DL_FUNC) &_arrow_dataset___Scanner__CountRows, 1},
{ "_arrow_Int8__initialize", (DL_FUNC) &_arrow_Int8__initialize, 0},
diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp
index e7d8df55bb..4c3cc92257 100644
--- a/r/src/compute-exec.cpp
+++ b/r/src/compute-exec.cpp
@@ -121,19 +121,20 @@ std::shared_ptr<arrow::Schema> ExecNode_output_schema(
#if defined(ARROW_R_WITH_DATASET)
+#include <arrow/dataset/file_base.h>
#include <arrow/dataset/plan.h>
#include <arrow/dataset/scanner.h>
// [[dataset::export]]
std::shared_ptr<compute::ExecNode> ExecNode_Scan(
const std::shared_ptr<compute::ExecPlan>& plan,
- const std::shared_ptr<arrow::dataset::Dataset>& dataset,
+ const std::shared_ptr<ds::Dataset>& dataset,
const std::shared_ptr<compute::Expression>& filter,
std::vector<std::string> materialized_field_names) {
arrow::dataset::internal::Initialize();
// TODO: pass in FragmentScanOptions
- auto options = std::make_shared<arrow::dataset::ScanOptions>();
+ auto options = std::make_shared<ds::ScanOptions>();
options->use_threads = arrow::r::GetBoolOption("arrow.use_threads", true);
@@ -154,7 +155,49 @@ std::shared_ptr<compute::ExecNode> ExecNode_Scan(
.Bind(*dataset->schema()));
return MakeExecNodeOrStop("scan", plan.get(), {},
- arrow::dataset::ScanNodeOptions{dataset, options});
+ ds::ScanNodeOptions{dataset, options});
+}
+
+// [[dataset::export]]
+void ExecPlan_Write(
+ const std::shared_ptr<compute::ExecPlan>& plan,
+ const std::shared_ptr<compute::ExecNode>& final_node, cpp11::strings metadata,
+ const std::shared_ptr<ds::FileWriteOptions>& file_write_options,
+ const std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir,
+ const std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template,
+ arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions,
+ uint32_t max_open_files, uint64_t max_rows_per_file, uint64_t min_rows_per_group,
+ uint64_t max_rows_per_group) {
+ arrow::dataset::internal::Initialize();
+
+ // TODO(ARROW-16200): expose FileSystemDatasetWriteOptions in R
+ // and encapsulate this logic better
+ ds::FileSystemDatasetWriteOptions opts;
+ opts.file_write_options = file_write_options;
+ opts.existing_data_behavior = existing_data_behavior;
+ opts.filesystem = filesystem;
+ opts.base_dir = base_dir;
+ opts.partitioning = partitioning;
+ opts.basename_template = basename_template;
+ opts.max_partitions = max_partitions;
+ opts.max_open_files = max_open_files;
+ opts.max_rows_per_file = max_rows_per_file;
+ opts.min_rows_per_group = min_rows_per_group;
+ opts.max_rows_per_group = max_rows_per_group;
+
+ // TODO: factor this out to a strings_to_KVM() helper
+ auto values = cpp11::as_cpp<std::vector<std::string>>(metadata);
+ auto names = cpp11::as_cpp<std::vector<std::string>>(metadata.attr("names"));
+
+ auto kv =
+ std::make_shared<arrow::KeyValueMetadata>(std::move(names), std::move(values));
+
+ MakeExecNodeOrStop("write", final_node->plan(), {final_node.get()},
+ ds::WriteNodeOptions{std::move(opts), std::move(kv)});
+
+ StopIfNotOk(plan->Validate());
+ StopIfNotOk(plan->StartProducing());
+ StopIfNotOk(plan->finished().status());
}
#endif
diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp
index 4881830560..4ff30d9d94 100644
--- a/r/src/dataset.cpp
+++ b/r/src/dataset.cpp
@@ -511,30 +511,6 @@ std::shared_ptr<arrow::Schema> dataset___Scanner__schema(
return sc->options()->projected_schema;
}
-// [[dataset::export]]
-void dataset___Dataset__Write(
- const std::shared_ptr<ds::FileWriteOptions>& file_write_options,
- const std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir,
- const std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template,
- const std::shared_ptr<ds::Scanner>& scanner,
- arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions,
- uint32_t max_open_files, uint64_t max_rows_per_file, uint64_t min_rows_per_group,
- uint64_t max_rows_per_group) {
- ds::FileSystemDatasetWriteOptions opts;
- opts.file_write_options = file_write_options;
- opts.existing_data_behavior = existing_data_behavior;
- opts.filesystem = filesystem;
- opts.base_dir = base_dir;
- opts.partitioning = partitioning;
- opts.basename_template = basename_template;
- opts.max_partitions = max_partitions;
- opts.max_open_files = max_open_files;
- opts.max_rows_per_file = max_rows_per_file;
- opts.min_rows_per_group = min_rows_per_group;
- opts.max_rows_per_group = max_rows_per_group;
- StopIfNotOk(ds::FileSystemDataset::Write(opts, scanner));
-}
-
// [[dataset::export]]
std::shared_ptr<arrow::Table> dataset___Scanner__TakeRows(
const std::shared_ptr<ds::Scanner>& scanner,
diff --git a/r/tests/testthat/test-dataset-write.R b/r/tests/testthat/test-dataset-write.R
index aafb4bf292..5b657148a5 100644
--- a/r/tests/testthat/test-dataset-write.R
+++ b/r/tests/testthat/test-dataset-write.R
@@ -244,6 +244,24 @@ test_that("Dataset writing: dplyr methods", {
new_ds %>% select(c(names(df1), "twice")) %>% collect(),
df1 %>% filter(int == 4) %>% mutate(twice = int * 2)
)
+
+ # head
+ dst_dir4 <- tempfile()
+ ds %>%
+ mutate(twice = int * 2) %>%
+ arrange(int) %>%
+ head(3) %>%
+ write_dataset(dst_dir4, format = "feather")
+ new_ds <- open_dataset(dst_dir4, format = "feather")
+
+ expect_equal(
+ new_ds %>%
+ select(c(names(df1), "twice")) %>%
+ collect(),
+ df1 %>%
+ mutate(twice = int * 2) %>%
+ head(3)
+ )
})
test_that("Dataset writing: non-hive", {
@@ -321,6 +339,7 @@ test_that("Dataset writing: from RecordBatch", {
dst_dir <- tempfile()
stacked <- record_batch(rbind(df1, df2))
stacked %>%
+ mutate(twice = int * 2) %>%
group_by(int) %>%
write_dataset(dst_dir, format = "feather")
expect_true(dir.exists(dst_dir))
@@ -438,7 +457,7 @@ test_that("Writing a dataset: CSV format options", {
test_that("Dataset writing: unsupported features/input validation", {
skip_if_not_available("parquet")
- expect_error(write_dataset(4), 'dataset must be a "Dataset"')
+ expect_error(write_dataset(4), "'dataset' must be a Dataset, ")
ds <- open_dataset(hive_dir)
expect_error(
@@ -520,7 +539,6 @@ test_that("max_rows_per_group is adjusted if at odds with max_rows_per_file", {
expect_silent(
write_dataset(df, dst_dir, max_rows_per_file = 5)
)
-
})
@@ -571,17 +589,27 @@ test_that("Dataset write max open files", {
partitioning <- "c2"
num_of_unique_c2_groups <- 5
- record_batch_1 <- record_batch(c1 = c(1, 2, 3, 4, 0, 10),
- c2 = c("a", "b", "c", "d", "e", "a"))
- record_batch_2 <- record_batch(c1 = c(5, 6, 7, 8, 0, 1),
- c2 = c("a", "b", "c", "d", "e", "c"))
- record_batch_3 <- record_batch(c1 = c(9, 10, 11, 12, 0, 1),
- c2 = c("a", "b", "c", "d", "e", "d"))
- record_batch_4 <- record_batch(c1 = c(13, 14, 15, 16, 0, 1),
- c2 = c("a", "b", "c", "d", "e", "b"))
+ record_batch_1 <- record_batch(
+ c1 = c(1, 2, 3, 4, 0, 10),
+ c2 = c("a", "b", "c", "d", "e", "a")
+ )
+ record_batch_2 <- record_batch(
+ c1 = c(5, 6, 7, 8, 0, 1),
+ c2 = c("a", "b", "c", "d", "e", "c")
+ )
+ record_batch_3 <- record_batch(
+ c1 = c(9, 10, 11, 12, 0, 1),
+ c2 = c("a", "b", "c", "d", "e", "d")
+ )
+ record_batch_4 <- record_batch(
+ c1 = c(13, 14, 15, 16, 0, 1),
+ c2 = c("a", "b", "c", "d", "e", "b")
+ )
- table <- Table$create(d1 = record_batch_1, d2 = record_batch_2,
- d3 = record_batch_3, d4 = record_batch_4)
+ table <- Table$create(
+ d1 = record_batch_1, d2 = record_batch_2,
+ d3 = record_batch_3, d4 = record_batch_4
+ )
write_dataset(table, path = dst_dir, format = file_format, partitioning = partitioning)
@@ -643,12 +671,18 @@ test_that("Dataset write max rows per files", {
test_that("Dataset min_rows_per_group", {
skip_if_not_available("parquet")
- rb1 <- record_batch(c1 = c(1, 2, 3, 4),
- c2 = c("a", "b", "e", "a"))
- rb2 <- record_batch(c1 = c(5, 6, 7, 8, 9),
- c2 = c("a", "b", "c", "d", "h"))
- rb3 <- record_batch(c1 = c(10, 11),
- c2 = c("a", "b"))
+ rb1 <- record_batch(
+ c1 = c(1, 2, 3, 4),
+ c2 = c("a", "b", "e", "a")
+ )
+ rb2 <- record_batch(
+ c1 = c(5, 6, 7, 8, 9),
+ c2 = c("a", "b", "c", "d", "h")
+ )
+ rb3 <- record_batch(
+ c1 = c(10, 11),
+ c2 = c("a", "b")
+ )
dataset <- Table$create(d1 = rb1, d2 = rb2, d3 = rb3)
diff --git a/r/tests/testthat/test-metadata.R b/r/tests/testthat/test-metadata.R
index 3217b58d6c..4db20d04df 100644
--- a/r/tests/testthat/test-metadata.R
+++ b/r/tests/testthat/test-metadata.R
@@ -226,11 +226,13 @@ test_that("Row-level metadata (does not by default) roundtrip", {
# But we can re-enable this / read data that has already been written with
# row-level metadata
withr::with_options(
- list("arrow.preserve_row_level_metadata" = TRUE), {
+ list("arrow.preserve_row_level_metadata" = TRUE),
+ {
tab <- Table$create(df)
expect_identical(attr(as.data.frame(tab)$x[[1]], "foo"), "bar")
expect_identical(attr(as.data.frame(tab)$x[[2]], "baz"), "qux")
- })
+ }
+ )
})
@@ -256,7 +258,8 @@ test_that("Row-level metadata (does not) roundtrip in datasets", {
dst_dir <- make_temp_dir()
withr::with_options(
- list("arrow.preserve_row_level_metadata" = TRUE), {
+ list("arrow.preserve_row_level_metadata" = TRUE),
+ {
expect_warning(
write_dataset(df, dst_dir, partitioning = "part"),
"Row-level metadata is not compatible with datasets and will be discarded"
@@ -286,7 +289,25 @@ test_that("Row-level metadata (does not) roundtrip in datasets", {
df_from_ds <- ds %>% select(int) %>% collect(),
NA
)
- })
+ }
+ )
+})
+
+test_that("Dataset writing does handle other metadata", {
+ skip_if_not_available("dataset")
+ skip_if_not_available("parquet")
+
+ dst_dir <- make_temp_dir()
+ write_dataset(example_with_metadata, dst_dir, partitioning = "b")
+
+ ds <- open_dataset(dst_dir)
+ expect_equal(
+ ds %>%
+ # partitioning on b puts it last, so move it back
+ select(a, b, c, d) %>%
+ collect(),
+ example_with_metadata
+ )
})
test_that("When we encounter SF cols, we warn", {
@@ -305,11 +326,13 @@ test_that("When we encounter SF cols, we warn", {
# But we can re-enable this / read data that has already been written with
# row-level metadata without a warning
withr::with_options(
- list("arrow.preserve_row_level_metadata" = TRUE), {
+ list("arrow.preserve_row_level_metadata" = TRUE),
+ {
expect_warning(tab <- Table$create(df), NA)
expect_identical(attr(as.data.frame(tab)$x[[1]], "foo"), "bar")
expect_identical(attr(as.data.frame(tab)$x[[2]], "baz"), "qux")
- })
+ }
+ )
})
test_that("dplyr with metadata", {
@@ -369,7 +392,6 @@ test_that("grouped_df metadata is recorded (efficiently)", {
})
test_that("grouped_df non-arrow metadata is preserved", {
-
simple_tbl <- tibble(a = 1:2, b = 3:4)
attr(simple_tbl, "other_metadata") <- "look I'm still here!"
grouped <- group_by(simple_tbl, a)