You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ic...@apache.org on 2023/06/21 15:20:20 UTC

[arrow] branch main updated: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false` (#35953)

This is an automated email from the ASF dual-hosted git repository.

icexelloss pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new b451b34a26 GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false` (#35953)
b451b34a26 is described below

commit b451b34a260caae52182608b76f3e450e522b59e
Author: rtpsw <rt...@hotmail.com>
AuthorDate: Wed Jun 21 18:20:10 2023 +0300

    GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false` (#35953)
    
    ### What changes are included in this PR?
    
    The execution plan set-up code is refactored to call `StopProducing` on the plan in the reader's `Close`. Originally, `StopProducing` was called from the destructor of `BatchConverter`, but with `use_threads=false` a hang occurs prior to this point in the code.
    
    ### Are these changes tested?
    
    Yes - a test that early-closes a plan reader.
    
    ### Are there any user-facing changes?
    
    Yes; this will fix a hang visible to users.
    
    * Closes: #35935
    
    Lead-authored-by: Yaron Gvili <rt...@hotmail.com>
    Co-authored-by: rtpsw <rt...@hotmail.com>
    Signed-off-by: Li Jin <ic...@gmail.com>
---
 cpp/src/arrow/acero/exec_plan.cc | 38 +++++++++++++++++++++++++++++---------
 cpp/src/arrow/acero/exec_plan.h  |  4 ++++
 cpp/src/arrow/acero/plan_test.cc | 17 +++++++++++++++++
 3 files changed, 50 insertions(+), 9 deletions(-)

diff --git a/cpp/src/arrow/acero/exec_plan.cc b/cpp/src/arrow/acero/exec_plan.cc
index 2f03159de5..541e5fed62 100644
--- a/cpp/src/arrow/acero/exec_plan.cc
+++ b/cpp/src/arrow/acero/exec_plan.cc
@@ -999,20 +999,30 @@ struct BatchConverter {
   std::shared_ptr<ExecPlan> exec_plan;
 };
 
+// Convert a `Declaration` using `QueryOptions` to a `RecordBatch` generator.
+// Additional outputs:
+// * `out_schema` is the schema for the generated record batches
+// * `out_plan` is the backing `ExecPlan`, which may be stopped to cancel the generation
 Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> DeclarationToRecordBatchGenerator(
     Declaration declaration, QueryOptions options,
-    ::arrow::internal::Executor* cpu_executor, std::shared_ptr<Schema>* out_schema) {
+    ::arrow::internal::Executor* cpu_executor, std::shared_ptr<Schema>* out_schema,
+    std::shared_ptr<ExecPlan>* out_plan) {
+  if (out_plan == nullptr) {
+    std::shared_ptr<ExecPlan> tmp_plan;
+    return DeclarationToRecordBatchGenerator(std::move(declaration), std::move(options),
+                                             cpu_executor, out_schema, &tmp_plan);
+  }
   auto converter = std::make_shared<BatchConverter>();
   ExecContext exec_ctx(options.memory_pool, cpu_executor, options.function_registry);
-  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> plan,
-                        ExecPlan::Make(options, exec_ctx));
+  std::shared_ptr<ExecPlan>& plan = *out_plan;
+  ARROW_ASSIGN_OR_RAISE(plan, ExecPlan::Make(options, exec_ctx));
   Declaration with_sink = Declaration::Sequence(
       {declaration,
        {"sink", SinkNodeOptions(&converter->exec_batch_gen, &converter->schema)}});
   ARROW_RETURN_NOT_OK(with_sink.AddToPlan(plan.get()));
   ARROW_RETURN_NOT_OK(plan->Validate());
   plan->StartProducing();
-  converter->exec_plan = std::move(plan);
+  converter->exec_plan = plan;
   ARROW_ASSIGN_OR_RAISE(*out_schema, converter->InitializeSchema(options.field_names));
   return [conv = std::move(converter)] { return (*conv)(); };
 }
@@ -1025,6 +1035,7 @@ Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(Declaration decla
     return Status::Invalid("Cannot use synchronous methods with a custom CPU executor");
   }
   std::shared_ptr<Schema> schema;
+  std::shared_ptr<ExecPlan> plan;
   auto batch_iterator = std::make_unique<Iterator<std::shared_ptr<RecordBatch>>>(
       ::arrow::internal::IterateSynchronously<std::shared_ptr<RecordBatch>>(
           [&](::arrow::internal::Executor* executor)
@@ -1032,14 +1043,16 @@ Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(Declaration decla
             ExecContext exec_ctx(options.memory_pool, executor,
                                  options.function_registry);
             return DeclarationToRecordBatchGenerator(declaration, std::move(options),
-                                                     executor, &schema);
+                                                     executor, &schema, &plan);
           },
           options.use_threads));
 
   struct PlanReader : RecordBatchReader {
-    PlanReader(std::shared_ptr<Schema> schema,
+    PlanReader(std::shared_ptr<ExecPlan> plan, std::shared_ptr<Schema> schema,
                std::unique_ptr<Iterator<std::shared_ptr<RecordBatch>>> iterator)
-        : schema_(std::move(schema)), iterator_(std::move(iterator)) {}
+        : plan_(std::move(plan)),
+          schema_(std::move(schema)),
+          iterator_(std::move(iterator)) {}
 
     std::shared_ptr<Schema> schema() const override { return schema_; }
 
@@ -1056,19 +1069,26 @@ Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(Declaration decla
         return Status::OK();
       }
       // End plan and read from generator until finished
+      plan_->StopProducing();
       std::shared_ptr<RecordBatch> batch;
       do {
-        ARROW_RETURN_NOT_OK(ReadNext(&batch));
+        Status st = ReadNext(&batch);
+        if (!st.ok()) {
+          if (st.IsCancelled()) break;  // plan cancelled, so closing is done
+          return st;
+        }
       } while (batch != nullptr);
       iterator_.reset();
       return Status::OK();
     }
 
+    std::shared_ptr<ExecPlan> plan_;
     std::shared_ptr<Schema> schema_;
     std::unique_ptr<Iterator<std::shared_ptr<RecordBatch>>> iterator_;
   };
 
-  return std::make_unique<PlanReader>(std::move(schema), std::move(batch_iterator));
+  return std::make_unique<PlanReader>(std::move(plan), std::move(schema),
+                                      std::move(batch_iterator));
 }
 
 Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(
diff --git a/cpp/src/arrow/acero/exec_plan.h b/cpp/src/arrow/acero/exec_plan.h
index 72cccf6b7c..04303aa951 100644
--- a/cpp/src/arrow/acero/exec_plan.h
+++ b/cpp/src/arrow/acero/exec_plan.h
@@ -751,6 +751,10 @@ DeclarationToBatchesAsync(Declaration declaration, ExecContext exec_context);
 /// fills up.
 ///
 /// If a custom exec context is provided then the value of `use_threads` will be ignored.
+///
+/// The returned RecordBatchReader can be closed early to cancel the computation of record
+/// batches. In this case, only errors encountered by the computation may be reported. In
+/// particular, no cancellation error may be reported.
 ARROW_ACERO_EXPORT Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(
     Declaration declaration, bool use_threads = true,
     MemoryPool* memory_pool = default_memory_pool(),
diff --git a/cpp/src/arrow/acero/plan_test.cc b/cpp/src/arrow/acero/plan_test.cc
index 5f62550177..ff7d2d7eca 100644
--- a/cpp/src/arrow/acero/plan_test.cc
+++ b/cpp/src/arrow/acero/plan_test.cc
@@ -767,6 +767,23 @@ TEST(ExecPlanExecution, DeclarationToReader) {
                                   reader->Next());
 }
 
+TEST(ExecPlanExecution, DeclarationToReaderWithEarlyClose) {
+  auto random_data = MakeRandomBatches(schema({field("a", int8())}), /*num_batches=*/100);
+  auto plan = Declaration::Sequence(
+      {{"source", SourceNodeOptions(random_data.schema, random_data.gen(false, false))}});
+  ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatchReader> reader,
+                       DeclarationToReader(plan, /*use_threads=*/false));
+
+  // read only a few batches
+  for (size_t i = 0; i < 10; i++) {
+    ASSERT_OK(reader->Next());
+  }
+  // then close
+  ASSERT_OK(reader->Close());
+  EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, HasSubstr("already closed reader"),
+                                  reader->Next());
+}
+
 TEST(ExecPlanExecution, ConsumingSinkNames) {
   struct SchemaKeepingConsumer : public SinkNodeConsumer {
     std::shared_ptr<Schema> schema_;