You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ic...@apache.org on 2023/06/21 15:20:20 UTC
[arrow] branch main updated: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false` (#35953)
This is an automated email from the ASF dual-hosted git repository.
icexelloss pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new b451b34a26 GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false` (#35953)
b451b34a26 is described below
commit b451b34a260caae52182608b76f3e450e522b59e
Author: rtpsw <rt...@hotmail.com>
AuthorDate: Wed Jun 21 18:20:10 2023 +0300
GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false` (#35953)
### What changes are included in this PR?
The execution plan set-up code is refactored to call `StopProducing` on the plan in the reader's `Close`. Originally, `StopProducing` was called from the destructor of `BatchConverter`, but with `use_threads=false` a hang occurs prior to this point in the code.
### Are these changes tested?
Yes - a test that early-closes a plan reader.
### Are there any user-facing changes?
Yes; this will fix a hang visible to users.
* Closes: #35935
Lead-authored-by: Yaron Gvili <rt...@hotmail.com>
Co-authored-by: rtpsw <rt...@hotmail.com>
Signed-off-by: Li Jin <ic...@gmail.com>
---
cpp/src/arrow/acero/exec_plan.cc | 38 +++++++++++++++++++++++++++++---------
cpp/src/arrow/acero/exec_plan.h | 4 ++++
cpp/src/arrow/acero/plan_test.cc | 17 +++++++++++++++++
3 files changed, 50 insertions(+), 9 deletions(-)
diff --git a/cpp/src/arrow/acero/exec_plan.cc b/cpp/src/arrow/acero/exec_plan.cc
index 2f03159de5..541e5fed62 100644
--- a/cpp/src/arrow/acero/exec_plan.cc
+++ b/cpp/src/arrow/acero/exec_plan.cc
@@ -999,20 +999,30 @@ struct BatchConverter {
std::shared_ptr<ExecPlan> exec_plan;
};
+// Convert a `Declaration` using `QueryOptions` to a `RecordBatch` generator.
+// Additional outputs:
+// * `out_schema` is the schema for the generated record batches
+// * `out_plan` is the backing `ExecPlan`, which may be stopped to cancel the generation
Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> DeclarationToRecordBatchGenerator(
Declaration declaration, QueryOptions options,
- ::arrow::internal::Executor* cpu_executor, std::shared_ptr<Schema>* out_schema) {
+ ::arrow::internal::Executor* cpu_executor, std::shared_ptr<Schema>* out_schema,
+ std::shared_ptr<ExecPlan>* out_plan) {
+ if (out_plan == nullptr) {
+ std::shared_ptr<ExecPlan> tmp_plan;
+ return DeclarationToRecordBatchGenerator(std::move(declaration), std::move(options),
+ cpu_executor, out_schema, &tmp_plan);
+ }
auto converter = std::make_shared<BatchConverter>();
ExecContext exec_ctx(options.memory_pool, cpu_executor, options.function_registry);
- ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> plan,
- ExecPlan::Make(options, exec_ctx));
+ std::shared_ptr<ExecPlan>& plan = *out_plan;
+ ARROW_ASSIGN_OR_RAISE(plan, ExecPlan::Make(options, exec_ctx));
Declaration with_sink = Declaration::Sequence(
{declaration,
{"sink", SinkNodeOptions(&converter->exec_batch_gen, &converter->schema)}});
ARROW_RETURN_NOT_OK(with_sink.AddToPlan(plan.get()));
ARROW_RETURN_NOT_OK(plan->Validate());
plan->StartProducing();
- converter->exec_plan = std::move(plan);
+ converter->exec_plan = plan;
ARROW_ASSIGN_OR_RAISE(*out_schema, converter->InitializeSchema(options.field_names));
return [conv = std::move(converter)] { return (*conv)(); };
}
@@ -1025,6 +1035,7 @@ Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(Declaration decla
return Status::Invalid("Cannot use synchronous methods with a custom CPU executor");
}
std::shared_ptr<Schema> schema;
+ std::shared_ptr<ExecPlan> plan;
auto batch_iterator = std::make_unique<Iterator<std::shared_ptr<RecordBatch>>>(
::arrow::internal::IterateSynchronously<std::shared_ptr<RecordBatch>>(
[&](::arrow::internal::Executor* executor)
@@ -1032,14 +1043,16 @@ Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(Declaration decla
ExecContext exec_ctx(options.memory_pool, executor,
options.function_registry);
return DeclarationToRecordBatchGenerator(declaration, std::move(options),
- executor, &schema);
+ executor, &schema, &plan);
},
options.use_threads));
struct PlanReader : RecordBatchReader {
- PlanReader(std::shared_ptr<Schema> schema,
+ PlanReader(std::shared_ptr<ExecPlan> plan, std::shared_ptr<Schema> schema,
std::unique_ptr<Iterator<std::shared_ptr<RecordBatch>>> iterator)
- : schema_(std::move(schema)), iterator_(std::move(iterator)) {}
+ : plan_(std::move(plan)),
+ schema_(std::move(schema)),
+ iterator_(std::move(iterator)) {}
std::shared_ptr<Schema> schema() const override { return schema_; }
@@ -1056,19 +1069,26 @@ Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(Declaration decla
return Status::OK();
}
// End plan and read from generator until finished
+ plan_->StopProducing();
std::shared_ptr<RecordBatch> batch;
do {
- ARROW_RETURN_NOT_OK(ReadNext(&batch));
+ Status st = ReadNext(&batch);
+ if (!st.ok()) {
+ if (st.IsCancelled()) break; // plan cancelled, so closing is done
+ return st;
+ }
} while (batch != nullptr);
iterator_.reset();
return Status::OK();
}
+ std::shared_ptr<ExecPlan> plan_;
std::shared_ptr<Schema> schema_;
std::unique_ptr<Iterator<std::shared_ptr<RecordBatch>>> iterator_;
};
- return std::make_unique<PlanReader>(std::move(schema), std::move(batch_iterator));
+ return std::make_unique<PlanReader>(std::move(plan), std::move(schema),
+ std::move(batch_iterator));
}
Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(
diff --git a/cpp/src/arrow/acero/exec_plan.h b/cpp/src/arrow/acero/exec_plan.h
index 72cccf6b7c..04303aa951 100644
--- a/cpp/src/arrow/acero/exec_plan.h
+++ b/cpp/src/arrow/acero/exec_plan.h
@@ -751,6 +751,10 @@ DeclarationToBatchesAsync(Declaration declaration, ExecContext exec_context);
/// fills up.
///
/// If a custom exec context is provided then the value of `use_threads` will be ignored.
+///
+/// The returned RecordBatchReader can be closed early to cancel the computation of record
+/// batches. In this case, only errors encountered by the computation may be reported. In
+/// particular, no cancellation error may be reported.
ARROW_ACERO_EXPORT Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(
Declaration declaration, bool use_threads = true,
MemoryPool* memory_pool = default_memory_pool(),
diff --git a/cpp/src/arrow/acero/plan_test.cc b/cpp/src/arrow/acero/plan_test.cc
index 5f62550177..ff7d2d7eca 100644
--- a/cpp/src/arrow/acero/plan_test.cc
+++ b/cpp/src/arrow/acero/plan_test.cc
@@ -767,6 +767,23 @@ TEST(ExecPlanExecution, DeclarationToReader) {
reader->Next());
}
+TEST(ExecPlanExecution, DeclarationToReaderWithEarlyClose) {
+ auto random_data = MakeRandomBatches(schema({field("a", int8())}), /*num_batches=*/100);
+ auto plan = Declaration::Sequence(
+ {{"source", SourceNodeOptions(random_data.schema, random_data.gen(false, false))}});
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatchReader> reader,
+ DeclarationToReader(plan, /*use_threads=*/false));
+
+ // read only a few batches
+ for (size_t i = 0; i < 10; i++) {
+ ASSERT_OK(reader->Next());
+ }
+ // then close
+ ASSERT_OK(reader->Close());
+ EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, HasSubstr("already closed reader"),
+ reader->Next());
+}
+
TEST(ExecPlanExecution, ConsumingSinkNames) {
struct SchemaKeepingConsumer : public SinkNodeConsumer {
std::shared_ptr<Schema> schema_;