You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/02/01 21:21:28 UTC

[GitHub] [arrow] nealrichardson opened a new pull request #12316: ARROW-15517: [R] Use WriteNode in write_dataset()

nealrichardson opened a new pull request #12316:
URL: https://github.com/apache/arrow/pull/12316


   I've done the minimal so far to switch write_dataset() to use ExecPlans directly, which should allow streaming writes in more cases. There are a few other cases to test, some refactoring/cleanup that would make sense, and it would be good to verify that this makes a difference e.g. in the case of dataset writing with a join--maybe it doesn't matter today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #12316: ARROW-15517: [R] Use WriteNode in write_dataset()

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12316:
URL: https://github.com/apache/arrow/pull/12316#issuecomment-1027299989






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] nealrichardson commented on a change in pull request #12316: ARROW-15517: [R] Use WriteNode in write_dataset()

Posted by GitBox <gi...@apache.org>.
nealrichardson commented on a change in pull request #12316:
URL: https://github.com/apache/arrow/pull/12316#discussion_r806278437



##########
File path: r/src/compute-exec.cpp
##########
@@ -157,7 +158,33 @@ std::shared_ptr<compute::ExecNode> ExecNode_Scan(
                             arrow::dataset::ScanNodeOptions{dataset, options});
 }
 
-#endif
+// [[dataset::export]]
+void ExecPlan_Write(const std::shared_ptr<compute::ExecPlan>& plan,
+                    const std::shared_ptr<compute::ExecNode>& final_node,
+                    const std::shared_ptr<ds::FileWriteOptions>& file_write_options,
+                    const std::shared_ptr<fs::FileSystem>& filesystem,
+                    std::string base_dir,
+                    const std::shared_ptr<ds::Partitioning>& partitioning,
+                    std::string basename_template,
+                    arrow::dataset::ExistingDataBehavior existing_data_behavior,
+                    int max_partitions) {
+  ds::FileSystemDatasetWriteOptions opts;
+  opts.file_write_options = file_write_options;
+  opts.existing_data_behavior = existing_data_behavior;
+  opts.filesystem = filesystem;
+  opts.base_dir = base_dir;
+  opts.partitioning = partitioning;
+  opts.basename_template = basename_template;
+  opts.max_partitions = max_partitions;
+
+  MakeExecNodeOrStop(
+      "write", final_node->plan(), {final_node.get()},
+      ds::WriteNodeOptions{std::move(opts), std::move(final_node->output_schema())});
+
+  StopIfNotOk(plan->Validate());
+  StopIfNotOk(plan->StartProducing());
+  StopIfNotOk(plan->finished().status());
+}
 
 // [[dataset::export]]
 std::shared_ptr<compute::ExecNode> ExecNode_Filter(

Review comment:
       Yes but we (currently) turn a Table into an InMemoryDataset. I guess we could go through RecordBatchReader, which wouldn't require dataset (I can't remember if I looked into that and found that it didn't work for some reason).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] removed a comment on pull request #12316: ARROW-15517: [R] Use WriteNode in write_dataset()

Posted by GitBox <gi...@apache.org>.
github-actions[bot] removed a comment on pull request #12316:
URL: https://github.com/apache/arrow/pull/12316#issuecomment-1027300016


   :warning: Ticket **has not been started in JIRA**, please click 'Start Progress'.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] nealrichardson commented on a change in pull request #12316: ARROW-15517: [R] Use WriteNode in write_dataset()

Posted by GitBox <gi...@apache.org>.
nealrichardson commented on a change in pull request #12316:
URL: https://github.com/apache/arrow/pull/12316#discussion_r806278437



##########
File path: r/src/compute-exec.cpp
##########
@@ -157,7 +158,33 @@ std::shared_ptr<compute::ExecNode> ExecNode_Scan(
                             arrow::dataset::ScanNodeOptions{dataset, options});
 }
 
