You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/03/19 13:34:47 UTC

[GitHub] [arrow] vibhatha opened a new pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer [WIP]

vibhatha opened a new pull request #12672:
URL: https://github.com/apache/arrow/pull/12672


   This PR is work in progress. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vibhatha commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

Posted by GitBox <gi...@apache.org>.
vibhatha commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r836434947



##########
File path: python/CMakeLists.txt
##########
@@ -69,6 +69,7 @@ endif()
 if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
   option(PYARROW_BUILD_CUDA "Build the PyArrow CUDA support" OFF)
   option(PYARROW_BUILD_FLIGHT "Build the PyArrow Flight integration" OFF)
+  option(PYARROW_BUILD_ENGINE "Build the PyArrow Substrait integration" OFF)

Review comment:
       @lidavidm Good question. Here I followed the format followed in integrating Substrait work in Cmakes. In the CMakes, we see `ARROW_ENGINE` to check if `engine` is enabled. I assume here there could be many engines and substrait could be one of them. Is there a better standard we can follow here?
   cc @westonpace 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r835704502



##########
File path: cpp/src/arrow/engine/substrait/util.cc
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+
+namespace arrow {
+
+namespace engine {
+
+Status SubstraitSinkConsumer::Consume(cp::ExecBatch batch) {
+  // Consume a batch of data
+  bool did_push = producer_.Push(batch);
+  if (!did_push) return Status::ExecutionError("Producer closed already");
+  return Status::OK();
+}
+
+Future<> SubstraitSinkConsumer::Finish() {

Review comment:
       Please create a ticket.  It can be our place for open discussion




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vibhatha commented on pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer [WIP]

Posted by GitBox <gi...@apache.org>.
vibhatha commented on pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#issuecomment-1073295410


   @lidavidm no I wrongly checked in files of testing submodule. I just wanted to remove that. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r836438385



##########
File path: python/CMakeLists.txt
##########
@@ -69,6 +69,7 @@ endif()
 if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
   option(PYARROW_BUILD_CUDA "Build the PyArrow CUDA support" OFF)
   option(PYARROW_BUILD_FLIGHT "Build the PyArrow Flight integration" OFF)
+  option(PYARROW_BUILD_ENGINE "Build the PyArrow Substrait integration" OFF)

Review comment:
       Yeah, we probably can't avoid that, unfortunately.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vibhatha commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

Posted by GitBox <gi...@apache.org>.
vibhatha commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r836429903



##########
File path: python/pyarrow/_engine.pyx
##########
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+import sys
+
+from pyarrow.lib cimport *
+from pyarrow.includes.libarrow cimport *
+
+
+def run_query(plan, output_schema):
+    """
+    executes a substrait plan and returns a RecordBatchReader
+
+    Paramters
+    ---------
+    plan : bytes
+    output_schema: expected output schema

Review comment:
       Again, this is an intermediate step and I 100% agree, this is not intuitive or useful. This PR won't be merged until we finalize the PR which enables us to extract the input_schema to the sink node. I closed the minor fix PR (https://github.com/apache/arrow/pull/12715) I created to resolve this inplace of @westonpace's PR on a much solid approach (https://github.com/apache/arrow/pull/12721). 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vibhatha commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

Posted by GitBox <gi...@apache.org>.
vibhatha commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r835697513



##########
File path: cpp/src/arrow/engine/substrait/util.cc
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+
+namespace arrow {
+
+namespace engine {
+
+Status SubstraitSinkConsumer::Consume(cp::ExecBatch batch) {
+  // Consume a batch of data
+  bool did_push = producer_.Push(batch);
+  if (!did_push) return Status::ExecutionError("Producer closed already");
+  return Status::OK();
+}
+
+Future<> SubstraitSinkConsumer::Finish() {

Review comment:
       @westonpace good point. In this PR, should we continue with the `ConsumingSink`approach and later on in another PR think about supporting a factory approach? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vibhatha commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

Posted by GitBox <gi...@apache.org>.
vibhatha commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r835706173



##########
File path: cpp/src/arrow/engine/substrait/util.cc
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+
+namespace arrow {
+
+namespace engine {
+
+Status SubstraitSinkConsumer::Consume(cp::ExecBatch batch) {
+  // Consume a batch of data
+  bool did_push = producer_.Push(batch);
+  if (!did_push) return Status::ExecutionError("Producer closed already");
+  return Status::OK();
+}
+
+Future<> SubstraitSinkConsumer::Finish() {

Review comment:
       @westonpace created a ticket here: https://issues.apache.org/jira/browse/ARROW-16036 for. open discussion. 
   
   Would like to work on this one. I think the usability piece of this PR can be further improved with this integration. 
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r836434371



##########
File path: python/CMakeLists.txt
##########
@@ -69,6 +69,7 @@ endif()
 if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
   option(PYARROW_BUILD_CUDA "Build the PyArrow CUDA support" OFF)
   option(PYARROW_BUILD_FLIGHT "Build the PyArrow Flight integration" OFF)
+  option(PYARROW_BUILD_ENGINE "Build the PyArrow Substrait integration" OFF)

Review comment:
       IMO we should move examples to the Cookbook going forward. Also, hardcoding a bunch of Substrait plans may not be a wise idea right now anyways as there may be spec changes still?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vibhatha commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

Posted by GitBox <gi...@apache.org>.
vibhatha commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r836432458



##########
File path: python/CMakeLists.txt
##########
@@ -69,6 +69,7 @@ endif()
 if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
   option(PYARROW_BUILD_CUDA "Build the PyArrow CUDA support" OFF)
   option(PYARROW_BUILD_FLIGHT "Build the PyArrow Flight integration" OFF)
+  option(PYARROW_BUILD_ENGINE "Build the PyArrow Substrait integration" OFF)

Review comment:
       @lidavidm I just added the example for reference of the users. But are we focusing on pushing all the examples for PyArrow to Arrow-cookbook? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vibhatha commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

Posted by GitBox <gi...@apache.org>.
vibhatha commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r835037176



##########
File path: cpp/examples/arrow/engine_substrait_example.cc
##########
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.

Review comment:
       I can write a detailed example than what we have presented here in the cookbook. Or remove it from this PR and add it to cookbook later on. What do you think? 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r835620641



##########
File path: cpp/src/arrow/engine/substrait/util.cc
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+
+namespace arrow {
+
+namespace engine {
+
+Status SubstraitSinkConsumer::Consume(cp::ExecBatch batch) {
+  // Consume a batch of data
+  bool did_push = producer_.Push(batch);
+  if (!did_push) return Status::ExecutionError("Producer closed already");
+  return Status::OK();
+}
+
+Future<> SubstraitSinkConsumer::Finish() {

Review comment:
       My guess is that you are thinking of `SinkNode` which is very similar to this class.
   
   Right now the Substrait consumer always uses a `ConsumingSinkNode` and thus it needs a "consumer factory".
   
   Another potential implementation would be for the Substrait to take in a "sink node factory" instead (or we could have both implementations).  That might be more flexible in the long term.  In that case we could reuse `SinkNode` here.
   
   So we have `SinkNode` which is a "node that shoves batches into a push generator" and we have `SubstraitSinkConsumer` which is a "consumer that shoves batches into a push generator".




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r834484186



##########
File path: cpp/examples/arrow/engine_substrait_example.cc
##########
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.

Review comment:
       I guess the issue is the Cookbook works against the released version of Arrow (IIRC), so it wouldn't work quite yet, but once we have 8.0.0 we should port the example so it's more visible




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vibhatha commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

Posted by GitBox <gi...@apache.org>.
vibhatha commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r834270121



##########
File path: cpp/src/arrow/engine/substrait/util.h
##########
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+#include "arrow/engine/api.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+
+namespace cp = arrow::compute;
+
+namespace engine {
+
+class ARROW_ENGINE_EXPORT SubstraitSinkConsumer : public cp::SinkNodeConsumer {

Review comment:
       Yes, indeed. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r835620641



##########
File path: cpp/src/arrow/engine/substrait/util.cc
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+
+namespace arrow {
+
+namespace engine {
+
+Status SubstraitSinkConsumer::Consume(cp::ExecBatch batch) {
+  // Consume a batch of data
+  bool did_push = producer_.Push(batch);
+  if (!did_push) return Status::ExecutionError("Producer closed already");
+  return Status::OK();
+}
+
+Future<> SubstraitSinkConsumer::Finish() {

Review comment:
       My guess is that you are thinking of `SinkNode` which is very similar to this class.
   
   Right now the Substrait consumer always uses a `ConsumingSinkNode` and thus it needs a "consumer factory".
   
   Another potential implementation would be for the Substrait to take in a "sink node factory" instead.  That might be more flexible in the long term.  In that case we could reuse `SinkNode` here.
   
   So we have `SinkNode` which is a "node that shoves batches into a push generator" and we have `SubstraitSinkConsumer` which is a "consumer that shoves batches into a push generator".




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vibhatha commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

Posted by GitBox <gi...@apache.org>.
vibhatha commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r834269567



##########
File path: python/CMakeLists.txt
##########
@@ -534,6 +539,20 @@ if(PYARROW_BUILD_FLIGHT)
   set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} _flight)
 endif()
 
+# Engine
+
+if(PYARROW_BUILD_ENGINE)
+find_package(ArrowEngine REQUIRED)
+  if(PYARROW_BUNDLE_ARROW_CPP)
+    message("ARROW_ENGINE_SHARED_LIB")
+    message(""${ARROW_ENGINE_SHARED_LIB})

Review comment:
       Oh this was a typo. I was checking the cmake build. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vibhatha commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

Posted by GitBox <gi...@apache.org>.
vibhatha commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r835029109



##########
File path: cpp/src/arrow/engine/substrait/util.cc
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+
+namespace arrow {
+
+namespace engine {
+
+Status SubstraitSinkConsumer::Consume(cp::ExecBatch batch) {
+  // Consume a batch of data
+  bool did_push = producer_.Push(batch);
+  if (!did_push) return Status::ExecutionError("Producer closed already");
+  return Status::OK();
+}
+
+Future<> SubstraitSinkConsumer::Finish() {

Review comment:
       For sure I know that there is a sink that output's a `std::shared_ptr<arrow::Table>`. Could you please point me to this implementation, I might have missed this one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r835621347



##########
File path: cpp/src/arrow/engine/substrait/util.cc
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+
+namespace arrow {
+
+namespace engine {
+
+Status SubstraitSinkConsumer::Consume(cp::ExecBatch batch) {
+  // Consume a batch of data
+  bool did_push = producer_.Push(batch);
+  if (!did_push) return Status::ExecutionError("Producer closed already");
+  return Status::OK();
+}
+
+Future<> SubstraitSinkConsumer::Finish() {

Review comment:
       Just to be clear, that last comment was about potential modifications to the Substrait consumer (e.g. we might want the Substrait consumer to support both a consumer factory API and a sink node factory API)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r836415184



##########
File path: python/pyarrow/tests/test_substrait.py
##########
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import pathlib
+import pyarrow as pa
+from pyarrow.lib import tobytes
+import pyarrow.parquet as pq
+
+try:
+    from pyarrow import engine
+    from pyarrow.engine import (
+        run_query,
+    )
+except ImportError:
+    engine = None
+
+
+def test_import():
+    # So we see the ImportError somewhere
+    import pyarrow.engine  # noqa

Review comment:
       I'm not sure we need this? What we should do is add a case for `engine` to `pyarrow/tests/conftest.py`.

##########
File path: python/pyarrow/_engine.pyx
##########
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+import sys
+
+from pyarrow.lib cimport *
+from pyarrow.includes.libarrow cimport *
+
+
+def run_query(plan, output_schema):
+    """
+    executes a substrait plan and returns a RecordBatchReader
+
+    Paramters
+    ---------

Review comment:
       ```suggestion
       Parameters
       ----------
   ```

##########
File path: python/pyarrow/_engine.pyx
##########
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+import sys
+
+from pyarrow.lib cimport *
+from pyarrow.includes.libarrow cimport *
+
+
+def run_query(plan, output_schema):
+    """
+    executes a substrait plan and returns a RecordBatchReader

Review comment:
       ```suggestion
       Execute a Substrait plan and return results as a RecordBatchReader.
   ```

##########
File path: python/pyarrow/_engine.pyx
##########
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+import sys
+
+from pyarrow.lib cimport *
+from pyarrow.includes.libarrow cimport *
+
+
+def run_query(plan, output_schema):
+    """
+    executes a substrait plan and returns a RecordBatchReader
+
+    Paramters
+    ---------
+    plan : bytes
+    output_schema: expected output schema

Review comment:
       ```suggestion
       output_schema: Schema
           The expected output schema.
   ```

##########
File path: python/examples/substrait/query_execution_example.py
##########
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.

Review comment:
       So I'm not sure it makes sense to have this file at all - again, nothing runs these examples.

##########
File path: python/CMakeLists.txt
##########
@@ -69,6 +69,7 @@ endif()
 if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
   option(PYARROW_BUILD_CUDA "Build the PyArrow CUDA support" OFF)
   option(PYARROW_BUILD_FLIGHT "Build the PyArrow Flight integration" OFF)
+  option(PYARROW_BUILD_ENGINE "Build the PyArrow Substrait integration" OFF)

Review comment:
       Is `engine` meant to be specific to Substrait, or are we going to use it for the query engine as a whole?

##########
File path: cpp/examples/arrow/engine_substrait_example.cc
##########
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.

Review comment:
       I'm not sure the example adds much to this PR, we can save it for the cookbook? (We can move the code into the issue comments to make sure we have it around or something.)

##########
File path: python/pyarrow/_engine.pyx
##########
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+import sys
+
+from pyarrow.lib cimport *
+from pyarrow.includes.libarrow cimport *
+
+
+def run_query(plan, output_schema):
+    """
+    executes a substrait plan and returns a RecordBatchReader
+
+    Paramters
+    ---------
+    plan : bytes
+    output_schema: expected output schema

Review comment:
       Though, again, IMO it is really user-unfriendly to require the user to know the result schema.

##########
File path: python/pyarrow/_engine.pyx
##########
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+import sys

Review comment:
       Unused import?

##########
File path: python/pyarrow/_engine.pyx
##########
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+import sys
+
+from pyarrow.lib cimport *
+from pyarrow.includes.libarrow cimport *
+
+
+def run_query(plan, output_schema):
+    """
+    executes a substrait plan and returns a RecordBatchReader
+
+    Paramters
+    ---------
+    plan : bytes
+    output_schema: expected output schema
+    """
+
+    cdef:
+        CResult[shared_ptr[CRecordBatchReader]] c_res_reader
+        shared_ptr[CRecordBatchReader] c_reader
+        shared_ptr[CSchema] c_schema
+        c_string c_plan
+        RecordBatchReader reader
+
+    c_plan = plan

Review comment:
       `tobytes(plan)`? and IIRC it should work to just `GetRecordBatchReader(tobytes(plan), ...)` without having to explicitly allocate a `c_string`

##########
File path: python/pyarrow/tests/test_substrait.py
##########
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import pathlib
+import pyarrow as pa
+from pyarrow.lib import tobytes
+import pyarrow.parquet as pq
+
+try:
+    from pyarrow import engine
+    from pyarrow.engine import (
+        run_query,
+    )
+except ImportError:
+    engine = None
+
+
+def test_import():
+    # So we see the ImportError somewhere
+    import pyarrow.engine  # noqa
+
+
+def resource_root():
+    """Get the path to the test resources directory."""
+    if not os.environ.get("PARQUET_TEST_DATA"):
+        raise RuntimeError("Test resources not found; set "
+                           "PARQUET_TEST_DATA to "
+                           "<repo root>/cpp/submodules/parquet-testing/data")
+    return pathlib.Path(os.environ["PARQUET_TEST_DATA"])
+
+
+def test_run_query():
+    filename = str(resource_root() / "binary.parquet")
+
+    query = """
+    {
+        "relations": [
+        {"rel": {
+            "read": {
+            "base_schema": {
+                "struct": {
+                "types": [
+                            {"binary": {}}
+                        ]
+                },
+                "names": [
+                        "foo"
+                        ]
+            },
+            "local_files": {
+                "items": [
+                {
+                    "uri_file": "file://FILENAME_PLACEHOLDER",
+                    "format": "FILE_FORMAT_PARQUET"
+                }
+                ]
+            }
+            }
+        }}
+        ]
+    }
+    """
+
+    query = tobytes(query.replace("FILENAME_PLACEHOLDER", filename))

Review comment:
       Just use an f-string or `str.replace` to be idiomatic in Python.

##########
File path: python/pyarrow/tests/test_substrait.py
##########
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import pathlib
+import pyarrow as pa
+from pyarrow.lib import tobytes
+import pyarrow.parquet as pq
+
+try:
+    from pyarrow import engine
+    from pyarrow.engine import (
+        run_query,
+    )
+except ImportError:
+    engine = None
+
+
+def test_import():
+    # So we see the ImportError somewhere
+    import pyarrow.engine  # noqa
+
+
+def resource_root():
+    """Get the path to the test resources directory."""
+    if not os.environ.get("PARQUET_TEST_DATA"):
+        raise RuntimeError("Test resources not found; set "
+                           "PARQUET_TEST_DATA to "
+                           "<repo root>/cpp/submodules/parquet-testing/data")
+    return pathlib.Path(os.environ["PARQUET_TEST_DATA"])
+
+
+def test_run_query():
+    filename = str(resource_root() / "binary.parquet")
+
+    query = """
+    {
+        "relations": [
+        {"rel": {
+            "read": {
+            "base_schema": {
+                "struct": {
+                "types": [
+                            {"binary": {}}
+                        ]
+                },
+                "names": [
+                        "foo"
+                        ]
+            },
+            "local_files": {
+                "items": [
+                {
+                    "uri_file": "file://FILENAME_PLACEHOLDER",
+                    "format": "FILE_FORMAT_PARQUET"
+                }
+                ]
+            }
+            }
+        }}
+        ]
+    }
+    """
+
+    query = tobytes(query.replace("FILENAME_PLACEHOLDER", filename))
+
+    schema = pa.schema({"foo": pa.binary()})
+
+    reader = run_query(query, schema)
+
+    res = reader.read_all()
+
+    assert res.schema == schema
+    assert res.num_rows > 0

Review comment:
       I think this assertion isn't useful with the below one

##########
File path: python/pyarrow/_engine.pyx
##########
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+import sys
+
+from pyarrow.lib cimport *
+from pyarrow.includes.libarrow cimport *
+
+
+def run_query(plan, output_schema):
+    """
+    executes a substrait plan and returns a RecordBatchReader
+
+    Paramters
+    ---------
+    plan : bytes

Review comment:
       Please describe the argument. Is this JSON? Serialized Protobuf?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r836413387



##########
File path: cpp/src/arrow/engine/substrait/serde_test.cc
##########
@@ -724,5 +728,103 @@ TEST(Substrait, ExtensionSetFromPlan) {
   EXPECT_EQ(decoded_add_func.name, "add");
 }
 
+TEST(Substrait, GetRecordBatchIterator) {
+  const auto parquet_root = std::getenv("PARQUET_TEST_DATA");
+  std::string dir_string(parquet_root);
+  std::stringstream ss;
+  ss << dir_string << "/binary.parquet";
+  auto file_path = ss.str();
+
+  std::string substrait_json = R"({
+    "relations": [
+      {"rel": {
+        "read": {
+          "base_schema": {
+            "struct": {
+              "types": [ 
+                         {"binary": {}}
+                       ]
+            },
+            "names": [
+                      "foo"
+                      ]
+          },
+          "local_files": {
+            "items": [
+              {
+                "uri_file": "file://FILENAME_PLACEHOLDER",
+                "format": "FILE_FORMAT_PARQUET"
+              }
+            ]
+          }
+        }
+      }}
+    ]
+  })";
+
+  std::string filename_placeholder = "FILENAME_PLACEHOLDER";
+  substrait_json.replace(substrait_json.find(filename_placeholder),
+                         filename_placeholder.size(), file_path);
+  auto in_schema = schema({field("foo", binary())});
+  AsyncGenerator<util::optional<cp::ExecBatch>> sink_gen;
+  cp::ExecContext exec_context(default_memory_pool(),
+                               arrow::internal::GetCpuThreadPool());
+  ASSERT_OK_AND_ASSIGN(auto plan, cp::ExecPlan::Make());
+  engine::SubstraitExecutor executor(substrait_json, &sink_gen, plan, in_schema,
+                                     exec_context);
+  auto status = executor.MakePlan();
+  ASSERT_OK(status);
+  ASSERT_OK_AND_ASSIGN(auto reader, executor.Execute());
+  auto finish = executor.Finalize();
+  ASSERT_OK(finish);
+  ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatchReader(reader.get()));
+  EXPECT_GT(table->num_rows(), 0);
+}
+
+TEST(Substrait, GetRecordBatchIteratorUtil) {
+  const auto parquet_root = std::getenv("PARQUET_TEST_DATA");
+  std::string dir_string(parquet_root);
+  std::stringstream ss;
+  ss << dir_string << "/binary.parquet";
+  auto file_path = ss.str();
+
+  std::string substrait_json = R"({
+    "relations": [
+      {"rel": {
+        "read": {
+          "base_schema": {
+            "struct": {
+              "types": [ 
+                         {"binary": {}}
+                       ]
+            },
+            "names": [
+                      "foo"
+                      ]
+          },
+          "local_files": {
+            "items": [
+              {
+                "uri_file": "file://FILENAME_PLACEHOLDER",
+                "format": "FILE_FORMAT_PARQUET"
+              }
+            ]
+          }
+        }
+      }}
+    ]
+  })";
+
+  std::string filename_placeholder = "FILENAME_PLACEHOLDER";
+  substrait_json.replace(substrait_json.find(filename_placeholder),
+                         filename_placeholder.size(), file_path);
+  auto in_schema = schema({field("foo", binary())});
+
+  ASSERT_OK_AND_ASSIGN(auto reader, engine::SubstraitExecutor::GetRecordBatchReader(
+                                        substrait_json, in_schema));
+  ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatchReader(reader.get()));
+  EXPECT_GT(table->num_rows(), 0);

