You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ks...@apache.org on 2021/11/03 12:19:47 UTC
[arrow] 04/10: ARROW-14480: [R] Expose
arrow::dataset::ExistingDataBehavior to R
This is an automated email from the ASF dual-hosted git repository.
kszucs pushed a commit to branch maint-6.0.x
in repository https://gitbox.apache.org/repos/asf/arrow.git
commit 58a7c3b2ff9981227f462462357178c23a66b639
Author: Weston Pace <we...@gmail.com>
AuthorDate: Wed Oct 27 14:18:57 2021 -0400
ARROW-14480: [R] Expose arrow::dataset::ExistingDataBehavior to R
This only adds the one option so we can restore backwards compatibility for CRAN compliance.
Closes #11552 from westonpace/feature/ARROW-14480--expose-new-dataset-writer-opts-to-r
Lead-authored-by: Weston Pace <we...@gmail.com>
Co-authored-by: Jonathan Keane <jk...@gmail.com>
Co-authored-by: Neal Richardson <ne...@gmail.com>
Signed-off-by: Neal Richardson <ne...@gmail.com>
---
cpp/src/arrow/dataset/dataset_writer.cc | 5 ++--
cpp/src/arrow/dataset/dataset_writer_test.cc | 6 ++---
cpp/src/arrow/dataset/file_base.h | 14 +---------
cpp/src/arrow/dataset/type_fwd.h | 12 +++++++++
r/R/arrowExports.R | 4 +--
r/R/dataset-write.R | 17 +++++++++++-
r/man/arrow-package.Rd | 6 ++++-
r/man/write_dataset.Rd | 10 +++++++
r/src/arrowExports.cpp | 11 ++++----
r/src/dataset.cpp | 4 ++-
r/tests/testthat/test-dataset-write.R | 40 ++++++++++++++++++++++++++++
11 files changed, 101 insertions(+), 28 deletions(-)
diff --git a/cpp/src/arrow/dataset/dataset_writer.cc b/cpp/src/arrow/dataset/dataset_writer.cc
index 12b7858..a61f32c 100644
--- a/cpp/src/arrow/dataset/dataset_writer.cc
+++ b/cpp/src/arrow/dataset/dataset_writer.cc
@@ -300,7 +300,8 @@ class DatasetWriterDirectoryQueue : public util::AsyncDestroyable {
init_future_ =
DeferNotOk(write_options_.filesystem->io_context().executor()->Submit([this] {
RETURN_NOT_OK(write_options_.filesystem->CreateDir(directory_));
- if (write_options_.existing_data_behavior == kDeleteMatchingPartitions) {
+ if (write_options_.existing_data_behavior ==
+ ExistingDataBehavior::kDeleteMatchingPartitions) {
return write_options_.filesystem->DeleteDirContents(directory_);
}
return Status::OK();
@@ -358,7 +359,7 @@ Status ValidateBasenameTemplate(util::string_view basename_template) {
}
Status EnsureDestinationValid(const FileSystemDatasetWriteOptions& options) {
- if (options.existing_data_behavior == kError) {
+ if (options.existing_data_behavior == ExistingDataBehavior::kError) {
fs::FileSelector selector;
selector.base_dir = options.base_dir;
selector.recursive = true;
diff --git a/cpp/src/arrow/dataset/dataset_writer_test.cc b/cpp/src/arrow/dataset/dataset_writer_test.cc
index e3fac05..bf38c2f 100644
--- a/cpp/src/arrow/dataset/dataset_writer_test.cc
+++ b/cpp/src/arrow/dataset/dataset_writer_test.cc
@@ -284,7 +284,7 @@ TEST_F(DatasetWriterTestFixture, DeleteExistingData) {
fs::File("testdir/chunk-5.arrow"), fs::File("testdir/blah.txt")}));
filesystem_ = std::dynamic_pointer_cast<MockFileSystem>(fs);
write_options_.filesystem = filesystem_;
- write_options_.existing_data_behavior = kDeleteMatchingPartitions;
+ write_options_.existing_data_behavior = ExistingDataBehavior::kDeleteMatchingPartitions;
EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_));
Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(100), "");
AssertFinished(queue_fut);
@@ -302,7 +302,7 @@ TEST_F(DatasetWriterTestFixture, PartitionedDeleteExistingData) {
fs::File("testdir/part1/bar.arrow")}));
filesystem_ = std::dynamic_pointer_cast<MockFileSystem>(fs);
write_options_.filesystem = filesystem_;
- write_options_.existing_data_behavior = kDeleteMatchingPartitions;
+ write_options_.existing_data_behavior = ExistingDataBehavior::kDeleteMatchingPartitions;
EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_));
Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(100), "part0");
AssertFinished(queue_fut);
@@ -321,7 +321,7 @@ TEST_F(DatasetWriterTestFixture, LeaveExistingData) {
fs::File("testdir/chunk-5.arrow"), fs::File("testdir/blah.txt")}));
filesystem_ = std::dynamic_pointer_cast<MockFileSystem>(fs);
write_options_.filesystem = filesystem_;
- write_options_.existing_data_behavior = kOverwriteOrIgnore;
+ write_options_.existing_data_behavior = ExistingDataBehavior::kOverwriteOrIgnore;
EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_));
Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(100), "");
AssertFinished(queue_fut);
diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h
index 3c7b825..9113691 100644
--- a/cpp/src/arrow/dataset/file_base.h
+++ b/cpp/src/arrow/dataset/file_base.h
@@ -343,18 +343,6 @@ class ARROW_DS_EXPORT FileWriter {
fs::FileLocator destination_locator_;
};
-/// \brief Controls what happens if files exist in an output directory during a dataset
-/// write
-enum ExistingDataBehavior : int8_t {
- /// Deletes all files in a directory the first time that directory is encountered
- kDeleteMatchingPartitions,
- /// Ignores existing files, overwriting any that happen to have the same name as an
- /// output file
- kOverwriteOrIgnore,
- /// Returns an error if there are any files or subdirectories in the output directory
- kError,
-};
-
/// \brief Options for writing a dataset.
struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions {
/// Options for individual fragment writing.
@@ -388,7 +376,7 @@ struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions {
uint64_t max_rows_per_file = 0;
/// Controls what happens if an output directory already exists.
- ExistingDataBehavior existing_data_behavior = kError;
+ ExistingDataBehavior existing_data_behavior = ExistingDataBehavior::kError;
/// Callback to be invoked against all FileWriters before
/// they are finalized with FileWriter::Finish().
diff --git a/cpp/src/arrow/dataset/type_fwd.h b/cpp/src/arrow/dataset/type_fwd.h
index ad1a299..78748a3 100644
--- a/cpp/src/arrow/dataset/type_fwd.h
+++ b/cpp/src/arrow/dataset/type_fwd.h
@@ -52,6 +52,18 @@ class FileSystemDataset;
class FileSystemDatasetFactory;
struct FileSystemDatasetWriteOptions;
+/// \brief Controls what happens if files exist in an output directory during a dataset
+/// write
+enum class ExistingDataBehavior : int8_t {
+ /// Deletes all files in a directory the first time that directory is encountered
+ kDeleteMatchingPartitions,
+ /// Ignores existing files, overwriting any that happen to have the same name as an
+ /// output file
+ kOverwriteOrIgnore,
+ /// Returns an error if there are any files or subdirectories in the output directory
+ kError,
+};
+
class InMemoryDataset;
class CsvFileFormat;
diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index f5f2dd7..014b164 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -604,8 +604,8 @@ dataset___ScanTask__get_batches <- function(scan_task) {
.Call(`_arrow_dataset___ScanTask__get_batches`, scan_task)
}
-dataset___Dataset__Write <- function(file_write_options, filesystem, base_dir, partitioning, basename_template, scanner) {
- invisible(.Call(`_arrow_dataset___Dataset__Write`, file_write_options, filesystem, base_dir, partitioning, basename_template, scanner))
+dataset___Dataset__Write <- function(file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior) {
+ invisible(.Call(`_arrow_dataset___Dataset__Write`, file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior))
}
dataset___Scanner__TakeRows <- function(scanner, indices) {
diff --git a/r/R/dataset-write.R b/r/R/dataset-write.R
index 95c7f7b..3a98357 100644
--- a/r/R/dataset-write.R
+++ b/r/R/dataset-write.R
@@ -38,6 +38,16 @@
#' will yield `"part-0.feather", ...`.
#' @param hive_style logical: write partition segments as Hive-style
#' (`key1=value1/key2=value2/file.ext`) or as just bare values. Default is `TRUE`.
+#' @param existing_data_behavior The behavior to use when there is already data
+#' in the destination directory. Must be one of "overwrite", "error", or
+#' "delete_matching".
+#' - "overwrite" (the default) then any new files created will overwrite
+#' existing files
+#' - "error" then the operation will fail if the destination directory is not
+#' empty
+#' - "delete_matching" then the writer will delete any existing partitions
+#' if data is going to be written to those partitions and will leave alone
+#' partitions which data is not written to.
#' @param ... additional format-specific arguments. For available Parquet
#' options, see [write_parquet()]. The available Feather options are
#' - `use_legacy_format` logical: write data formatted so that Arrow libraries
@@ -97,6 +107,7 @@ write_dataset <- function(dataset,
partitioning = dplyr::group_vars(dataset),
basename_template = paste0("part-{i}.", as.character(format)),
hive_style = TRUE,
+ existing_data_behavior = c("overwrite", "error", "delete_matching"),
...) {
format <- match.arg(format)
if (inherits(dataset, "arrow_dplyr_query")) {
@@ -122,8 +133,12 @@ write_dataset <- function(dataset,
path_and_fs <- get_path_and_filesystem(path)
options <- FileWriteOptions$create(format, table = scanner, ...)
+ existing_data_behavior_opts <- c("delete_matching", "overwrite", "error")
+ existing_data_behavior <- match(match.arg(existing_data_behavior), existing_data_behavior_opts) - 1L
+
dataset___Dataset__Write(
options, path_and_fs$fs, path_and_fs$path,
- partitioning, basename_template, scanner
+ partitioning, basename_template, scanner,
+ existing_data_behavior
)
}
diff --git a/r/man/arrow-package.Rd b/r/man/arrow-package.Rd
index 122f768..0217621 100644
--- a/r/man/arrow-package.Rd
+++ b/r/man/arrow-package.Rd
@@ -6,7 +6,11 @@
\alias{arrow-package}
\title{arrow: Integration to 'Apache' 'Arrow'}
\description{
-'Apache' 'Arrow' <https://arrow.apache.org/> is a cross-language development platform for in-memory data. It specifies a standardized language-independent columnar memory format for flat and hierarchical data, organized for efficient analytic operations on modern hardware. This package provides an interface to the 'Arrow C++' library.
+'Apache' 'Arrow' <https://arrow.apache.org/> is a cross-language
+ development platform for in-memory data. It specifies a standardized
+ language-independent columnar memory format for flat and hierarchical data,
+ organized for efficient analytic operations on modern hardware. This
+ package provides an interface to the 'Arrow C++' library.
}
\seealso{
Useful links:
diff --git a/r/man/write_dataset.Rd b/r/man/write_dataset.Rd
index 219cc83..76bbaf7 100644
--- a/r/man/write_dataset.Rd
+++ b/r/man/write_dataset.Rd
@@ -11,6 +11,7 @@ write_dataset(
partitioning = dplyr::group_vars(dataset),
basename_template = paste0("part-{i}.", as.character(format)),
hive_style = TRUE,
+ existing_data_behavior = c("overwrite", "error", "delete_matching"),
...
)
}
@@ -38,6 +39,15 @@ will yield \verb{"part-0.feather", ...}.}
\item{hive_style}{logical: write partition segments as Hive-style
(\code{key1=value1/key2=value2/file.ext}) or as just bare values. Default is \code{TRUE}.}
+\item{existing_data_behavior}{The behavior to use when there is already data
+in the destination directory. Must be one of overwrite, error, or
+delete_matching. When this is set to "overwrite" (the default) then any
+new files created will overwrite existing files. When this is set to
+"error" then the operation will fail if the destination directory is not
+empty. When this is set to "delete_matching" then the writer will delete
+any existing partitions if data is going to be written to those partitions
+and will leave alone partitions which data is not written to.}
+
\item{...}{additional format-specific arguments. For available Parquet
options, see \code{\link[=write_parquet]{write_parquet()}}. The available Feather options are
\itemize{
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index c446b77..5872aa4 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -2396,8 +2396,8 @@ extern "C" SEXP _arrow_dataset___ScanTask__get_batches(SEXP scan_task_sexp){
// dataset.cpp
#if defined(ARROW_R_WITH_DATASET)
-void dataset___Dataset__Write(const std::shared_ptr<ds::FileWriteOptions>& file_write_options, const std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir, const std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template, const std::shared_ptr<ds::Scanner>& scanner);
-extern "C" SEXP _arrow_dataset___Dataset__Write(SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP scanner_sexp){
+void dataset___Dataset__Write(const std::shared_ptr<ds::FileWriteOptions>& file_write_options, const std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir, const std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template, const std::shared_ptr<ds::Scanner>& scanner, arrow::dataset::ExistingDataBehavior existing_data_behavior);
+extern "C" SEXP _arrow_dataset___Dataset__Write(SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP scanner_sexp, SEXP existing_data_behavior_sexp){
BEGIN_CPP11
arrow::r::Input<const std::shared_ptr<ds::FileWriteOptions>&>::type file_write_options(file_write_options_sexp);
arrow::r::Input<const std::shared_ptr<fs::FileSystem>&>::type filesystem(filesystem_sexp);
@@ -2405,12 +2405,13 @@ BEGIN_CPP11
arrow::r::Input<const std::shared_ptr<ds::Partitioning>&>::type partitioning(partitioning_sexp);
arrow::r::Input<std::string>::type basename_template(basename_template_sexp);
arrow::r::Input<const std::shared_ptr<ds::Scanner>&>::type scanner(scanner_sexp);
- dataset___Dataset__Write(file_write_options, filesystem, base_dir, partitioning, basename_template, scanner);
+ arrow::r::Input<arrow::dataset::ExistingDataBehavior>::type existing_data_behavior(existing_data_behavior_sexp);
+ dataset___Dataset__Write(file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior);
return R_NilValue;
END_CPP11
}
#else
-extern "C" SEXP _arrow_dataset___Dataset__Write(SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP scanner_sexp){
+extern "C" SEXP _arrow_dataset___Dataset__Write(SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP scanner_sexp, SEXP existing_data_behavior_sexp){
Rf_error("Cannot call dataset___Dataset__Write(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. ");
}
#endif
@@ -7319,7 +7320,7 @@ static const R_CallMethodDef CallEntries[] = {
{ "_arrow_dataset___Scanner__head", (DL_FUNC) &_arrow_dataset___Scanner__head, 2},
{ "_arrow_dataset___Scanner__schema", (DL_FUNC) &_arrow_dataset___Scanner__schema, 1},
{ "_arrow_dataset___ScanTask__get_batches", (DL_FUNC) &_arrow_dataset___ScanTask__get_batches, 1},
- { "_arrow_dataset___Dataset__Write", (DL_FUNC) &_arrow_dataset___Dataset__Write, 6},
+ { "_arrow_dataset___Dataset__Write", (DL_FUNC) &_arrow_dataset___Dataset__Write, 7},
{ "_arrow_dataset___Scanner__TakeRows", (DL_FUNC) &_arrow_dataset___Scanner__TakeRows, 2},
{ "_arrow_dataset___Scanner__CountRows", (DL_FUNC) &_arrow_dataset___Scanner__CountRows, 1},
{ "_arrow_Int8__initialize", (DL_FUNC) &_arrow_Int8__initialize, 0},
diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp
index 544c9d8..7e384aa 100644
--- a/r/src/dataset.cpp
+++ b/r/src/dataset.cpp
@@ -516,9 +516,11 @@ void dataset___Dataset__Write(
const std::shared_ptr<ds::FileWriteOptions>& file_write_options,
const std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir,
const std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template,
- const std::shared_ptr<ds::Scanner>& scanner) {
+ const std::shared_ptr<ds::Scanner>& scanner,
+ arrow::dataset::ExistingDataBehavior existing_data_behavior) {
ds::FileSystemDatasetWriteOptions opts;
opts.file_write_options = file_write_options;
+ opts.existing_data_behavior = existing_data_behavior;
opts.filesystem = filesystem;
opts.base_dir = base_dir;
opts.partitioning = partitioning;
diff --git a/r/tests/testthat/test-dataset-write.R b/r/tests/testthat/test-dataset-write.R
index 705103f..8e7c077 100644
--- a/r/tests/testthat/test-dataset-write.R
+++ b/r/tests/testthat/test-dataset-write.R
@@ -139,6 +139,46 @@ test_that("Writing a dataset: Parquet->Parquet (default)", {
)
})
+test_that("Writing a dataset: existing data behavior", {
+ # This test does not work on Windows because unlink does not immediately
+ # delete the data.
+ skip_on_os("windows")
+ ds <- open_dataset(csv_dir, partitioning = "part", format = "csv")
+ dst_dir <- make_temp_dir()
+ write_dataset(ds, dst_dir, format = "feather", partitioning = "int")
+ expect_true(dir.exists(dst_dir))
+
+ check_dataset <- function() {
+ new_ds <- open_dataset(dst_dir, format = "feather")
+
+ expect_equal(
+ new_ds %>%
+ select(string = chr, integer = int) %>%
+ filter(integer > 6 & integer < 11) %>%
+ collect() %>%
+ summarize(mean = mean(integer)),
+ df1 %>%
+ select(string = chr, integer = int) %>%
+ filter(integer > 6) %>%
+ summarize(mean = mean(integer))
+ )
+ }
+
+ check_dataset()
+ # By default we should overwrite
+ write_dataset(ds, dst_dir, format = "feather", partitioning = "int")
+ check_dataset()
+ write_dataset(ds, dst_dir, format = "feather", partitioning = "int", existing_data_behavior = "overwrite")
+ check_dataset()
+ expect_error(
+ write_dataset(ds, dst_dir, format = "feather", partitioning = "int", existing_data_behavior = "error"),
+ "directory is not empty"
+ )
+ unlink(dst_dir, recursive = TRUE)
+ write_dataset(ds, dst_dir, format = "feather", partitioning = "int", existing_data_behavior = "error")
+ check_dataset()
+})
+
test_that("Writing a dataset: no format specified", {
dst_dir <- make_temp_dir()
write_dataset(example_data, dst_dir)