-#endif
+// [[dataset::export]]
+void ExecPlan_Write(const std::shared_ptr<compute::ExecPlan>& plan,
+                    const std::shared_ptr<compute::ExecNode>& final_node,
+                    const std::shared_ptr<ds::FileWriteOptions>& file_write_options,
+                    const std::shared_ptr<fs::FileSystem>& filesystem,
+                    std::string base_dir,
+                    const std::shared_ptr<ds::Partitioning>& partitioning,
+                    std::string basename_template,
+                    arrow::dataset::ExistingDataBehavior existing_data_behavior,
+                    int max_partitions) {
+  ds::FileSystemDatasetWriteOptions opts;
+  opts.file_write_options = file_write_options;
+  opts.existing_data_behavior = existing_data_behavior;
+  opts.filesystem = filesystem;
+  opts.base_dir = base_dir;
+  opts.partitioning = partitioning;
+  opts.basename_template = basename_template;
+  opts.max_partitions = max_partitions;
+
+  MakeExecNodeOrStop(
+      "write", final_node->plan(), {final_node.get()},
+      ds::WriteNodeOptions{std::move(opts), std::move(final_node->output_schema())});
+
+  StopIfNotOk(plan->Validate());
+  StopIfNotOk(plan->StartProducing());
+  StopIfNotOk(plan->finished().status());
+}
 
 // [[dataset::export]]
 std::shared_ptr<compute::ExecNode> ExecNode_Filter(

Review comment:
       Yes but we (currently) turn a Table into an InMemoryDataset. I guess we could go through RecordBatchReader, which wouldn't require dataset (I can't remember if I looked into that and found that it didn't work for some reason, you'd think it should).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #12316: ARROW-15517: [R] Use WriteNode in write_dataset()

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #12316:
URL: https://github.com/apache/arrow/pull/12316#discussion_r806279597



##########
File path: r/src/compute-exec.cpp
##########
@@ -157,7 +158,33 @@ std::shared_ptr<compute::ExecNode> ExecNode_Scan(
                             arrow::dataset::ScanNodeOptions{dataset, options});
 }
 
-#endif
+// [[dataset::export]]
+void ExecPlan_Write(const std::shared_ptr<compute::ExecPlan>& plan,
+                    const std::shared_ptr<compute::ExecNode>& final_node,
+                    const std::shared_ptr<ds::FileWriteOptions>& file_write_options,
+                    const std::shared_ptr<fs::FileSystem>& filesystem,
+                    std::string base_dir,
+                    const std::shared_ptr<ds::Partitioning>& partitioning,
+                    std::string basename_template,
+                    arrow::dataset::ExistingDataBehavior existing_data_behavior,
+                    int max_partitions) {
+  ds::FileSystemDatasetWriteOptions opts;
+  opts.file_write_options = file_write_options;
+  opts.existing_data_behavior = existing_data_behavior;
+  opts.filesystem = filesystem;
+  opts.base_dir = base_dir;
+  opts.partitioning = partitioning;
+  opts.basename_template = basename_template;
+  opts.max_partitions = max_partitions;
+
+  MakeExecNodeOrStop(
+      "write", final_node->plan(), {final_node.get()},
+      ds::WriteNodeOptions{std::move(opts), std::move(final_node->output_schema())});
+
+  StopIfNotOk(plan->Validate());
+  StopIfNotOk(plan->StartProducing());
+  StopIfNotOk(plan->finished().status());
+}
 
 // [[dataset::export]]
 std::shared_ptr<compute::ExecNode> ExecNode_Filter(

Review comment:
       Soon it will be even easier than that: #12267 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] nealrichardson commented on a change in pull request #12316: ARROW-15517: [R] Use WriteNode in write_dataset()

Posted by GitBox <gi...@apache.org>.
nealrichardson commented on a change in pull request #12316:
URL: https://github.com/apache/arrow/pull/12316#discussion_r806277566



##########
File path: r/R/dataset-write.R
##########
@@ -116,25 +116,40 @@ write_dataset <- function(dataset,
   if (inherits(dataset, "arrow_dplyr_query")) {
     # partitioning vars need to be in the `select` schema
     dataset <- ensure_group_vars(dataset)
-  } else if (inherits(dataset, "grouped_df")) {
-    force(partitioning)
-    # Drop the grouping metadata before writing; we've already consumed it
-    # now to construct `partitioning` and don't want it in the metadata$r
-    dataset <- dplyr::ungroup(dataset)
+  } else {
+    if (inherits(dataset, "grouped_df")) {
+      force(partitioning)
+      # Drop the grouping metadata before writing; we've already consumed it
+      # now to construct `partitioning` and don't want it in the metadata$r
+      dataset <- dplyr::ungroup(dataset)
+    }
+    dataset <- tryCatch(
+      as_adq(dataset),
+      error = function(e) {
+        stop("'dataset' must be a Dataset, RecordBatch, Table, arrow_dplyr_query, or data.frame, not ", deparse(class(dataset)), call. = FALSE)
+      }
+    )
   }
 
-  scanner <- Scanner$create(dataset)
+  plan <- ExecPlan$create()
+  final_node <- plan$Build(dataset)
+  # TODO: warn/error if there is sorting/top_k? or just compute? (this needs test)

Review comment:
       Cool. TopK is a separate issue--it's another feature only handled in a sink node. I'll handle it here by evaluating the query and then doing a new ExecPlan to write the resulting Table. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] nealrichardson commented on a change in pull request #12316: ARROW-15517: [R] Use WriteNode in write_dataset()

Posted by GitBox <gi...@apache.org>.
nealrichardson commented on a change in pull request #12316:
URL: https://github.com/apache/arrow/pull/12316#discussion_r806275298



##########
File path: r/src/compute-exec.cpp
##########
@@ -157,7 +158,33 @@ std::shared_ptr<compute::ExecNode> ExecNode_Scan(
                             arrow::dataset::ScanNodeOptions{dataset, options});
 }
 
