You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/05/11 20:52:23 UTC

[GitHub] [arrow] westonpace commented on a diff in pull request #13117: ARROW-16525: [C++] Tee node not properly marking node finished

westonpace commented on code in PR #13117:
URL: https://github.com/apache/arrow/pull/13117#discussion_r870745521


##########
cpp/src/arrow/dataset/file_base.cc:
##########
@@ -460,6 +460,23 @@ class TeeNode : public compute::MapNode {
 
   const char* kind_name() const override { return "TeeNode"; }
 
+  void Finish(Status finish_st) override {
+    dataset_writer_->Finish().AddCallback([this, finish_st](const Status& dw_status) {
+      // Need to wait for the task group to complete regardless of dw_status
+      task_group_.End().AddCallback(
+          [this, dw_status, finish_st](const Status& tg_status) {
+            // Prefer dw_status then finish_st and then tg_status
+            if (!dw_status.ok()) {
+              finished_.MarkFinished(dw_status);
+            }
+            if (!finish_st.ok()) {
+              finished_.MarkFinished(finish_st);
+            }
+            finished_.MarkFinished(tg_status);

Review Comment:
   Good idea.  We do support `dw_status & finish_st & tg_status` so I changed to that.



##########
cpp/src/arrow/dataset/file_test.cc:
##########
@@ -342,5 +347,118 @@ TEST_F(TestFileSystemDataset, WriteProjected) {
     }
   }
 }
+
+class FileSystemWriteTest : public testing::TestWithParam<std::tuple<bool, bool>> {
+ protected:
+  bool IsParallel() { return std::get<0>(GetParam()); }
+  bool IsSlow() { return std::get<1>(GetParam()); }
+
+  FileSystemWriteTest() { dataset::internal::Initialize(); }
+
+  void TestDatasetWriteRoundTrip(
+      std::function<Result<std::shared_ptr<cp::ExecPlan>>(
+          const cp::BatchesWithSchema& source_data, const FileSystemDatasetWriteOptions&,
+          std::function<Future<util::optional<cp::ExecBatch>>()>*)>
+          plan_factory,
+      bool has_output) {
+    // Runs in-memory data through the plan and then scans out the written
+    // data to ensure it matches the source data
+    auto format = std::make_shared<IpcFileFormat>();
+    auto fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
+    FileSystemDatasetWriteOptions write_options;
+    write_options.file_write_options = format->DefaultWriteOptions();
+    write_options.filesystem = fs;
+    write_options.base_dir = "root";
+    write_options.partitioning = std::make_shared<HivePartitioning>(schema({}));
+    write_options.basename_template = "{i}.feather";
+

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org