You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/18 21:37:23 UTC

[GitHub] [arrow] westonpace commented on a diff in pull request #12316: ARROW-15517: [R] Use WriteNode in write_dataset()

westonpace commented on code in PR #12316:
URL: https://github.com/apache/arrow/pull/12316#discussion_r852421227


##########
r/R/dataset-write.R:
##########
@@ -136,41 +136,84 @@ write_dataset <- function(dataset,
   if (inherits(dataset, "arrow_dplyr_query")) {
     # partitioning vars need to be in the `select` schema
     dataset <- ensure_group_vars(dataset)
-  } else if (inherits(dataset, "grouped_df")) {
-    force(partitioning)
-    # Drop the grouping metadata before writing; we've already consumed it
-    # now to construct `partitioning` and don't want it in the metadata$r
-    dataset <- dplyr::ungroup(dataset)
+  } else {
+    if (inherits(dataset, "grouped_df")) {
+      force(partitioning)
+      # Drop the grouping metadata before writing; we've already consumed it
+      # now to construct `partitioning` and don't want it in the metadata$r
+      dataset <- dplyr::ungroup(dataset)
+    }
+    dataset <- tryCatch(
+      as_adq(dataset),
+      error = function(e) {
+        supported <- c(
+          "Dataset", "RecordBatch", "Table", "arrow_dplyr_query", "data.frame"
+        )
+        stop(
+          "'dataset' must be a ",
+          oxford_paste(supported, "or", quote = FALSE),
+          ", not ",
+          deparse(class(dataset)),
+          call. = FALSE
+        )
+      }
+    )
+  }
+
+  plan <- ExecPlan$create()
+  final_node <- plan$Build(dataset)
+  if (!is.null(final_node$sort %||% final_node$head %||% final_node$tail)) {
+    # Because sorting and topK are only handled in the SinkNode (or in R!),
+    # they wouldn't get picked up in the WriteNode. So let's Run this ExecPlan
+    # to capture those, and then create a new plan for writing
+    # TODO(ARROW-15681): do sorting in WriteNode in C++
+    dataset <- as_adq(plan$Run(final_node))
+    plan <- ExecPlan$create()
+    final_node <- plan$Build(dataset)
   }
 
-  scanner <- Scanner$create(dataset)
   if (!inherits(partitioning, "Partitioning")) {
-    partition_schema <- scanner$schema[partitioning]
+    partition_schema <- final_node$schema[partitioning]
     if (isTRUE(hive_style)) {
-      partitioning <- HivePartitioning$create(partition_schema, null_fallback = list(...)$null_fallback)
+      partitioning <- HivePartitioning$create(
+        partition_schema,
+        null_fallback = list(...)$null_fallback
+      )
     } else {
       partitioning <- DirectoryPartitioning$create(partition_schema)
     }
   }
 
-  if (!missing(max_rows_per_file) && missing(max_rows_per_group) && max_rows_per_group > max_rows_per_file) {
-    max_rows_per_group <- max_rows_per_file
-  }
-
   path_and_fs <- get_path_and_filesystem(path)
-  options <- FileWriteOptions$create(format, table = scanner, ...)
+  output_schema <- final_node$schema
+  options <- FileWriteOptions$create(format, table = output_schema, ...)

Review Comment:
   Why is this arg named `table`?



##########
r/R/metadata.R:
##########
@@ -208,3 +207,24 @@ arrow_attributes <- function(x, only_top_level = FALSE) {
     NULL
   }
 }
