You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/03/24 12:42:39 UTC

[GitHub] [arrow] lidavidm commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

lidavidm commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r834250238



##########
File path: python/pyarrow/_engine.pyx
##########
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+import sys
+
+from cython.operator cimport dereference as deref
+
+from pyarrow.lib cimport *
+from pyarrow.includes.libarrow cimport *
+import pyarrow.lib as lib
+
+import numpy as np
+
+
+def run_query(plan, output_schema):

Review comment:
       Add a docstring?

##########
File path: python/pyarrow/_engine.pyx
##########
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+import sys
+
+from cython.operator cimport dereference as deref
+
+from pyarrow.lib cimport *
+from pyarrow.includes.libarrow cimport *
+import pyarrow.lib as lib
+
+import numpy as np
+
+
+def run_query(plan, output_schema):
+
+    cdef:
+        CResult[shared_ptr[CRecordBatchReader]] c_res_reader
+        shared_ptr[CRecordBatchReader] c_reader
+        shared_ptr[CSchema] c_schema
+        c_string c_plan
+        RecordBatchReader reader
+
+    c_plan = plan.encode()

Review comment:
       I think we generally use `tobytes`. Though: if `plan` is supposed to be a serialized Protobuf, shouldn't it be `bytes` in the first place?

##########
File path: python/examples/substrait/query_execution_example.py
##########
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.

Review comment:
       IIRC nothing runs the Python examples (unlike C++). Maybe make this a cookbook example instead?
   
   Also, add a unit test.

##########
File path: cpp/src/arrow/engine/substrait/util.h
##########
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+#include "arrow/engine/api.h"

Review comment:
       nit, but try not to use the `api.h` headers, they're expensive to include