Review comment:
       We can probably just assume the # of rows is static.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vibhatha commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

Posted by GitBox <gi...@apache.org>.
vibhatha commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r834278074



##########
File path: cpp/src/arrow/engine/substrait/util.cc
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+
+namespace arrow {
+
+namespace engine {
+
+Status SubstraitSinkConsumer::Consume(cp::ExecBatch batch) {
+  // Consume a batch of data
+  bool did_push = producer_.Push(batch);
+  if (!did_push) return Status::ExecutionError("Producer closed already");
+  return Status::OK();
+}
+
+Future<> SubstraitSinkConsumer::Finish() {
+  producer_.Push(IterationEnd<arrow::util::optional<cp::ExecBatch>>());
+  if (producer_.Close()) {
+    return Future<>::MakeFinished();
+  }
+  return Future<>::MakeFinished(
+      Status::ExecutionError("Error occurred in closing the batch producer"));
+}
+
+Status SubstraitExecutor::MakePlan() {
+  ARROW_ASSIGN_OR_RAISE(auto serialized_plan,
+                        engine::internal::SubstraitFromJSON("Plan", substrait_json_));
+
+  auto maybe_plan_json = engine::internal::SubstraitToJSON("Plan", *serialized_plan);
+  RETURN_NOT_OK(maybe_plan_json.status());
+
+  std::vector<std::shared_ptr<cp::SinkNodeConsumer>> consumers;
+  std::function<std::shared_ptr<cp::SinkNodeConsumer>()> consumer_factory = [&] {
+    // All batches produced by the plan will be fed into IgnoringConsumers:

Review comment:
       Oh this is not relevant. Typo. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer [WIP]

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#issuecomment-1073287970


   Hmm, did you mean to delete the `testing` submodule?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer [WIP]

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#issuecomment-1073011316






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r834250238



##########
File path: python/pyarrow/_engine.pyx
##########
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+import sys
+
+from cython.operator cimport dereference as deref
+
+from pyarrow.lib cimport *
+from pyarrow.includes.libarrow cimport *
+import pyarrow.lib as lib
+
+import numpy as np
+
+
+def run_query(plan, output_schema):

Review comment:
       Add a docstring?

##########
File path: python/pyarrow/_engine.pyx
##########
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+import sys
+
+from cython.operator cimport dereference as deref
+
+from pyarrow.lib cimport *
+from pyarrow.includes.libarrow cimport *
+import pyarrow.lib as lib
+
+import numpy as np
+
+
+def run_query(plan, output_schema):
+
+    cdef:
+        CResult[shared_ptr[CRecordBatchReader]] c_res_reader
+        shared_ptr[CRecordBatchReader] c_reader
+        shared_ptr[CSchema] c_schema
+        c_string c_plan
+        RecordBatchReader reader
+
+    c_plan = plan.encode()

Review comment:
       I think we generally use `tobytes`. Though: if `plan` is supposed to be a serialized Protobuf, shouldn't it be `bytes` in the first place?

##########
File path: python/examples/substrait/query_execution_example.py
##########
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.

Review comment:
       IIRC nothing runs the Python examples (unlike C++). Maybe make this a cookbook example instead?
   
   Also, add a unit test.

##########
File path: cpp/src/arrow/engine/substrait/util.h
##########
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+#include "arrow/engine/api.h"

Review comment:
       nit, but try not to use the `api.h` headers, they're expensive to include

##########
File path: cpp/src/arrow/engine/substrait/util.h
##########
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+#include "arrow/engine/api.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+
+namespace cp = arrow::compute;
+
+namespace engine {
+
+class ARROW_ENGINE_EXPORT SubstraitSinkConsumer : public cp::SinkNodeConsumer {
+ public:
+  explicit SubstraitSinkConsumer(
+      AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator,
+      arrow::util::BackpressureOptions backpressure = {})
+      : producer_(MakeProducer(generator, std::move(backpressure))) {}
+
+  Status Consume(cp::ExecBatch batch) override;
+
+  static arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer
+  MakeProducer(AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* out_gen,
+               arrow::util::BackpressureOptions backpressure) {
+    arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>> push_gen(
+        std::move(backpressure));
+    auto out = push_gen.producer();
+    *out_gen = std::move(push_gen);
+    return out;
+  }
+
+  Future<> Finish() override;
+
+ private:
+  PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer producer_;

Review comment:
       We should `#include` the optional header

##########
File path: cpp/examples/arrow/engine_substrait_example.cc
##########
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/api.h>
+#include <arrow/compute/api.h>
+#include <arrow/engine/api.h>
+#include <arrow/engine/substrait/util.h>
+#include "arrow/util/async_generator.h"
+#include "arrow/util/iterator.h"
+
+#include <cstdlib>
+#include <iostream>
+#include <memory>
+#include <vector>
+
+namespace eng = arrow::engine;
+namespace cp = arrow::compute;
+
+#define ABORT_ON_FAILURE(expr)                     \
+  do {                                             \
+    arrow::Status status_ = (expr);                \
+    if (!status_.ok()) {                           \
+      std::cerr << status_.message() << std::endl; \
+      abort();                                     \
+    }                                              \
+  } while (0);
+
+std::string GetSubstraitPlanFromServer(const std::string& filename) {
+  // Emulate server interaction by parsing hard coded JSON
+  std::string substrait_json = R"({
+    "relations": [
+      {"rel": {
+        "read": {
+          "base_schema": {
+            "struct": {
+              "types": [ 
+                         {"i64": {}},
+                         {"bool": {}}
+                       ]
+            },
+            "names": [
+                      "i",
+                       "b"
+                     ]
+          },
+          "local_files": {
+            "items": [
+              {
+                "uri_file": "file://FILENAME_PLACEHOLDER",
+                "format": "FILE_FORMAT_PARQUET"
+              }
+            ]
+          }
+        }
+      }}
+    ]
+  })";
+  std::string filename_placeholder = "FILENAME_PLACEHOLDER";
+  substrait_json.replace(substrait_json.find(filename_placeholder),
+                         filename_placeholder.size(), filename);
+  return substrait_json;
+}
+
+int main(int argc, char** argv) {
+  if (argc < 2) {
+    std::cout << "Please specify a parquet file to scan" << std::endl;
+    // Fake pass for CI
+    return EXIT_SUCCESS;
+  }
+  auto substrait_json = GetSubstraitPlanFromServer(argv[1]);
+
+  auto schema = arrow::schema(
+      {arrow::field("i", arrow::int64()), arrow::field("b", arrow::boolean())});
+
+  cp::ExecContext exec_context(arrow::default_memory_pool(),
+                               ::arrow::internal::GetCpuThreadPool());
+
+  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
+
+  auto maybe_plan = cp::ExecPlan::Make();
+  if (!maybe_plan.status().ok()) {
+    return EXIT_FAILURE;
+  }

Review comment:
       Create something like `Status Main();` instead so we can use the usual Arrow macros, and then just `ABORT_NOT_OK(Main())` inside `main`

##########
File path: cpp/src/arrow/engine/substrait/serde_test.cc
##########
@@ -724,5 +728,103 @@ TEST(Substrait, ExtensionSetFromPlan) {
   EXPECT_EQ(decoded_add_func.name, "add");
 }
 
+TEST(Substrait, GetRecordBatchIterator) {

Review comment:
       nit, but `GetRecordBatchReader`?

##########
File path: cpp/src/arrow/engine/substrait/util.cc
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+
+namespace arrow {
+
+namespace engine {
+
+Status SubstraitSinkConsumer::Consume(cp::ExecBatch batch) {
+  // Consume a batch of data
+  bool did_push = producer_.Push(batch);
+  if (!did_push) return Status::ExecutionError("Producer closed already");
+  return Status::OK();
+}
+
+Future<> SubstraitSinkConsumer::Finish() {
+  producer_.Push(IterationEnd<arrow::util::optional<cp::ExecBatch>>());
+  if (producer_.Close()) {
+    return Future<>::MakeFinished();
+  }
+  return Future<>::MakeFinished(
+      Status::ExecutionError("Error occurred in closing the batch producer"));
+}
+
+Status SubstraitExecutor::MakePlan() {
+  ARROW_ASSIGN_OR_RAISE(auto serialized_plan,
+                        engine::internal::SubstraitFromJSON("Plan", substrait_json_));
+
+  auto maybe_plan_json = engine::internal::SubstraitToJSON("Plan", *serialized_plan);
+  RETURN_NOT_OK(maybe_plan_json.status());
+
+  std::vector<std::shared_ptr<cp::SinkNodeConsumer>> consumers;
+  std::function<std::shared_ptr<cp::SinkNodeConsumer>()> consumer_factory = [&] {
+    // All batches produced by the plan will be fed into IgnoringConsumers:
+    consumers.emplace_back(new SubstraitSinkConsumer{generator_});
+    return consumers.back();
+  };
+
+  // Deserialize each relation tree in the substrait plan to an Arrow compute Declaration
+  ARROW_ASSIGN_OR_RAISE(declerations_,
+                        engine::DeserializePlan(*serialized_plan, consumer_factory));
+
+  // It's safe to drop the serialized plan; we don't leave references to its memory
+  serialized_plan.reset();
+
+  // Construct an empty plan (note: configure Function registry and ThreadPool here)
+  return Status::OK();
+}
+
+Result<std::shared_ptr<RecordBatchReader>> SubstraitExecutor::Execute() {
+  for (const cp::Declaration& decl : declerations_) {
+    RETURN_NOT_OK(decl.AddToPlan(plan_.get()).status());
+  }
+
+  ARROW_RETURN_NOT_OK(plan_->Validate());
+
+  ARROW_RETURN_NOT_OK(plan_->StartProducing());
+
+  std::shared_ptr<RecordBatchReader> sink_reader = cp::MakeGeneratorReader(
+      schema_, std::move(*generator_), exec_context_.memory_pool());
+  return sink_reader;
+}
+
+Status SubstraitExecutor::Finalize() {
+  ARROW_RETURN_NOT_OK(plan_->finished().status());
+  return Status::OK();
+}
+
+Result<std::shared_ptr<RecordBatchReader>> SubstraitExecutor::GetRecordBatchReader(
+    std::string& substrait_json, std::shared_ptr<arrow::Schema> schema) {
+  cp::ExecContext exec_context(arrow::default_memory_pool(),
+                               ::arrow::internal::GetCpuThreadPool());
+
+  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
+
+  ARROW_ASSIGN_OR_RAISE(auto plan, cp::ExecPlan::Make());
+
+  arrow::engine::SubstraitExecutor executor(substrait_json, &sink_gen, plan, schema,
+                                            exec_context);
+  RETURN_NOT_OK(executor.MakePlan());
+
+  ARROW_ASSIGN_OR_RAISE(auto sink_reader, executor.Execute());
+
+  RETURN_NOT_OK(executor.Finalize());

Review comment:
       I would frankly expect that the `RecordBatchReader`'s `Close` gets wired up to the executor's `Close` so that you don't have to manage their lifetimes independently, also, that would let you truly stream data right?

##########
File path: cpp/src/arrow/engine/substrait/serde_test.cc
##########
@@ -724,5 +728,103 @@ TEST(Substrait, ExtensionSetFromPlan) {
   EXPECT_EQ(decoded_add_func.name, "add");
 }
 
+TEST(Substrait, GetRecordBatchIterator) {
+  const auto parquet_root = std::getenv("PARQUET_TEST_DATA");
+  std::string dir_string(parquet_root);
+  std::stringstream ss;
+  ss << dir_string << "/binary.parquet";
+  auto file_path = ss.str();
+
+  std::string substrait_json = R"({
+    "relations": [
+      {"rel": {
+        "read": {
+          "base_schema": {
+            "struct": {
+              "types": [ 
+                         {"binary": {}}
+                       ]
+            },
+            "names": [
+                      "foo"
+                      ]
+          },
+          "local_files": {
+            "items": [
+              {
+                "uri_file": "file://FILENAME_PLACEHOLDER",
+                "format": "FILE_FORMAT_PARQUET"
+              }
+            ]
+          }
+        }
+      }}
+    ]
+  })";
+
+  std::string filename_placeholder = "FILENAME_PLACEHOLDER";
+  substrait_json.replace(substrait_json.find(filename_placeholder),
+                         filename_placeholder.size(), file_path);
+  auto in_schema = schema({field("foo", binary())});
+  AsyncGenerator<util::optional<cp::ExecBatch>> sink_gen;
+  cp::ExecContext exec_context(default_memory_pool(),
+                               arrow::internal::GetCpuThreadPool());
+  ASSERT_OK_AND_ASSIGN(auto plan, cp::ExecPlan::Make());
+  engine::SubstraitExecutor executor(substrait_json, &sink_gen, plan, in_schema,
+                                     exec_context);
+  auto status = executor.MakePlan();
+  ASSERT_OK(status);
+  ASSERT_OK_AND_ASSIGN(auto reader, executor.Execute());
+  auto finish = executor.Finalize();
+  ASSERT_OK(finish);
+  ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatchReader(reader.get()));
+  EXPECT_GT(table->num_rows(), 0);
+}
+
+TEST(Substrait, GetRecordBatchIteratorUtil) {
+  const auto parquet_root = std::getenv("PARQUET_TEST_DATA");
+  std::string dir_string(parquet_root);
+  std::stringstream ss;
+  ss << dir_string << "/binary.parquet";
+  auto file_path = ss.str();
+
+  std::string substrait_json = R"({
+    "relations": [
+      {"rel": {
+        "read": {
+          "base_schema": {
+            "struct": {
+              "types": [ 
+                         {"binary": {}}
+                       ]
+            },
+            "names": [
+                      "foo"
+                      ]
+          },
+          "local_files": {
+            "items": [
+              {
+                "uri_file": "file://FILENAME_PLACEHOLDER",
+                "format": "FILE_FORMAT_PARQUET"
+              }
+            ]
+          }
+        }
+      }}
+    ]
+  })";
+
+  std::string filename_placeholder = "FILENAME_PLACEHOLDER";
+  substrait_json.replace(substrait_json.find(filename_placeholder),
+                         filename_placeholder.size(), file_path);
+  auto in_schema = schema({field("foo", binary())});
+
+  ASSERT_OK_AND_ASSIGN(auto reader, engine::SubstraitExecutor::GetRecordBatchReader(
+                                        substrait_json, in_schema));
+  ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatchReader(reader.get()));
+  EXPECT_GT(table->num_rows(), 0);

Review comment:
       Shouldn't we know the number of expected rows?

##########
File path: python/pyarrow/engine.py
##########
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pyarrow._engine import (  # noqa
+    run_query

Review comment:
       just a nit, if we're going to indent like this

##########
File path: python/CMakeLists.txt
##########
@@ -534,6 +539,20 @@ if(PYARROW_BUILD_FLIGHT)
   set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} _flight)
 endif()
 
+# Engine
+
+if(PYARROW_BUILD_ENGINE)
+find_package(ArrowEngine REQUIRED)

Review comment:
       looks like the CMake files need formatting

##########
File path: python/pyarrow/_engine.pyx
##########
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+import sys
+
+from cython.operator cimport dereference as deref

Review comment:
       Unused imports?

##########
File path: python/pyarrow/_engine.pyx
##########
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+import sys
+
+from cython.operator cimport dereference as deref
+
+from pyarrow.lib cimport *
+from pyarrow.includes.libarrow cimport *
+import pyarrow.lib as lib
+
+import numpy as np

Review comment:
       Unused imports?

##########
File path: python/pyarrow/engine.py
##########
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pyarrow._engine import (  # noqa
+    run_query

Review comment:
       ```suggestion
       run_query,
   ```

##########
File path: cpp/src/arrow/engine/substrait/util.h
##########
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+#include "arrow/engine/api.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+
+namespace cp = arrow::compute;
+
+namespace engine {
+
+class ARROW_ENGINE_EXPORT SubstraitSinkConsumer : public cp::SinkNodeConsumer {
+ public:
+  explicit SubstraitSinkConsumer(
+      AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator,
+      arrow::util::BackpressureOptions backpressure = {})
+      : producer_(MakeProducer(generator, std::move(backpressure))) {}
+
+  Status Consume(cp::ExecBatch batch) override;
+
+  static arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer
+  MakeProducer(AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* out_gen,
+               arrow::util::BackpressureOptions backpressure) {
+    arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>> push_gen(
+        std::move(backpressure));
+    auto out = push_gen.producer();
+    *out_gen = std::move(push_gen);
+    return out;
+  }
+
+  Future<> Finish() override;
+
+ private:
+  PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer producer_;
+};
+
+class ARROW_ENGINE_EXPORT SubstraitExecutor {
+ public:
+  explicit SubstraitExecutor(
+      std::string substrait_json,
+      AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator,
+      std::shared_ptr<cp::ExecPlan> plan, std::shared_ptr<Schema> schema,
+      cp::ExecContext exec_context)
+      : substrait_json_(substrait_json),
+        generator_(generator),
+        plan_(std::move(plan)),
+        schema_(schema),
+        exec_context_(exec_context) {}
+
+  Status MakePlan();
+
+  Result<std::shared_ptr<RecordBatchReader>> Execute();
+
+  Status Finalize();
+
+  static Result<std::shared_ptr<RecordBatchReader>> GetRecordBatchReader(
+      std::string& substrait_json, std::shared_ptr<arrow::Schema> schema);
+
+ private:
+  std::string substrait_json_;
+  AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator_;
+  std::vector<cp::Declaration> declerations_;
+  std::shared_ptr<cp::ExecPlan> plan_;
+  std::shared_ptr<Schema> schema_;

Review comment:
       We should `#include` vector, memory, and string

##########
File path: cpp/src/arrow/engine/substrait/util.h
##########
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+#include "arrow/engine/api.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+
+namespace cp = arrow::compute;
+
+namespace engine {
+
+class ARROW_ENGINE_EXPORT SubstraitSinkConsumer : public cp::SinkNodeConsumer {

Review comment:
       nit: docstrings?

##########
File path: python/CMakeLists.txt
##########
@@ -534,6 +539,20 @@ if(PYARROW_BUILD_FLIGHT)
   set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} _flight)
 endif()
 
+# Engine
+
+if(PYARROW_BUILD_ENGINE)
+find_package(ArrowEngine REQUIRED)
+  if(PYARROW_BUNDLE_ARROW_CPP)
+    message("ARROW_ENGINE_SHARED_LIB")
+    message(""${ARROW_ENGINE_SHARED_LIB})

Review comment:
       Are these log lines necessary?

##########
File path: cpp/examples/arrow/engine_substrait_example.cc
##########
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/api.h>
+#include <arrow/compute/api.h>
+#include <arrow/engine/api.h>
+#include <arrow/engine/substrait/util.h>
+#include "arrow/util/async_generator.h"
+#include "arrow/util/iterator.h"

Review comment:
       inconsistent include styles here

##########
File path: cpp/examples/arrow/engine_substrait_example.cc
##########
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.

Review comment:
       Should this be made a cookbook example?

##########
File path: cpp/src/arrow/engine/substrait/util.h
##########
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+#include "arrow/engine/api.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+
+namespace cp = arrow::compute;
+
+namespace engine {
+
+class ARROW_ENGINE_EXPORT SubstraitSinkConsumer : public cp::SinkNodeConsumer {
+ public:
+  explicit SubstraitSinkConsumer(
+      AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator,
+      arrow::util::BackpressureOptions backpressure = {})
+      : producer_(MakeProducer(generator, std::move(backpressure))) {}
+
+  Status Consume(cp::ExecBatch batch) override;
+
+  static arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer
+  MakeProducer(AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* out_gen,
+               arrow::util::BackpressureOptions backpressure) {
+    arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>> push_gen(
+        std::move(backpressure));
+    auto out = push_gen.producer();
+    *out_gen = std::move(push_gen);
+    return out;
+  }
+
+  Future<> Finish() override;
+
+ private:
+  PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer producer_;
+};
+
+class ARROW_ENGINE_EXPORT SubstraitExecutor {
+ public:
+  explicit SubstraitExecutor(
+      std::string substrait_json,
+      AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator,
+      std::shared_ptr<cp::ExecPlan> plan, std::shared_ptr<Schema> schema,
+      cp::ExecContext exec_context)

Review comment:
       Does it make sense to take anything besides the JSON and the ExecContext as arguments? I would expect this class manages the details of executing substrait internally

##########
File path: cpp/src/arrow/engine/substrait/util.h
##########
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+#include "arrow/engine/api.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+
+namespace cp = arrow::compute;
+
+namespace engine {
+
+class ARROW_ENGINE_EXPORT SubstraitSinkConsumer : public cp::SinkNodeConsumer {
+ public:
+  explicit SubstraitSinkConsumer(
+      AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator,
+      arrow::util::BackpressureOptions backpressure = {})
+      : producer_(MakeProducer(generator, std::move(backpressure))) {}
+
+  Status Consume(cp::ExecBatch batch) override;
+
+  static arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer
+  MakeProducer(AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* out_gen,
+               arrow::util::BackpressureOptions backpressure) {
+    arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>> push_gen(
+        std::move(backpressure));
+    auto out = push_gen.producer();
+    *out_gen = std::move(push_gen);
+    return out;
+  }
+
+  Future<> Finish() override;
+
+ private:
+  PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer producer_;
+};
+
+class ARROW_ENGINE_EXPORT SubstraitExecutor {
+ public:
+  explicit SubstraitExecutor(
+      std::string substrait_json,
+      AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator,
+      std::shared_ptr<cp::ExecPlan> plan, std::shared_ptr<Schema> schema,
+      cp::ExecContext exec_context)
+      : substrait_json_(substrait_json),
+        generator_(generator),
+        plan_(std::move(plan)),
+        schema_(schema),
+        exec_context_(exec_context) {}
+
+  Status MakePlan();
+
+  Result<std::shared_ptr<RecordBatchReader>> Execute();
+
+  Status Finalize();

Review comment:
       We usually call this `Close()`

##########
File path: cpp/src/arrow/engine/substrait/util.cc
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+
+namespace arrow {
+
+namespace engine {
+
+Status SubstraitSinkConsumer::Consume(cp::ExecBatch batch) {
+  // Consume a batch of data
+  bool did_push = producer_.Push(batch);
+  if (!did_push) return Status::ExecutionError("Producer closed already");
+  return Status::OK();
+}
+
+Future<> SubstraitSinkConsumer::Finish() {
+  producer_.Push(IterationEnd<arrow::util::optional<cp::ExecBatch>>());
+  if (producer_.Close()) {
+    return Future<>::MakeFinished();
+  }
+  return Future<>::MakeFinished(
+      Status::ExecutionError("Error occurred in closing the batch producer"));
+}
+
+Status SubstraitExecutor::MakePlan() {
+  ARROW_ASSIGN_OR_RAISE(auto serialized_plan,
+                        engine::internal::SubstraitFromJSON("Plan", substrait_json_));
+
+  auto maybe_plan_json = engine::internal::SubstraitToJSON("Plan", *serialized_plan);
+  RETURN_NOT_OK(maybe_plan_json.status());
+
+  std::vector<std::shared_ptr<cp::SinkNodeConsumer>> consumers;
+  std::function<std::shared_ptr<cp::SinkNodeConsumer>()> consumer_factory = [&] {
+    // All batches produced by the plan will be fed into IgnoringConsumers:

Review comment:
       What does this mean? It certainly doesn't seem we're ignoring the data.

##########
File path: cpp/src/arrow/engine/substrait/util.h
##########
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+#include "arrow/engine/api.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+
+namespace cp = arrow::compute;
+
+namespace engine {
+
+class ARROW_ENGINE_EXPORT SubstraitSinkConsumer : public cp::SinkNodeConsumer {
+ public:
+  explicit SubstraitSinkConsumer(
+      AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator,
+      arrow::util::BackpressureOptions backpressure = {})
+      : producer_(MakeProducer(generator, std::move(backpressure))) {}
+
+  Status Consume(cp::ExecBatch batch) override;
+
+  static arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer
+  MakeProducer(AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* out_gen,
+               arrow::util::BackpressureOptions backpressure) {
+    arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>> push_gen(
+        std::move(backpressure));
+    auto out = push_gen.producer();
+    *out_gen = std::move(push_gen);
+    return out;
+  }
+
+  Future<> Finish() override;
+
+ private:
+  PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer producer_;
+};
+
+class ARROW_ENGINE_EXPORT SubstraitExecutor {
+ public:
+  explicit SubstraitExecutor(
+      std::string substrait_json,
+      AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator,
+      std::shared_ptr<cp::ExecPlan> plan, std::shared_ptr<Schema> schema,
+      cp::ExecContext exec_context)
+      : substrait_json_(substrait_json),
+        generator_(generator),
+        plan_(std::move(plan)),
+        schema_(schema),
+        exec_context_(exec_context) {}
+
+  Status MakePlan();

Review comment:
       Maybe this should be private?

##########
File path: cpp/src/arrow/engine/substrait/util.cc
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+
+namespace arrow {
+
+namespace engine {
+
+Status SubstraitSinkConsumer::Consume(cp::ExecBatch batch) {
+  // Consume a batch of data
+  bool did_push = producer_.Push(batch);
+  if (!did_push) return Status::ExecutionError("Producer closed already");
+  return Status::OK();
+}
+
+Future<> SubstraitSinkConsumer::Finish() {
+  producer_.Push(IterationEnd<arrow::util::optional<cp::ExecBatch>>());
+  if (producer_.Close()) {
+    return Future<>::MakeFinished();
+  }
+  return Future<>::MakeFinished(
+      Status::ExecutionError("Error occurred in closing the batch producer"));
+}
+
+Status SubstraitExecutor::MakePlan() {
+  ARROW_ASSIGN_OR_RAISE(auto serialized_plan,
+                        engine::internal::SubstraitFromJSON("Plan", substrait_json_));
+
+  auto maybe_plan_json = engine::internal::SubstraitToJSON("Plan", *serialized_plan);
+  RETURN_NOT_OK(maybe_plan_json.status());

Review comment:
       Does this have any effect? And why not just `RETURN_NOT_OK(engine::internal::SubstraitToJSON());`?

##########
File path: cpp/src/arrow/engine/substrait/util.h
##########
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+#include "arrow/engine/api.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+
+namespace cp = arrow::compute;
+
+namespace engine {
+
+class ARROW_ENGINE_EXPORT SubstraitSinkConsumer : public cp::SinkNodeConsumer {
+ public:
+  explicit SubstraitSinkConsumer(
+      AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator,
+      arrow::util::BackpressureOptions backpressure = {})
+      : producer_(MakeProducer(generator, std::move(backpressure))) {}
+
+  Status Consume(cp::ExecBatch batch) override;
+
+  static arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer
+  MakeProducer(AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* out_gen,
+               arrow::util::BackpressureOptions backpressure) {
+    arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>> push_gen(
+        std::move(backpressure));
+    auto out = push_gen.producer();
+    *out_gen = std::move(push_gen);
+    return out;
+  }
+
+  Future<> Finish() override;
+
+ private:
+  PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer producer_;
+};
+
+class ARROW_ENGINE_EXPORT SubstraitExecutor {
+ public:
+  explicit SubstraitExecutor(
+      std::string substrait_json,
+      AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator,
+      std::shared_ptr<cp::ExecPlan> plan, std::shared_ptr<Schema> schema,
+      cp::ExecContext exec_context)
+      : substrait_json_(substrait_json),
+        generator_(generator),
+        plan_(std::move(plan)),
+        schema_(schema),
+        exec_context_(exec_context) {}
+
+  Status MakePlan();
+
+  Result<std::shared_ptr<RecordBatchReader>> Execute();
+
+  Status Finalize();
+
+  static Result<std::shared_ptr<RecordBatchReader>> GetRecordBatchReader(
+      std::string& substrait_json, std::shared_ptr<arrow::Schema> schema);

Review comment:
       IMO it's quite weird that you have to know the schema ahead of time, is there not a way to extract the schema from the substrait plan?

##########
File path: cpp/src/arrow/engine/substrait/util.h
##########
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+#include "arrow/engine/api.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+
+namespace cp = arrow::compute;
+
+namespace engine {
+
+class ARROW_ENGINE_EXPORT SubstraitSinkConsumer : public cp::SinkNodeConsumer {
+ public:
+  explicit SubstraitSinkConsumer(
+      AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator,
+      arrow::util::BackpressureOptions backpressure = {})
+      : producer_(MakeProducer(generator, std::move(backpressure))) {}
+
+  Status Consume(cp::ExecBatch batch) override;
+
+  static arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer
+  MakeProducer(AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* out_gen,
+               arrow::util::BackpressureOptions backpressure) {

Review comment:
       nit, but is it not possible to place this in the `.cc` file too?

##########
File path: cpp/src/arrow/engine/substrait/util.cc
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+
+namespace arrow {
+
+namespace engine {
+
+Status SubstraitSinkConsumer::Consume(cp::ExecBatch batch) {
+  // Consume a batch of data
+  bool did_push = producer_.Push(batch);
+  if (!did_push) return Status::ExecutionError("Producer closed already");
+  return Status::OK();
+}
+
+Future<> SubstraitSinkConsumer::Finish() {
+  producer_.Push(IterationEnd<arrow::util::optional<cp::ExecBatch>>());
+  if (producer_.Close()) {
+    return Future<>::MakeFinished();
+  }
+  return Future<>::MakeFinished(
+      Status::ExecutionError("Error occurred in closing the batch producer"));
+}
+
+Status SubstraitExecutor::MakePlan() {
+  ARROW_ASSIGN_OR_RAISE(auto serialized_plan,
+                        engine::internal::SubstraitFromJSON("Plan", substrait_json_));
+
+  auto maybe_plan_json = engine::internal::SubstraitToJSON("Plan", *serialized_plan);
+  RETURN_NOT_OK(maybe_plan_json.status());
+
+  std::vector<std::shared_ptr<cp::SinkNodeConsumer>> consumers;
+  std::function<std::shared_ptr<cp::SinkNodeConsumer>()> consumer_factory = [&] {
+    // All batches produced by the plan will be fed into IgnoringConsumers:
+    consumers.emplace_back(new SubstraitSinkConsumer{generator_});
+    return consumers.back();
+  };
+
+  // Deserialize each relation tree in the substrait plan to an Arrow compute Declaration
+  ARROW_ASSIGN_OR_RAISE(declerations_,
+                        engine::DeserializePlan(*serialized_plan, consumer_factory));
+
+  // It's safe to drop the serialized plan; we don't leave references to its memory
+  serialized_plan.reset();

Review comment:
       Won't this go out of scope automatically anyways?

##########
File path: cpp/src/arrow/engine/substrait/serde_test.cc
##########
@@ -724,5 +728,103 @@ TEST(Substrait, ExtensionSetFromPlan) {
   EXPECT_EQ(decoded_add_func.name, "add");
 }
 
+TEST(Substrait, GetRecordBatchIterator) {
+  const auto parquet_root = std::getenv("PARQUET_TEST_DATA");
+  std::string dir_string(parquet_root);
+  std::stringstream ss;
+  ss << dir_string << "/binary.parquet";
+  auto file_path = ss.str();
+
+  std::string substrait_json = R"({
+    "relations": [
+      {"rel": {
+        "read": {
+          "base_schema": {
+            "struct": {
+              "types": [ 
+                         {"binary": {}}
+                       ]
+            },
+            "names": [
+                      "foo"
+                      ]
+          },
+          "local_files": {
+            "items": [
+              {
+                "uri_file": "file://FILENAME_PLACEHOLDER",
+                "format": "FILE_FORMAT_PARQUET"
+              }
+            ]
+          }
+        }
+      }}
+    ]
+  })";
+
+  std::string filename_placeholder = "FILENAME_PLACEHOLDER";
+  substrait_json.replace(substrait_json.find(filename_placeholder),
+                         filename_placeholder.size(), file_path);
+  auto in_schema = schema({field("foo", binary())});
+  AsyncGenerator<util::optional<cp::ExecBatch>> sink_gen;
+  cp::ExecContext exec_context(default_memory_pool(),
+                               arrow::internal::GetCpuThreadPool());
+  ASSERT_OK_AND_ASSIGN(auto plan, cp::ExecPlan::Make());
+  engine::SubstraitExecutor executor(substrait_json, &sink_gen, plan, in_schema,
+                                     exec_context);
+  auto status = executor.MakePlan();
+  ASSERT_OK(status);
+  ASSERT_OK_AND_ASSIGN(auto reader, executor.Execute());
+  auto finish = executor.Finalize();
+  ASSERT_OK(finish);

Review comment:
       Why not just `ASSERT_OK(executor.Finalize())`? In general, why all the temporary variables?

##########
File path: cpp/examples/arrow/engine_substrait_example.cc
##########
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/api.h>
+#include <arrow/compute/api.h>
+#include <arrow/engine/api.h>
+#include <arrow/engine/substrait/util.h>
+#include "arrow/util/async_generator.h"
+#include "arrow/util/iterator.h"
+
+#include <cstdlib>
+#include <iostream>
+#include <memory>
+#include <vector>
+
+namespace eng = arrow::engine;
+namespace cp = arrow::compute;
+
+#define ABORT_ON_FAILURE(expr)                     \
+  do {                                             \
+    arrow::Status status_ = (expr);                \
+    if (!status_.ok()) {                           \
+      std::cerr << status_.message() << std::endl; \
+      abort();                                     \
+    }                                              \
+  } while (0);
+
+std::string GetSubstraitPlanFromServer(const std::string& filename) {
+  // Emulate server interaction by parsing hard coded JSON
+  std::string substrait_json = R"({
+    "relations": [
+      {"rel": {
+        "read": {
+          "base_schema": {
+            "struct": {
+              "types": [ 
+                         {"i64": {}},
+                         {"bool": {}}
+                       ]
+            },
+            "names": [
+                      "i",
+                       "b"
+                     ]
+          },
+          "local_files": {
+            "items": [
+              {
+                "uri_file": "file://FILENAME_PLACEHOLDER",
+                "format": "FILE_FORMAT_PARQUET"
+              }
+            ]
+          }
+        }
+      }}
+    ]
+  })";
+  std::string filename_placeholder = "FILENAME_PLACEHOLDER";
+  substrait_json.replace(substrait_json.find(filename_placeholder),
+                         filename_placeholder.size(), filename);
+  return substrait_json;
+}
+
+int main(int argc, char** argv) {
+  if (argc < 2) {
+    std::cout << "Please specify a parquet file to scan" << std::endl;
+    // Fake pass for CI
+    return EXIT_SUCCESS;
+  }
+  auto substrait_json = GetSubstraitPlanFromServer(argv[1]);
+
+  auto schema = arrow::schema(
+      {arrow::field("i", arrow::int64()), arrow::field("b", arrow::boolean())});
+
+  cp::ExecContext exec_context(arrow::default_memory_pool(),
+                               ::arrow::internal::GetCpuThreadPool());

Review comment:
       hmm, should we be using `internal` namespace in an example? is there not a default constructor?

##########
File path: python/pyarrow/_engine.pyx
##########
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+import sys
+
+from cython.operator cimport dereference as deref
+
+from pyarrow.lib cimport *
+from pyarrow.includes.libarrow cimport *
+import pyarrow.lib as lib
+
+import numpy as np
+
+
+def run_query(plan, output_schema):
+
+    cdef:
+        CResult[shared_ptr[CRecordBatchReader]] c_res_reader
+        shared_ptr[CRecordBatchReader] c_reader
+        shared_ptr[CSchema] c_schema
+        c_string c_plan
+        RecordBatchReader reader
+
+    c_plan = plan.encode()

Review comment:
       Ah, it's the Protobuf JSON.

##########
File path: cpp/src/arrow/engine/substrait/util.cc
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+
+namespace arrow {
+
+namespace engine {
+
+Status SubstraitSinkConsumer::Consume(cp::ExecBatch batch) {
+  // Consume a batch of data
+  bool did_push = producer_.Push(batch);
+  if (!did_push) return Status::ExecutionError("Producer closed already");
+  return Status::OK();
+}
+
+Future<> SubstraitSinkConsumer::Finish() {

Review comment:
       Isn't there already a sink that outputs to a reader? Why do we need a custom implementation here?

##########
File path: cpp/src/arrow/engine/substrait/util.cc
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+
+namespace arrow {
+
+namespace engine {
+
+Status SubstraitSinkConsumer::Consume(cp::ExecBatch batch) {
+  // Consume a batch of data
+  bool did_push = producer_.Push(batch);
+  if (!did_push) return Status::ExecutionError("Producer closed already");
+  return Status::OK();
+}
+
+Future<> SubstraitSinkConsumer::Finish() {
+  producer_.Push(IterationEnd<arrow::util::optional<cp::ExecBatch>>());
+  if (producer_.Close()) {
+    return Future<>::MakeFinished();
+  }
+  return Future<>::MakeFinished(
+      Status::ExecutionError("Error occurred in closing the batch producer"));
+}
+
+Status SubstraitExecutor::MakePlan() {
+  ARROW_ASSIGN_OR_RAISE(auto serialized_plan,
+                        engine::internal::SubstraitFromJSON("Plan", substrait_json_));
+
+  auto maybe_plan_json = engine::internal::SubstraitToJSON("Plan", *serialized_plan);
+  RETURN_NOT_OK(maybe_plan_json.status());
+
+  std::vector<std::shared_ptr<cp::SinkNodeConsumer>> consumers;
+  std::function<std::shared_ptr<cp::SinkNodeConsumer>()> consumer_factory = [&] {
+    // All batches produced by the plan will be fed into IgnoringConsumers:
+    consumers.emplace_back(new SubstraitSinkConsumer{generator_});
+    return consumers.back();
+  };
+
+  // Deserialize each relation tree in the substrait plan to an Arrow compute Declaration
+  ARROW_ASSIGN_OR_RAISE(declerations_,
+                        engine::DeserializePlan(*serialized_plan, consumer_factory));
+
+  // It's safe to drop the serialized plan; we don't leave references to its memory
+  serialized_plan.reset();
+
+  // Construct an empty plan (note: configure Function registry and ThreadPool here)

Review comment:
       Is this comment still relevant?

##########
File path: cpp/src/arrow/engine/substrait/util.cc
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+
+namespace arrow {
+
+namespace engine {
+
+Status SubstraitSinkConsumer::Consume(cp::ExecBatch batch) {
+  // Consume a batch of data
+  bool did_push = producer_.Push(batch);
+  if (!did_push) return Status::ExecutionError("Producer closed already");
+  return Status::OK();
+}
+
+Future<> SubstraitSinkConsumer::Finish() {
+  producer_.Push(IterationEnd<arrow::util::optional<cp::ExecBatch>>());
+  if (producer_.Close()) {
+    return Future<>::MakeFinished();
+  }
+  return Future<>::MakeFinished(
+      Status::ExecutionError("Error occurred in closing the batch producer"));
+}
+
+Status SubstraitExecutor::MakePlan() {
+  ARROW_ASSIGN_OR_RAISE(auto serialized_plan,
+                        engine::internal::SubstraitFromJSON("Plan", substrait_json_));
+
+  auto maybe_plan_json = engine::internal::SubstraitToJSON("Plan", *serialized_plan);
+  RETURN_NOT_OK(maybe_plan_json.status());
+
+  std::vector<std::shared_ptr<cp::SinkNodeConsumer>> consumers;
+  std::function<std::shared_ptr<cp::SinkNodeConsumer>()> consumer_factory = [&] {
+    // All batches produced by the plan will be fed into IgnoringConsumers:
+    consumers.emplace_back(new SubstraitSinkConsumer{generator_});
+    return consumers.back();
+  };
+
+  // Deserialize each relation tree in the substrait plan to an Arrow compute Declaration
+  ARROW_ASSIGN_OR_RAISE(declerations_,

Review comment:
       typo: `declarations`

##########
File path: cpp/src/arrow/engine/substrait/util.h
##########
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+#include "arrow/engine/api.h"

Review comment:
       If we can use `type_fwd.h` headers or manually forward-declare just the things we need that would be best

##########
File path: cpp/src/arrow/engine/substrait/util.h
##########
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+#include "arrow/engine/api.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+
+namespace cp = arrow::compute;
+
+namespace engine {
+
+class ARROW_ENGINE_EXPORT SubstraitSinkConsumer : public cp::SinkNodeConsumer {
+ public:
+  explicit SubstraitSinkConsumer(
+      AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator,
+      arrow::util::BackpressureOptions backpressure = {})
+      : producer_(MakeProducer(generator, std::move(backpressure))) {}
+
+  Status Consume(cp::ExecBatch batch) override;
+
+  static arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer
+  MakeProducer(AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* out_gen,
+               arrow::util::BackpressureOptions backpressure) {
+    arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>> push_gen(
+        std::move(backpressure));
+    auto out = push_gen.producer();
+    *out_gen = std::move(push_gen);
+    return out;
+  }
+
+  Future<> Finish() override;
+
+ private:
+  PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer producer_;
+};
+
+class ARROW_ENGINE_EXPORT SubstraitExecutor {
+ public:
+  explicit SubstraitExecutor(
+      std::string substrait_json,
+      AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator,
+      std::shared_ptr<cp::ExecPlan> plan, std::shared_ptr<Schema> schema,
+      cp::ExecContext exec_context)

Review comment:
       Ah, I guess `GetRecordBatchReader` is a factory function of sorts. The C++ API would make a little more sense IMO if `SubstraitExecutor::GetRecordBatchReader` were renamed `Make` and returned `SubstraitExecutor`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vibhatha commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

Posted by GitBox <gi...@apache.org>.
vibhatha commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r834280102



##########
File path: cpp/src/arrow/engine/substrait/util.cc
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+
+namespace arrow {
+
+namespace engine {
+
+Status SubstraitSinkConsumer::Consume(cp::ExecBatch batch) {
+  // Consume a batch of data
+  bool did_push = producer_.Push(batch);
+  if (!did_push) return Status::ExecutionError("Producer closed already");
+  return Status::OK();
+}
+
+Future<> SubstraitSinkConsumer::Finish() {
+  producer_.Push(IterationEnd<arrow::util::optional<cp::ExecBatch>>());
+  if (producer_.Close()) {
+    return Future<>::MakeFinished();
+  }
+  return Future<>::MakeFinished(
+      Status::ExecutionError("Error occurred in closing the batch producer"));
+}
+
+Status SubstraitExecutor::MakePlan() {
+  ARROW_ASSIGN_OR_RAISE(auto serialized_plan,
+                        engine::internal::SubstraitFromJSON("Plan", substrait_json_));
+
+  auto maybe_plan_json = engine::internal::SubstraitToJSON("Plan", *serialized_plan);
+  RETURN_NOT_OK(maybe_plan_json.status());
+
+  std::vector<std::shared_ptr<cp::SinkNodeConsumer>> consumers;
+  std::function<std::shared_ptr<cp::SinkNodeConsumer>()> consumer_factory = [&] {
+    // All batches produced by the plan will be fed into IgnoringConsumers:
+    consumers.emplace_back(new SubstraitSinkConsumer{generator_});
+    return consumers.back();
+  };
+
+  // Deserialize each relation tree in the substrait plan to an Arrow compute Declaration
+  ARROW_ASSIGN_OR_RAISE(declerations_,
+                        engine::DeserializePlan(*serialized_plan, consumer_factory));
+
+  // It's safe to drop the serialized plan; we don't leave references to its memory
+  serialized_plan.reset();
+
+  // Construct an empty plan (note: configure Function registry and ThreadPool here)

Review comment:
       Sorry, this PR needs another round of cleaning on comments and formatting. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r835697672



##########
File path: cpp/src/arrow/engine/substrait/util.cc
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+
+namespace arrow {
+
+namespace engine {
+
+Status SubstraitSinkConsumer::Consume(cp::ExecBatch batch) {
+  // Consume a batch of data
+  bool did_push = producer_.Push(batch);
+  if (!did_push) return Status::ExecutionError("Producer closed already");
+  return Status::OK();
+}
+
+Future<> SubstraitSinkConsumer::Finish() {

Review comment:
       Yes, let's keep this PR focused on the `ConsumingSink` approach.  We can worry about other changes later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vibhatha commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

Posted by GitBox <gi...@apache.org>.
vibhatha commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r836437676



##########
File path: python/CMakeLists.txt
##########
@@ -69,6 +69,7 @@ endif()
 if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
   option(PYARROW_BUILD_CUDA "Build the PyArrow CUDA support" OFF)
   option(PYARROW_BUILD_FLIGHT "Build the PyArrow Flight integration" OFF)
+  option(PYARROW_BUILD_ENGINE "Build the PyArrow Substrait integration" OFF)

Review comment:
       @lidavidm yes, it is better to move it to Cookbook then. Also I merely created a 1:1 example for the C++ Substrait consumer example. But I see your point. Even if we create an example for Substrait in Cookbook, can we skip the raw JSON? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vibhatha commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

Posted by GitBox <gi...@apache.org>.
vibhatha commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r834271979



##########
File path: cpp/examples/arrow/engine_substrait_example.cc
##########
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.

Review comment:
       We definitely can. I was planning on creating a more comprehensive one for the cookbook. 
   With a few filters and functions. Waiting for https://github.com/apache/arrow/pull/12664 this to be finalised. 
   What do you think? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vibhatha commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

Posted by GitBox <gi...@apache.org>.
vibhatha commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r835104064



##########
File path: cpp/src/arrow/engine/substrait/serde_test.cc
##########
@@ -724,5 +728,103 @@ TEST(Substrait, ExtensionSetFromPlan) {
   EXPECT_EQ(decoded_add_func.name, "add");
 }
 
+TEST(Substrait, GetRecordBatchIterator) {
+  const auto parquet_root = std::getenv("PARQUET_TEST_DATA");
+  std::string dir_string(parquet_root);
+  std::stringstream ss;
+  ss << dir_string << "/binary.parquet";
+  auto file_path = ss.str();
+
+  std::string substrait_json = R"({
+    "relations": [
+      {"rel": {
+        "read": {
+          "base_schema": {
+            "struct": {
+              "types": [ 
+                         {"binary": {}}
+                       ]
+            },
+            "names": [
+                      "foo"
+                      ]
+          },
+          "local_files": {
+            "items": [
+              {
+                "uri_file": "file://FILENAME_PLACEHOLDER",
+                "format": "FILE_FORMAT_PARQUET"
+              }
+            ]
+          }
+        }
+      }}
+    ]
+  })";
+
+  std::string filename_placeholder = "FILENAME_PLACEHOLDER";
+  substrait_json.replace(substrait_json.find(filename_placeholder),
+                         filename_placeholder.size(), file_path);
+  auto in_schema = schema({field("foo", binary())});
+  AsyncGenerator<util::optional<cp::ExecBatch>> sink_gen;
+  cp::ExecContext exec_context(default_memory_pool(),
+                               arrow::internal::GetCpuThreadPool());
+  ASSERT_OK_AND_ASSIGN(auto plan, cp::ExecPlan::Make());
+  engine::SubstraitExecutor executor(substrait_json, &sink_gen, plan, in_schema,
+                                     exec_context);
+  auto status = executor.MakePlan();
+  ASSERT_OK(status);
+  ASSERT_OK_AND_ASSIGN(auto reader, executor.Execute());
+  auto finish = executor.Finalize();
+  ASSERT_OK(finish);
+  ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatchReader(reader.get()));
+  EXPECT_GT(table->num_rows(), 0);
+}
+
+TEST(Substrait, GetRecordBatchIteratorUtil) {
+  const auto parquet_root = std::getenv("PARQUET_TEST_DATA");
+  std::string dir_string(parquet_root);
+  std::stringstream ss;
+  ss << dir_string << "/binary.parquet";
+  auto file_path = ss.str();
+
+  std::string substrait_json = R"({
+    "relations": [
+      {"rel": {
+        "read": {
+          "base_schema": {
+            "struct": {
+              "types": [ 
+                         {"binary": {}}
+                       ]
+            },
+            "names": [
+                      "foo"
+                      ]
+          },
+          "local_files": {
+            "items": [
+              {
+                "uri_file": "file://FILENAME_PLACEHOLDER",
+                "format": "FILE_FORMAT_PARQUET"
+              }
+            ]
+          }
+        }
+      }}
+    ]
+  })";
+
+  std::string filename_placeholder = "FILENAME_PLACEHOLDER";
+  substrait_json.replace(substrait_json.find(filename_placeholder),
+                         filename_placeholder.size(), file_path);
+  auto in_schema = schema({field("foo", binary())});
+
+  ASSERT_OK_AND_ASSIGN(auto reader, engine::SubstraitExecutor::GetRecordBatchReader(
+                                        substrait_json, in_schema));
+  ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatchReader(reader.get()));
+  EXPECT_GT(table->num_rows(), 0);

Review comment:
       Alternative is we can read the file directly using Parquet API and check the values. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vibhatha commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

Posted by GitBox <gi...@apache.org>.
vibhatha commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r836439607



##########
File path: python/CMakeLists.txt
##########
@@ -69,6 +69,7 @@ endif()
 if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
   option(PYARROW_BUILD_CUDA "Build the PyArrow CUDA support" OFF)
   option(PYARROW_BUILD_FLIGHT "Build the PyArrow Flight integration" OFF)
+  option(PYARROW_BUILD_ENGINE "Build the PyArrow Substrait integration" OFF)

Review comment:
       @lidavidm yes, the description is misleading. You're correct. We can probably say "Build PyArrow Engine integration"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vibhatha commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

Posted by GitBox <gi...@apache.org>.
vibhatha commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r834277072



##########
File path: cpp/src/arrow/engine/substrait/util.h
##########
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+#include "arrow/engine/api.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+
+namespace cp = arrow::compute;
+
+namespace engine {
+
+class ARROW_ENGINE_EXPORT SubstraitSinkConsumer : public cp::SinkNodeConsumer {
+ public:
+  explicit SubstraitSinkConsumer(
+      AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator,
+      arrow::util::BackpressureOptions backpressure = {})
+      : producer_(MakeProducer(generator, std::move(backpressure))) {}
+
+  Status Consume(cp::ExecBatch batch) override;
+
+  static arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer
+  MakeProducer(AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* out_gen,
+               arrow::util::BackpressureOptions backpressure) {
+    arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>> push_gen(
+        std::move(backpressure));
+    auto out = push_gen.producer();
+    *out_gen = std::move(push_gen);
+    return out;
+  }
+
+  Future<> Finish() override;
+
+ private:
+  PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer producer_;
+};
+
+class ARROW_ENGINE_EXPORT SubstraitExecutor {
+ public:
+  explicit SubstraitExecutor(
+      std::string substrait_json,
+      AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator,
+      std::shared_ptr<cp::ExecPlan> plan, std::shared_ptr<Schema> schema,
+      cp::ExecContext exec_context)
+      : substrait_json_(substrait_json),
+        generator_(generator),
+        plan_(std::move(plan)),
+        schema_(schema),
+        exec_context_(exec_context) {}
+
+  Status MakePlan();
+
+  Result<std::shared_ptr<RecordBatchReader>> Execute();
+
+  Status Finalize();
+
+  static Result<std::shared_ptr<RecordBatchReader>> GetRecordBatchReader(
+      std::string& substrait_json, std::shared_ptr<arrow::Schema> schema);

Review comment:
       I agree with you. I am still trying to work that out. It is not 100% clear to me how to infer the output schema from the plan. I am working on those details. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vibhatha commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

Posted by GitBox <gi...@apache.org>.
vibhatha commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r834275132



##########
File path: cpp/src/arrow/engine/substrait/util.h
##########
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+#include "arrow/engine/api.h"
+#include "arrow/util/async_generator.h"
+#include "arrow/util/iterator.h"
+
+namespace arrow {
+
+namespace cp = arrow::compute;
+
+namespace engine {
+
+class ARROW_ENGINE_EXPORT SubstraitSinkConsumer : public cp::SinkNodeConsumer {
+ public:
+  explicit SubstraitSinkConsumer(
+      AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator,
+      arrow::util::BackpressureOptions backpressure = {})
+      : producer_(MakeProducer(generator, std::move(backpressure))) {}
+
+  Status Consume(cp::ExecBatch batch) override;
+
+  static arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer
+  MakeProducer(AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* out_gen,
+               arrow::util::BackpressureOptions backpressure) {
+    arrow::PushGenerator<arrow::util::optional<cp::ExecBatch>> push_gen(
+        std::move(backpressure));
+    auto out = push_gen.producer();
+    *out_gen = std::move(push_gen);
+    return out;
+  }
+
+  Future<> Finish() override;
+
+ private:
+  PushGenerator<arrow::util::optional<cp::ExecBatch>>::Producer producer_;
+};
+
+class ARROW_ENGINE_EXPORT SubstraitExecutor {
+ public:
+  explicit SubstraitExecutor(
+      std::string substrait_json,
+      AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator,
+      std::shared_ptr<cp::ExecPlan> plan, std::shared_ptr<Schema> schema,
+      cp::ExecContext exec_context)

Review comment:
       Yes, We can keep the generator inside. I am still thinking how to extract the output-schema from the plan. 
   That's a component I haven't addressed in the PR yet. 
   
   How can we infer the schema from the plan? It is not 100% clear to me yet. 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vibhatha commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

Posted by GitBox <gi...@apache.org>.
vibhatha commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r834490322



##########
File path: cpp/examples/arrow/engine_substrait_example.cc
##########
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.

Review comment:
       yes, let's do that. I created an issue here:
   
   https://github.com/apache/arrow-cookbook/issues/168
   https://github.com/apache/arrow-cookbook/issues/169




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vibhatha commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

Posted by GitBox <gi...@apache.org>.
vibhatha commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r835098733



##########
File path: cpp/src/arrow/engine/substrait/util.cc
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+
+namespace arrow {
+
+namespace engine {
+
+Status SubstraitSinkConsumer::Consume(cp::ExecBatch batch) {
+  // Consume a batch of data
+  bool did_push = producer_.Push(batch);
+  if (!did_push) return Status::ExecutionError("Producer closed already");
+  return Status::OK();
+}
+
+Future<> SubstraitSinkConsumer::Finish() {
+  producer_.Push(IterationEnd<arrow::util::optional<cp::ExecBatch>>());
+  if (producer_.Close()) {
+    return Future<>::MakeFinished();
+  }
+  return Future<>::MakeFinished(
+      Status::ExecutionError("Error occurred in closing the batch producer"));
+}
+
+Status SubstraitExecutor::MakePlan() {
+  ARROW_ASSIGN_OR_RAISE(auto serialized_plan,
+                        engine::internal::SubstraitFromJSON("Plan", substrait_json_));
+
+  auto maybe_plan_json = engine::internal::SubstraitToJSON("Plan", *serialized_plan);
+  RETURN_NOT_OK(maybe_plan_json.status());
+
+  std::vector<std::shared_ptr<cp::SinkNodeConsumer>> consumers;
+  std::function<std::shared_ptr<cp::SinkNodeConsumer>()> consumer_factory = [&] {
+    // All batches produced by the plan will be fed into IgnoringConsumers:
+    consumers.emplace_back(new SubstraitSinkConsumer{generator_});
+    return consumers.back();
+  };
+
+  // Deserialize each relation tree in the substrait plan to an Arrow compute Declaration
+  ARROW_ASSIGN_OR_RAISE(declerations_,
+                        engine::DeserializePlan(*serialized_plan, consumer_factory));
+
+  // It's safe to drop the serialized plan; we don't leave references to its memory
+  serialized_plan.reset();
+
+  // Construct an empty plan (note: configure Function registry and ThreadPool here)
+  return Status::OK();
+}
+
+Result<std::shared_ptr<RecordBatchReader>> SubstraitExecutor::Execute() {
+  for (const cp::Declaration& decl : declerations_) {
+    RETURN_NOT_OK(decl.AddToPlan(plan_.get()).status());
+  }
+
+  ARROW_RETURN_NOT_OK(plan_->Validate());
+
+  ARROW_RETURN_NOT_OK(plan_->StartProducing());
+
+  std::shared_ptr<RecordBatchReader> sink_reader = cp::MakeGeneratorReader(
+      schema_, std::move(*generator_), exec_context_.memory_pool());
+  return sink_reader;
+}
+
+Status SubstraitExecutor::Finalize() {
+  ARROW_RETURN_NOT_OK(plan_->finished().status());
+  return Status::OK();
+}
+
+Result<std::shared_ptr<RecordBatchReader>> SubstraitExecutor::GetRecordBatchReader(
+    std::string& substrait_json, std::shared_ptr<arrow::Schema> schema) {
+  cp::ExecContext exec_context(arrow::default_memory_pool(),
+                               ::arrow::internal::GetCpuThreadPool());
+
+  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
+
+  ARROW_ASSIGN_OR_RAISE(auto plan, cp::ExecPlan::Make());
+
+  arrow::engine::SubstraitExecutor executor(substrait_json, &sink_gen, plan, schema,
+                                            exec_context);
+  RETURN_NOT_OK(executor.MakePlan());
+
+  ARROW_ASSIGN_OR_RAISE(auto sink_reader, executor.Execute());
+
+  RETURN_NOT_OK(executor.Finalize());

Review comment:
       What do you mean by the  `RecordBatchReader`'s Close? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r835621084



##########
File path: cpp/src/arrow/engine/substrait/util.cc
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+
+namespace arrow {
+
+namespace engine {
+
+Status SubstraitSinkConsumer::Consume(cp::ExecBatch batch) {
+  // Consume a batch of data
+  bool did_push = producer_.Push(batch);
+  if (!did_push) return Status::ExecutionError("Producer closed already");
+  return Status::OK();
+}
+
+Future<> SubstraitSinkConsumer::Finish() {

Review comment:
       We might want to support both as well as a consumer is much easier to implement than a node.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vibhatha commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

Posted by GitBox <gi...@apache.org>.
vibhatha commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r835697914



##########
File path: cpp/src/arrow/engine/substrait/util.cc
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/engine/substrait/util.h"
+
+namespace arrow {
+
+namespace engine {
+
+Status SubstraitSinkConsumer::Consume(cp::ExecBatch batch) {
+  // Consume a batch of data
+  bool did_push = producer_.Push(batch);
+  if (!did_push) return Status::ExecutionError("Producer closed already");
+  return Status::OK();
+}
+
+Future<> SubstraitSinkConsumer::Finish() {

Review comment:
       Should we create a JIRA for that one or should it be an open discussion before it becomes a ticket? Interested in that piece :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #12672: ARROW-15779: [Python] Create python bindings for Substrait consumer

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #12672:
URL: https://github.com/apache/arrow/pull/12672#discussion_r836436336



##########
File path: python/CMakeLists.txt
##########
@@ -69,6 +69,7 @@ endif()
 if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
   option(PYARROW_BUILD_CUDA "Build the PyArrow CUDA support" OFF)
   option(PYARROW_BUILD_FLIGHT "Build the PyArrow Flight integration" OFF)
+  option(PYARROW_BUILD_ENGINE "Build the PyArrow Substrait integration" OFF)

Review comment:
       I'm mostly talking about the description, the flag itself is fine. Substrait is not a query engine.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org