You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ks...@apache.org on 2021/11/03 12:19:47 UTC

[arrow] 04/10: ARROW-14480: [R] Expose arrow::dataset::ExistingDataBehavior to R

This is an automated email from the ASF dual-hosted git repository.

kszucs pushed a commit to branch maint-6.0.x
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 58a7c3b2ff9981227f462462357178c23a66b639
Author: Weston Pace <we...@gmail.com>
AuthorDate: Wed Oct 27 14:18:57 2021 -0400

    ARROW-14480: [R] Expose arrow::dataset::ExistingDataBehavior to R
    
    This only adds the one option so we can restore backwards compatibility for CRAN compliance.
    
    Closes #11552 from westonpace/feature/ARROW-14480--expose-new-dataset-writer-opts-to-r
    
    Lead-authored-by: Weston Pace <we...@gmail.com>
    Co-authored-by: Jonathan Keane <jk...@gmail.com>
    Co-authored-by: Neal Richardson <ne...@gmail.com>
    Signed-off-by: Neal Richardson <ne...@gmail.com>
---
 cpp/src/arrow/dataset/dataset_writer.cc      |  5 ++--
 cpp/src/arrow/dataset/dataset_writer_test.cc |  6 ++---
 cpp/src/arrow/dataset/file_base.h            | 14 +---------
 cpp/src/arrow/dataset/type_fwd.h             | 12 +++++++++
 r/R/arrowExports.R                           |  4 +--
 r/R/dataset-write.R                          | 17 +++++++++++-
 r/man/arrow-package.Rd                       |  6 ++++-
 r/man/write_dataset.Rd                       | 10 +++++++
 r/src/arrowExports.cpp                       | 11 ++++----
 r/src/dataset.cpp                            |  4 ++-
 r/tests/testthat/test-dataset-write.R        | 40 ++++++++++++++++++++++++++++
 11 files changed, 101 insertions(+), 28 deletions(-)

diff --git a/cpp/src/arrow/dataset/dataset_writer.cc b/cpp/src/arrow/dataset/dataset_writer.cc
index 12b7858..a61f32c 100644
--- a/cpp/src/arrow/dataset/dataset_writer.cc
+++ b/cpp/src/arrow/dataset/dataset_writer.cc
@@ -300,7 +300,8 @@ class DatasetWriterDirectoryQueue : public util::AsyncDestroyable {
     init_future_ =
         DeferNotOk(write_options_.filesystem->io_context().executor()->Submit([this] {
           RETURN_NOT_OK(write_options_.filesystem->CreateDir(directory_));
-          if (write_options_.existing_data_behavior == kDeleteMatchingPartitions) {
+          if (write_options_.existing_data_behavior ==
+              ExistingDataBehavior::kDeleteMatchingPartitions) {
             return write_options_.filesystem->DeleteDirContents(directory_);
           }
           return Status::OK();
@@ -358,7 +359,7 @@ Status ValidateBasenameTemplate(util::string_view basename_template) {
 }
 
 Status EnsureDestinationValid(const FileSystemDatasetWriteOptions& options) {
-  if (options.existing_data_behavior == kError) {
+  if (options.existing_data_behavior == ExistingDataBehavior::kError) {
     fs::FileSelector selector;
     selector.base_dir = options.base_dir;
     selector.recursive = true;
diff --git a/cpp/src/arrow/dataset/dataset_writer_test.cc b/cpp/src/arrow/dataset/dataset_writer_test.cc
index e3fac05..bf38c2f 100644
--- a/cpp/src/arrow/dataset/dataset_writer_test.cc
+++ b/cpp/src/arrow/dataset/dataset_writer_test.cc
@@ -284,7 +284,7 @@ TEST_F(DatasetWriterTestFixture, DeleteExistingData) {
                      fs::File("testdir/chunk-5.arrow"), fs::File("testdir/blah.txt")}));
   filesystem_ = std::dynamic_pointer_cast<MockFileSystem>(fs);
   write_options_.filesystem = filesystem_;
-  write_options_.existing_data_behavior = kDeleteMatchingPartitions;
+  write_options_.existing_data_behavior = ExistingDataBehavior::kDeleteMatchingPartitions;
   EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_));
   Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(100), "");
   AssertFinished(queue_fut);
@@ -302,7 +302,7 @@ TEST_F(DatasetWriterTestFixture, PartitionedDeleteExistingData) {
                      fs::File("testdir/part1/bar.arrow")}));
   filesystem_ = std::dynamic_pointer_cast<MockFileSystem>(fs);
   write_options_.filesystem = filesystem_;
-  write_options_.existing_data_behavior = kDeleteMatchingPartitions;
+  write_options_.existing_data_behavior = ExistingDataBehavior::kDeleteMatchingPartitions;
   EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_));
   Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(100), "part0");
   AssertFinished(queue_fut);