##########
File path: cpp/src/arrow/engine/substrait/util.h
##########
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+#include "arrow/engine/api.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+
+namespace cp = arrow::compute;
+
+namespace engine {
+
+class ARROW_ENGINE_EXPORT SubstraitSinkConsumer : public cp::SinkNodeConsumer {
+ public:
+  explicit SubstraitSinkConsumer(
+      AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator,
+      arrow::util::BackpressureOptions backpressure = {})
+      : producer_(MakeProducer(generator, std::move(backpressure))) {}
+
+  Status Consume(cp::ExecBatch batch) override;
+
+  static arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer
+  MakeProducer(AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* out_gen,
+               arrow::util::BackpressureOptions backpressure) {
+    arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>> push_gen(
+        std::move(backpressure));
+    auto out = push_gen.producer();
+    *out_gen = std::move(push_gen);
+    return out;
+  }
+
+  Future<> Finish() override;
+
+ private:
+  PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer producer_;

Review comment:
       We should `#include` the optional header

##########
File path: cpp/examples/arrow/engine_substrait_example.cc
##########
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/api.h>
+#include <arrow/compute/api.h>
+#include <arrow/engine/api.h>
+#include <arrow/engine/substrait/util.h>
+#include "arrow/util/async_generator.h"
+#include "arrow/util/iterator.h"
+
+#include <cstdlib>
+#include <iostream>
+#include <memory>
+#include <vector>
+
+namespace eng = arrow::engine;
+namespace cp = arrow::compute;
+
+#define ABORT_ON_FAILURE(expr)                     \
+  do {                                             \
+    arrow::Status status_ = (expr);                \
+    if (!status_.ok()) {                           \
+      std::cerr << status_.message() << std::endl; \
+      abort();                                     \
+    }                                              \
+  } while (0);
+
+std::string GetSubstraitPlanFromServer(const std::string& filename) {
+  // Emulate server interaction by parsing hard coded JSON
+  std::string substrait_json = R"({
+    "relations": [
+      {"rel": {
+        "read": {
+          "base_schema": {
+            "struct": {
+              "types": [ 
+                         {"i64": {}},
+                         {"bool": {}}
+                       ]
+            },
+            "names": [
+                      "i",
+                       "b"
+                     ]
+          },
+          "local_files": {
+            "items": [
+              {
+                "uri_file": "file://FILENAME_PLACEHOLDER",
+                "format": "FILE_FORMAT_PARQUET"
+              }
+            ]
+          }
+        }
+      }}
+    ]
+  })";
+  std::string filename_placeholder = "FILENAME_PLACEHOLDER";
+  substrait_json.replace(substrait_json.find(filename_placeholder),
+                         filename_placeholder.size(), filename);
+  return substrait_json;
+}
+
+int main(int argc, char** argv) {
+  if (argc < 2) {
+    std::cout << "Please specify a parquet file to scan" << std::endl;
+    // Fake pass for CI
+    return EXIT_SUCCESS;
+  }
+  auto substrait_json = GetSubstraitPlanFromServer(argv[1]);
+
+  auto schema = arrow::schema(
+      {arrow::field("i", arrow::int64()), arrow::field("b", arrow::boolean())});
+
+  cp::ExecContext exec_context(arrow::default_memory_pool(),
+                               ::arrow::internal::GetCpuThreadPool());
+
+  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
+
+  auto maybe_plan = cp::ExecPlan::Make();
+  if (!maybe_plan.status().ok()) {
+    return EXIT_FAILURE;
+  }

Review comment:
       Create something like `Status Main();` instead so we can use the usual Arrow macros, and then just `ABORT_NOT_OK(Main())` inside `main`

##########
File path: cpp/src/arrow/engine/substrait/serde_test.cc
##########
@@ -724,5 +728,103 @@ TEST(Substrait, ExtensionSetFromPlan) {
   EXPECT_EQ(decoded_add_func.name, "add");
 }
 
+TEST(Substrait, GetRecordBatchIterator) {

Review comment:
       nit, but `GetRecordBatchReader`?

##########
File path: cpp/src/arrow/engine/substrait/util.cc
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+
+namespace arrow {
+
+namespace engine {
+
+Status SubstraitSinkConsumer::Consume(cp::ExecBatch batch) {
+  // Consume a batch of data
+  bool did_push = producer_.Push(batch);
+  if (!did_push) return Status::ExecutionError("Producer closed already");
+  return Status::OK();
+}
+
+Future<> SubstraitSinkConsumer::Finish() {
+  producer_.Push(IterationEnd<arrow::util::optional<cp::ExecBatch>>());
+  if (producer_.Close()) {
+    return Future<>::MakeFinished();
+  }
+  return Future<>::MakeFinished(
+      Status::ExecutionError("Error occurred in closing the batch producer"));
+}
+
+Status SubstraitExecutor::MakePlan() {
+  ARROW_ASSIGN_OR_RAISE(auto serialized_plan,
+                        engine::internal::SubstraitFromJSON("Plan", substrait_json_));
+
+  auto maybe_plan_json = engine::internal::SubstraitToJSON("Plan", *serialized_plan);
+  RETURN_NOT_OK(maybe_plan_json.status());
+
+  std::vector<std::shared_ptr<cp::SinkNodeConsumer>> consumers;
+  std::function<std::shared_ptr<cp::SinkNodeConsumer>()> consumer_factory = [&] {
+    // All batches produced by the plan will be fed into IgnoringConsumers:
+    consumers.emplace_back(new SubstraitSinkConsumer{generator_});
+    return consumers.back();
+  };
+
+  // Deserialize each relation tree in the substrait plan to an Arrow compute Declaration
+  ARROW_ASSIGN_OR_RAISE(declerations_,
+                        engine::DeserializePlan(*serialized_plan, consumer_factory));
+
+  // It's safe to drop the serialized plan; we don't leave references to its memory
+  serialized_plan.reset();
+
+  // Construct an empty plan (note: configure Function registry and ThreadPool here)
+  return Status::OK();
+}
+
+Result<std::shared_ptr<RecordBatchReader>> SubstraitExecutor::Execute() {
+  for (const cp::Declaration& decl : declerations_) {
+    RETURN_NOT_OK(decl.AddToPlan(plan_.get()).status());
+  }
+
+  ARROW_RETURN_NOT_OK(plan_->Validate());
+
+  ARROW_RETURN_NOT_OK(plan_->StartProducing());
+
+  std::shared_ptr<RecordBatchReader> sink_reader = cp::MakeGeneratorReader(
+      schema_, std::move(*generator_), exec_context_.memory_pool());
+  return sink_reader;
+}
+
+Status SubstraitExecutor::Finalize() {
+  ARROW_RETURN_NOT_OK(plan_->finished().status());
+  return Status::OK();
+}
+
+Result<std::shared_ptr<RecordBatchReader>> SubstraitExecutor::GetRecordBatchReader(
+    std::string& substrait_json, std::shared_ptr<arrow::Schema> schema) {
+  cp::ExecContext exec_context(arrow::default_memory_pool(),
+                               ::arrow::internal::GetCpuThreadPool());
+
+  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
+
+  ARROW_ASSIGN_OR_RAISE(auto plan, cp::ExecPlan::Make());
+
+  arrow::engine::SubstraitExecutor executor(substrait_json, &sink_gen, plan, schema,
+                                            exec_context);
+  RETURN_NOT_OK(executor.MakePlan());
+
+  ARROW_ASSIGN_OR_RAISE(auto sink_reader, executor.Execute());
+
+  RETURN_NOT_OK(executor.Finalize());

Review comment:
       I would frankly expect that the `RecordBatchReader`'s `Close` gets wired up to the executor's `Close` so that you don't have to manage their lifetimes independently, also, that would let you truly stream data right?

##########
File path: cpp/src/arrow/engine/substrait/serde_test.cc
##########
@@ -724,5 +728,103 @@ TEST(Substrait, ExtensionSetFromPlan) {
   EXPECT_EQ(decoded_add_func.name, "add");
 }
 
+TEST(Substrait, GetRecordBatchIterator) {
+  const auto parquet_root = std::getenv("PARQUET_TEST_DATA");
+  std::string dir_string(parquet_root);
+  std::stringstream ss;
+  ss << dir_string << "/binary.parquet";
+  auto file_path = ss.str();
+
+  std::string substrait_json = R"({
+    "relations": [
+      {"rel": {
+        "read": {
+          "base_schema": {
+            "struct": {
+              "types": [ 
+                         {"binary": {}}
+                       ]
+            },
+            "names": [
+                      "foo"
+                      ]
+          },
+          "local_files": {
+            "items": [
+              {
+                "uri_file": "file://FILENAME_PLACEHOLDER",
+                "format": "FILE_FORMAT_PARQUET"
+              }
+            ]
+          }
+        }
+      }}
+    ]
+  })";
+
+  std::string filename_placeholder = "FILENAME_PLACEHOLDER";
+  substrait_json.replace(substrait_json.find(filename_placeholder),
+                         filename_placeholder.size(), file_path);
+  auto in_schema = schema({field("foo", binary())});
+  AsyncGenerator<util::optional<cp::ExecBatch>> sink_gen;
+  cp::ExecContext exec_context(default_memory_pool(),
+                               arrow::internal::GetCpuThreadPool());
+  ASSERT_OK_AND_ASSIGN(auto plan, cp::ExecPlan::Make());
+  engine::SubstraitExecutor executor(substrait_json, &sink_gen, plan, in_schema,
+                                     exec_context);
+  auto status = executor.MakePlan();
+  ASSERT_OK(status);
+  ASSERT_OK_AND_ASSIGN(auto reader, executor.Execute());
+  auto finish = executor.Finalize();
+  ASSERT_OK(finish);
+  ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatchReader(reader.get()));
+  EXPECT_GT(table->num_rows(), 0);
+}
+
+TEST(Substrait, GetRecordBatchIteratorUtil) {
+  const auto parquet_root = std::getenv("PARQUET_TEST_DATA");
+  std::string dir_string(parquet_root);
+  std::stringstream ss;
+  ss << dir_string << "/binary.parquet";
+  auto file_path = ss.str();
+
+  std::string substrait_json = R"({
+    "relations": [
+      {"rel": {
+        "read": {
+          "base_schema": {
+            "struct": {
+              "types": [ 
+                         {"binary": {}}
+                       ]
+            },
+            "names": [
+                      "foo"
+                      ]
+          },
+          "local_files": {
+            "items": [
+              {
+                "uri_file": "file://FILENAME_PLACEHOLDER",
+                "format": "FILE_FORMAT_PARQUET"
+              }
+            ]
+          }
+        }
+      }}
+    ]
+  })";
+
+  std::string filename_placeholder = "FILENAME_PLACEHOLDER";
+  substrait_json.replace(substrait_json.find(filename_placeholder),
+                         filename_placeholder.size(), file_path);
+  auto in_schema = schema({field("foo", binary())});
+
+  ASSERT_OK_AND_ASSIGN(auto reader, engine::SubstraitExecutor::GetRecordBatchReader(
+                                        substrait_json, in_schema));
+  ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatchReader(reader.get()));
+  EXPECT_GT(table->num_rows(), 0);