-#endif
+// [[dataset::export]]
+void ExecPlan_Write(const std::shared_ptr<compute::ExecPlan>& plan,
+                    const std::shared_ptr<compute::ExecNode>& final_node,
+                    const std::shared_ptr<ds::FileWriteOptions>& file_write_options,
+                    const std::shared_ptr<fs::FileSystem>& filesystem,
+                    std::string base_dir,
+                    const std::shared_ptr<ds::Partitioning>& partitioning,
+                    std::string basename_template,
+                    arrow::dataset::ExistingDataBehavior existing_data_behavior,
+                    int max_partitions) {
+  ds::FileSystemDatasetWriteOptions opts;
+  opts.file_write_options = file_write_options;
+  opts.existing_data_behavior = existing_data_behavior;
+  opts.filesystem = filesystem;
+  opts.base_dir = base_dir;
+  opts.partitioning = partitioning;
+  opts.basename_template = basename_template;
+  opts.max_partitions = max_partitions;
+
+  MakeExecNodeOrStop(
+      "write", final_node->plan(), {final_node.get()},
+      ds::WriteNodeOptions{std::move(opts), std::move(final_node->output_schema())});
+
+  StopIfNotOk(plan->Validate());
+  StopIfNotOk(plan->StartProducing());
+  StopIfNotOk(plan->finished().status());
+}
 
 // [[dataset::export]]
 std::shared_ptr<compute::ExecNode> ExecNode_Filter(

Review comment:
       The ifdef and the decorations above each function should match, so I moved the endif accordingly. Technically these ExecNode methods compile with just the compute namespace, not dataset. I could move them so they compile even if the arrow C++ library wasn't built with ARROW_DATASET=ON, but that seemed not worth the effort. 

##########
File path: r/R/dataset-write.R
##########
@@ -116,25 +116,40 @@ write_dataset <- function(dataset,
   if (inherits(dataset, "arrow_dplyr_query")) {
     # partitioning vars need to be in the `select` schema
     dataset <- ensure_group_vars(dataset)
-  } else if (inherits(dataset, "grouped_df")) {
-    force(partitioning)
-    # Drop the grouping metadata before writing; we've already consumed it
-    # now to construct `partitioning` and don't want it in the metadata$r
-    dataset <- dplyr::ungroup(dataset)
+  } else {
+    if (inherits(dataset, "grouped_df")) {
+      force(partitioning)
+      # Drop the grouping metadata before writing; we've already consumed it
+      # now to construct `partitioning` and don't want it in the metadata$r
+      dataset <- dplyr::ungroup(dataset)
+    }
+    dataset <- tryCatch(
+      as_adq(dataset),
+      error = function(e) {
+        stop("'dataset' must be a Dataset, RecordBatch, Table, arrow_dplyr_query, or data.frame, not ", deparse(class(dataset)), call. = FALSE)
+      }
+    )
   }
 
-  scanner <- Scanner$create(dataset)
+  plan <- ExecPlan$create()
+  final_node <- plan$Build(dataset)
+  # TODO: warn/error if there is sorting/top_k? or just compute? (this needs test)

Review comment:
       Cool. TopK is a separate issue--it's another feature only handled in a sink node. I'll handle it here by evaluating the query and then doing a new ExecPlan to write the resulting Table. 

##########
File path: r/src/compute-exec.cpp
##########
@@ -157,7 +158,33 @@ std::shared_ptr<compute::ExecNode> ExecNode_Scan(
                             arrow::dataset::ScanNodeOptions{dataset, options});
 }
 
-#endif
+// [[dataset::export]]
+void ExecPlan_Write(const std::shared_ptr<compute::ExecPlan>& plan,
+                    const std::shared_ptr<compute::ExecNode>& final_node,
+                    const std::shared_ptr<ds::FileWriteOptions>& file_write_options,
+                    const std::shared_ptr<fs::FileSystem>& filesystem,
+                    std::string base_dir,
+                    const std::shared_ptr<ds::Partitioning>& partitioning,
+                    std::string basename_template,
+                    arrow::dataset::ExistingDataBehavior existing_data_behavior,
+                    int max_partitions) {
+  ds::FileSystemDatasetWriteOptions opts;
+  opts.file_write_options = file_write_options;
+  opts.existing_data_behavior = existing_data_behavior;
+  opts.filesystem = filesystem;
+  opts.base_dir = base_dir;
+  opts.partitioning = partitioning;
+  opts.basename_template = basename_template;
+  opts.max_partitions = max_partitions;
+
+  MakeExecNodeOrStop(
+      "write", final_node->plan(), {final_node.get()},
+      ds::WriteNodeOptions{std::move(opts), std::move(final_node->output_schema())});
+
+  StopIfNotOk(plan->Validate());
+  StopIfNotOk(plan->StartProducing());
+  StopIfNotOk(plan->finished().status());
+}
 
 // [[dataset::export]]
 std::shared_ptr<compute::ExecNode> ExecNode_Filter(

Review comment:
       Yes but we (currently) turn a Table into an InMemoryDataset. I guess we could go through RecordBatchReader, which wouldn't require dataset (I can't remember if I looked into that and found that it didn't work for some reason).

##########
File path: r/src/compute-exec.cpp
##########
@@ -157,7 +158,33 @@ std::shared_ptr<compute::ExecNode> ExecNode_Scan(
                             arrow::dataset::ScanNodeOptions{dataset, options});
 }
 
-#endif
+// [[dataset::export]]
+void ExecPlan_Write(const std::shared_ptr<compute::ExecPlan>& plan,
+                    const std::shared_ptr<compute::ExecNode>& final_node,
+                    const std::shared_ptr<ds::FileWriteOptions>& file_write_options,
+                    const std::shared_ptr<fs::FileSystem>& filesystem,
+                    std::string base_dir,
+                    const std::shared_ptr<ds::Partitioning>& partitioning,
+                    std::string basename_template,
+                    arrow::dataset::ExistingDataBehavior existing_data_behavior,
+                    int max_partitions) {
+  ds::FileSystemDatasetWriteOptions opts;
+  opts.file_write_options = file_write_options;
+  opts.existing_data_behavior = existing_data_behavior;
+  opts.filesystem = filesystem;
+  opts.base_dir = base_dir;
+  opts.partitioning = partitioning;
+  opts.basename_template = basename_template;
+  opts.max_partitions = max_partitions;
+
+  MakeExecNodeOrStop(
+      "write", final_node->plan(), {final_node.get()},
+      ds::WriteNodeOptions{std::move(opts), std::move(final_node->output_schema())});
+
+  StopIfNotOk(plan->Validate());
+  StopIfNotOk(plan->StartProducing());
+  StopIfNotOk(plan->finished().status());
+}
 
 // [[dataset::export]]
 std::shared_ptr<compute::ExecNode> ExecNode_Filter(

Review comment:
       Yes but we (currently) turn a Table into an InMemoryDataset. I guess we could go through RecordBatchReader, which wouldn't require dataset (I can't remember if I looked into that and found that it didn't work for some reason, you'd think it should).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] nealrichardson commented on a change in pull request #12316: ARROW-15517: [R] Use WriteNode in write_dataset()