@@ -321,7 +321,7 @@ TEST_F(DatasetWriterTestFixture, LeaveExistingData) {
                      fs::File("testdir/chunk-5.arrow"), fs::File("testdir/blah.txt")}));
   filesystem_ = std::dynamic_pointer_cast<MockFileSystem>(fs);
   write_options_.filesystem = filesystem_;
-  write_options_.existing_data_behavior = kOverwriteOrIgnore;
+  write_options_.existing_data_behavior = ExistingDataBehavior::kOverwriteOrIgnore;
   EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_));
   Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(100), "");
   AssertFinished(queue_fut);
diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h
index 3c7b825..9113691 100644
--- a/cpp/src/arrow/dataset/file_base.h
+++ b/cpp/src/arrow/dataset/file_base.h
@@ -343,18 +343,6 @@ class ARROW_DS_EXPORT FileWriter {
   fs::FileLocator destination_locator_;
 };
 
-/// \brief Controls what happens if files exist in an output directory during a dataset
-/// write
-enum ExistingDataBehavior : int8_t {
-  /// Deletes all files in a directory the first time that directory is encountered
-  kDeleteMatchingPartitions,
-  /// Ignores existing files, overwriting any that happen to have the same name as an
-  /// output file
-  kOverwriteOrIgnore,
-  /// Returns an error if there are any files or subdirectories in the output directory
-  kError,
-};
-
 /// \brief Options for writing a dataset.
 struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions {
   /// Options for individual fragment writing.
@@ -388,7 +376,7 @@ struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions {
   uint64_t max_rows_per_file = 0;
 
   /// Controls what happens if an output directory already exists.
-  ExistingDataBehavior existing_data_behavior = kError;
+  ExistingDataBehavior existing_data_behavior = ExistingDataBehavior::kError;
 
   /// Callback to be invoked against all FileWriters before
   /// they are finalized with FileWriter::Finish().
diff --git a/cpp/src/arrow/dataset/type_fwd.h b/cpp/src/arrow/dataset/type_fwd.h
index ad1a299..78748a3 100644
--- a/cpp/src/arrow/dataset/type_fwd.h
+++ b/cpp/src/arrow/dataset/type_fwd.h
@@ -52,6 +52,18 @@ class FileSystemDataset;
 class FileSystemDatasetFactory;
 struct FileSystemDatasetWriteOptions;
 
+/// \brief Controls what happens if files exist in an output directory during a dataset
+/// write
+enum class ExistingDataBehavior : int8_t {
+  /// Deletes all files in a directory the first time that directory is encountered
+  kDeleteMatchingPartitions,
+  /// Ignores existing files, overwriting any that happen to have the same name as an
+  /// output file
+  kOverwriteOrIgnore,
+  /// Returns an error if there are any files or subdirectories in the output directory
+  kError,
+};
+
 class InMemoryDataset;
 
 class CsvFileFormat;
diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index f5f2dd7..014b164 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -604,8 +604,8 @@ dataset___ScanTask__get_batches <- function(scan_task) {
   .Call(`_arrow_dataset___ScanTask__get_batches`, scan_task)
 }
 
-dataset___Dataset__Write <- function(file_write_options, filesystem, base_dir, partitioning, basename_template, scanner) {
-  invisible(.Call(`_arrow_dataset___Dataset__Write`, file_write_options, filesystem, base_dir, partitioning, basename_template, scanner))
+dataset___Dataset__Write <- function(file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior) {
+  invisible(.Call(`_arrow_dataset___Dataset__Write`, file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior))
 }
 
 dataset___Scanner__TakeRows <- function(scanner, indices) {
diff --git a/r/R/dataset-write.R b/r/R/dataset-write.R
index 95c7f7b..3a98357 100644
--- a/r/R/dataset-write.R
+++ b/r/R/dataset-write.R
@@ -38,6 +38,16 @@
 #' will yield `"part-0.feather", ...`.
 #' @param hive_style logical: write partition segments as Hive-style
 #' (`key1=value1/key2=value2/file.ext`) or as just bare values. Default is `TRUE`.
+#' @param existing_data_behavior The behavior to use when there is already data
+#' in the destination directory.  Must be one of "overwrite", "error", or
+#' "delete_matching".
+#' - "overwrite" (the default) then any new files created will overwrite
+#'   existing files
+#' - "error" then the operation will fail if the destination directory is not
+#'   empty
+#' - "delete_matching" then the writer will delete any existing partitions
+#'   if data is going to be written to those partitions and will leave alone
+#'   partitions which data is not written to.
 #' @param ... additional format-specific arguments. For available Parquet
 #' options, see [write_parquet()]. The available Feather options are
 #' - `use_legacy_format` logical: write data formatted so that Arrow libraries
@@ -97,6 +107,7 @@ write_dataset <- function(dataset,
                           partitioning = dplyr::group_vars(dataset),
                           basename_template = paste0("part-{i}.", as.character(format)),
                           hive_style = TRUE,
+                          existing_data_behavior = c("overwrite", "error", "delete_matching"),
                           ...) {
   format <- match.arg(format)
   if (inherits(dataset, "arrow_dplyr_query")) {
@@ -122,8 +133,12 @@ write_dataset <- function(dataset,
   path_and_fs <- get_path_and_filesystem(path)
   options <- FileWriteOptions$create(format, table = scanner, ...)
 
+  existing_data_behavior_opts <- c("delete_matching", "overwrite", "error")
+  existing_data_behavior <- match(match.arg(existing_data_behavior), existing_data_behavior_opts) - 1L
+
   dataset___Dataset__Write(
     options, path_and_fs$fs, path_and_fs$path,
-    partitioning, basename_template, scanner
+    partitioning, basename_template, scanner,
+    existing_data_behavior
   )
 }
diff --git a/r/man/arrow-package.Rd b/r/man/arrow-package.Rd
index 122f768..0217621 100644
--- a/r/man/arrow-package.Rd
+++ b/r/man/arrow-package.Rd
@@ -6,7 +6,11 @@
 \alias{arrow-package}
 \title{arrow: Integration to 'Apache' 'Arrow'}
 \description{
-'Apache' 'Arrow' <https://arrow.apache.org/> is a cross-language development platform for in-memory data. It specifies a standardized language-independent columnar memory format for flat and hierarchical data, organized for efficient analytic operations on modern hardware. This package provides an interface to the 'Arrow C++' library.
+'Apache' 'Arrow' <https://arrow.apache.org/> is a cross-language
+    development platform for in-memory data. It specifies a standardized
+    language-independent columnar memory format for flat and hierarchical data,
+    organized for efficient analytic operations on modern hardware. This
+    package provides an interface to the 'Arrow C++' library.
 }
 \seealso{
 Useful links:
diff --git a/r/man/write_dataset.Rd b/r/man/write_dataset.Rd
index 219cc83..76bbaf7 100644
--- a/r/man/write_dataset.Rd
+++ b/r/man/write_dataset.Rd
@@ -11,6 +11,7 @@ write_dataset(
   partitioning = dplyr::group_vars(dataset),
   basename_template = paste0("part-{i}.", as.character(format)),
   hive_style = TRUE,
+  existing_data_behavior = c("overwrite", "error", "delete_matching"),
   ...
 )
 }
@@ -38,6 +39,15 @@ will yield \verb{"part-0.feather", ...}.}
 \item{hive_style}{logical: write partition segments as Hive-style
 (\code{key1=value1/key2=value2/file.ext}) or as just bare values. Default is \code{TRUE}.}
 
+\item{existing_data_behavior}{The behavior to use when there is already data
+in the destination directory.  Must be one of overwrite, error, or
+delete_matching.  When this is set to "overwrite" (the default) then any
+new files created will overwrite existing files.  When this is set to
+"error" then the operation will fail if the destination directory is not
+empty.  When this is set to "delete_matching" then the writer will delete
+any existing partitions if data is going to be written to those partitions
+and will leave alone partitions which data is not written to.}
+
 \item{...}{additional format-specific arguments. For available Parquet
 options, see \code{\link[=write_parquet]{write_parquet()}}. The available Feather options are
 \itemize{
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index c446b77..5872aa4 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -2396,8 +2396,8 @@ extern "C" SEXP _arrow_dataset___ScanTask__get_batches(SEXP scan_task_sexp){
 
 // dataset.cpp
 #if defined(ARROW_R_WITH_DATASET)
-void dataset___Dataset__Write(const std::shared_ptr<ds::FileWriteOptions>& file_write_options, const std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir, const std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template, const std::shared_ptr<ds::Scanner>& scanner);
-extern "C" SEXP _arrow_dataset___Dataset__Write(SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP scanner_sexp){
+void dataset___Dataset__Write(const std::shared_ptr<ds::FileWriteOptions>& file_write_options, const std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir, const std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template, const std::shared_ptr<ds::Scanner>& scanner, arrow::dataset::ExistingDataBehavior existing_data_behavior);
+extern "C" SEXP _arrow_dataset___Dataset__Write(SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP scanner_sexp, SEXP existing_data_behavior_sexp){
 BEGIN_CPP11
 	arrow::r::Input<const std::shared_ptr<ds::FileWriteOptions>&>::type file_write_options(file_write_options_sexp);
 	arrow::r::Input<const std::shared_ptr<fs::FileSystem>&>::type filesystem(filesystem_sexp);
@@ -2405,12 +2405,13 @@ BEGIN_CPP11
 	arrow::r::Input<const std::shared_ptr<ds::Partitioning>&>::type partitioning(partitioning_sexp);
 	arrow::r::Input<std::string>::type basename_template(basename_template_sexp);
 	arrow::r::Input<const std::shared_ptr<ds::Scanner>&>::type scanner(scanner_sexp);
-	dataset___Dataset__Write(file_write_options, filesystem, base_dir, partitioning, basename_template, scanner);
+	arrow::r::Input<arrow::dataset::ExistingDataBehavior>::type existing_data_behavior(existing_data_behavior_sexp);
+	dataset___Dataset__Write(file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior);
 	return R_NilValue;
 END_CPP11
 }
 #else
-extern "C" SEXP _arrow_dataset___Dataset__Write(SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP scanner_sexp){
+extern "C" SEXP _arrow_dataset___Dataset__Write(SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP scanner_sexp, SEXP existing_data_behavior_sexp){
 	Rf_error("Cannot call dataset___Dataset__Write(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. ");
 }
 #endif
@@ -7319,7 +7320,7 @@ static const R_CallMethodDef CallEntries[] = {
 		{ "_arrow_dataset___Scanner__head", (DL_FUNC) &_arrow_dataset___Scanner__head, 2}, 
 		{ "_arrow_dataset___Scanner__schema", (DL_FUNC) &_arrow_dataset___Scanner__schema, 1}, 
 		{ "_arrow_dataset___ScanTask__get_batches", (DL_FUNC) &_arrow_dataset___ScanTask__get_batches, 1}, 
-		{ "_arrow_dataset___Dataset__Write", (DL_FUNC) &_arrow_dataset___Dataset__Write, 6}, 
+		{ "_arrow_dataset___Dataset__Write", (DL_FUNC) &_arrow_dataset___Dataset__Write, 7}, 
 		{ "_arrow_dataset___Scanner__TakeRows", (DL_FUNC) &_arrow_dataset___Scanner__TakeRows, 2}, 
 		{ "_arrow_dataset___Scanner__CountRows", (DL_FUNC) &_arrow_dataset___Scanner__CountRows, 1}, 
 		{ "_arrow_Int8__initialize", (DL_FUNC) &_arrow_Int8__initialize, 0}, 
diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp
index 544c9d8..7e384aa 100644
--- a/r/src/dataset.cpp
+++ b/r/src/dataset.cpp
@@ -516,9 +516,11 @@ void dataset___Dataset__Write(
     const std::shared_ptr<ds::FileWriteOptions>& file_write_options,
     const std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir,
     const std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template,
-    const std::shared_ptr<ds::Scanner>& scanner) {
+    const std::shared_ptr<ds::Scanner>& scanner,
+    arrow::dataset::ExistingDataBehavior existing_data_behavior) {
   ds::FileSystemDatasetWriteOptions opts;
   opts.file_write_options = file_write_options;
+  opts.existing_data_behavior = existing_data_behavior;
   opts.filesystem = filesystem;
   opts.base_dir = base_dir;
   opts.partitioning = partitioning;
diff --git a/r/tests/testthat/test-dataset-write.R b/r/tests/testthat/test-dataset-write.R
index 705103f..8e7c077 100644
--- a/r/tests/testthat/test-dataset-write.R
+++ b/r/tests/testthat/test-dataset-write.R
@@ -139,6 +139,46 @@ test_that("Writing a dataset: Parquet->Parquet (default)", {
   )
 })
 
+test_that("Writing a dataset: existing data behavior", {
+  # This test does not work on Windows because unlink does not immediately
+  # delete the data.
+  skip_on_os("windows")
+  ds <- open_dataset(csv_dir, partitioning = "part", format = "csv")
+  dst_dir <- make_temp_dir()
+  write_dataset(ds, dst_dir, format = "feather", partitioning = "int")
+  expect_true(dir.exists(dst_dir))
+
+  check_dataset <- function() {
+    new_ds <- open_dataset(dst_dir, format = "feather")
+
+    expect_equal(
+      new_ds %>%
+        select(string = chr, integer = int) %>%
+        filter(integer > 6 & integer < 11) %>%
+        collect() %>%
+        summarize(mean = mean(integer)),
+      df1 %>%
+        select(string = chr, integer = int) %>%
+        filter(integer > 6) %>%
+        summarize(mean = mean(integer))
+    )
+  }
+
+  check_dataset()
+  # By default we should overwrite
+  write_dataset(ds, dst_dir, format = "feather", partitioning = "int")
+  check_dataset()
+  write_dataset(ds, dst_dir, format = "feather", partitioning = "int", existing_data_behavior = "overwrite")
+  check_dataset()
+  expect_error(
+    write_dataset(ds, dst_dir, format = "feather", partitioning = "int", existing_data_behavior = "error"),
+    "directory is not empty"
+  )
+  unlink(dst_dir, recursive = TRUE)
+  write_dataset(ds, dst_dir, format = "feather", partitioning = "int", existing_data_behavior = "error")
+  check_dataset()
+})
+
 test_that("Writing a dataset: no format specified", {
   dst_dir <- make_temp_dir()
   write_dataset(example_data, dst_dir)