Review comment:
       Shouldn't we know the number of expected rows?

##########
File path: python/pyarrow/engine.py
##########
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pyarrow._engine import (  # noqa
+    run_query

Review comment:
       just a nit, if we're going to indent like this

##########
File path: python/CMakeLists.txt
##########
@@ -534,6 +539,20 @@ if(PYARROW_BUILD_FLIGHT)
   set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} _flight)
 endif()
 
+# Engine
+
+if(PYARROW_BUILD_ENGINE)
+find_package(ArrowEngine REQUIRED)

Review comment:
       looks like the CMake files need formatting

##########
File path: python/pyarrow/_engine.pyx
##########
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+import sys
+
+from cython.operator cimport dereference as deref

Review comment:
       Unused imports?

##########
File path: python/pyarrow/_engine.pyx
##########
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+import sys
+
+from cython.operator cimport dereference as deref
+
+from pyarrow.lib cimport *
+from pyarrow.includes.libarrow cimport *
+import pyarrow.lib as lib
+
+import numpy as np

Review comment:
       Unused imports?

##########
File path: python/pyarrow/engine.py
##########
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pyarrow._engine import (  # noqa
+    run_query

Review comment:
       ```suggestion
       run_query,
   ```

##########
File path: cpp/src/arrow/engine/substrait/util.h
##########
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+#include "arrow/engine/api.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+
+namespace cp = arrow::compute;
+
+namespace engine {
+
+class ARROW_ENGINE_EXPORT SubstraitSinkConsumer : public cp::SinkNodeConsumer {
+ public:
+  explicit SubstraitSinkConsumer(
+      AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator,
+      arrow::util::BackpressureOptions backpressure = {})
+      : producer_(MakeProducer(generator, std::move(backpressure))) {}
+
+  Status Consume(cp::ExecBatch batch) override;
+
+  static arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer
+  MakeProducer(AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* out_gen,
+               arrow::util::BackpressureOptions backpressure) {
+    arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>> push_gen(
+        std::move(backpressure));
+    auto out = push_gen.producer();
+    *out_gen = std::move(push_gen);
+    return out;
+  }
+
+  Future<> Finish() override;
+
+ private:
+  PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer producer_;
+};
+
+class ARROW_ENGINE_EXPORT SubstraitExecutor {
+ public:
+  explicit SubstraitExecutor(
+      std::string substrait_json,
+      AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator,
+      std::shared_ptr<cp::ExecPlan> plan, std::shared_ptr<Schema> schema,
+      cp::ExecContext exec_context)
+      : substrait_json_(substrait_json),
+        generator_(generator),
+        plan_(std::move(plan)),
+        schema_(schema),
+        exec_context_(exec_context) {}
+
+  Status MakePlan();
+
+  Result<std::shared_ptr<RecordBatchReader>> Execute();
+
+  Status Finalize();
+
+  static Result<std::shared_ptr<RecordBatchReader>> GetRecordBatchReader(
+      std::string& substrait_json, std::shared_ptr<arrow::Schema> schema);
+
+ private:
+  std::string substrait_json_;
+  AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator_;
+  std::vector<cp::Declaration> declerations_;
+  std::shared_ptr<cp::ExecPlan> plan_;
+  std::shared_ptr<Schema> schema_;

Review comment:
       We should `#include` vector, memory, and string

##########
File path: cpp/src/arrow/engine/substrait/util.h
##########
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+#include "arrow/engine/api.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+
+namespace cp = arrow::compute;
+
+namespace engine {
+
+class ARROW_ENGINE_EXPORT SubstraitSinkConsumer : public cp::SinkNodeConsumer {

Review comment:
       nit: docstrings?

##########
File path: python/CMakeLists.txt
##########
@@ -534,6 +539,20 @@ if(PYARROW_BUILD_FLIGHT)
   set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} _flight)
 endif()
 
+# Engine
+
+if(PYARROW_BUILD_ENGINE)
+find_package(ArrowEngine REQUIRED)
+  if(PYARROW_BUNDLE_ARROW_CPP)
+    message("ARROW_ENGINE_SHARED_LIB")
+    message(""${ARROW_ENGINE_SHARED_LIB})

Review comment:
       Are these log lines necessary?

##########
File path: cpp/examples/arrow/engine_substrait_example.cc
##########
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/api.h>
+#include <arrow/compute/api.h>
+#include <arrow/engine/api.h>
+#include <arrow/engine/substrait/util.h>
+#include "arrow/util/async_generator.h"
+#include "arrow/util/iterator.h"

Review comment:
       inconsistent include styles here

##########
File path: cpp/examples/arrow/engine_substrait_example.cc
##########
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.

Review comment:
       Should this be made a cookbook example?

##########
File path: cpp/src/arrow/engine/substrait/util.h
##########
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+#include "arrow/engine/api.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+
+namespace cp = arrow::compute;
+
+namespace engine {
+
+class ARROW_ENGINE_EXPORT SubstraitSinkConsumer : public cp::SinkNodeConsumer {
+ public:
+  explicit SubstraitSinkConsumer(
+      AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator,
+      arrow::util::BackpressureOptions backpressure = {})
+      : producer_(MakeProducer(generator, std::move(backpressure))) {}
+
+  Status Consume(cp::ExecBatch batch) override;
+
+  static arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer
+  MakeProducer(AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* out_gen,
+               arrow::util::BackpressureOptions backpressure) {
+    arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>> push_gen(
+        std::move(backpressure));
+    auto out = push_gen.producer();
+    *out_gen = std::move(push_gen);
+    return out;
+  }
+
+  Future<> Finish() override;
+
+ private:
+  PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer producer_;
+};
+
+class ARROW_ENGINE_EXPORT SubstraitExecutor {
+ public:
+  explicit SubstraitExecutor(
+      std::string substrait_json,
+      AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator,
+      std::shared_ptr<cp::ExecPlan> plan, std::shared_ptr<Schema> schema,
+      cp::ExecContext exec_context)

Review comment:
       Does it make sense to take anything besides the JSON and the ExecContext as arguments? I would expect this class manages the details of executing substrait internally

##########
File path: cpp/src/arrow/engine/substrait/util.h
##########
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+#include "arrow/engine/api.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+
+namespace cp = arrow::compute;
+
+namespace engine {
+
+class ARROW_ENGINE_EXPORT SubstraitSinkConsumer : public cp::SinkNodeConsumer {
+ public:
+  explicit SubstraitSinkConsumer(
+      AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator,
+      arrow::util::BackpressureOptions backpressure = {})
+      : producer_(MakeProducer(generator, std::move(backpressure))) {}
+
+  Status Consume(cp::ExecBatch batch) override;
+
+  static arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer
+  MakeProducer(AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* out_gen,
+               arrow::util::BackpressureOptions backpressure) {
+    arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>> push_gen(
+        std::move(backpressure));
+    auto out = push_gen.producer();
+    *out_gen = std::move(push_gen);
+    return out;
+  }
+
+  Future<> Finish() override;
+
+ private:
+  PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer producer_;
+};
+
+class ARROW_ENGINE_EXPORT SubstraitExecutor {
+ public:
+  explicit SubstraitExecutor(
+      std::string substrait_json,
+      AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator,
+      std::shared_ptr<cp::ExecPlan> plan, std::shared_ptr<Schema> schema,
+      cp::ExecContext exec_context)
+      : substrait_json_(substrait_json),
+        generator_(generator),
+        plan_(std::move(plan)),
+        schema_(schema),
+        exec_context_(exec_context) {}
+
+  Status MakePlan();
+
+  Result<std::shared_ptr<RecordBatchReader>> Execute();
+
+  Status Finalize();

Review comment:
       We usually call this `Close()`

##########
File path: cpp/src/arrow/engine/substrait/util.cc
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+
+namespace arrow {
+
+namespace engine {
+
+Status SubstraitSinkConsumer::Consume(cp::ExecBatch batch) {
+  // Consume a batch of data
+  bool did_push = producer_.Push(batch);
+  if (!did_push) return Status::ExecutionError("Producer closed already");
+  return Status::OK();
+}
+
+Future<> SubstraitSinkConsumer::Finish() {
+  producer_.Push(IterationEnd<arrow::util::optional<cp::ExecBatch>>());
+  if (producer_.Close()) {
+    return Future<>::MakeFinished();
+  }
+  return Future<>::MakeFinished(
+      Status::ExecutionError("Error occurred in closing the batch producer"));
+}
+
+Status SubstraitExecutor::MakePlan() {
+  ARROW_ASSIGN_OR_RAISE(auto serialized_plan,
+                        engine::internal::SubstraitFromJSON("Plan", substrait_json_));
+
+  auto maybe_plan_json = engine::internal::SubstraitToJSON("Plan", *serialized_plan);
+  RETURN_NOT_OK(maybe_plan_json.status());
+
+  std::vector<std::shared_ptr<cp::SinkNodeConsumer>> consumers;
+  std::function<std::shared_ptr<cp::SinkNodeConsumer>()> consumer_factory = [&] {
+    // All batches produced by the plan will be fed into IgnoringConsumers:

Review comment:
       What does this mean? It certainly doesn't seem we're ignoring the data.

##########
File path: cpp/src/arrow/engine/substrait/util.h
##########
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+#include "arrow/engine/api.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+
+namespace cp = arrow::compute;
+
+namespace engine {
+
+class ARROW_ENGINE_EXPORT SubstraitSinkConsumer : public cp::SinkNodeConsumer {
+ public:
+  explicit SubstraitSinkConsumer(
+      AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator,
+      arrow::util::BackpressureOptions backpressure = {})
+      : producer_(MakeProducer(generator, std::move(backpressure))) {}
+
+  Status Consume(cp::ExecBatch batch) override;
+
+  static arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer
+  MakeProducer(AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* out_gen,
+               arrow::util::BackpressureOptions backpressure) {
+    arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>> push_gen(
+        std::move(backpressure));
+    auto out = push_gen.producer();
+    *out_gen = std::move(push_gen);
+    return out;
+  }
+
+  Future<> Finish() override;
+
+ private:
+  PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer producer_;
+};
+
+class ARROW_ENGINE_EXPORT SubstraitExecutor {
+ public:
+  explicit SubstraitExecutor(
+      std::string substrait_json,
+      AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator,
+      std::shared_ptr<cp::ExecPlan> plan, std::shared_ptr<Schema> schema,
+      cp::ExecContext exec_context)
+      : substrait_json_(substrait_json),
+        generator_(generator),
+        plan_(std::move(plan)),
+        schema_(schema),
+        exec_context_(exec_context) {}
+
+  Status MakePlan();

Review comment:
       Maybe this should be private?

##########
File path: cpp/src/arrow/engine/substrait/util.cc
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+
+namespace arrow {
+
+namespace engine {
+
+Status SubstraitSinkConsumer::Consume(cp::ExecBatch batch) {
+  // Consume a batch of data
+  bool did_push = producer_.Push(batch);
+  if (!did_push) return Status::ExecutionError("Producer closed already");
+  return Status::OK();
+}
+
+Future<> SubstraitSinkConsumer::Finish() {
+  producer_.Push(IterationEnd<arrow::util::optional<cp::ExecBatch>>());
+  if (producer_.Close()) {
+    return Future<>::MakeFinished();
+  }
+  return Future<>::MakeFinished(
+      Status::ExecutionError("Error occurred in closing the batch producer"));
+}
+
+Status SubstraitExecutor::MakePlan() {
+  ARROW_ASSIGN_OR_RAISE(auto serialized_plan,
+                        engine::internal::SubstraitFromJSON("Plan", substrait_json_));
+
+  auto maybe_plan_json = engine::internal::SubstraitToJSON("Plan", *serialized_plan);
+  RETURN_NOT_OK(maybe_plan_json.status());

Review comment:
       Does this have any effect? And why not just `RETURN_NOT_OK(engine::internal::SubstraitToJSON());`?

##########
File path: cpp/src/arrow/engine/substrait/util.h
##########
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+#include "arrow/engine/api.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+
+namespace cp = arrow::compute;
+
+namespace engine {
+
+class ARROW_ENGINE_EXPORT SubstraitSinkConsumer : public cp::SinkNodeConsumer {
+ public:
+  explicit SubstraitSinkConsumer(
+      AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator,
+      arrow::util::BackpressureOptions backpressure = {})
+      : producer_(MakeProducer(generator, std::move(backpressure))) {}
+
+  Status Consume(cp::ExecBatch batch) override;
+
+  static arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer
+  MakeProducer(AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* out_gen,
+               arrow::util::BackpressureOptions backpressure) {
+    arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>> push_gen(
+        std::move(backpressure));
+    auto out = push_gen.producer();
+    *out_gen = std::move(push_gen);
+    return out;
+  }
+
+  Future<> Finish() override;
+
+ private:
+  PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer producer_;
+};
+
+class ARROW_ENGINE_EXPORT SubstraitExecutor {
+ public:
+  explicit SubstraitExecutor(
+      std::string substrait_json,
+      AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator,
+      std::shared_ptr<cp::ExecPlan> plan, std::shared_ptr<Schema> schema,
+      cp::ExecContext exec_context)
+      : substrait_json_(substrait_json),
+        generator_(generator),
+        plan_(std::move(plan)),
+        schema_(schema),
+        exec_context_(exec_context) {}
+
+  Status MakePlan();
+
+  Result<std::shared_ptr<RecordBatchReader>> Execute();
+
+  Status Finalize();
+
+  static Result<std::shared_ptr<RecordBatchReader>> GetRecordBatchReader(
+      std::string& substrait_json, std::shared_ptr<arrow::Schema> schema);

Review comment:
       IMO it's quite weird that you have to know the schema ahead of time, is there not a way to extract the schema from the substrait plan?

##########
File path: cpp/src/arrow/engine/substrait/util.h
##########
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+#include "arrow/engine/api.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+
+namespace cp = arrow::compute;
+
+namespace engine {
+
+class ARROW_ENGINE_EXPORT SubstraitSinkConsumer : public cp::SinkNodeConsumer {
+ public:
+  explicit SubstraitSinkConsumer(
+      AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator,
+      arrow::util::BackpressureOptions backpressure = {})
+      : producer_(MakeProducer(generator, std::move(backpressure))) {}
+
+  Status Consume(cp::ExecBatch batch) override;
+
+  static arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer
+  MakeProducer(AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* out_gen,
+               arrow::util::BackpressureOptions backpressure) {

Review comment:
       nit, but is it not possible to place this in the `.cc` file too?

##########
File path: cpp/src/arrow/engine/substrait/util.cc
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+
+namespace arrow {
+
+namespace engine {
+
+Status SubstraitSinkConsumer::Consume(cp::ExecBatch batch) {
+  // Consume a batch of data
+  bool did_push = producer_.Push(batch);
+  if (!did_push) return Status::ExecutionError("Producer closed already");
+  return Status::OK();
+}
+
+Future<> SubstraitSinkConsumer::Finish() {
+  producer_.Push(IterationEnd<arrow::util::optional<cp::ExecBatch>>());
+  if (producer_.Close()) {
+    return Future<>::MakeFinished();
+  }
+  return Future<>::MakeFinished(
+      Status::ExecutionError("Error occurred in closing the batch producer"));
+}
+
+Status SubstraitExecutor::MakePlan() {
+  ARROW_ASSIGN_OR_RAISE(auto serialized_plan,
+                        engine::internal::SubstraitFromJSON("Plan", substrait_json_));
+
+  auto maybe_plan_json = engine::internal::SubstraitToJSON("Plan", *serialized_plan);
+  RETURN_NOT_OK(maybe_plan_json.status());
+
+  std::vector<std::shared_ptr<cp::SinkNodeConsumer>> consumers;
+  std::function<std::shared_ptr<cp::SinkNodeConsumer>()> consumer_factory = [&] {
+    // All batches produced by the plan will be fed into IgnoringConsumers:
+    consumers.emplace_back(new SubstraitSinkConsumer{generator_});
+    return consumers.back();
+  };
+
+  // Deserialize each relation tree in the substrait plan to an Arrow compute Declaration
+  ARROW_ASSIGN_OR_RAISE(declerations_,
+                        engine::DeserializePlan(*serialized_plan, consumer_factory));
+
+  // It's safe to drop the serialized plan; we don't leave references to its memory
+  serialized_plan.reset();

Review comment:
       Won't this go out of scope automatically anyways?

##########
File path: cpp/src/arrow/engine/substrait/serde_test.cc
##########
@@ -724,5 +728,103 @@ TEST(Substrait, ExtensionSetFromPlan) {
   EXPECT_EQ(decoded_add_func.name, "add");
 }
 
+TEST(Substrait, GetRecordBatchIterator) {
+  const auto parquet_root = std::getenv("PARQUET_TEST_DATA");
+  std::string dir_string(parquet_root);
+  std::stringstream ss;
+  ss << dir_string << "/binary.parquet";
+  auto file_path = ss.str();
+
+  std::string substrait_json = R"({
+    "relations": [
+      {"rel": {
+        "read": {
+          "base_schema": {
+            "struct": {
+              "types": [ 
+                         {"binary": {}}
+                       ]
+            },
+            "names": [
+                      "foo"
+                      ]
+          },
+          "local_files": {
+            "items": [
+              {
+                "uri_file": "file://FILENAME_PLACEHOLDER",
+                "format": "FILE_FORMAT_PARQUET"
+              }
+            ]
+          }
+        }
+      }}
+    ]
+  })";
+
+  std::string filename_placeholder = "FILENAME_PLACEHOLDER";
+  substrait_json.replace(substrait_json.find(filename_placeholder),
+                         filename_placeholder.size(), file_path);
+  auto in_schema = schema({field("foo", binary())});
+  AsyncGenerator<util::optional<cp::ExecBatch>> sink_gen;
+  cp::ExecContext exec_context(default_memory_pool(),
+                               arrow::internal::GetCpuThreadPool());
+  ASSERT_OK_AND_ASSIGN(auto plan, cp::ExecPlan::Make());
+  engine::SubstraitExecutor executor(substrait_json, &sink_gen, plan, in_schema,
+                                     exec_context);
+  auto status = executor.MakePlan();
+  ASSERT_OK(status);
+  ASSERT_OK_AND_ASSIGN(auto reader, executor.Execute());
+  auto finish = executor.Finalize();
+  ASSERT_OK(finish);

Review comment:
       Why not just `ASSERT_OK(executor.Finalize())`? In general, why all the temporary variables?

##########
File path: cpp/examples/arrow/engine_substrait_example.cc
##########
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/api.h>
+#include <arrow/compute/api.h>
+#include <arrow/engine/api.h>
+#include <arrow/engine/substrait/util.h>
+#include "arrow/util/async_generator.h"
+#include "arrow/util/iterator.h"
+
+#include <cstdlib>
+#include <iostream>
+#include <memory>
+#include <vector>
+
+namespace eng = arrow::engine;
+namespace cp = arrow::compute;
+
+#define ABORT_ON_FAILURE(expr)                     \
+  do {                                             \
+    arrow::Status status_ = (expr);                \
+    if (!status_.ok()) {                           \
+      std::cerr << status_.message() << std::endl; \
+      abort();                                     \
+    }                                              \
+  } while (0);
+
+std::string GetSubstraitPlanFromServer(const std::string& filename) {
+  // Emulate server interaction by parsing hard coded JSON
+  std::string substrait_json = R"({
+    "relations": [
+      {"rel": {
+        "read": {
+          "base_schema": {
+            "struct": {
+              "types": [ 
+                         {"i64": {}},
+                         {"bool": {}}
+                       ]
+            },
+            "names": [
+                      "i",
+                       "b"
+                     ]
+          },
+          "local_files": {
+            "items": [
+              {
+                "uri_file": "file://FILENAME_PLACEHOLDER",
+                "format": "FILE_FORMAT_PARQUET"
+              }
+            ]
+          }
+        }
+      }}
+    ]
+  })";
+  std::string filename_placeholder = "FILENAME_PLACEHOLDER";
+  substrait_json.replace(substrait_json.find(filename_placeholder),
+                         filename_placeholder.size(), filename);
+  return substrait_json;
+}
+
+int main(int argc, char** argv) {
+  if (argc < 2) {
+    std::cout << "Please specify a parquet file to scan" << std::endl;
+    // Fake pass for CI
+    return EXIT_SUCCESS;
+  }
+  auto substrait_json = GetSubstraitPlanFromServer(argv[1]);
+
+  auto schema = arrow::schema(
+      {arrow::field("i", arrow::int64()), arrow::field("b", arrow::boolean())});
+
+  cp::ExecContext exec_context(arrow::default_memory_pool(),
+                               ::arrow::internal::GetCpuThreadPool());

Review comment:
       hmm, should we be using `internal` namespace in an example? is there not a default constructor?

##########
File path: python/pyarrow/_engine.pyx
##########
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+import sys
+
+from cython.operator cimport dereference as deref
+
+from pyarrow.lib cimport *
+from pyarrow.includes.libarrow cimport *
+import pyarrow.lib as lib
+
+import numpy as np
+
+
+def run_query(plan, output_schema):
+
+    cdef:
+        CResult[shared_ptr[CRecordBatchReader]] c_res_reader
+        shared_ptr[CRecordBatchReader] c_reader
+        shared_ptr[CSchema] c_schema
+        c_string c_plan
+        RecordBatchReader reader
+
+    c_plan = plan.encode()

Review comment:
       Ah, it's the Protobuf JSON.

##########
File path: cpp/src/arrow/engine/substrait/util.cc
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+
+namespace arrow {
+
+namespace engine {
+
+Status SubstraitSinkConsumer::Consume(cp::ExecBatch batch) {
+  // Consume a batch of data
+  bool did_push = producer_.Push(batch);
+  if (!did_push) return Status::ExecutionError("Producer closed already");
+  return Status::OK();
+}
+
+Future<> SubstraitSinkConsumer::Finish() {

Review comment:
       Isn't there already a sink that outputs to a reader? Why do we need a custom implementation here?

##########
File path: cpp/src/arrow/engine/substrait/util.cc
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+
+namespace arrow {
+
+namespace engine {
+
+Status SubstraitSinkConsumer::Consume(cp::ExecBatch batch) {
+  // Consume a batch of data
+  bool did_push = producer_.Push(batch);
+  if (!did_push) return Status::ExecutionError("Producer closed already");
+  return Status::OK();
+}
+
+Future<> SubstraitSinkConsumer::Finish() {
+  producer_.Push(IterationEnd<arrow::util::optional<cp::ExecBatch>>());
+  if (producer_.Close()) {
+    return Future<>::MakeFinished();
+  }
+  return Future<>::MakeFinished(
+      Status::ExecutionError("Error occurred in closing the batch producer"));
+}
+
+Status SubstraitExecutor::MakePlan() {
+  ARROW_ASSIGN_OR_RAISE(auto serialized_plan,
+                        engine::internal::SubstraitFromJSON("Plan", substrait_json_));
+
+  auto maybe_plan_json = engine::internal::SubstraitToJSON("Plan", *serialized_plan);
+  RETURN_NOT_OK(maybe_plan_json.status());
+
+  std::vector<std::shared_ptr<cp::SinkNodeConsumer>> consumers;
+  std::function<std::shared_ptr<cp::SinkNodeConsumer>()> consumer_factory = [&] {
+    // All batches produced by the plan will be fed into IgnoringConsumers:
+    consumers.emplace_back(new SubstraitSinkConsumer{generator_});
+    return consumers.back();
+  };
+
+  // Deserialize each relation tree in the substrait plan to an Arrow compute Declaration
+  ARROW_ASSIGN_OR_RAISE(declerations_,
+                        engine::DeserializePlan(*serialized_plan, consumer_factory));
+
+  // It's safe to drop the serialized plan; we don't leave references to its memory
+  serialized_plan.reset();
+
+  // Construct an empty plan (note: configure Function registry and ThreadPool here)

Review comment:
       Is this comment still relevant?

##########
File path: cpp/src/arrow/engine/substrait/util.cc
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+
+namespace arrow {
+
+namespace engine {
+
+Status SubstraitSinkConsumer::Consume(cp::ExecBatch batch) {
+  // Consume a batch of data
+  bool did_push = producer_.Push(batch);
+  if (!did_push) return Status::ExecutionError("Producer closed already");
+  return Status::OK();
+}
+
+Future<> SubstraitSinkConsumer::Finish() {
+  producer_.Push(IterationEnd<arrow::util::optional<cp::ExecBatch>>());
+  if (producer_.Close()) {
+    return Future<>::MakeFinished();
+  }
+  return Future<>::MakeFinished(
+      Status::ExecutionError("Error occurred in closing the batch producer"));
+}
+
+Status SubstraitExecutor::MakePlan() {
+  ARROW_ASSIGN_OR_RAISE(auto serialized_plan,
+                        engine::internal::SubstraitFromJSON("Plan", substrait_json_));
+
+  auto maybe_plan_json = engine::internal::SubstraitToJSON("Plan", *serialized_plan);
+  RETURN_NOT_OK(maybe_plan_json.status());
+
+  std::vector<std::shared_ptr<cp::SinkNodeConsumer>> consumers;
+  std::function<std::shared_ptr<cp::SinkNodeConsumer>()> consumer_factory = [&] {
+    // All batches produced by the plan will be fed into IgnoringConsumers:
+    consumers.emplace_back(new SubstraitSinkConsumer{generator_});
+    return consumers.back();
+  };
+
+  // Deserialize each relation tree in the substrait plan to an Arrow compute Declaration
+  ARROW_ASSIGN_OR_RAISE(declerations_,

Review comment:
       typo: `declarations`

##########
File path: cpp/src/arrow/engine/substrait/util.h
##########
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+#include "arrow/engine/api.h"

Review comment:
       If we can use `type_fwd.h` headers or manually forward-declare just the things we need that would be best

##########
File path: cpp/src/arrow/engine/substrait/util.h
##########
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+#include "arrow/engine/api.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+
+namespace cp = arrow::compute;
+
+namespace engine {
+
+class ARROW_ENGINE_EXPORT SubstraitSinkConsumer : public cp::SinkNodeConsumer {
+ public:
+  explicit SubstraitSinkConsumer(
+      AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator,
+      arrow::util::BackpressureOptions backpressure = {})
+      : producer_(MakeProducer(generator, std::move(backpressure))) {}
+
+  Status Consume(cp::ExecBatch batch) override;
+
+  static arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer
+  MakeProducer(AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* out_gen,
+               arrow::util::BackpressureOptions backpressure) {
+    arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>> push_gen(
+        std::move(backpressure));
+    auto out = push_gen.producer();
+    *out_gen = std::move(push_gen);
+    return out;
+  }
+
+  Future<> Finish() override;
+
+ private:
+  PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer producer_;
+};
+
+class ARROW_ENGINE_EXPORT SubstraitExecutor {
+ public:
+  explicit SubstraitExecutor(
+      std::string substrait_json,
+      AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator,
+      std::shared_ptr<cp::ExecPlan> plan, std::shared_ptr<Schema> schema,
+      cp::ExecContext exec_context)

Review comment:
       Ah, I guess `GetRecordBatchReader` is a factory function of sorts. The C++ API would make a little more sense IMO if `SubstraitExecutor::GetRecordBatchReader` were renamed `Make` and returned `SubstraitExecutor`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org