Posted by GitBox <gi...@apache.org>.
nealrichardson commented on a change in pull request #12316:
URL: https://github.com/apache/arrow/pull/12316#discussion_r806275298



##########
File path: r/src/compute-exec.cpp
##########
@@ -157,7 +158,33 @@ std::shared_ptr<compute::ExecNode> ExecNode_Scan(
                             arrow::dataset::ScanNodeOptions{dataset, options});
 }
 
-#endif
+// [[dataset::export]]
+void ExecPlan_Write(const std::shared_ptr<compute::ExecPlan>& plan,
+                    const std::shared_ptr<compute::ExecNode>& final_node,
+                    const std::shared_ptr<ds::FileWriteOptions>& file_write_options,
+                    const std::shared_ptr<fs::FileSystem>& filesystem,
+                    std::string base_dir,
+                    const std::shared_ptr<ds::Partitioning>& partitioning,
+                    std::string basename_template,
+                    arrow::dataset::ExistingDataBehavior existing_data_behavior,
+                    int max_partitions) {
+  ds::FileSystemDatasetWriteOptions opts;
+  opts.file_write_options = file_write_options;
+  opts.existing_data_behavior = existing_data_behavior;
+  opts.filesystem = filesystem;
+  opts.base_dir = base_dir;
+  opts.partitioning = partitioning;
+  opts.basename_template = basename_template;
+  opts.max_partitions = max_partitions;
+
+  MakeExecNodeOrStop(
+      "write", final_node->plan(), {final_node.get()},
+      ds::WriteNodeOptions{std::move(opts), std::move(final_node->output_schema())});
+
+  StopIfNotOk(plan->Validate());
+  StopIfNotOk(plan->StartProducing());
+  StopIfNotOk(plan->finished().status());
+}
 
 // [[dataset::export]]
 std::shared_ptr<compute::ExecNode> ExecNode_Filter(

Review comment:
       The ifdef and the decorations above each function should match, so I moved the endif accordingly. Technically these ExecNode methods compile with just the compute namespace, not dataset. I could move them so they compile even if the arrow C++ library wasn't built with ARROW_DATASET=ON, but that seemed not worth the effort. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #12316: ARROW-15517: [R] Use WriteNode in write_dataset()

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #12316:
URL: https://github.com/apache/arrow/pull/12316#discussion_r806276486



##########
File path: r/src/compute-exec.cpp
##########
@@ -157,7 +158,33 @@ std::shared_ptr<compute::ExecNode> ExecNode_Scan(
                             arrow::dataset::ScanNodeOptions{dataset, options});
 }
 
