You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2023/01/13 00:55:52 UTC
[arrow] branch master updated: GH-33212: [C++][Python] Add use_threads to pyarrow.substrait.run_query (#33623)
This is an automated email from the ASF dual-hosted git repository.
westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new e1027dc70f GH-33212: [C++][Python] Add use_threads to pyarrow.substrait.run_query (#33623)
e1027dc70f is described below
commit e1027dc70f8344257baa30fe385c5a8154f366a9
Author: Weston Pace <we...@gmail.com>
AuthorDate: Thu Jan 12 16:55:45 2023 -0800
GH-33212: [C++][Python] Add use_threads to pyarrow.substrait.run_query (#33623)
Also adds memory_pool and & function_registry to the various DeclarationToXyz methods. Converts `ExecuteSerializedPlan` to `DeclarationToReader` instead of the bespoke thing it was doing before.
* Closes: #33212
Lead-authored-by: Weston Pace <we...@gmail.com>
Co-authored-by: Vibhatha Lakmal Abeykoon <vi...@users.noreply.github.com>
Signed-off-by: Weston Pace <we...@gmail.com>
---
cpp/src/arrow/compute/exec/exec_plan.cc | 87 +++++++++++--------
cpp/src/arrow/compute/exec/exec_plan.h | 68 ++++++++++-----
cpp/src/arrow/engine/substrait/serde.cc | 22 +++++
cpp/src/arrow/engine/substrait/serde.h | 17 ++++
cpp/src/arrow/engine/substrait/util.cc | 112 ++-----------------------
cpp/src/arrow/engine/substrait/util.h | 16 +++-
python/pyarrow/_substrait.pyx | 10 ++-
python/pyarrow/includes/libarrow_substrait.pxd | 3 +-
python/pyarrow/tests/test_substrait.py | 22 +++--
9 files changed, 185 insertions(+), 172 deletions(-)
diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc
index b8886619d7..88cd298d2c 100644
--- a/cpp/src/arrow/compute/exec/exec_plan.cc
+++ b/cpp/src/arrow/compute/exec/exec_plan.cc
@@ -563,23 +563,28 @@ Future<std::shared_ptr<Table>> DeclarationToTableAsync(Declaration declaration,
return exec_plan->finished().Then([exec_plan, output_table] { return *output_table; });
}
-Future<std::shared_ptr<Table>> DeclarationToTableAsync(Declaration declaration,
- bool use_threads) {
+Future<std::shared_ptr<Table>> DeclarationToTableAsync(
+ Declaration declaration, bool use_threads, MemoryPool* memory_pool,
+ FunctionRegistry* function_registry) {
if (use_threads) {
- return DeclarationToTableAsync(std::move(declaration), *threaded_exec_context());
+ ExecContext ctx(memory_pool, ::arrow::internal::GetCpuThreadPool(),
+ function_registry);
+ return DeclarationToTableAsync(std::move(declaration), ctx);
} else {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ThreadPool> tpool, ThreadPool::Make(1));
- ExecContext ctx(default_memory_pool(), tpool.get());
+ ExecContext ctx(memory_pool, tpool.get(), function_registry);
return DeclarationToTableAsync(std::move(declaration), ctx)
.Then([tpool](const std::shared_ptr<Table>& table) { return table; });
}
}
Result<std::shared_ptr<Table>> DeclarationToTable(Declaration declaration,
- bool use_threads) {
+ bool use_threads,
+ MemoryPool* memory_pool,
+ FunctionRegistry* function_registry) {
return ::arrow::internal::RunSynchronously<Future<std::shared_ptr<Table>>>(
- [declaration = std::move(declaration)](::arrow::internal::Executor* executor) {
- ExecContext ctx(default_memory_pool(), executor);
+ [=, declaration = std::move(declaration)](::arrow::internal::Executor* executor) {
+ ExecContext ctx(memory_pool, executor, function_registry);
return DeclarationToTableAsync(std::move(declaration), ctx);
},
use_threads);
@@ -594,12 +599,15 @@ Future<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatchesAsync(
}
Future<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatchesAsync(
- Declaration declaration, bool use_threads) {
+ Declaration declaration, bool use_threads, MemoryPool* memory_pool,
+ FunctionRegistry* function_registry) {
if (use_threads) {
- return DeclarationToBatchesAsync(std::move(declaration), *threaded_exec_context());
+ ExecContext ctx(memory_pool, ::arrow::internal::GetCpuThreadPool(),
+ function_registry);
+ return DeclarationToBatchesAsync(std::move(declaration), ctx);
} else {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ThreadPool> tpool, ThreadPool::Make(1));
- ExecContext ctx(default_memory_pool(), tpool.get());
+ ExecContext ctx(memory_pool, tpool.get(), function_registry);
return DeclarationToBatchesAsync(std::move(declaration), ctx)
.Then([tpool](const std::vector<std::shared_ptr<RecordBatch>>& batches) {
return batches;
@@ -608,11 +616,12 @@ Future<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatchesAsync(
}
Result<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatches(
- Declaration declaration, bool use_threads) {
+ Declaration declaration, bool use_threads, MemoryPool* memory_pool,
+ FunctionRegistry* function_registry) {
return ::arrow::internal::RunSynchronously<
Future<std::vector<std::shared_ptr<RecordBatch>>>>(
- [declaration = std::move(declaration)](::arrow::internal::Executor* executor) {
- ExecContext ctx(default_memory_pool(), executor);
+ [=, declaration = std::move(declaration)](::arrow::internal::Executor* executor) {
+ ExecContext ctx(memory_pool, executor, function_registry);
return DeclarationToBatchesAsync(std::move(declaration), ctx);
},
use_threads);
@@ -641,24 +650,27 @@ Future<BatchesWithCommonSchema> DeclarationToExecBatchesAsync(Declaration declar
});
}
-Future<BatchesWithCommonSchema> DeclarationToExecBatchesAsync(Declaration declaration,
- bool use_threads) {
+Future<BatchesWithCommonSchema> DeclarationToExecBatchesAsync(
+ Declaration declaration, bool use_threads, MemoryPool* memory_pool,
+ FunctionRegistry* function_registry) {
if (use_threads) {
- return DeclarationToExecBatchesAsync(std::move(declaration),
- *threaded_exec_context());
+ ExecContext ctx(memory_pool, ::arrow::internal::GetCpuThreadPool(),
+ function_registry);
+ return DeclarationToExecBatchesAsync(std::move(declaration), ctx);
} else {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ThreadPool> tpool, ThreadPool::Make(1));
- ExecContext ctx(default_memory_pool(), tpool.get());
+ ExecContext ctx(memory_pool, tpool.get(), function_registry);
return DeclarationToExecBatchesAsync(std::move(declaration), ctx)
.Then([tpool](const BatchesWithCommonSchema& batches) { return batches; });
}
}
-Result<BatchesWithCommonSchema> DeclarationToExecBatches(Declaration declaration,
- bool use_threads) {
+Result<BatchesWithCommonSchema> DeclarationToExecBatches(
+ Declaration declaration, bool use_threads, MemoryPool* memory_pool,
+ FunctionRegistry* function_registry) {
return ::arrow::internal::RunSynchronously<Future<BatchesWithCommonSchema>>(
- [declaration = std::move(declaration)](::arrow::internal::Executor* executor) {
- ExecContext ctx(default_memory_pool(), executor);
+ [=, declaration = std::move(declaration)](::arrow::internal::Executor* executor) {
+ ExecContext ctx(memory_pool, executor, function_registry);
return DeclarationToExecBatchesAsync(std::move(declaration), ctx);
},
use_threads);
@@ -680,20 +692,25 @@ Future<> DeclarationToStatusAsync(Declaration declaration, ExecContext exec_cont
return exec_plan->finished().Then([exec_plan]() {});
}
-Future<> DeclarationToStatusAsync(Declaration declaration, bool use_threads) {
+Future<> DeclarationToStatusAsync(Declaration declaration, bool use_threads,
+ MemoryPool* memory_pool,
+ FunctionRegistry* function_registry) {
if (use_threads) {
- return DeclarationToStatusAsync(std::move(declaration), *threaded_exec_context());
+ ExecContext ctx(memory_pool, ::arrow::internal::GetCpuThreadPool(),
+ function_registry);
+ return DeclarationToStatusAsync(std::move(declaration), ctx);
} else {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ThreadPool> tpool, ThreadPool::Make(1));
- ExecContext ctx(default_memory_pool(), tpool.get());
+ ExecContext ctx(memory_pool, tpool.get(), function_registry);
return DeclarationToStatusAsync(std::move(declaration), ctx).Then([tpool]() {});
}
}
-Status DeclarationToStatus(Declaration declaration, bool use_threads) {
+Status DeclarationToStatus(Declaration declaration, bool use_threads,
+ MemoryPool* memory_pool, FunctionRegistry* function_registry) {
return ::arrow::internal::RunSynchronously<Future<>>(
- [declaration = std::move(declaration)](::arrow::internal::Executor* executor) {
- ExecContext ctx(default_memory_pool(), executor);
+ [=, declaration = std::move(declaration)](::arrow::internal::Executor* executor) {
+ ExecContext ctx(memory_pool, executor, function_registry);
return DeclarationToStatusAsync(std::move(declaration), ctx);
},
use_threads);
@@ -738,11 +755,9 @@ struct BatchConverter {
};
Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> DeclarationToRecordBatchGenerator(
- Declaration declaration, ::arrow::internal::Executor* executor,
- std::shared_ptr<Schema>* out_schema) {
+ Declaration declaration, ExecContext exec_ctx, std::shared_ptr<Schema>* out_schema) {
auto converter = std::make_shared<BatchConverter>();
- ExecContext exec_context(default_memory_pool(), executor);
- ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> plan, ExecPlan::Make(exec_context));
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> plan, ExecPlan::Make(exec_ctx));
Declaration with_sink = Declaration::Sequence(
{declaration,
{"sink", SinkNodeOptions(&converter->exec_batch_gen, &converter->schema)}});
@@ -754,14 +769,16 @@ Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> DeclarationToRecordBatchGen
}
} // namespace
-Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(Declaration declaration,
- bool use_threads) {
+Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(
+ Declaration declaration, bool use_threads, MemoryPool* memory_pool,
+ FunctionRegistry* function_registry) {
std::shared_ptr<Schema> schema;
auto batch_iterator = std::make_unique<Iterator<std::shared_ptr<RecordBatch>>>(
::arrow::internal::IterateSynchronously<std::shared_ptr<RecordBatch>>(
[&](::arrow::internal::Executor* executor)
-> Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> {
- return DeclarationToRecordBatchGenerator(declaration, executor, &schema);
+ ExecContext exec_ctx(memory_pool, executor, function_registry);
+ return DeclarationToRecordBatchGenerator(declaration, exec_ctx, &schema);
},
use_threads));
diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h
index 09fab00727..f7519bbd88 100644
--- a/cpp/src/arrow/compute/exec/exec_plan.h
+++ b/cpp/src/arrow/compute/exec/exec_plan.h
@@ -426,24 +426,36 @@ struct ARROW_EXPORT Declaration {
/// \brief Utility method to run a declaration and collect the results into a table
///
+/// \param use_threads If `use_threads` is false then all CPU work will be done on the
+/// calling thread. I/O tasks will still happen on the I/O executor
+/// and may be multi-threaded (but should not use significant CPU
+/// resources).
+/// \param memory_pool The memory pool to use for allocations made while running the plan.
+/// \param function_registry The function registry to use for function execution. If null
+/// then the default function registry will be used.
+///
/// This method will add a sink node to the declaration to collect results into a
/// table. It will then create an ExecPlan from the declaration, start the exec plan,
/// block until the plan has finished, and return the created table.
-///
-/// If `use_threads` is false then all CPU work will be done on the calling thread. I/O
-/// tasks will still happen on the I/O executor and may be multi-threaded (but should
-/// not use significant CPU resources)
-ARROW_EXPORT Result<std::shared_ptr<Table>> DeclarationToTable(Declaration declaration,
- bool use_threads = true);
+ARROW_EXPORT Result<std::shared_ptr<Table>> DeclarationToTable(
+ Declaration declaration, bool use_threads = true,
+ MemoryPool* memory_pool = default_memory_pool(),
+ FunctionRegistry* function_registry = NULLPTR);
/// \brief Asynchronous version of \see DeclarationToTable
///
-/// The behavior of use_threads is slightly different than the synchronous version since
-/// we cannot run synchronously on the calling thread. Instead, if use_threads=false then
-/// a new thread pool will be created with a single thread and this will be used for all
-/// compute work.
+/// \param use_threads The behavior of use_threads is slightly different than the
+/// synchronous version since we cannot run synchronously on the
+/// calling thread. Instead, if use_threads=false then a new thread
+/// pool will be created with a single thread and this will be used for
+/// all compute work.
+/// \param memory_pool The memory pool to use for allocations made while running the plan.
+/// \param function_registry The function registry to use for function execution. If null
+/// then the default function registry will be used.
ARROW_EXPORT Future<std::shared_ptr<Table>> DeclarationToTableAsync(
- Declaration declaration, bool use_threads = true);
+ Declaration declaration, bool use_threads = true,
+ MemoryPool* memory_pool = default_memory_pool(),
+ FunctionRegistry* function_registry = NULLPTR);
/// \brief Overload of \see DeclarationToTableAsync accepting a custom exec context
///
@@ -463,13 +475,17 @@ struct BatchesWithCommonSchema {
///
/// \see DeclarationToTable for details on threading & execution
ARROW_EXPORT Result<BatchesWithCommonSchema> DeclarationToExecBatches(
- Declaration declaration, bool use_threads = true);
+ Declaration declaration, bool use_threads = true,
+ MemoryPool* memory_pool = default_memory_pool(),
+ FunctionRegistry* function_registry = NULLPTR);
/// \brief Asynchronous version of \see DeclarationToExecBatches
///
/// \see DeclarationToTableAsync for details on threading & execution
ARROW_EXPORT Future<BatchesWithCommonSchema> DeclarationToExecBatchesAsync(
- Declaration declaration, bool use_threads = true);
+ Declaration declaration, bool use_threads = true,
+ MemoryPool* memory_pool = default_memory_pool(),
+ FunctionRegistry* function_registry = NULLPTR);
/// \brief Overload of \see DeclarationToExecBatchesAsync accepting a custom exec context
///
@@ -481,13 +497,17 @@ ARROW_EXPORT Future<BatchesWithCommonSchema> DeclarationToExecBatchesAsync(
///
/// \see DeclarationToTable for details on threading & execution
ARROW_EXPORT Result<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatches(
- Declaration declaration, bool use_threads = true);
+ Declaration declaration, bool use_threads = true,
+ MemoryPool* memory_pool = default_memory_pool(),
+ FunctionRegistry* function_registry = NULLPTR);
/// \brief Asynchronous version of \see DeclarationToBatches
///
/// \see DeclarationToTableAsync for details on threading & execution
ARROW_EXPORT Future<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatchesAsync(
- Declaration declaration, bool use_threads = true);
+ Declaration declaration, bool use_threads = true,
+ MemoryPool* memory_pool = default_memory_pool(),
+ FunctionRegistry* function_registry = NULLPTR);
/// \brief Overload of \see DeclarationToBatchesAsync accepting a custom exec context
///
@@ -511,7 +531,13 @@ ARROW_EXPORT Future<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatc
///
/// If a custom exec context is provided then the value of `use_threads` will be ignored.
ARROW_EXPORT Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(
- Declaration declaration, bool use_threads = true);
+ Declaration declaration, bool use_threads = true,
+ MemoryPool* memory_pool = default_memory_pool(),
+ FunctionRegistry* function_registry = NULLPTR);
+
+/// \brief Overload of \see DeclarationToReader accepting a custom exec context
+ARROW_EXPORT Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(
+ Declaration declaration, ExecContext exec_context);
/// \brief Utility method to run a declaration and ignore results
///
@@ -519,7 +545,9 @@ ARROW_EXPORT Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(
/// example, when the plan ends with a write node.
///
/// \see DeclarationToTable for details on threading & execution
-ARROW_EXPORT Status DeclarationToStatus(Declaration declaration, bool use_threads = true);
+ARROW_EXPORT Status DeclarationToStatus(Declaration declaration, bool use_threads = true,
+ MemoryPool* memory_pool = default_memory_pool(),
+ FunctionRegistry* function_registry = NULLPTR);
/// \brief Asynchronous version of \see DeclarationToStatus
///
@@ -527,8 +555,10 @@ ARROW_EXPORT Status DeclarationToStatus(Declaration declaration, bool use_thread
/// example, when the plan ends with a write node.
///
/// \see DeclarationToTableAsync for details on threading & execution
-ARROW_EXPORT Future<> DeclarationToStatusAsync(Declaration declaration,
- bool use_threads = true);
+ARROW_EXPORT Future<> DeclarationToStatusAsync(
+ Declaration declaration, bool use_threads = true,
+ MemoryPool* memory_pool = default_memory_pool(),
+ FunctionRegistry* function_registry = NULLPTR);
/// \brief Overload of \see DeclarationToStatusAsync accepting a custom exec context
///
diff --git a/cpp/src/arrow/engine/substrait/serde.cc b/cpp/src/arrow/engine/substrait/serde.cc
index ac5de90326..f588aff444 100644
--- a/cpp/src/arrow/engine/substrait/serde.cc
+++ b/cpp/src/arrow/engine/substrait/serde.cc
@@ -127,6 +127,13 @@ DeclarationFactory MakeWriteDeclarationFactory(
};
}
+DeclarationFactory MakeNoSinkDeclarationFactory() {
+ return [](compute::Declaration input,
+ std::vector<std::string> names) -> Result<compute::Declaration> {
+ return input;
+ };
+}
+
// FIXME - Replace with actual version that includes the change
constexpr uint32_t kMinimumMajorVersion = 0;
constexpr uint32_t kMinimumMinorVersion = 19;
@@ -188,6 +195,21 @@ Result<std::vector<compute::Declaration>> DeserializePlans(
registry, ext_set_out, conversion_options);
}
+ARROW_ENGINE_EXPORT Result<compute::Declaration> DeserializePlan(
+ const Buffer& buf, const ExtensionIdRegistry* registry, ExtensionSet* ext_set_out,
+ const ConversionOptions& conversion_options) {
+ ARROW_ASSIGN_OR_RAISE(std::vector<compute::Declaration> top_level_decls,
+ DeserializePlans(buf, MakeNoSinkDeclarationFactory(), registry,
+ ext_set_out, conversion_options));
+ if (top_level_decls.empty()) {
+ return Status::Invalid("No RelRoot in plan");
+ }
+ if (top_level_decls.size() != 1) {
+ return Status::Invalid("Multiple top level declarations found in Substrait plan");
+ }
+ return top_level_decls[0];
+}
+
namespace {
Result<std::shared_ptr<compute::ExecPlan>> MakeSingleDeclarationPlan(
diff --git a/cpp/src/arrow/engine/substrait/serde.h b/cpp/src/arrow/engine/substrait/serde.h
index 0f61ba209f..a4e3b3df14 100644
--- a/cpp/src/arrow/engine/substrait/serde.h
+++ b/cpp/src/arrow/engine/substrait/serde.h
@@ -139,6 +139,23 @@ ARROW_ENGINE_EXPORT Result<std::shared_ptr<compute::ExecPlan>> DeserializePlan(
const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out = NULLPTR,
const ConversionOptions& conversion_options = {});
+/// \brief Deserializes a Substrait Plan message to a Declaration
+///
+/// The plan will not contain any sink nodes and will be suitable for use in any
+/// of the arrow::compute::DeclarationToXyz methods.
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
+/// Plan is returned here.
+/// \param[in] conversion_options options to control how the conversion is to be done.
+/// \return A declaration representing the Substrait plan
+ARROW_ENGINE_EXPORT Result<compute::Declaration> DeserializePlan(
+ const Buffer& buf, const ExtensionIdRegistry* registry = NULLPTR,
+ ExtensionSet* ext_set_out = NULLPTR,
+ const ConversionOptions& conversion_options = {});
+
/// \brief Deserializes a Substrait Type message to the corresponding Arrow type
///
/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Type
diff --git a/cpp/src/arrow/engine/substrait/util.cc b/cpp/src/arrow/engine/substrait/util.cc
index 22091d9699..e0c876d21d 100644
--- a/cpp/src/arrow/engine/substrait/util.cc
+++ b/cpp/src/arrow/engine/substrait/util.cc
@@ -40,113 +40,15 @@ namespace arrow {
namespace engine {
-namespace {
-
-/// \brief A SinkNodeConsumer specialized to output ExecBatches via PushGenerator
-class SubstraitSinkConsumer : public compute::SinkNodeConsumer {
- public:
- explicit SubstraitSinkConsumer(
- arrow::PushGenerator<std::optional<compute::ExecBatch>>::Producer producer)
- : producer_(std::move(producer)) {}
-
- Status Consume(compute::ExecBatch batch) override {
- // Consume a batch of data
- bool did_push = producer_.Push(batch);
- if (!did_push) return Status::Invalid("Producer closed already");
- return Status::OK();
- }
-
- Status Init(const std::shared_ptr<Schema>& schema,
- compute::BackpressureControl* backpressure_control,
- compute::ExecPlan* plan) override {
- schema_ = schema;
- return Status::OK();
- }
-
- Future<> Finish() override {
- ARROW_UNUSED(producer_.Close());
- return Future<>::MakeFinished();
- }
-
- std::shared_ptr<Schema> schema() { return schema_; }
-
- private:
- arrow::PushGenerator<std::optional<compute::ExecBatch>>::Producer producer_;
- std::shared_ptr<Schema> schema_;
-};
-
-/// \brief An executor to run a Substrait Query
-/// This interface is provided as a utility when creating language
-/// bindings for consuming a Substrait plan.
-class SubstraitExecutor {
- public:
- explicit SubstraitExecutor(std::shared_ptr<compute::ExecPlan> plan,
- compute::ExecContext exec_context,
- const ConversionOptions& conversion_options = {})
- : plan_(std::move(plan)),
- plan_started_(false),
- exec_context_(exec_context),
- conversion_options_(conversion_options) {}
-
- ~SubstraitExecutor() { ARROW_UNUSED(this->Close()); }
-
- Result<std::shared_ptr<RecordBatchReader>> Execute() {
- for (const compute::Declaration& decl : declarations_) {
- RETURN_NOT_OK(decl.AddToPlan(plan_.get()).status());
- }
- RETURN_NOT_OK(plan_->Validate());
- plan_started_ = true;
- RETURN_NOT_OK(plan_->StartProducing());
- auto schema = sink_consumer_->schema();
- std::shared_ptr<RecordBatchReader> sink_reader = compute::MakeGeneratorReader(
- std::move(schema), std::move(generator_), exec_context_.memory_pool());
- return sink_reader;
- }
-
- Status Close() {
- if (plan_started_) return plan_->finished().status();
- return Status::OK();
- }
-
- Status Init(const Buffer& substrait_buffer, const ExtensionIdRegistry* registry) {
- if (substrait_buffer.size() == 0) {
- return Status::Invalid("Empty substrait plan is passed.");
- }
- sink_consumer_ = std::make_shared<SubstraitSinkConsumer>(generator_.producer());
- std::function<std::shared_ptr<compute::SinkNodeConsumer>()> consumer_factory = [&] {
- return sink_consumer_;
- };
- ARROW_ASSIGN_OR_RAISE(
- declarations_, engine::DeserializePlans(substrait_buffer, consumer_factory,
- registry, nullptr, conversion_options_));
- return Status::OK();
- }
-
- private:
- arrow::PushGenerator<std::optional<compute::ExecBatch>> generator_;
- std::vector<compute::Declaration> declarations_;
- std::shared_ptr<compute::ExecPlan> plan_;
- bool plan_started_;
- compute::ExecContext exec_context_;
- std::shared_ptr<SubstraitSinkConsumer> sink_consumer_;
- const ConversionOptions& conversion_options_;
-};
-
-} // namespace
-
Result<std::shared_ptr<RecordBatchReader>> ExecuteSerializedPlan(
const Buffer& substrait_buffer, const ExtensionIdRegistry* registry,
- compute::FunctionRegistry* func_registry,
- const ConversionOptions& conversion_options) {
- compute::ExecContext exec_context(arrow::default_memory_pool(),
- ::arrow::internal::GetCpuThreadPool(), func_registry);
- ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make(exec_context));
- SubstraitExecutor executor(std::move(plan), exec_context, conversion_options);
- RETURN_NOT_OK(executor.Init(substrait_buffer, registry));
- ARROW_ASSIGN_OR_RAISE(auto sink_reader, executor.Execute());
- // check closing here, not in destructor, to expose error to caller
- RETURN_NOT_OK(executor.Close());
- return sink_reader;
+ compute::FunctionRegistry* func_registry, const ConversionOptions& conversion_options,
+ bool use_threads, MemoryPool* memory_pool) {
+ ARROW_ASSIGN_OR_RAISE(compute::Declaration plan,
+ DeserializePlan(substrait_buffer, registry,
+ /*ext_set_out=*/nullptr, conversion_options));
+ return compute::DeclarationToReader(std::move(plan), use_threads, memory_pool,
+ func_registry);
}
Result<std::shared_ptr<Buffer>> SerializeJsonPlan(const std::string& substrait_json) {
diff --git a/cpp/src/arrow/engine/substrait/util.h b/cpp/src/arrow/engine/substrait/util.h
index 38b6447b00..9f8bd80488 100644
--- a/cpp/src/arrow/engine/substrait/util.h
+++ b/cpp/src/arrow/engine/substrait/util.h
@@ -38,10 +38,24 @@ namespace engine {
using PythonTableProvider =
std::function<Result<std::shared_ptr<Table>>(const std::vector<std::string>&)>;
+/// \brief Utility method to run a Substrait plan
+/// \param substrait_buffer The plan to run, must be in binary protobuf format
+/// \param registry A registry of extension functions to make available to the plan
+/// If null then the default registry will be used.
+/// \param memory_pool The memory pool the plan should use to make allocations.
+/// \param func_registry A registry of functions used for execution expressions.
+/// `registry` maps from Substrait function IDs to "names". These
+/// names will be provided to `func_registry` to get the actual
+/// kernel.
+/// \param conversion_options Options to control plan deserialization
+/// \param use_threads If True then the CPU thread pool will be used for CPU work. If
+/// False then all work will be done on the calling thread.
+/// \return A record batch reader that will read out the results
ARROW_ENGINE_EXPORT Result<std::shared_ptr<RecordBatchReader>> ExecuteSerializedPlan(
const Buffer& substrait_buffer, const ExtensionIdRegistry* registry = NULLPTR,
compute::FunctionRegistry* func_registry = NULLPTR,
- const ConversionOptions& conversion_options = {});
+ const ConversionOptions& conversion_options = {}, bool use_threads = true,
+ MemoryPool* memory_pool = default_memory_pool());
/// \brief Get a Serialized Plan from a Substrait JSON plan.
/// This is a helper method for Python tests.
diff --git a/python/pyarrow/_substrait.pyx b/python/pyarrow/_substrait.pyx
index 7974aea152..da061d8cd3 100644
--- a/python/pyarrow/_substrait.pyx
+++ b/python/pyarrow/_substrait.pyx
@@ -48,7 +48,7 @@ cdef CDeclaration _create_named_table_provider(dict named_args, const std_vector
no_c_inputs, c_input_node_opts)
-def run_query(plan, table_provider=None):
+def run_query(plan, *, table_provider=None, use_threads=True):
"""
Execute a Substrait plan and read the results as a RecordBatchReader.
@@ -60,6 +60,9 @@ def run_query(plan, table_provider=None):
A function to resolve any NamedTable relation to a table.
The function will receive a single argument which will be a list
of strings representing the table name and should return a pyarrow.Table.
+ use_threads : bool, default True
+ If True then multiple threads will be used to run the query. If False then
+ all CPU intensive work will be done on the calling thread.
Returns
-------
@@ -123,7 +126,9 @@ def run_query(plan, table_provider=None):
shared_ptr[CBuffer] c_buf_plan
function[CNamedTableProvider] c_named_table_provider
CConversionOptions c_conversion_options
+ c_bool c_use_threads
+ c_use_threads = use_threads
if isinstance(plan, bytes):
c_buf_plan = pyarrow_unwrap_buffer(py_buffer(plan))
elif isinstance(plan, Buffer):
@@ -141,7 +146,8 @@ def run_query(plan, table_provider=None):
with nogil:
c_res_reader = ExecuteSerializedPlan(
- deref(c_buf_plan), default_extension_id_registry(), GetFunctionRegistry(), c_conversion_options)
+ deref(c_buf_plan), default_extension_id_registry(),
+ GetFunctionRegistry(), c_conversion_options, c_use_threads)
c_reader = GetResultValue(c_res_reader)
diff --git a/python/pyarrow/includes/libarrow_substrait.pxd b/python/pyarrow/includes/libarrow_substrait.pxd
index 04990380d9..b3ad00516d 100644
--- a/python/pyarrow/includes/libarrow_substrait.pxd
+++ b/python/pyarrow/includes/libarrow_substrait.pxd
@@ -51,6 +51,7 @@ cdef extern from "arrow/engine/substrait/extension_set.h" \
cdef extern from "arrow/engine/substrait/util.h" namespace "arrow::engine" nogil:
CResult[shared_ptr[CRecordBatchReader]] ExecuteSerializedPlan(
const CBuffer& substrait_buffer, const ExtensionIdRegistry* registry,
- CFunctionRegistry* func_registry, const CConversionOptions& conversion_options)
+ CFunctionRegistry* func_registry, const CConversionOptions& conversion_options,
+ c_bool use_threads)
CResult[shared_ptr[CBuffer]] SerializeJsonPlan(const c_string& substrait_json)
diff --git a/python/pyarrow/tests/test_substrait.py b/python/pyarrow/tests/test_substrait.py
index 50b16cd6b6..bd32178fea 100644
--- a/python/pyarrow/tests/test_substrait.py
+++ b/python/pyarrow/tests/test_substrait.py
@@ -41,7 +41,8 @@ def _write_dummy_data_to_disk(tmpdir, file_name, table):
return path
-def test_run_serialized_query(tmpdir):
+@pytest.mark.parametrize("use_threads", [True, False])
+def test_run_serialized_query(tmpdir, use_threads):
substrait_query = """
{
"version": { "major": 9999 },
@@ -80,7 +81,7 @@ def test_run_serialized_query(tmpdir):
buf = pa._substrait._parse_json_plan(query)
- reader = substrait.run_query(buf)
+ reader = substrait.run_query(buf, use_threads=use_threads)
res_tb = reader.read_all()
assert table.select(["foo"]) == res_tb.select(["foo"])
@@ -110,12 +111,13 @@ def test_invalid_plan():
}
"""
buf = pa._substrait._parse_json_plan(tobytes(query))
- exec_message = "Empty substrait plan is passed."
+ exec_message = "No RelRoot in plan"
with pytest.raises(ArrowInvalid, match=exec_message):
substrait.run_query(buf)
-def test_binary_conversion_with_json_options(tmpdir):
+@pytest.mark.parametrize("use_threads", [True, False])
+def test_binary_conversion_with_json_options(tmpdir, use_threads):
substrait_query = """
{
"version": { "major": 9999 },
@@ -156,7 +158,7 @@ def test_binary_conversion_with_json_options(tmpdir):
"FILENAME_PLACEHOLDER", pathlib.Path(path).as_uri()))
buf = pa._substrait._parse_json_plan(tobytes(query))
- reader = substrait.run_query(buf)
+ reader = substrait.run_query(buf, use_threads=use_threads)
res_tb = reader.read_all()
assert table.select(["bar"]) == res_tb.select(["bar"])
@@ -182,7 +184,8 @@ def test_get_supported_functions():
'functions_arithmetic.yaml', 'sum')
-def test_named_table():
+@pytest.mark.parametrize("use_threads", [True, False])
+def test_named_table(use_threads):
test_table_1 = pa.Table.from_pydict({"x": [1, 2, 3]})
test_table_2 = pa.Table.from_pydict({"x": [4, 5, 6]})
@@ -222,7 +225,8 @@ def test_named_table():
"""
buf = pa._substrait._parse_json_plan(tobytes(substrait_query))
- reader = pa.substrait.run_query(buf, table_provider)
+ reader = pa.substrait.run_query(
+ buf, table_provider=table_provider, use_threads=use_threads)
res_tb = reader.read_all()
assert res_tb == test_table_1
@@ -266,7 +270,7 @@ def test_named_table_invalid_table_name():
buf = pa._substrait._parse_json_plan(tobytes(substrait_query))
exec_message = "Invalid NamedTable Source"
with pytest.raises(ArrowInvalid, match=exec_message):
- substrait.run_query(buf, table_provider)
+ substrait.run_query(buf, table_provider=table_provider)
def test_named_table_empty_names():
@@ -308,4 +312,4 @@ def test_named_table_empty_names():
buf = pa._substrait._parse_json_plan(tobytes(query))
exec_message = "names for NamedTable not provided"
with pytest.raises(ArrowInvalid, match=exec_message):
- substrait.run_query(buf, table_provider)
+ substrait.run_query(buf, table_provider=table_provider)