+
+get_r_metadata_from_old_schema <- function(new_schema,
+                                           old_schema,
+                                           drop_attributes = FALSE) {
+  # TODO: do we care about other (non-R) metadata preservation?
+  # How would we know if it were meaningful?

Review Comment:
   I think it depends on the source of `old_schema`.
   
   In the general case, the input is a collection of files and the output is a different set of files (sometimes we explode files and sometimes we merge files).  The idea of writing metadata to the output files in somewhat meaningless.  So, in general, I would say no, you don't care about preservation.
   
   In python, users can create a dataset from a single file, and we do a little bit of work to preserve the metadata on write because we want to feel like it "round trips".
   
   When creating or appending to a dataset users might want to specify general information about how the files were created, like "Origin": "Nightly update" but that is unrelated to the original metadata.
   
   In the future the dataset write may append its own metadata (e.g. dataset statistics, or information about the dataset schema such as which columns are already sorted, etc.)



##########
r/R/dataset-write.R:
##########
@@ -136,41 +136,84 @@ write_dataset <- function(dataset,
   if (inherits(dataset, "arrow_dplyr_query")) {
     # partitioning vars need to be in the `select` schema
     dataset <- ensure_group_vars(dataset)
-  } else if (inherits(dataset, "grouped_df")) {
-    force(partitioning)
-    # Drop the grouping metadata before writing; we've already consumed it
-    # now to construct `partitioning` and don't want it in the metadata$r
-    dataset <- dplyr::ungroup(dataset)
+  } else {
+    if (inherits(dataset, "grouped_df")) {
+      force(partitioning)
+      # Drop the grouping metadata before writing; we've already consumed it
+      # now to construct `partitioning` and don't want it in the metadata$r
+      dataset <- dplyr::ungroup(dataset)
+    }
+    dataset <- tryCatch(
+      as_adq(dataset),
+      error = function(e) {
+        supported <- c(
+          "Dataset", "RecordBatch", "Table", "arrow_dplyr_query", "data.frame"
+        )
+        stop(
+          "'dataset' must be a ",
+          oxford_paste(supported, "or", quote = FALSE),
+          ", not ",
+          deparse(class(dataset)),
+          call. = FALSE
+        )
+      }
+    )
+  }
+
+  plan <- ExecPlan$create()
+  final_node <- plan$Build(dataset)
+  if (!is.null(final_node$sort %||% final_node$head %||% final_node$tail)) {
+    # Because sorting and topK are only handled in the SinkNode (or in R!),
+    # they wouldn't get picked up in the WriteNode. So let's Run this ExecPlan
+    # to capture those, and then create a new plan for writing
+    # TODO(ARROW-15681): do sorting in WriteNode in C++
+    dataset <- as_adq(plan$Run(final_node))
+    plan <- ExecPlan$create()
+    final_node <- plan$Build(dataset)
   }
 
-  scanner <- Scanner$create(dataset)
   if (!inherits(partitioning, "Partitioning")) {
-    partition_schema <- scanner$schema[partitioning]
+    partition_schema <- final_node$schema[partitioning]
     if (isTRUE(hive_style)) {
-      partitioning <- HivePartitioning$create(partition_schema, null_fallback = list(...)$null_fallback)
+      partitioning <- HivePartitioning$create(
+        partition_schema,
+        null_fallback = list(...)$null_fallback
+      )
     } else {
       partitioning <- DirectoryPartitioning$create(partition_schema)
     }
   }
 
-  if (!missing(max_rows_per_file) && missing(max_rows_per_group) && max_rows_per_group > max_rows_per_file) {
-    max_rows_per_group <- max_rows_per_file
-  }
-
   path_and_fs <- get_path_and_filesystem(path)
-  options <- FileWriteOptions$create(format, table = scanner, ...)
+  output_schema <- final_node$schema
+  options <- FileWriteOptions$create(format, table = output_schema, ...)
 
+  # TODO(ARROW-16200): expose FileSystemDatasetWriteOptions in R
+  # and encapsulate this logic better
   existing_data_behavior_opts <- c("delete_matching", "overwrite", "error")
   existing_data_behavior <- match(match.arg(existing_data_behavior), existing_data_behavior_opts) - 1L
 
-  validate_positive_int_value(max_partitions, "max_partitions must be a positive, non-missing integer")
-  validate_positive_int_value(max_open_files, "max_open_files must be a positive, non-missing integer")
-  validate_positive_int_value(min_rows_per_group, "min_rows_per_group must be a positive, non-missing integer")
-  validate_positive_int_value(max_rows_per_group, "max_rows_per_group must be a positive, non-missing integer")
+  if (!missing(max_rows_per_file) && missing(max_rows_per_group) && max_rows_per_group > max_rows_per_file) {
+    max_rows_per_group <- max_rows_per_file
+  }
 
-  dataset___Dataset__Write(
+  validate_positive_int_value(max_partitions)
+  validate_positive_int_value(max_open_files)
+  validate_positive_int_value(min_rows_per_group)
+  validate_positive_int_value(max_rows_per_group)

Review Comment:
   The error message says `non-missing` and yet we have defaults for all of these properties (and line 196 seems to tolerate a missing `max_rows_per_group`.  Are they truly required to be non-missing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org