-#endif
+// [[dataset::export]]
+void ExecPlan_Write(const std::shared_ptr<compute::ExecPlan>& plan,
+                    const std::shared_ptr<compute::ExecNode>& final_node,
+                    const std::shared_ptr<ds::FileWriteOptions>& file_write_options,
+                    const std::shared_ptr<fs::FileSystem>& filesystem,
+                    std::string base_dir,
+                    const std::shared_ptr<ds::Partitioning>& partitioning,
+                    std::string basename_template,
+                    arrow::dataset::ExistingDataBehavior existing_data_behavior,
+                    int max_partitions) {
+  ds::FileSystemDatasetWriteOptions opts;
+  opts.file_write_options = file_write_options;
+  opts.existing_data_behavior = existing_data_behavior;
+  opts.filesystem = filesystem;
+  opts.base_dir = base_dir;
+  opts.partitioning = partitioning;
+  opts.basename_template = basename_template;
+  opts.max_partitions = max_partitions;
+
+  MakeExecNodeOrStop(
+      "write", final_node->plan(), {final_node.get()},
+      ds::WriteNodeOptions{std::move(opts), std::move(final_node->output_schema())});
+
+  StopIfNotOk(plan->Validate());
+  StopIfNotOk(plan->StartProducing());
+  StopIfNotOk(plan->finished().status());
+}
 
 // [[dataset::export]]
 std::shared_ptr<compute::ExecNode> ExecNode_Filter(

Review comment:
       It should be possible in that case to use a table as the source but I agree it's probably not worth worrying about unless there is a compelling use case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #12316: ARROW-15517: [R] Use WriteNode in write_dataset()

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #12316:
URL: https://github.com/apache/arrow/pull/12316#discussion_r806235649



##########
File path: r/R/dataset-write.R
##########
@@ -116,25 +116,40 @@ write_dataset <- function(dataset,
   if (inherits(dataset, "arrow_dplyr_query")) {
     # partitioning vars need to be in the `select` schema
     dataset <- ensure_group_vars(dataset)
-  } else if (inherits(dataset, "grouped_df")) {
-    force(partitioning)
-    # Drop the grouping metadata before writing; we've already consumed it
-    # now to construct `partitioning` and don't want it in the metadata$r
-    dataset <- dplyr::ungroup(dataset)
+  } else {
+    if (inherits(dataset, "grouped_df")) {
+      force(partitioning)
+      # Drop the grouping metadata before writing; we've already consumed it
+      # now to construct `partitioning` and don't want it in the metadata$r
+      dataset <- dplyr::ungroup(dataset)
+    }
+    dataset <- tryCatch(
+      as_adq(dataset),
+      error = function(e) {
+        stop("'dataset' must be a Dataset, RecordBatch, Table, arrow_dplyr_query, or data.frame, not ", deparse(class(dataset)), call. = FALSE)
+      }
+    )
   }
 
-  scanner <- Scanner$create(dataset)
+  plan <- ExecPlan$create()
+  final_node <- plan$Build(dataset)
+  # TODO: warn/error if there is sorting/top_k? or just compute? (this needs test)

Review comment:
       I think a warning would be good.  Someday we should be able to respect the sort (the dataset writer already has a short serialized path at the front so this should be straightforward).  Dataset writing writes out chunks and those chunks have indices so if the user asked for a sort then the data in chunk-0 should precede chunk-1.  I've created ARROW-15681 to track this on the cpp side.

##########
File path: r/src/compute-exec.cpp
##########
@@ -157,7 +158,33 @@ std::shared_ptr<compute::ExecNode> ExecNode_Scan(
                             arrow::dataset::ScanNodeOptions{dataset, options});
 }
 
-#endif
+// [[dataset::export]]
+void ExecPlan_Write(const std::shared_ptr<compute::ExecPlan>& plan,
+                    const std::shared_ptr<compute::ExecNode>& final_node,
+                    const std::shared_ptr<ds::FileWriteOptions>& file_write_options,
+                    const std::shared_ptr<fs::FileSystem>& filesystem,
+                    std::string base_dir,
+                    const std::shared_ptr<ds::Partitioning>& partitioning,
+                    std::string basename_template,
+                    arrow::dataset::ExistingDataBehavior existing_data_behavior,
+                    int max_partitions) {
+  ds::FileSystemDatasetWriteOptions opts;
+  opts.file_write_options = file_write_options;
+  opts.existing_data_behavior = existing_data_behavior;
+  opts.filesystem = filesystem;
+  opts.base_dir = base_dir;
+  opts.partitioning = partitioning;
+  opts.basename_template = basename_template;
+  opts.max_partitions = max_partitions;
+
+  MakeExecNodeOrStop(
+      "write", final_node->plan(), {final_node.get()},
+      ds::WriteNodeOptions{std::move(opts), std::move(final_node->output_schema())});
+
+  StopIfNotOk(plan->Validate());
+  StopIfNotOk(plan->StartProducing());
+  StopIfNotOk(plan->finished().status());
+}
 
 // [[dataset::export]]
 std::shared_ptr<compute::ExecNode> ExecNode_Filter(

Review comment:
       I don't think this method was inside the `#if` block before.  Did you mean to include it?  I might also be reading the git diff incorrectly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #12316: ARROW-15517: [R] Use WriteNode in write_dataset()

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #12316:
URL: https://github.com/apache/arrow/pull/12316#discussion_r806235649



##########
File path: r/R/dataset-write.R
##########
@@ -116,25 +116,40 @@ write_dataset <- function(dataset,
   if (inherits(dataset, "arrow_dplyr_query")) {
     # partitioning vars need to be in the `select` schema
     dataset <- ensure_group_vars(dataset)
-  } else if (inherits(dataset, "grouped_df")) {
-    force(partitioning)
-    # Drop the grouping metadata before writing; we've already consumed it
-    # now to construct `partitioning` and don't want it in the metadata$r
-    dataset <- dplyr::ungroup(dataset)
+  } else {
+    if (inherits(dataset, "grouped_df")) {
+      force(partitioning)
+      # Drop the grouping metadata before writing; we've already consumed it
+      # now to construct `partitioning` and don't want it in the metadata$r
+      dataset <- dplyr::ungroup(dataset)
+    }
+    dataset <- tryCatch(
+      as_adq(dataset),
+      error = function(e) {
+        stop("'dataset' must be a Dataset, RecordBatch, Table, arrow_dplyr_query, or data.frame, not ", deparse(class(dataset)), call. = FALSE)
+      }
+    )
   }
 
-  scanner <- Scanner$create(dataset)
+  plan <- ExecPlan$create()
+  final_node <- plan$Build(dataset)
+  # TODO: warn/error if there is sorting/top_k? or just compute? (this needs test)

Review comment:
       I think a warning would be good.  Someday we should be able to respect the sort (the dataset writer already has a short serialized path at the front so this should be straightforward).  Dataset writing writes out chunks and those chunks have indices so if the user asked for a sort then the data in chunk-0 should precede chunk-1.  I've created ARROW-15681 to track this on the cpp side.

##########
File path: r/src/compute-exec.cpp
##########
@@ -157,7 +158,33 @@ std::shared_ptr<compute::ExecNode> ExecNode_Scan(
                             arrow::dataset::ScanNodeOptions{dataset, options});
 }
 
-#endif
+// [[dataset::export]]
+void ExecPlan_Write(const std::shared_ptr<compute::ExecPlan>& plan,
+                    const std::shared_ptr<compute::ExecNode>& final_node,
+                    const std::shared_ptr<ds::FileWriteOptions>& file_write_options,
+                    const std::shared_ptr<fs::FileSystem>& filesystem,
+                    std::string base_dir,
+                    const std::shared_ptr<ds::Partitioning>& partitioning,
+                    std::string basename_template,
+                    arrow::dataset::ExistingDataBehavior existing_data_behavior,
+                    int max_partitions) {
+  ds::FileSystemDatasetWriteOptions opts;
+  opts.file_write_options = file_write_options;
+  opts.existing_data_behavior = existing_data_behavior;
+  opts.filesystem = filesystem;
+  opts.base_dir = base_dir;
+  opts.partitioning = partitioning;
+  opts.basename_template = basename_template;
+  opts.max_partitions = max_partitions;
+
+  MakeExecNodeOrStop(
+      "write", final_node->plan(), {final_node.get()},
+      ds::WriteNodeOptions{std::move(opts), std::move(final_node->output_schema())});
+
+  StopIfNotOk(plan->Validate());
+  StopIfNotOk(plan->StartProducing());
+  StopIfNotOk(plan->finished().status());
+}
 
 // [[dataset::export]]
 std::shared_ptr<compute::ExecNode> ExecNode_Filter(

Review comment:
       I don't think this method was inside the `#if` block before.  Did you mean to include it?  I might also be reading the git diff incorrectly.

##########
File path: r/src/compute-exec.cpp
##########
@@ -157,7 +158,33 @@ std::shared_ptr<compute::ExecNode> ExecNode_Scan(
                             arrow::dataset::ScanNodeOptions{dataset, options});
 }
 
-#endif
+// [[dataset::export]]
+void ExecPlan_Write(const std::shared_ptr<compute::ExecPlan>& plan,
+                    const std::shared_ptr<compute::ExecNode>& final_node,
+                    const std::shared_ptr<ds::FileWriteOptions>& file_write_options,
+                    const std::shared_ptr<fs::FileSystem>& filesystem,
+                    std::string base_dir,
+                    const std::shared_ptr<ds::Partitioning>& partitioning,
+                    std::string basename_template,
+                    arrow::dataset::ExistingDataBehavior existing_data_behavior,
+                    int max_partitions) {
+  ds::FileSystemDatasetWriteOptions opts;
+  opts.file_write_options = file_write_options;
+  opts.existing_data_behavior = existing_data_behavior;
+  opts.filesystem = filesystem;
+  opts.base_dir = base_dir;
+  opts.partitioning = partitioning;
+  opts.basename_template = basename_template;
+  opts.max_partitions = max_partitions;
+
+  MakeExecNodeOrStop(
+      "write", final_node->plan(), {final_node.get()},
+      ds::WriteNodeOptions{std::move(opts), std::move(final_node->output_schema())});
+
+  StopIfNotOk(plan->Validate());
+  StopIfNotOk(plan->StartProducing());
+  StopIfNotOk(plan->finished().status());
+}
 
 // [[dataset::export]]
 std::shared_ptr<compute::ExecNode> ExecNode_Filter(

Review comment:
       It should be possible in that case to use a table as the source but I agree it's probably not worth worrying about unless there is a compelling use case.

##########
File path: r/src/compute-exec.cpp
##########
@@ -157,7 +158,33 @@ std::shared_ptr<compute::ExecNode> ExecNode_Scan(
                             arrow::dataset::ScanNodeOptions{dataset, options});
 }
 
-#endif
+// [[dataset::export]]
+void ExecPlan_Write(const std::shared_ptr<compute::ExecPlan>& plan,
+                    const std::shared_ptr<compute::ExecNode>& final_node,
+                    const std::shared_ptr<ds::FileWriteOptions>& file_write_options,
+                    const std::shared_ptr<fs::FileSystem>& filesystem,
+                    std::string base_dir,
+                    const std::shared_ptr<ds::Partitioning>& partitioning,
+                    std::string basename_template,
+                    arrow::dataset::ExistingDataBehavior existing_data_behavior,
+                    int max_partitions) {
+  ds::FileSystemDatasetWriteOptions opts;
+  opts.file_write_options = file_write_options;
+  opts.existing_data_behavior = existing_data_behavior;
+  opts.filesystem = filesystem;
+  opts.base_dir = base_dir;
+  opts.partitioning = partitioning;
+  opts.basename_template = basename_template;
+  opts.max_partitions = max_partitions;
+
+  MakeExecNodeOrStop(
+      "write", final_node->plan(), {final_node.get()},
+      ds::WriteNodeOptions{std::move(opts), std::move(final_node->output_schema())});
+
+  StopIfNotOk(plan->Validate());
+  StopIfNotOk(plan->StartProducing());
+  StopIfNotOk(plan->finished().status());
+}
 
 // [[dataset::export]]
 std::shared_ptr<compute::ExecNode> ExecNode_Filter(

Review comment:
       Soon it will be even easier than that: #12267 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org