You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/05/04 20:57:50 UTC

[GitHub] [arrow] westonpace commented on a diff in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

westonpace commented on code in PR #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r865311231


##########
cpp/src/arrow/engine/substrait/util.h:
##########
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include "arrow/engine/substrait/api.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/optional.h"
+
+namespace arrow {
+
+namespace engine {
+
+/// \brief Retrieve a RecordBatchReader from a Substrait plan.
+ARROW_ENGINE_EXPORT Result<std::shared_ptr<RecordBatchReader>> ExecuteSerializedPlan(
+    std::shared_ptr<Buffer> substrait_buffer);

Review Comment:
   This should be `const Buffer&`.  That will require some changes to `SubstraitExecutor` but the way it is written right now we are keeping the plan alive for the entire time we execute it and that is incorrect.  We only need the `substrait_buffer` to create the `ExecPlan` (happens in `SubstraitExecutor::Init`).  Once that is done we should allow this buffer to be freed (it's not likely to be a huge chunk of RAM but it seems like good hygiene and the API will be clearer to the user).



##########
cpp/src/arrow/engine/substrait/util.cc:
##########
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/async_util.h"
+
+namespace arrow {
+
+namespace engine {
+
+namespace {
+/// \brief A SinkNodeConsumer specialized to output ExecBatches via PushGenerator
+class SubstraitSinkConsumer : public compute::SinkNodeConsumer {
+ public:
+  explicit SubstraitSinkConsumer(
+      AsyncGenerator<util::optional<compute::ExecBatch>>* generator)
+      : producer_(MakeProducer(generator)) {}
+
+  Status Consume(compute::ExecBatch batch) override {
+    // Consume a batch of data
+    bool did_push = producer_.Push(batch);
+    if (!did_push) return Status::ExecutionError("Producer closed already");
+    return Status::OK();
+  }
+
+  Status Init(const std::shared_ptr<Schema>& schema,
+              compute::BackpressureControl* backpressure_control) override {
+    return Status::OK();
+  }
+
+  static arrow::PushGenerator<util::optional<compute::ExecBatch>>::Producer MakeProducer(
+      AsyncGenerator<util::optional<compute::ExecBatch>>* out_gen);
+
+  Future<> Finish() override {
+    producer_.Push(IterationEnd<util::optional<compute::ExecBatch>>());
+    ARROW_UNUSED(producer_.Close());
+    return Future<>::MakeFinished();
+  }
+
+ private:
+  PushGenerator<util::optional<compute::ExecBatch>>::Producer producer_;
+};
+
+/// \brief An executor to run a Substrait Query
+/// This interface is provided as a utility when creating language
+/// bindings for consuming a Substrait plan.
+class SubstraitExecutor {
+ public:
+  explicit SubstraitExecutor(
+      std::shared_ptr<Buffer> substrait_buffer,
+      AsyncGenerator<util::optional<compute::ExecBatch>>* generator,
+      std::shared_ptr<compute::ExecPlan> plan, compute::ExecContext exec_context)
+      : substrait_buffer_(std::move(substrait_buffer)),
+        generator_(generator),
+        plan_(std::move(plan)),
+        exec_context_(exec_context) {}
+
+  Result<std::shared_ptr<RecordBatchReader>> Execute() {
+    for (const compute::Declaration& decl : declarations_) {
+      RETURN_NOT_OK(decl.AddToPlan(plan_.get()).status());
+    }
+    RETURN_NOT_OK(plan_->Validate());
+    RETURN_NOT_OK(plan_->StartProducing());
+    // schema of the output can be obtained by the output_schema
+    // of the input to the sink node.
+    auto schema = plan_->sinks()[0]->inputs()[0]->output_schema();
+    std::shared_ptr<RecordBatchReader> sink_reader = compute::MakeGeneratorReader(
+        schema, std::move(*generator_), exec_context_.memory_pool());
+    return sink_reader;
+  }
+
+  Status Close() { return plan_->finished().status(); }
+
+  Status Init() {

Review Comment:
   To solve the `substrait_buffer_` shared_ptr dilemma I mentioned earlier you could pass `const Buffer&` into `Init` and get rid of `substrait_buffer_`.



##########
cpp/src/arrow/engine/substrait/util.cc:
##########
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/async_util.h"
+
+namespace arrow {
+
+namespace engine {
+
+namespace {
+/// \brief A SinkNodeConsumer specialized to output ExecBatches via PushGenerator
+class SubstraitSinkConsumer : public compute::SinkNodeConsumer {
+ public:
+  explicit SubstraitSinkConsumer(
+      AsyncGenerator<util::optional<compute::ExecBatch>>* generator)
+      : producer_(MakeProducer(generator)) {}
+
+  Status Consume(compute::ExecBatch batch) override {
+    // Consume a batch of data
+    bool did_push = producer_.Push(batch);
+    if (!did_push) return Status::ExecutionError("Producer closed already");
+    return Status::OK();
+  }
+
+  Status Init(const std::shared_ptr<Schema>& schema,
+              compute::BackpressureControl* backpressure_control) override {
+    return Status::OK();
+  }
+
+  static arrow::PushGenerator<util::optional<compute::ExecBatch>>::Producer MakeProducer(
+      AsyncGenerator<util::optional<compute::ExecBatch>>* out_gen);
+
+  Future<> Finish() override {
+    producer_.Push(IterationEnd<util::optional<compute::ExecBatch>>());
+    ARROW_UNUSED(producer_.Close());
+    return Future<>::MakeFinished();
+  }
+
+ private:
+  PushGenerator<util::optional<compute::ExecBatch>>::Producer producer_;
+};
+
+/// \brief An executor to run a Substrait Query
+/// This interface is provided as a utility when creating language
+/// bindings for consuming a Substrait plan.
+class SubstraitExecutor {
+ public:
+  explicit SubstraitExecutor(
+      std::shared_ptr<Buffer> substrait_buffer,
+      AsyncGenerator<util::optional<compute::ExecBatch>>* generator,
+      std::shared_ptr<compute::ExecPlan> plan, compute::ExecContext exec_context)
+      : substrait_buffer_(std::move(substrait_buffer)),
+        generator_(generator),
+        plan_(std::move(plan)),
+        exec_context_(exec_context) {}
+
+  Result<std::shared_ptr<RecordBatchReader>> Execute() {
+    for (const compute::Declaration& decl : declarations_) {
+      RETURN_NOT_OK(decl.AddToPlan(plan_.get()).status());
+    }
+    RETURN_NOT_OK(plan_->Validate());
+    RETURN_NOT_OK(plan_->StartProducing());
+    // schema of the output can be obtained by the output_schema
+    // of the input to the sink node.
+    auto schema = plan_->sinks()[0]->inputs()[0]->output_schema();
+    std::shared_ptr<RecordBatchReader> sink_reader = compute::MakeGeneratorReader(
+        schema, std::move(*generator_), exec_context_.memory_pool());
+    return sink_reader;
+  }
+
+  Status Close() { return plan_->finished().status(); }
+
+  Status Init() {
+    if (substrait_buffer_ == NULLPTR) {
+      return Status::Invalid("Buffer containing Substrait plan is null.");
+    }
+    std::function<std::shared_ptr<compute::SinkNodeConsumer>()> consumer_factory = [&] {
+      return std::make_shared<SubstraitSinkConsumer>(generator_);
+    };
+    ARROW_ASSIGN_OR_RAISE(declarations_,
+                          engine::DeserializePlan(*substrait_buffer_, consumer_factory));
+    return Status::OK();
+  }
+
+ private:
+  std::shared_ptr<Buffer> substrait_buffer_;
+  AsyncGenerator<util::optional<compute::ExecBatch>>* generator_;
+  std::vector<compute::Declaration> declarations_;
+  std::shared_ptr<compute::ExecPlan> plan_;
+  compute::ExecContext exec_context_;
+};
+
+arrow::PushGenerator<util::optional<compute::ExecBatch>>::Producer
+SubstraitSinkConsumer::MakeProducer(
+    AsyncGenerator<util::optional<compute::ExecBatch>>* out_gen) {
+  arrow::PushGenerator<util::optional<compute::ExecBatch>> push_gen;
+  auto out = push_gen.producer();
+  *out_gen = std::move(push_gen);
+  return out;
+}
+}  // namespace
+
+Result<std::shared_ptr<RecordBatchReader>> ExecuteSerializedPlan(
+    std::shared_ptr<Buffer> substrait_buffer) {
+  arrow::AsyncGenerator<util::optional<compute::ExecBatch>> sink_gen;
+  ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make());
+  compute::ExecContext exec_context(arrow::default_memory_pool(),
+                                    ::arrow::internal::GetCpuThreadPool());

Review Comment:
   I don't think we need to fix this today but someday we should allow the user to supply their own `ExecContext`.  It should be an optional parameter that defaults to `default_exec_context` but right now `default_exec_context` is serial which is a little silly so I don't want to make the change now.  Can you add a `TODO(ARROW-15732)` comment here and I'll pick it up with that change (which updates `default_exec_context` to be parallel).



##########
cpp/src/arrow/engine/substrait/util.cc:
##########
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/async_util.h"
+
+namespace arrow {
+
+namespace engine {
+
+namespace {
+/// \brief A SinkNodeConsumer specialized to output ExecBatches via PushGenerator
+class SubstraitSinkConsumer : public compute::SinkNodeConsumer {
+ public:
+  explicit SubstraitSinkConsumer(
+      AsyncGenerator<util::optional<compute::ExecBatch>>* generator)
+      : producer_(MakeProducer(generator)) {}
+
+  Status Consume(compute::ExecBatch batch) override {
+    // Consume a batch of data
+    bool did_push = producer_.Push(batch);
+    if (!did_push) return Status::ExecutionError("Producer closed already");
+    return Status::OK();
+  }
+
+  Status Init(const std::shared_ptr<Schema>& schema,
+              compute::BackpressureControl* backpressure_control) override {
+    return Status::OK();
+  }

Review Comment:
   Store the schema as a member variable.



##########
cpp/src/arrow/engine/substrait/util.cc:
##########
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/async_util.h"
+
+namespace arrow {
+
+namespace engine {
+
+namespace {
+/// \brief A SinkNodeConsumer specialized to output ExecBatches via PushGenerator
+class SubstraitSinkConsumer : public compute::SinkNodeConsumer {
+ public:
+  explicit SubstraitSinkConsumer(
+      AsyncGenerator<util::optional<compute::ExecBatch>>* generator)
+      : producer_(MakeProducer(generator)) {}
+
+  Status Consume(compute::ExecBatch batch) override {
+    // Consume a batch of data
+    bool did_push = producer_.Push(batch);
+    if (!did_push) return Status::ExecutionError("Producer closed already");
+    return Status::OK();
+  }
+
+  Status Init(const std::shared_ptr<Schema>& schema,
+              compute::BackpressureControl* backpressure_control) override {

Review Comment:
   Eventually we will want to write in backpressure here.  It doesn't make sense to do as part of this PR because I think it's related to the question of whether the substrait consumer should just allow us to use the `SinkNode` directly (instead of a `ConsumingSinkNode`).  Please make a follow-up PR.



##########
python/pyarrow/tests/test_substrait.py:
##########
@@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import sys
+import pathlib
+import pytest
+
+import pyarrow as pa
+from pyarrow.lib import tobytes
+from pyarrow.lib import ArrowInvalid
+
+try:
+    import pyarrow.parquet as pq
+except ImportError:
+    pq = None
+
+try:
+    import pyarrow.substrait as substrait
+except ImportError:
+    substrait = None
+
+# Marks all of the tests in this module
+# Ignore these with pytest ... -m 'not substrait'
+pytestmark = [pytest.mark.parquet, pytest.mark.substrait]

Review Comment:
   So at the moment `ARROW_SUBSTRAIT` implies `ARROW_DATASET`.  However, that doesn't always have to be the case.  One could build Substrait without datasets.  They would be limited to plans which operate on in-memory (or in-plan) tables.  I can't imagine we will really run into it.  But still, it might be good to add `pytest.mark.dataset` just in case.



##########
cpp/src/arrow/engine/substrait/util.cc:
##########
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/async_util.h"
+
+namespace arrow {
+
+namespace engine {
+
+namespace {
+/// \brief A SinkNodeConsumer specialized to output ExecBatches via PushGenerator
+class SubstraitSinkConsumer : public compute::SinkNodeConsumer {
+ public:
+  explicit SubstraitSinkConsumer(
+      AsyncGenerator<util::optional<compute::ExecBatch>>* generator)
+      : producer_(MakeProducer(generator)) {}
+
+  Status Consume(compute::ExecBatch batch) override {
+    // Consume a batch of data
+    bool did_push = producer_.Push(batch);
+    if (!did_push) return Status::ExecutionError("Producer closed already");
+    return Status::OK();
+  }
+
+  Status Init(const std::shared_ptr<Schema>& schema,
+              compute::BackpressureControl* backpressure_control) override {
+    return Status::OK();
+  }
+
+  static arrow::PushGenerator<util::optional<compute::ExecBatch>>::Producer MakeProducer(
+      AsyncGenerator<util::optional<compute::ExecBatch>>* out_gen);
+
+  Future<> Finish() override {
+    producer_.Push(IterationEnd<util::optional<compute::ExecBatch>>());
+    ARROW_UNUSED(producer_.Close());
+    return Future<>::MakeFinished();
+  }
+
+ private:
+  PushGenerator<util::optional<compute::ExecBatch>>::Producer producer_;
+};
+
+/// \brief An executor to run a Substrait Query
+/// This interface is provided as a utility when creating language
+/// bindings for consuming a Substrait plan.
+class SubstraitExecutor {
+ public:
+  explicit SubstraitExecutor(
+      std::shared_ptr<Buffer> substrait_buffer,
+      AsyncGenerator<util::optional<compute::ExecBatch>>* generator,
+      std::shared_ptr<compute::ExecPlan> plan, compute::ExecContext exec_context)
+      : substrait_buffer_(std::move(substrait_buffer)),
+        generator_(generator),
+        plan_(std::move(plan)),
+        exec_context_(exec_context) {}
+
+  Result<std::shared_ptr<RecordBatchReader>> Execute() {
+    for (const compute::Declaration& decl : declarations_) {
+      RETURN_NOT_OK(decl.AddToPlan(plan_.get()).status());
+    }
+    RETURN_NOT_OK(plan_->Validate());
+    RETURN_NOT_OK(plan_->StartProducing());
+    // schema of the output can be obtained by the output_schema
+    // of the input to the sink node.
+    auto schema = plan_->sinks()[0]->inputs()[0]->output_schema();
+    std::shared_ptr<RecordBatchReader> sink_reader = compute::MakeGeneratorReader(
+        schema, std::move(*generator_), exec_context_.memory_pool());
+    return sink_reader;
+  }
+
+  Status Close() { return plan_->finished().status(); }
+
+  Status Init() {
+    if (substrait_buffer_ == NULLPTR) {
+      return Status::Invalid("Buffer containing Substrait plan is null.");
+    }
+    std::function<std::shared_ptr<compute::SinkNodeConsumer>()> consumer_factory = [&] {
+      return std::make_shared<SubstraitSinkConsumer>(generator_);
+    };
+    ARROW_ASSIGN_OR_RAISE(declarations_,
+                          engine::DeserializePlan(*substrait_buffer_, consumer_factory));
+    return Status::OK();
+  }
+
+ private:
+  std::shared_ptr<Buffer> substrait_buffer_;
+  AsyncGenerator<util::optional<compute::ExecBatch>>* generator_;
+  std::vector<compute::Declaration> declarations_;
+  std::shared_ptr<compute::ExecPlan> plan_;
+  compute::ExecContext exec_context_;
+};
+
+arrow::PushGenerator<util::optional<compute::ExecBatch>>::Producer
+SubstraitSinkConsumer::MakeProducer(
+    AsyncGenerator<util::optional<compute::ExecBatch>>* out_gen) {
+  arrow::PushGenerator<util::optional<compute::ExecBatch>> push_gen;
+  auto out = push_gen.producer();
+  *out_gen = std::move(push_gen);
+  return out;
+}
+}  // namespace
+
+Result<std::shared_ptr<RecordBatchReader>> ExecuteSerializedPlan(
+    std::shared_ptr<Buffer> substrait_buffer) {
+  arrow::AsyncGenerator<util::optional<compute::ExecBatch>> sink_gen;

Review Comment:
   There is no reason to declare this here.  Just initialize `SubstraitExecutor::generator_` to `nullptr`.



##########
cpp/src/arrow/engine/substrait/util.cc:
##########
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/async_util.h"
+
+namespace arrow {
+
+namespace engine {
+
+namespace {
+/// \brief A SinkNodeConsumer specialized to output ExecBatches via PushGenerator
+class SubstraitSinkConsumer : public compute::SinkNodeConsumer {
+ public:
+  explicit SubstraitSinkConsumer(
+      AsyncGenerator<util::optional<compute::ExecBatch>>* generator)
+      : producer_(MakeProducer(generator)) {}
+
+  Status Consume(compute::ExecBatch batch) override {
+    // Consume a batch of data
+    bool did_push = producer_.Push(batch);
+    if (!did_push) return Status::ExecutionError("Producer closed already");
+    return Status::OK();
+  }
+
+  Status Init(const std::shared_ptr<Schema>& schema,
+              compute::BackpressureControl* backpressure_control) override {
+    return Status::OK();
+  }
+
+  static arrow::PushGenerator<util::optional<compute::ExecBatch>>::Producer MakeProducer(
+      AsyncGenerator<util::optional<compute::ExecBatch>>* out_gen);
+
+  Future<> Finish() override {
+    producer_.Push(IterationEnd<util::optional<compute::ExecBatch>>());
+    ARROW_UNUSED(producer_.Close());
+    return Future<>::MakeFinished();
+  }
+
+ private:
+  PushGenerator<util::optional<compute::ExecBatch>>::Producer producer_;
+};
+
+/// \brief An executor to run a Substrait Query
+/// This interface is provided as a utility when creating language
+/// bindings for consuming a Substrait plan.
+class SubstraitExecutor {
+ public:
+  explicit SubstraitExecutor(
+      std::shared_ptr<Buffer> substrait_buffer,
+      AsyncGenerator<util::optional<compute::ExecBatch>>* generator,
+      std::shared_ptr<compute::ExecPlan> plan, compute::ExecContext exec_context)
+      : substrait_buffer_(std::move(substrait_buffer)),
+        generator_(generator),
+        plan_(std::move(plan)),
+        exec_context_(exec_context) {}
+
+  Result<std::shared_ptr<RecordBatchReader>> Execute() {
+    for (const compute::Declaration& decl : declarations_) {
+      RETURN_NOT_OK(decl.AddToPlan(plan_.get()).status());
+    }
+    RETURN_NOT_OK(plan_->Validate());
+    RETURN_NOT_OK(plan_->StartProducing());
+    // schema of the output can be obtained by the output_schema
+    // of the input to the sink node.
+    auto schema = plan_->sinks()[0]->inputs()[0]->output_schema();
+    std::shared_ptr<RecordBatchReader> sink_reader = compute::MakeGeneratorReader(
+        schema, std::move(*generator_), exec_context_.memory_pool());
+    return sink_reader;
+  }
+
+  Status Close() { return plan_->finished().status(); }
+
+  Status Init() {
+    if (substrait_buffer_ == NULLPTR) {
+      return Status::Invalid("Buffer containing Substrait plan is null.");
+    }
+    std::function<std::shared_ptr<compute::SinkNodeConsumer>()> consumer_factory = [&] {
+      return std::make_shared<SubstraitSinkConsumer>(generator_);

Review Comment:
   Make the `std::shared_ptr<SubstraitSinkConsumer>` a member variable for `SubstraitExecutor`.



##########
cpp/src/arrow/engine/substrait/util.cc:
##########
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/async_util.h"
+
+namespace arrow {
+
+namespace engine {
+
+namespace {
+/// \brief A SinkNodeConsumer specialized to output ExecBatches via PushGenerator
+class SubstraitSinkConsumer : public compute::SinkNodeConsumer {
+ public:
+  explicit SubstraitSinkConsumer(
+      AsyncGenerator<util::optional<compute::ExecBatch>>* generator)
+      : producer_(MakeProducer(generator)) {}
+
+  Status Consume(compute::ExecBatch batch) override {
+    // Consume a batch of data
+    bool did_push = producer_.Push(batch);
+    if (!did_push) return Status::ExecutionError("Producer closed already");
+    return Status::OK();
+  }
+
+  Status Init(const std::shared_ptr<Schema>& schema,
+              compute::BackpressureControl* backpressure_control) override {
+    return Status::OK();
+  }
+
+  static arrow::PushGenerator<util::optional<compute::ExecBatch>>::Producer MakeProducer(
+      AsyncGenerator<util::optional<compute::ExecBatch>>* out_gen);
+
+  Future<> Finish() override {
+    producer_.Push(IterationEnd<util::optional<compute::ExecBatch>>());
+    ARROW_UNUSED(producer_.Close());
+    return Future<>::MakeFinished();
+  }
+
+ private:
+  PushGenerator<util::optional<compute::ExecBatch>>::Producer producer_;
+};
+
+/// \brief An executor to run a Substrait Query
+/// This interface is provided as a utility when creating language
+/// bindings for consuming a Substrait plan.
+class SubstraitExecutor {
+ public:
+  explicit SubstraitExecutor(
+      std::shared_ptr<Buffer> substrait_buffer,
+      AsyncGenerator<util::optional<compute::ExecBatch>>* generator,
+      std::shared_ptr<compute::ExecPlan> plan, compute::ExecContext exec_context)
+      : substrait_buffer_(std::move(substrait_buffer)),
+        generator_(generator),
+        plan_(std::move(plan)),
+        exec_context_(exec_context) {}
+
+  Result<std::shared_ptr<RecordBatchReader>> Execute() {
+    for (const compute::Declaration& decl : declarations_) {
+      RETURN_NOT_OK(decl.AddToPlan(plan_.get()).status());
+    }
+    RETURN_NOT_OK(plan_->Validate());
+    RETURN_NOT_OK(plan_->StartProducing());
+    // schema of the output can be obtained by the output_schema
+    // of the input to the sink node.
+    auto schema = plan_->sinks()[0]->inputs()[0]->output_schema();
+    std::shared_ptr<RecordBatchReader> sink_reader = compute::MakeGeneratorReader(
+        schema, std::move(*generator_), exec_context_.memory_pool());
+    return sink_reader;
+  }
+
+  Status Close() { return plan_->finished().status(); }
+
+  Status Init() {
+    if (substrait_buffer_ == NULLPTR) {
+      return Status::Invalid("Buffer containing Substrait plan is null.");
+    }
+    std::function<std::shared_ptr<compute::SinkNodeConsumer>()> consumer_factory = [&] {
+      return std::make_shared<SubstraitSinkConsumer>(generator_);
+    };
+    ARROW_ASSIGN_OR_RAISE(declarations_,
+                          engine::DeserializePlan(*substrait_buffer_, consumer_factory));
+    return Status::OK();
+  }
+
+ private:
+  std::shared_ptr<Buffer> substrait_buffer_;
+  AsyncGenerator<util::optional<compute::ExecBatch>>* generator_;
+  std::vector<compute::Declaration> declarations_;
+  std::shared_ptr<compute::ExecPlan> plan_;
+  compute::ExecContext exec_context_;
+};
+
+arrow::PushGenerator<util::optional<compute::ExecBatch>>::Producer
+SubstraitSinkConsumer::MakeProducer(
+    AsyncGenerator<util::optional<compute::ExecBatch>>* out_gen) {
+  arrow::PushGenerator<util::optional<compute::ExecBatch>> push_gen;
+  auto out = push_gen.producer();
+  *out_gen = std::move(push_gen);
+  return out;
+}
+}  // namespace
+
+Result<std::shared_ptr<RecordBatchReader>> ExecuteSerializedPlan(
+    std::shared_ptr<Buffer> substrait_buffer) {
+  arrow::AsyncGenerator<util::optional<compute::ExecBatch>> sink_gen;
+  ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make());
+  compute::ExecContext exec_context(arrow::default_memory_pool(),
+                                    ::arrow::internal::GetCpuThreadPool());
+  arrow::engine::SubstraitExecutor executor(std::move(substrait_buffer), &sink_gen,
+                                            std::move(plan), exec_context);
+  RETURN_NOT_OK(executor.Init());
+  ARROW_ASSIGN_OR_RAISE(auto sink_reader, executor.Execute());
+  RETURN_NOT_OK(executor.Close());

Review Comment:
   This doesn't seem right to me.  We shouldn't be waiting for the execution plan to finish.  If we are going to do that we may as well return `arrow::Table` or `RecordBatchVector`.  However, if we don't wait for it to close I see we will end up with a new dilemma.  Something needs to keep the `ExecPlan` alive.
   
   Instead, can we have `SubstraitExecutor` extend `RecordBatchReader`?  `Execute` can store the `std::shared_ptr<RecordBatchReader>` as a member variable.  You can just proxy the two virtual methods:
   
   ```
   std::shared_ptr<Schema> SubstraitExecutor::schema() const override {
     return sink_reader->schema();
   }
   Status SubstraitExecutor::ReadNext(std::shared_ptr<RecordBatch>* batch) override {
     return sink_reader->ReadNext(batch);
   }
   ```
   
   This should allow us to return `SubstraitExecutor` itself instead of returning `sink_reader`.  We should also, arguably, add a context manager for it in python so that `SubstraitExecutor::Close` is called as soon as python is done with it.  Since `RecordBatchReader` does not have a close method I suppose that will be a new wrinkle in how we expose this to python.
   
   We could either return `std::shared_ptr<RecordBatchReader>` and the "close" could be resetting the shared pointer This would still require some cython logic and probably a new python/cython class (CloseableRecordBatchReader?).
   
   Or we could expose SubstraitExecutor in python.
   
   Or we could create the new concept of a CloseableRecordBatchReader in C++ and python.
   
   Or we could add Close to RecordBatchReader (which has a default implementation of doing nothing).
   
   I think my preference would be for the last option there.  CC @lidavidm for second opinion.



##########
cpp/src/arrow/engine/substrait/util.cc:
##########
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/async_util.h"
+
+namespace arrow {
+
+namespace engine {
+
+namespace {
+/// \brief A SinkNodeConsumer specialized to output ExecBatches via PushGenerator
+class SubstraitSinkConsumer : public compute::SinkNodeConsumer {
+ public:
+  explicit SubstraitSinkConsumer(
+      AsyncGenerator<util::optional<compute::ExecBatch>>* generator)
+      : producer_(MakeProducer(generator)) {}
+
+  Status Consume(compute::ExecBatch batch) override {
+    // Consume a batch of data
+    bool did_push = producer_.Push(batch);
+    if (!did_push) return Status::ExecutionError("Producer closed already");
+    return Status::OK();
+  }
+
+  Status Init(const std::shared_ptr<Schema>& schema,
+              compute::BackpressureControl* backpressure_control) override {
+    return Status::OK();
+  }
+
+  static arrow::PushGenerator<util::optional<compute::ExecBatch>>::Producer MakeProducer(
+      AsyncGenerator<util::optional<compute::ExecBatch>>* out_gen);
+
+  Future<> Finish() override {
+    producer_.Push(IterationEnd<util::optional<compute::ExecBatch>>());

Review Comment:
   ```suggestion
   ```
   
   There is no need for this.  When you call `Close` the push generator will take care of sending a terminal item.
   



##########
cpp/src/arrow/engine/substrait/util.cc:
##########
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/async_util.h"
+
+namespace arrow {
+
+namespace engine {
+
+namespace {
+/// \brief A SinkNodeConsumer specialized to output ExecBatches via PushGenerator
+class SubstraitSinkConsumer : public compute::SinkNodeConsumer {
+ public:
+  explicit SubstraitSinkConsumer(
+      AsyncGenerator<util::optional<compute::ExecBatch>>* generator)
+      : producer_(MakeProducer(generator)) {}
+
+  Status Consume(compute::ExecBatch batch) override {
+    // Consume a batch of data
+    bool did_push = producer_.Push(batch);
+    if (!did_push) return Status::ExecutionError("Producer closed already");
+    return Status::OK();
+  }
+
+  Status Init(const std::shared_ptr<Schema>& schema,
+              compute::BackpressureControl* backpressure_control) override {
+    return Status::OK();
+  }
+
+  static arrow::PushGenerator<util::optional<compute::ExecBatch>>::Producer MakeProducer(
+      AsyncGenerator<util::optional<compute::ExecBatch>>* out_gen);
+
+  Future<> Finish() override {
+    producer_.Push(IterationEnd<util::optional<compute::ExecBatch>>());
+    ARROW_UNUSED(producer_.Close());
+    return Future<>::MakeFinished();
+  }
+
+ private:
+  PushGenerator<util::optional<compute::ExecBatch>>::Producer producer_;
+};
+
+/// \brief An executor to run a Substrait Query
+/// This interface is provided as a utility when creating language
+/// bindings for consuming a Substrait plan.
+class SubstraitExecutor {
+ public:
+  explicit SubstraitExecutor(
+      std::shared_ptr<Buffer> substrait_buffer,
+      AsyncGenerator<util::optional<compute::ExecBatch>>* generator,
+      std::shared_ptr<compute::ExecPlan> plan, compute::ExecContext exec_context)
+      : substrait_buffer_(std::move(substrait_buffer)),
+        generator_(generator),
+        plan_(std::move(plan)),
+        exec_context_(exec_context) {}
+
+  Result<std::shared_ptr<RecordBatchReader>> Execute() {
+    for (const compute::Declaration& decl : declarations_) {
+      RETURN_NOT_OK(decl.AddToPlan(plan_.get()).status());
+    }
+    RETURN_NOT_OK(plan_->Validate());
+    RETURN_NOT_OK(plan_->StartProducing());
+    // schema of the output can be obtained by the output_schema
+    // of the input to the sink node.
+    auto schema = plan_->sinks()[0]->inputs()[0]->output_schema();

Review Comment:
   Rather than take this approach, if you store the sink consumer as a member variable (I added a comment to this effect) and, in the sink consumer's init step you store the schema as a member variable, then you can use the schema of the sink consumer here.  I.e. this line should become:
   
   ```
   auto schema = sink_consumer_->schema();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org